diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToField.java index cdf9907..2e96a51 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToField.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +101,32 @@ Conversion conversion(Schema schema) { }); } + @Override + protected SchemaAndValue processMap(R record, Map input) { + if (record.headers().isEmpty()) { + return new SchemaAndValue(null, input); + } + + Map headers = new HashMap<>(); + if (this.config.mappings.isEmpty()) { + for (Header header: record.headers()) { + headers.put(header.key(), header.value()); + break; + } + } else { + this.config.mappings.forEach(mapping -> { + for (Header header: record.headers()) { + if (header.key().equals(mapping.header)) { + headers.put(mapping.field, header.value()); + break; + } + } + }); + } + + input.put("_headers", headers); + return new SchemaAndValue(null, input); + } @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToFieldTest.java index 38d0644..a0f806f 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/HeaderToFieldTest.java @@ -14,19 +14,16 @@ import java.io.IOException; import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; public class HeaderToFieldTest { Transformation transformation; - @BeforeEach - public void before() { - this.transformation = new HeaderToField.Value<>(); - } - - @Test public void apply() throws IOException { + this.transformation = new HeaderToField.Value<>(); + this.transformation.configure( ImmutableMap.of(HeaderToFieldConfig.HEADER_MAPPINGS_CONF, "applicationId:STRING") ); @@ -71,4 +68,44 @@ public void apply() throws IOException { assertStruct(expectedStruct, (Struct) actualRecord.value()); } + @Test + public void applyWithMap() throws IOException { + this.transformation = new HeaderToField.Key<>(); + + this.transformation.configure( + ImmutableMap.of(HeaderToFieldConfig.HEADER_MAPPINGS_CONF, "applicationId:STRING") + ); + + ConnectHeaders inputHeaders = new ConnectHeaders(); + inputHeaders.addString("applicationId", "testing"); + + Schema inputSchema = SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .parameter("firstName", "example") + .parameter("lastName", "user") + .build(); + + Schema expectedSchema = SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .parameter("firstName", "example") + .parameter("lastName", "user") + .parameter("applicationId", "testing") + .build(); + + SinkRecord inputRecord = new SinkRecord( + "testing", + 1, + null, + null, + expectedSchema.schema(), + inputSchema, + 12345L, + 123412351L, + TimestampType.NO_TIMESTAMP_TYPE, + inputHeaders + ); + + SinkRecord actualRecord = this.transformation.apply(inputRecord); + assertNotNull(actualRecord, "record should not be null."); + assertEquals(expectedSchema.parameters().size(), 3); + } + }