From 198d876253cefd7f50f47b4f887202aae293cb9f Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Thu, 18 Jun 2026 12:05:27 +0200 Subject: [PATCH] KAFKA-20712: Fix transaction state name in log decoder output The TransactionLogMessageParser and TransactionLogMessageFormatter output the numeric transactionStatus (e.g. 4) instead of the human readable state name (e.g. CompleteCommit). This patch eesolve the byte to the state name via TransactionState.fromId() in both the dump log and consumer formatter code paths. Signed-off-by: Federico Valeri --- .../java/org/apache/kafka/tools/DumpLogSegments.java | 9 ++++++++- .../tools/consumer/TransactionLogMessageFormatter.java | 10 +++++++++- .../org/apache/kafka/tools/DumpLogSegmentsTest.java | 4 ++-- .../consumer/TransactionLogMessageFormatterTest.java | 4 ++-- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java index 8e6f3d8d09607..888ebd807e3d1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java +++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java @@ -53,6 +53,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde; import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.TransactionState; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; import org.apache.kafka.metadata.MetadataRecordSerde; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -725,8 +727,13 @@ protected JsonNode keyAsJson(ApiMessage message) { @Override protected JsonNode valueAsJson(ApiMessage message, short version) { - return org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters + JsonNode json = org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters .writeRecordValueAsJson(message, version); + if (message instanceof TransactionLogValue) { + ((ObjectNode) json).put("transactionStatus", + TransactionState.fromId(((TransactionLogValue) message).transactionStatus()).stateName()); + } + return json; } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java index cf9c540da3e63..56e1b587b3e70 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java @@ -18,9 +18,12 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.TransactionState; import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter { public TransactionLogMessageFormatter() { @@ -39,6 +42,11 @@ protected JsonNode keyAsJson(ApiMessage message) { @Override protected JsonNode valueAsJson(ApiMessage message, short version) { - return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); + JsonNode json = CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); + if (message instanceof TransactionLogValue) { + ((ObjectNode) json).put("transactionStatus", + TransactionState.fromId(((TransactionLogValue) message).transactionStatus()).stateName()); + } + return json; } } diff --git a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java index 77fba98097cc9..629bf9b62d6e0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java @@ -990,7 +990,7 @@ public void testTransactionLogMessageParser() { )), Optional.of("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), Optional.of("{\"version\":\"0\",\"data\":{\"producerId\":123,\"producerEpoch\":0,\"transactionTimeoutMs\":0," + - "\"transactionStatus\":0,\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," + + "\"transactionStatus\":\"Empty\",\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," + "\"transactionStartTimestampMs\":0}}") ); @@ -1047,7 +1047,7 @@ public void testTransactionLogMessageParser() { )), Optional.of("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), Optional.of("{\"version\":\"1\",\"data\":{\"producerId\":12,\"previousProducerId\":11,\"nextProducerId\":10," + - "\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":0," + + "\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":\"Empty\"," + "\"transactionPartitions\":[{\"topic\":\"topic1\",\"partitionIds\":[0,1,2]}," + "{\"topic\":\"topic2\",\"partitionIds\":[3,4,5]}],\"transactionLastUpdateTimestampMs\":123," + "\"transactionStartTimestampMs\":13}}") diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java index 7a6f86ace8002..79aef4ce13c18 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java @@ -60,7 +60,7 @@ protected Stream parameters() { "data":{"producerId":100, "producerEpoch":50, "transactionTimeoutMs":500, - "transactionStatus":4, + "transactionStatus":"CompleteCommit", "transactionPartitions":[], "transactionLastUpdateTimestampMs":1000, "transactionStartTimestampMs":750}}} @@ -75,7 +75,7 @@ protected Stream parameters() { "data":{"producerId":100, "producerEpoch":50, "transactionTimeoutMs":500, - "transactionStatus":4, + "transactionStatus":"CompleteCommit", "transactionPartitions":[], "transactionLastUpdateTimestampMs":1000, "transactionStartTimestampMs":750}}}