From 374d24d55a6f573debf940365b73fc1dfa90d1f6 Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Thu, 18 Jun 2026 01:23:04 +0000 Subject: [PATCH] KAFKA-19717: InsertHeader drops part of the value when header value is a number Signed-off-by: wilmerdooley --- .../connect/transforms/InsertHeader.java | 29 ++++++++++++++++++- .../connect/transforms/InsertHeaderTest.java | 13 +++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java index dbfc6adbb5503..d1eced262e674 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java @@ -19,11 +19,13 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.internals.AppInfoParser; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.math.BigDecimal; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; @@ -78,6 +80,31 @@ public void close() { public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); header = config.getString(HEADER_FIELD); - literalValue = Values.parseString(config.getString(VALUE_LITERAL_FIELD)); + String rawValue = config.getString(VALUE_LITERAL_FIELD); + SchemaAndValue parsed = Values.parseString(rawValue); + if (parsed != null && parsed.schema() != null) { + Schema.Type type = parsed.schema().type(); + boolean isIntegral = type == Schema.Type.INT8 || type == Schema.Type.INT16 || type == Schema.Type.INT32 || type == Schema.Type.INT64; + if (isIntegral && rawValue.contains(".")) { + try { + BigDecimal decimal = new BigDecimal(rawValue); + float fValue = decimal.floatValue(); + double dValue = decimal.doubleValue(); + if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY) { + literalValue = new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue); + } else if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) { + literalValue = new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue); + } else { + literalValue = new SchemaAndValue(org.apache.kafka.connect.data.Decimal.schema(decimal.scale()), decimal); + } + } catch (NumberFormatException e) { + literalValue = parsed; + } + } else { + literalValue = parsed; + } + } else { + literalValue = parsed; + } } } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java index 29e9dddcc2816..c938d47f7271b 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -81,6 +81,19 @@ public void insertionWithByteHeader() { assertEquals(expect, xformed.headers()); } + @Test + public void insertionWithFloatHeader() { + xform.configure(config("inserted", "2.0")); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "existing-value"); + Headers expect = headers.duplicate().addFloat("inserted", 2.0f); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expect, xformed.headers()); + } + @Test public void configRejectsNullHeaderKey() { assertThrows(ConfigException.class, () -> xform.configure(config(null, "1")));