Skip to content

Commit 1320ffa

Browse files
authored
Recurse into structs and arrays when changing case (#102)
1 parent 12f3084 commit 1320ffa

File tree

2 files changed

+115
-63
lines changed

2 files changed

+115
-63
lines changed

src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java

+76-35
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,13 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.HashMap;
33-
import java.util.LinkedHashMap;
33+
import java.util.List;
3434
import java.util.Map;
35+
import java.util.stream.Collectors;
3536

3637
public abstract class ChangeCase<R extends ConnectRecord<R>> extends BaseTransformation<R> {
3738
private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
3839

39-
class State {
40-
public final Map<String, String> columnMapping;
41-
public final Schema schema;
42-
43-
State(Map<String, String> columnMapping, Schema schema) {
44-
this.columnMapping = columnMapping;
45-
this.schema = schema;
46-
}
47-
}
48-
4940
private ChangeCaseConfig config;
5041

5142
@Override
@@ -63,39 +54,89 @@ public void configure(Map<String, ?> map) {
6354
this.config = new ChangeCaseConfig(map);
6455
}
6556

66-
Map<Schema, State> schemaState = new HashMap<>();
57+
Map<Schema, Schema> schemaState = new HashMap<>();
6758

6859
@Override
6960
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
70-
final State state = this.schemaState.computeIfAbsent(inputSchema, schema -> {
71-
final SchemaBuilder builder = SchemaBuilder.struct();
72-
if (!Strings.isNullOrEmpty(schema.name())) {
73-
builder.name(schema.name());
74-
}
75-
if (schema.isOptional()) {
76-
builder.optional();
77-
}
61+
final Schema outputSchema = this.schemaState.computeIfAbsent(inputSchema, schema -> convertSchema(schema));
62+
final Struct outputStruct = convertStruct(inputSchema, outputSchema, input);
63+
return new SchemaAndValue(outputSchema, outputStruct);
64+
}
7865

79-
final Map<String, String> columnMapping = new LinkedHashMap<>();
66+
private Struct convertStruct(Schema inputSchema, Schema outputSchema, Struct input) {
67+
final Struct struct = new Struct(outputSchema);
68+
for (Field inputField : inputSchema.fields()) {
69+
final int index = inputField.index();
70+
final Field outputField = outputSchema.fields().get(index);
71+
final Schema inputFieldSchema = inputField.schema();
72+
final Schema outputFieldSchema = outputField.schema();
73+
final Object value = convertValue(inputFieldSchema, outputFieldSchema, input.get(inputField));
74+
struct.put(outputField, value);
75+
}
76+
return struct;
77+
}
8078

81-
for (Field field : schema.fields()) {
82-
final String newFieldName = this.config.from.to(this.config.to, field.name());
83-
log.trace("processStruct() - Mapped '{}' to '{}'", field.name(), newFieldName);
84-
columnMapping.put(field.name(), newFieldName);
85-
builder.field(newFieldName, field.schema());
79+
private Object convertValue(Schema inputFieldSchema, Schema outputFieldSchema, Object value) {
80+
switch (outputFieldSchema.type()) {
81+
case STRUCT: {
82+
return convertStruct(inputFieldSchema, outputFieldSchema, (Struct) value);
8683
}
84+
case ARRAY: {
85+
return convertArray(inputFieldSchema, outputFieldSchema, (List<Object>) value);
86+
}
87+
}
88+
return value;
89+
}
8790

88-
return new State(columnMapping, builder.build());
89-
});
90-
91-
final Struct outputStruct = new Struct(state.schema);
92-
93-
for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
94-
final Object value = input.get(kvp.getKey());
95-
outputStruct.put(kvp.getValue(), value);
91+
private Object convertArray(Schema inputFieldSchema, Schema outputFieldSchema, List<Object> value) {
92+
final Schema inputSchema = inputFieldSchema.valueSchema();
93+
final Schema outputSchema = outputFieldSchema.valueSchema();
94+
switch (outputSchema.type()) {
95+
case STRUCT: {
96+
return value.stream().map(entry -> convertStruct(
97+
inputSchema,
98+
outputSchema,
99+
(Struct) entry
100+
)).collect(Collectors.toList());
101+
}
102+
case ARRAY: {
103+
return value.stream().map(entry -> convertArray(
104+
inputSchema,
105+
outputSchema,
106+
(List<Object>) entry
107+
)).collect(Collectors.toList());
108+
}
96109
}
110+
return value;
111+
}
97112

98-
return new SchemaAndValue(state.schema, outputStruct);
113+
private Schema convertSchema(Schema inputSchema) {
114+
switch (inputSchema.type()) {
115+
case ARRAY: {
116+
log.trace("convertSchema() - Recurse into array");
117+
final SchemaBuilder builder = SchemaBuilder.array(convertSchema(inputSchema.valueSchema()));
118+
if (inputSchema.isOptional()) {
119+
builder.optional();
120+
}
121+
return builder.build();
122+
}
123+
case STRUCT: {
124+
final SchemaBuilder builder = SchemaBuilder.struct();
125+
if (!Strings.isNullOrEmpty(inputSchema.name())) {
126+
builder.name(inputSchema.name());
127+
}
128+
if (inputSchema.isOptional()) {
129+
builder.optional();
130+
}
131+
for (Field field : inputSchema.fields()) {
132+
final String newFieldName = this.config.from.to(this.config.to, field.name());
133+
log.trace("convertSchema() - Mapped '{}' to '{}'", field.name(), newFieldName);
134+
builder.field(newFieldName, convertSchema(field.schema()));
135+
}
136+
return builder.build();
137+
}
138+
}
139+
return inputSchema;
99140
}
100141

101142
@Title("ChangeCase(Key)")

src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java

+39-28
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.kafka.connect.transforms.Transformation;
2626
import org.junit.jupiter.api.Test;
2727

28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.function.Function;
31+
2832
import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema;
2933
import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct;
3034
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -37,44 +41,51 @@ protected ChangeCaseTest(boolean isKey) {
3741
@Test
3842
public void test() {
3943
this.transformation.configure(
40-
ImmutableMap.of(
41-
ChangeCaseConfig.FROM_CONFIG, CaseFormat.UPPER_UNDERSCORE.toString(),
42-
ChangeCaseConfig.TO_CONFIG, CaseFormat.LOWER_UNDERSCORE.toString()
43-
)
44-
);
45-
final Schema inputSchema = SchemaBuilder.struct()
46-
.field("FIRST_NAME", Schema.STRING_SCHEMA)
47-
.field("LAST_NAME", Schema.STRING_SCHEMA)
48-
.build();
49-
final Schema expectedSchema = SchemaBuilder.struct()
50-
.field("first_name", Schema.STRING_SCHEMA)
51-
.field("last_name", Schema.STRING_SCHEMA)
52-
.build();
53-
final Struct inputStruct = new Struct(inputSchema)
54-
.put("FIRST_NAME", "test")
55-
.put("LAST_NAME", "user");
56-
final Struct expectedStruct = new Struct(expectedSchema)
57-
.put("first_name", "test")
58-
.put("last_name", "user");
44+
ImmutableMap.of(ChangeCaseConfig.FROM_CONFIG, CaseFormat.UPPER_UNDERSCORE.toString(),
45+
ChangeCaseConfig.TO_CONFIG, CaseFormat.LOWER_UNDERSCORE.toString()));
46+
final Schema inputSchema = makeSchema(CaseFormat.UPPER_UNDERSCORE);
47+
final Schema expectedSchema = makeSchema(CaseFormat.LOWER_UNDERSCORE);
5948

60-
final SinkRecord inputRecord = new SinkRecord(
61-
"topic",
62-
1,
63-
null,
64-
null,
65-
inputSchema,
66-
inputStruct,
67-
1L
68-
);
49+
final Struct inputStruct = makeStruct(inputSchema, CaseFormat.UPPER_UNDERSCORE);
50+
final Struct expectedStruct = makeStruct(expectedSchema, CaseFormat.LOWER_UNDERSCORE);
51+
52+
final SinkRecord inputRecord = new SinkRecord("topic", 1, null, null, inputSchema, inputStruct, 1L);
6953
for (int i = 0; i < 50; i++) {
7054
final SinkRecord transformedRecord = this.transformation.apply(inputRecord);
7155
assertNotNull(transformedRecord, "transformedRecord should not be null.");
7256
assertSchema(expectedSchema, transformedRecord.valueSchema());
7357
assertStruct(expectedStruct, (Struct) transformedRecord.value());
7458
}
59+
}
7560

61+
private Schema makeSchema(CaseFormat caseFormat) {
62+
final Function<String, String> convert = s -> CaseFormat.LOWER_UNDERSCORE.to(caseFormat, s);
63+
return SchemaBuilder.struct().field(convert.apply("contacts"),
64+
SchemaBuilder.array(SchemaBuilder.struct()
65+
.field(convert.apply("contact"),
66+
SchemaBuilder.struct()
67+
.field(convert.apply("first_name"), Schema.STRING_SCHEMA)
68+
.field(convert.apply("last_name"), Schema.STRING_SCHEMA)
69+
.build()
70+
).build())
71+
).build();
7672
}
7773

74+
private Struct makeStruct(Schema schema, CaseFormat caseFormat) {
75+
final Function<String, String> convert = s -> CaseFormat.LOWER_UNDERSCORE.to(caseFormat, s);
76+
final Schema contacts = schema.fields().get(0).schema().valueSchema();
77+
final Schema contact = contacts.fields().get(0).schema();
78+
return new Struct(schema).put(convert.apply("contacts"),
79+
new ArrayList<>(
80+
Collections.singletonList(
81+
new Struct(contacts).put(convert.apply("contact"),
82+
new Struct(contact)
83+
.put(convert.apply("first_name"), "test")
84+
.put(convert.apply("last_name"), "user"))
85+
)
86+
)
87+
);
88+
}
7889

7990
public static class ValueTest<R extends ConnectRecord<R>> extends ChangeCaseTest {
8091
protected ValueTest() {

0 commit comments

Comments
 (0)