diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java index 8e6f3d8d09607..888ebd807e3d1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java +++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java @@ -53,6 +53,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde; import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.TransactionState; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; import org.apache.kafka.metadata.MetadataRecordSerde; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -725,8 +727,13 @@ protected JsonNode keyAsJson(ApiMessage message) { @Override protected JsonNode valueAsJson(ApiMessage message, short version) { - return org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters + JsonNode json = org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters .writeRecordValueAsJson(message, version); + if (message instanceof TransactionLogValue) { + ((ObjectNode) json).put("transactionStatus", + TransactionState.fromId(((TransactionLogValue) message).transactionStatus()).stateName()); + } + return json; } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java index cf9c540da3e63..56e1b587b3e70 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java @@ -18,9 +18,12 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.TransactionState; import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter { public TransactionLogMessageFormatter() { @@ -39,6 +42,11 @@ protected JsonNode keyAsJson(ApiMessage message) { @Override protected JsonNode valueAsJson(ApiMessage message, short version) { - return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); + JsonNode json = CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); + if (message instanceof TransactionLogValue) { + ((ObjectNode) json).put("transactionStatus", + TransactionState.fromId(((TransactionLogValue) message).transactionStatus()).stateName()); + } + return json; } } diff --git a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java index 77fba98097cc9..629bf9b62d6e0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java @@ -990,7 +990,7 @@ public void testTransactionLogMessageParser() { )), Optional.of("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), Optional.of("{\"version\":\"0\",\"data\":{\"producerId\":123,\"producerEpoch\":0,\"transactionTimeoutMs\":0," + - "\"transactionStatus\":0,\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," + + "\"transactionStatus\":\"Empty\",\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," + "\"transactionStartTimestampMs\":0}}") ); @@ -1047,7 +1047,7 @@ public void testTransactionLogMessageParser() { )), Optional.of("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), Optional.of("{\"version\":\"1\",\"data\":{\"producerId\":12,\"previousProducerId\":11,\"nextProducerId\":10," + - "\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":0," + + "\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":\"Empty\"," + "\"transactionPartitions\":[{\"topic\":\"topic1\",\"partitionIds\":[0,1,2]}," + "{\"topic\":\"topic2\",\"partitionIds\":[3,4,5]}],\"transactionLastUpdateTimestampMs\":123," + "\"transactionStartTimestampMs\":13}}") diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java index 7a6f86ace8002..79aef4ce13c18 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java @@ -60,7 +60,7 @@ protected Stream parameters() { "data":{"producerId":100, "producerEpoch":50, "transactionTimeoutMs":500, - "transactionStatus":4, + "transactionStatus":"CompleteCommit", "transactionPartitions":[], "transactionLastUpdateTimestampMs":1000, "transactionStartTimestampMs":750}}} @@ -75,7 +75,7 @@ protected Stream parameters() { "data":{"producerId":100, "producerEpoch":50, "transactionTimeoutMs":500, - "transactionStatus":4, + "transactionStatus":"CompleteCommit", "transactionPartitions":[], "transactionLastUpdateTimestampMs":1000, "transactionStartTimestampMs":750}}}