Skip to content

Commit 6bcb923

Browse files
authored
Merge pull request #3962 from confluentinc/pr_merge_from_8_1_x_to_master
Merge Conflict Resolution (from 8.1.x to master)
2 parents 2654410 + 02148b1 commit 6bcb923

File tree

14 files changed

+173
-34
lines changed

14 files changed

+173
-34
lines changed

avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
2020
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
21+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
2122
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
2223
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
2324
import io.confluent.kafka.schemaregistry.utils.ExceptionUtils;
@@ -28,7 +29,10 @@
2829
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
2930
import io.confluent.kafka.serializers.NonRecordContainer;
3031
import org.apache.avro.generic.GenericContainer;
32+
import org.apache.avro.generic.GenericData;
33+
import org.apache.avro.generic.GenericDatumWriter;
3134
import org.apache.avro.generic.IndexedRecord;
35+
import org.apache.avro.io.DatumWriter;
3236
import org.apache.kafka.common.config.ConfigException;
3337
import org.apache.kafka.common.errors.InvalidConfigurationException;
3438
import org.apache.kafka.common.errors.NetworkException;
@@ -199,6 +203,16 @@ public byte[] serialize(
199203
value,
200204
schema);
201205
}
206+
207+
@Override
208+
protected DatumWriter<?> getDatumWriter(
209+
Object value, org.apache.avro.Schema schema, boolean useLogicalTypes, boolean allowNull) {
210+
GenericData data = AvroSchemaUtils.getThreadLocalGenericData();
211+
if (data == null) {
212+
data = AvroSchemaUtils.getGenericData(useLogicalTypes);
213+
}
214+
return new GenericDatumWriter<>(schema, data);
215+
}
202216
}
203217

204218
static class Deserializer extends AbstractKafkaAvroDeserializer {

avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ public void testFromConnectOptionalWithInvalidDefault() {
999999
arraySchema.addProp("connect.default", arrayNode);
10001000
org.apache.avro.Schema expectedAvroSchema = org.apache.avro.SchemaBuilder.builder()
10011001
.record("ConnectDefault").namespace("io.confluent.connect.avro").fields()
1002-
.name("array").type(arraySchema).noDefault() // no default
1002+
.name("array").type(arraySchema).withDefault(Arrays.asList("a", "b", "c"))
10031003
.endRecord();
10041004

10051005
assertEquals(expectedAvroSchema, avroSchema);

avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,17 @@ private void writeDatum(ByteArrayOutputStream out, Object value, Schema rawSchem
209209
throws ExecutionException, IOException {
210210
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
211211

212-
DatumWriter<Object> writer = getDatumWriter(value, rawSchema);
212+
DatumWriter<Object> writer;
213+
writer = datumWriterCache.get(rawSchema,
214+
() -> (DatumWriter<Object>) getDatumWriter(
215+
value, rawSchema, avroUseLogicalTypeConverters, avroReflectionAllowNull)
216+
);
213217
writer.write(value, encoder);
214218
encoder.flush();
215219
}
216220

217-
protected DatumWriter<Object> getDatumWriter(Object value, Schema rawSchema)
218-
throws ExecutionException {
219-
return datumWriterCache.get(rawSchema,
220-
() -> (DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(
221-
value, rawSchema, avroUseLogicalTypeConverters, avroReflectionAllowNull)
222-
);
221+
protected DatumWriter<?> getDatumWriter(
222+
Object value, Schema schema, boolean useLogicalTypes, boolean allowNull) {
223+
return AvroSchemaUtils.getDatumWriter(value, schema, useLogicalTypes, allowNull);
223224
}
224225
}

avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private IndexedRecord createInvalidAvroRecord() {
281281
+ " \"name\": \"test\",\n"
282282
+ " \"items\": {\n"
283283
+ "\"type\": \"record\",\n"
284-
+ "\"namespace\": \"example.avro\",\n"
284+
+ "\"namespace\": \"io.confluent.kafka.example\",\n"
285285
+ "\"name\": \"User\",\n"
286286
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}");
287287

@@ -291,7 +291,7 @@ private IndexedRecord createInvalidAvroRecord() {
291291
+ " \"name\": \"test\",\n"
292292
+ " \"values\": {\n"
293293
+ "\"type\": \"record\",\n"
294-
+ "\"namespace\": \"example.avro\",\n"
294+
+ "\"namespace\": \"io.confluent.kafka.example\",\n"
295295
+ "\"name\": \"User\",\n"
296296
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}");
297297

@@ -912,6 +912,7 @@ public void testKafkaAvroSerializerWithCyclicReference() throws IOException, Res
912912
new SchemaReference("io.confluent.kafka.example.User", "user", -1)
913913
)));
914914
}
915+
915916
@Test
916917
public void testKafkaAvroSerializerWithArraySpecific() throws IOException, RestClientException {
917918
Map serializerConfigs = ImmutableMap.of(
@@ -958,7 +959,7 @@ public void testKafkaAvroSerializerWithMapSpecific() throws IOException, RestCli
958959
true
959960
);
960961
Map<Utf8, IndexedRecord> data = new HashMap<>();
961-
data.put(new Utf8("one"), createUserRecordUtf8());
962+
data.put(new Utf8("one"), createSpecificAvroRecord());
962963
schemaRegistry.register(topic + "-value", new AvroSchema(mapSchema));
963964
avroSerializer.configure(serializerConfigs, false);
964965
avroDeserializer.configure(deserializerConfigs, false);

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Set;
5757
import java.util.stream.Collectors;
5858
import java.util.stream.StreamSupport;
59+
import org.apache.avro.NameValidator;
5960
import org.apache.avro.Schema;
6061
import org.apache.avro.SchemaCompatibility;
6162
import org.apache.avro.generic.GenericContainer;
@@ -224,8 +225,12 @@ public ParsedSchema copy(Map<SchemaEntity, Set<String>> tagsToAdd,
224225
}
225226

226227
protected Schema.Parser getParser() {
227-
Schema.Parser parser = new Schema.Parser();
228-
parser.setValidateDefaults(isNew());
228+
boolean isNew = isNew();
229+
NameValidator nameValidator = isNew
230+
? NameValidator.STRICT_VALIDATOR
231+
: NameValidator.NO_VALIDATION;
232+
Schema.Parser parser = new Schema.Parser(nameValidator);
233+
parser.setValidateDefaults(isNew);
229234
return parser;
230235
}
231236

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchemaUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,24 @@ public static SpecificData getSpecificData() {
170170
}
171171

172172
public static void addLogicalTypeConversion(GenericData avroData) {
173+
avroData.addLogicalTypeConversion(new Conversions.BigDecimalConversion());
173174
avroData.addLogicalTypeConversion(new Conversions.DecimalConversion());
175+
avroData.addLogicalTypeConversion(new Conversions.DurationConversion());
174176
avroData.addLogicalTypeConversion(new Conversions.UUIDConversion());
175177

176178
avroData.addLogicalTypeConversion(new TimeConversions.DateConversion());
177179

178180
avroData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
179181
avroData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
182+
avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
180183

181184
avroData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
182185
avroData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
186+
avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
183187

184188
avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
185189
avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
190+
avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
186191
}
187192

188193
private static final EncoderFactory encoderFactory = EncoderFactory.get();

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Config.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class Config {
3333
private String alias;
3434
private Boolean normalize;
3535
private Boolean validateFields;
36+
private Boolean validateNames;
3637
private Boolean validateRules;
3738
private String compatibilityLevel;
3839
private String compatibilityPolicy;
@@ -46,6 +47,7 @@ public class Config {
4647
public Config(@JsonProperty("alias") String alias,
4748
@JsonProperty("normalize") Boolean normalize,
4849
@JsonProperty("validateFields") Boolean validateFields,
50+
@JsonProperty("validateNames") Boolean validateNames,
4951
@JsonProperty("validateRules") Boolean validateRules,
5052
@JsonProperty("compatibilityLevel") String compatibilityLevel,
5153
@JsonProperty("compatibilityPolicy") String compatibilityPolicy,
@@ -57,6 +59,7 @@ public Config(@JsonProperty("alias") String alias,
5759
this.alias = alias;
5860
this.normalize = normalize;
5961
this.validateFields = validateFields;
62+
this.validateNames = validateNames;
6063
this.validateRules = validateRules;
6164
this.compatibilityLevel = compatibilityLevel;
6265
this.compatibilityPolicy = compatibilityPolicy;
@@ -100,6 +103,7 @@ public Config(ConfigUpdateRequest request) {
100103
this.alias = request.getAlias();
101104
this.normalize = request.isNormalize();
102105
this.validateFields = request.isValidateFields();
106+
this.validateNames = request.isValidateNames();
103107
this.validateRules = request.isValidateRules();
104108
this.compatibilityLevel = request.getCompatibilityLevel();
105109
this.compatibilityPolicy = request.getCompatibilityPolicy();
@@ -140,6 +144,16 @@ public void setValidateFields(Boolean validateFields) {
140144
this.validateFields = validateFields;
141145
}
142146

147+
@JsonProperty("validateNames")
148+
public Boolean isValidateNames() {
149+
return validateNames;
150+
}
151+
152+
@JsonProperty("validateNames")
153+
public void setValidateNames(Boolean validateNames) {
154+
this.validateNames = validateNames;
155+
}
156+
143157
@JsonProperty("validateRules")
144158
public Boolean isValidateRules() {
145159
return validateRules;
@@ -244,6 +258,7 @@ public boolean equals(Object o) {
244258
return Objects.equals(alias, config.alias)
245259
&& Objects.equals(normalize, config.normalize)
246260
&& Objects.equals(validateFields, config.validateFields)
261+
&& Objects.equals(validateNames, config.validateNames)
247262
&& Objects.equals(validateRules, config.validateRules)
248263
&& Objects.equals(compatibilityLevel, config.compatibilityLevel)
249264
&& Objects.equals(compatibilityPolicy, config.compatibilityPolicy)
@@ -256,7 +271,7 @@ public boolean equals(Object o) {
256271

257272
@Override
258273
public int hashCode() {
259-
return Objects.hash(alias, normalize, validateFields, validateRules,
274+
return Objects.hash(alias, normalize, validateFields, validateNames, validateRules,
260275
compatibilityLevel, compatibilityPolicy, compatibilityGroup,
261276
defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet);
262277
}
@@ -267,6 +282,7 @@ public String toString() {
267282
+ "alias='" + alias + '\''
268283
+ ", normalize=" + normalize
269284
+ ", validateFields=" + validateFields
285+
+ ", validateNames=" + validateNames
270286
+ ", validateRules=" + validateRules
271287
+ ", compatibilityLevel='" + compatibilityLevel + '\''
272288
+ ", compatibilityPolicy='" + compatibilityPolicy + '\''

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class ConfigUpdateRequest {
3939
private Optional<String> alias;
4040
private Optional<Boolean> normalize;
4141
private Optional<Boolean> validateFields;
42+
private Optional<Boolean> validateNames;
4243
private Optional<Boolean> validateRules;
4344
private Optional<String> compatibilityLevel;
4445
private Optional<String> compatibilityPolicy;
@@ -55,6 +56,7 @@ public ConfigUpdateRequest(Config config) {
5556
setAlias(config.getAlias());
5657
setNormalize(config.isNormalize());
5758
setValidateFields(config.isValidateFields());
59+
setValidateNames(config.isValidateNames());
5860
setValidateRules(config.isValidateRules());
5961
setCompatibilityLevel(config.getCompatibilityLevel());
6062
setCompatibilityPolicy(config.getCompatibilityPolicy());
@@ -129,6 +131,26 @@ public void setValidateFields(Boolean validateFields) {
129131
this.validateFields = validateFields != null ? Optional.of(validateFields) : null;
130132
}
131133

134+
@JsonProperty("validateNames")
135+
public Optional<Boolean> isOptionalValidateNames() {
136+
return validateNames;
137+
}
138+
139+
@JsonIgnore
140+
public Boolean isValidateNames() {
141+
return validateNames != null ? validateNames.orElse(null) : null;
142+
}
143+
144+
@JsonProperty("validateNames")
145+
public void setValidateNames(Optional<Boolean> validateNames) {
146+
this.validateNames = validateNames;
147+
}
148+
149+
@JsonIgnore
150+
public void setValidateNames(Boolean validateNames) {
151+
this.validateNames = validateNames != null ? Optional.of(validateNames) : null;
152+
}
153+
132154
@JsonProperty("validateRules")
133155
public Optional<Boolean> isOptionalValidateRules() {
134156
return validateRules;
@@ -311,6 +333,7 @@ public boolean equals(Object o) {
311333
return Objects.equals(alias, that.alias)
312334
&& Objects.equals(normalize, that.normalize)
313335
&& Objects.equals(validateFields, that.validateFields)
336+
&& Objects.equals(validateNames, that.validateNames)
314337
&& Objects.equals(validateRules, that.validateRules)
315338
&& Objects.equals(compatibilityLevel, that.compatibilityLevel)
316339
&& Objects.equals(compatibilityPolicy, that.compatibilityPolicy)
@@ -323,7 +346,7 @@ public boolean equals(Object o) {
323346

324347
@Override
325348
public int hashCode() {
326-
return Objects.hash(alias, normalize, validateFields, validateRules,
349+
return Objects.hash(alias, normalize, validateFields, validateNames, validateRules,
327350
compatibilityLevel, compatibilityPolicy, compatibilityGroup,
328351
defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet);
329352
}

core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ public class SchemaRegistryConfig extends RestConfig {
186186
public static final String SCHEMA_VALIDATE_FIELDS_CONFIG = "schema.validate.fields";
187187
public static final boolean SCHEMA_VALIDATE_FIELDS_DEFAULT = false;
188188

189+
public static final String SCHEMA_VALIDATE_NAMES_CONFIG = "schema.validate.names";
190+
public static final boolean SCHEMA_VALIDATE_NAMES_DEFAULT = true;
191+
189192
/**
190193
* <code>schema.cache.size</code>
191194
*/
@@ -402,6 +405,8 @@ public class SchemaRegistryConfig extends RestConfig {
402405
+ "enabled or not. If enabled, it checks whether any top level fields conflict with the "
403406
+ "reserved fields in metadata. It also checks for the presence of any field names "
404407
+ "beginning with $$";
408+
protected static final String VALIDATE_NAMES_DOC = "Determines whether name validation is "
409+
+ "enabled or not. If enabled, it validates both namespaces and names in Avro.";
405410
protected static final String SCHEMA_CACHE_SIZE_DOC =
406411
"The maximum size of the schema cache.";
407412
protected static final String SCHEMA_CACHE_EXPIRY_SECS_DOC =
@@ -628,6 +633,9 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
628633
.define(SCHEMA_VALIDATE_FIELDS_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_FIELDS_DEFAULT,
629634
ConfigDef.Importance.LOW, VALIDATE_FIELDS_DOC
630635
)
636+
.define(SCHEMA_VALIDATE_NAMES_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_NAMES_DEFAULT,
637+
ConfigDef.Importance.LOW, VALIDATE_NAMES_DOC
638+
)
631639
.define(SCHEMA_CACHE_SIZE_CONFIG, ConfigDef.Type.INT, SCHEMA_CACHE_SIZE_DEFAULT,
632640
ConfigDef.Importance.LOW, SCHEMA_CACHE_SIZE_DOC
633641
)

core/src/main/java/io/confluent/kafka/schemaregistry/storage/AbstractSchemaRegistry.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public abstract class AbstractSchemaRegistry implements SchemaRegistry,
117117
protected final LoadingCache<RawSchema, ParsedSchema> oldSchemaCache;
118118
protected final CompatibilityLevel defaultCompatibilityLevel;
119119
protected final boolean defaultValidateFields;
120+
protected final boolean defaultValidateNames;
120121
protected final Mode defaultMode;
121122
protected final int schemaSearchDefaultLimit;
122123
protected final int schemaSearchMaxLimit;
@@ -170,6 +171,8 @@ protected AbstractSchemaRegistry(SchemaRegistryConfig config, MetricsContainer m
170171
this.defaultCompatibilityLevel = config.compatibilityType();
171172
this.defaultValidateFields =
172173
config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_FIELDS_CONFIG);
174+
this.defaultValidateNames =
175+
config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_NAMES_CONFIG);
173176
this.defaultMode = Mode.READWRITE;
174177
this.schemaSearchDefaultLimit =
175178
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
@@ -508,10 +511,14 @@ protected void logSchemaOp(Schema schema, String operation) {
508511
tenant(), schema.getId(), schema.getSubject(), operation);
509512
}
510513

511-
private boolean isSchemaFieldValidationEnabled(Config config) {
514+
protected boolean isSchemaFieldValidationEnabled(Config config) {
512515
return config.isValidateFields() != null ? config.isValidateFields() : defaultValidateFields;
513516
}
514517

518+
protected boolean isSchemaNameValidationEnabled(Config config) {
519+
return config.isValidateNames() != null ? config.isValidateNames() : defaultValidateNames;
520+
}
521+
515522
private ParsedSchema maybeValidateAndNormalizeSchema(ParsedSchema parsedSchema,
516523
Schema schema,
517524
Config config,
@@ -1263,7 +1270,8 @@ public List<String> isCompatible(String subject,
12631270
}
12641271

12651272
Config config = getConfigInScope(subject);
1266-
ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, true, normalize);
1273+
boolean doValidation = isSchemaNameValidationEnabled(config);
1274+
ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, doValidation, normalize);
12671275
if (parsedSchema == null) {
12681276
log.error("Empty schema");
12691277
throw new InvalidSchemaException("Empty schema");

0 commit comments

Comments
 (0)