Skip to content

Commit 810423f

Browse files
committed
Add support for JSON/camelcase conversion
1 parent d2b6fec commit 810423f

File tree

3 files changed

+118
-11
lines changed

3 files changed

+118
-11
lines changed

protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ public class ProtobufData {
306306
private boolean useWrapperForRawPrimitives;
307307
private boolean generateStructForNulls;
308308
private boolean generateIndexForUnions;
309+
private boolean useJsonFieldNames;
309310

310311
public ProtobufData() {
311312
this(new ProtobufDataConfig.Builder().with(
@@ -332,6 +333,7 @@ public ProtobufData(ProtobufDataConfig protobufDataConfig) {
332333
this.useWrapperForRawPrimitives = protobufDataConfig.useWrapperForRawPrimitives();
333334
this.generateStructForNulls = protobufDataConfig.generateStructForNulls();
334335
this.generateIndexForUnions = protobufDataConfig.generateIndexForUnions();
336+
this.useJsonFieldNames = protobufDataConfig.useJsonFieldNames();
335337
}
336338

337339
/**
@@ -519,10 +521,14 @@ private Object fromConnectData(
519521
throw new DataException("Invalid message name: " + scopedStructName);
520522
}
521523
for (Field field : schema.fields()) {
522-
String fieldName = scrubName(field.name());
523-
Object fieldCtx = getFieldType(ctx, fieldName);
524+
String fieldName = field.name();
525+
if (useJsonFieldNames) {
526+
fieldName = fromJsonCase(fieldName);
527+
}
528+
String scrubbedFieldName = scrubName(fieldName);
529+
Object fieldCtx = getFieldType(ctx, scrubbedFieldName);
524530
Object connectFieldVal = ignoreDefaultForNullables
525-
? struct.getWithoutDefault(field.name()) : struct.get(field);
531+
? struct.getWithoutDefault(fieldName) : struct.get(field);
526532
Object fieldValue = fromConnectData(
527533
fieldCtx,
528534
field.schema(),
@@ -539,10 +545,10 @@ private Object fromConnectData(
539545
fieldValue = union.getValue();
540546
} else {
541547
fieldDescriptor = messageBuilder.getDescriptorForType()
542-
.findFieldByName(fieldName);
548+
.findFieldByName(scrubbedFieldName);
543549
}
544550
if (fieldDescriptor == null) {
545-
throw new DataException("Cannot find field with name " + fieldName);
551+
throw new DataException("Cannot find field with name " + scrubbedFieldName);
546552
}
547553
if (fieldValue != null) {
548554
messageBuilder.setField(fieldDescriptor, fieldValue);
@@ -727,12 +733,16 @@ private MessageDefinition messageDefinitionFromConnectSchema(
727733
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
728734
.get(PROTOBUF_TYPE_TAG) : null;
729735
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : index++;
736+
String fieldName = field.name();
737+
if (useJsonFieldNames) {
738+
fieldName = fromJsonCase(fieldName);
739+
}
730740
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
731741
ctx,
732742
schema,
733743
message,
734744
fieldSchema,
735-
scrubName(field.name()),
745+
scrubName(fieldName),
736746
tag
737747
);
738748
if (fieldDef != null) {
@@ -763,6 +773,22 @@ private MessageDefinition messageDefinitionFromConnectSchema(
763773
return message.build();
764774
}
765775

776+
private static String fromJsonCase(final String str) {
777+
final StringBuilder sb = new StringBuilder();
778+
for (int i = 0; i < str.length(); i++) {
779+
char c = str.charAt(i);
780+
if (Character.isUpperCase(c)) {
781+
if (i != 0) {
782+
sb.append("_");
783+
}
784+
sb.append(Character.toLowerCase(c));
785+
} else {
786+
sb.append(c);
787+
}
788+
}
789+
return sb.toString();
790+
}
791+
766792
private void oneofDefinitionFromConnectSchema(
767793
FromConnectContext ctx,
768794
DynamicSchema.Builder schema,
@@ -776,12 +802,17 @@ private void oneofDefinitionFromConnectSchema(
776802
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
777803
.get(PROTOBUF_TYPE_TAG) : null;
778804
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : 0;
805+
String fieldName = field.name();
806+
if (useJsonFieldNames) {
807+
fieldName = fromJsonCase(fieldName);
808+
}
809+
String scrubbedFieldName = scrubName(fieldName);
779810
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
780811
ctx,
781812
schema,
782813
message,
783814
field.schema(),
784-
scrubName(field.name()),
815+
scrubbedFieldName,
785816
tag
786817
);
787818
if (fieldDef != null) {
@@ -1363,7 +1394,8 @@ private void setStructField(
13631394
Struct result,
13641395
FieldDescriptor fieldDescriptor
13651396
) {
1366-
final String fieldName = fieldDescriptor.getName();
1397+
final String fieldName = useJsonFieldNames
1398+
? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
13671399
final Field field = schema.field(fieldName);
13681400
if ((isPrimitiveOrRepeated(fieldDescriptor) && !isOptional(fieldDescriptor))
13691401
|| (generateStructForNulls || message.hasField(fieldDescriptor))) {
@@ -1425,7 +1457,9 @@ private SchemaBuilder toConnectSchema(
14251457
// Already added field as oneof
14261458
continue;
14271459
}
1428-
builder.field(fieldDescriptor.getName(), toConnectSchema(ctx, fieldDescriptor));
1460+
final String fieldName = useJsonFieldNames
1461+
? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
1462+
builder.field(fieldName, toConnectSchema(ctx, fieldDescriptor));
14291463
}
14301464
}
14311465

protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public class ProtobufDataConfig extends AbstractDataConfig {
6969
public static final String GENERATE_INDEX_FOR_UNIONS_DOC = "Whether to suffix union"
7070
+ "names with an underscore followed by an index";
7171

72+
public static final String JSON_FIELD_NAMES_CONFIG = "json.field.names";
73+
public static final boolean JSON_FIELD_NAMES_DEFAULT = false;
74+
public static final String JSON_FIELD_NAMES_DOC = "Whether to convert protobuf field names "
75+
+ "to camelcase for internal data representation and vice-versa.";
76+
7277
public static ConfigDef baseConfigDef() {
7378
return AbstractDataConfig.baseConfigDef()
7479
.define(ENHANCED_PROTOBUF_SCHEMA_SUPPORT_CONFIG,
@@ -112,7 +117,12 @@ public static ConfigDef baseConfigDef() {
112117
ConfigDef.Type.BOOLEAN,
113118
GENERATE_INDEX_FOR_UNIONS_DEFAULT,
114119
ConfigDef.Importance.LOW,
115-
GENERATE_INDEX_FOR_UNIONS_DOC
120+
GENERATE_INDEX_FOR_UNIONS_DOC)
121+
.define(JSON_FIELD_NAMES_CONFIG,
122+
ConfigDef.Type.BOOLEAN,
123+
JSON_FIELD_NAMES_DEFAULT,
124+
ConfigDef.Importance.LOW,
125+
JSON_FIELD_NAMES_DOC
116126
);
117127
}
118128

@@ -156,6 +166,10 @@ public boolean generateIndexForUnions() {
156166
return this.getBoolean(GENERATE_INDEX_FOR_UNIONS_CONFIG);
157167
}
158168

169+
public boolean useJsonFieldNames() {
170+
return this.getBoolean(JSON_FIELD_NAMES_CONFIG);
171+
}
172+
159173
public static class Builder {
160174

161175
private final Map<String, Object> props = new HashMap<>();

protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue;
2929
import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2;
3030
import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue;
31-
import java.util.List;
31+
32+
import org.apache.kafka.connect.data.Field;
3233
import org.apache.kafka.connect.data.Schema;
3334
import org.apache.kafka.connect.data.SchemaAndValue;
3435
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -39,9 +40,12 @@
3940

4041
import java.io.IOException;
4142
import java.nio.ByteBuffer;
43+
import java.util.ArrayList;
4244
import java.util.Arrays;
4345
import java.util.Collections;
4446
import java.util.HashMap;
47+
import java.util.Iterator;
48+
import java.util.List;
4549
import java.util.Map;
4650

4751
import io.confluent.connect.protobuf.test.Key;
@@ -281,6 +285,23 @@ public void testFromConnectDataForValue() {
281285
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
282286
}
283287

288+
@Test
289+
public void testFromConnectDataForValueUseJsonFieldNames() {
290+
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
291+
292+
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
293+
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
294+
converter.configure(configs, false);
295+
296+
SchemaAndValue schemaAndValue = getExpectedTestMessageWithJsonFieldNames();
297+
298+
byte[] result = converter.fromConnectData("my-topic",
299+
schemaAndValue.schema(), schemaAndValue.value()
300+
);
301+
302+
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
303+
}
304+
284305
@Test
285306
public void testFromConnectDataForValueWithNamespace() {
286307
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
@@ -531,6 +552,44 @@ public void testToConnectDataForValue() throws Exception {
531552
assertEquals(expected, result);
532553
}
533554

555+
@Test
556+
public void testToConnectDataForValueUseJsonFieldNames() throws Exception {
557+
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
558+
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
559+
converter.configure(configs, false);
560+
// extra byte for message index
561+
final byte[] input = concat(new byte[]{0, 0, 0, 0, 1, 0}, HELLO_WORLD_MESSAGE.toByteArray());
562+
schemaRegistry.register("my-topic-value", getSchema(TestMessage.getDescriptor()));
563+
SchemaAndValue result = converter.toConnectData("my-topic", input);
564+
565+
SchemaAndValue expected = getExpectedTestMessageWithJsonFieldNames();
566+
567+
assertEquals(expected.schema(), result.schema());
568+
assertEquals(expected, result);
569+
}
570+
571+
private SchemaAndValue getExpectedTestMessageWithJsonFieldNames() {
572+
Struct testMessageStruct = getTestMessageStruct(TEST_MSG_STRING, 123);
573+
Schema testMessageSchema = getTestMessageSchema();
574+
575+
final SchemaBuilder builder = SchemaBuilder.struct();
576+
builder.name("TestMessage").version(1);
577+
List values = new ArrayList<>();
578+
for (Field field : testMessageSchema.fields()) {
579+
String jsonFieldName = TestMessage.getDescriptor()
580+
.findFieldByName(field.name()).getJsonName();
581+
builder.field(jsonFieldName, field.schema());
582+
values.add(testMessageStruct.get(field));
583+
}
584+
final Schema jsonSchema = builder.build();
585+
final Struct jsonStruct = new Struct(jsonSchema);
586+
final Iterator<Object> valuesIt = values.iterator();
587+
for (Field field : jsonSchema.fields()) {
588+
jsonStruct.put(field, valuesIt.next());
589+
}
590+
return new SchemaAndValue(jsonSchema, jsonStruct);
591+
}
592+
534593
@Test
535594
public void testToConnectDataForValueWithSecondMessage() throws Exception {
536595
converter.configure(SR_CONFIG, false);

0 commit comments

Comments
 (0)