Skip to content

Commit e1d8c55

Browse files
Added support for map as input type. Fixes #52. (#53)
1 parent ba7c357 commit e1d8c55

File tree

2 files changed

+38
-11
lines changed
  • src
    • main/java/com/github/jcustenborder/kafka/connect/transform/common
    • test/java/com/github/jcustenborder/kafka/connect/transform/common

2 files changed

+38
-11
lines changed

src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ToJSON.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input
6767
return new SchemaAndValue(inputSchema, input);
6868
}
6969

70-
@Override
71-
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
70+
SchemaAndValue schemaAndValue(Schema inputSchema, Object input) {
7271
final byte[] buffer = this.converter.fromConnectData("dummy", inputSchema, input);
7372
final Schema schema;
7473
final Object value;
@@ -95,6 +94,16 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu
9594
return new SchemaAndValue(schema, value);
9695
}
9796

97+
@Override
98+
protected SchemaAndValue processMap(R record, Map<String, Object> input) {
99+
return schemaAndValue(null, input);
100+
}
101+
102+
@Override
103+
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
104+
return schemaAndValue(inputSchema, input);
105+
}
106+
98107
@Title("ToJson(Key)")
99108
@Description("This transformation is used to take structured data such as AVRO and output it as " +
100109
"JSON by way of the JsonConverter built into Kafka Connect.")

src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ToJsonTest.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.github.jcustenborder.kafka.connect.transform.common;
1717

18-
import com.google.common.base.CaseFormat;
1918
import com.google.common.collect.ImmutableMap;
2019
import org.apache.kafka.connect.connector.ConnectRecord;
2120
import org.apache.kafka.connect.data.Schema;
@@ -24,10 +23,10 @@
2423
import org.apache.kafka.connect.sink.SinkRecord;
2524
import org.apache.kafka.connect.transforms.Transformation;
2625
import org.junit.jupiter.api.Test;
27-
import org.junit.jupiter.api.TestFactory;
2826

29-
import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema;
30-
import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct;
27+
import java.util.LinkedHashMap;
28+
import java.util.Map;
29+
3130
import static org.junit.jupiter.api.Assertions.assertEquals;
3231
import static org.junit.jupiter.api.Assertions.assertNotNull;
3332

@@ -37,7 +36,7 @@ protected ToJsonTest(boolean isKey) {
3736
}
3837

3938
@Test
40-
public void test() {
39+
public void struct() {
4140
this.transformation.configure(ImmutableMap.of());
4241
final Schema inputSchema = SchemaBuilder.struct()
4342
.field("FIRST_NAME", Schema.STRING_SCHEMA)
@@ -64,13 +63,32 @@ public void test() {
6463
1L
6564
);
6665

67-
for (int i = 0; i < 50; i++) {
68-
final SinkRecord transformedRecord = this.transformation.apply(inputRecord);
69-
assertNotNull(transformedRecord, "transformedRecord should not be null.");
70-
}
66+
final SinkRecord transformedRecord = this.transformation.apply(inputRecord);
67+
assertNotNull(transformedRecord, "transformedRecord should not be null.");
68+
}
69+
70+
@Test
71+
public void map() {
72+
this.transformation.configure(ImmutableMap.of());
73+
Map<String, String> input = new LinkedHashMap<>();
74+
input.put("FIRST_NAME", "test");
75+
input.put("LAST_NAME", "user");
76+
77+
final SinkRecord inputRecord = new SinkRecord(
78+
"topic",
79+
1,
80+
null,
81+
null,
82+
null,
83+
input,
84+
1L
85+
);
7186

87+
final SinkRecord transformedRecord = this.transformation.apply(inputRecord);
88+
assertNotNull(transformedRecord, "transformedRecord should not be null.");
7289
}
7390

91+
7492
@Test
7593
public void ignoreNonStruct() {
7694
final SinkRecord inputRecord = new SinkRecord(

0 commit comments

Comments
 (0)