From 24ef443c50f55e30608fbdb7eddedc7e0cc0277b Mon Sep 17 00:00:00 2001 From: okayhooni Date: Mon, 30 Oct 2023 20:07:16 +0900 Subject: [PATCH 1/2] implement processMap with recursion on ChangeCase --- .../connect/transform/common/ChangeCase.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java index 0adf819..cbc0a5a 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -56,6 +57,27 @@ public void configure(Map map) { Map schemaState = new HashMap<>(); + @Override + protected SchemaAndValue processMap(R record, Map input) { + return new SchemaAndValue(null, convertMap(input)); + } + + private Object convertMap(Map input) { + final Map outputMap = new LinkedHashMap<>(input.size()); + + for (final String inputFieldName : input.keySet()) { + log.trace("processMap() - Processing field '{}'", inputFieldName); + final String outputFieldName = this.config.from.to(this.config.to, inputFieldName); + Object value = input.get(inputFieldName); + if (value instanceof Map) { + value = convertMap((Map) value); + } + outputMap.put(outputFieldName, value); + } + + return outputMap; + } + @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { final Schema outputSchema = this.schemaState.computeIfAbsent(inputSchema, schema -> convertSchema(schema)); From 8e0aa2163de88f4c7cfa074342f0778a41836334 Mon Sep 17 00:00:00 2001 From: okayhooni Date: Sun, 5 Nov 2023 01:40:20 +0900 Subject: [PATCH 2/2] add simple test case for processMap of ChangeCase --- .../transform/common/ChangeCaseTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java index 78fedbf..c008075 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java @@ -27,8 +27,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Map; import java.util.function.Function; +import static com.github.jcustenborder.kafka.connect.transform.common.GenericAssertions.assertMap; import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -58,6 +60,47 @@ public void test() { } } + @Test + public void testSchemaLess() { + this.transformation.configure( + ImmutableMap.of(ChangeCaseConfig.FROM_CONFIG, CaseFormat.UPPER_CAMEL.toString(), + ChangeCaseConfig.TO_CONFIG, CaseFormat.LOWER_UNDERSCORE.toString())); + + final Map input = ImmutableMap.of( + "Field", "first value", + "NestingField", ImmutableMap.of( + "secondDepthField", "second depth", + "secondDepthNestingField", ImmutableMap.of( + "thirdDepthField", "third depth" + ) + ) + ); + final Map expected = ImmutableMap.of( + "field", "first value", + "nesting_field", ImmutableMap.of( + "second_depth_field", "second depth", + "second_depth_nesting_field", ImmutableMap.of( + "third_depth_field", "third depth" + ) + ) + ); + final SinkRecord inputRecord = new SinkRecord( + TOPIC, + 1, + null, + null, + null, + input, + 1L + ); + + final SinkRecord transformedRecord = this.transformation.apply(inputRecord); + assertNotNull(transformedRecord, "transformedRecord should not be null."); + @SuppressWarnings("unchecked") + final Map actual = (Map) transformedRecord.value(); + assertMap(expected, actual, "ChangeCase is not working on schema-less input, as expected"); + } + private Schema makeSchema(CaseFormat caseFormat) { final Function convert = s -> CaseFormat.LOWER_UNDERSCORE.to(caseFormat, s); return SchemaBuilder.struct().field(convert.apply("contacts"),