From b64c2f0a8a8538900e58fb25083121d9379cfc1b Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 22 Dec 2023 12:51:36 +0100 Subject: [PATCH 1/7] Import code from old PR --- .gitignore | 3 +- .../flink-avro-glue-schema-registry/pom.xml | 47 +++++ .../registry/AvroGlueFormatOptions.java | 92 ++++++++++ .../GlueSchemaRegistryAvroFormatFactory.java | 172 ++++++++++++++++++ ...SchemaRegistryAvroSchemaCoderProvider.java | 26 +++ .../org.apache.flink.table.factories.Factory | 16 ++ .../GlueSchemaRegistryFormatFactoryTest.java | 156 ++++++++++++++++ 7 files changed, 511 insertions(+), 1 deletion(-) create mode 100644 flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java create mode 100644 flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java create mode 100644 flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory create mode 100644 flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java diff --git a/.gitignore b/.gitignore index 973e8d522..27f696e0c 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,5 @@ tools/flink tools/flink-* tools/releasing/release tools/japicmp-output -*/.idea/ \ No newline at end of file +*/.idea/ +.java-version diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml index 35a1977c9..29692b3a3 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml @@ -39,6 +39,18 @@ under the License. ${flink.version} provided + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + org.apache.flink flink-avro @@ -61,6 +73,41 @@ under the License. ${glue.schema.registry.version} + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + + org.apache.flink + flink-table-api-java + ${flink.version} + test + test-jar + + + + org.apache.flink + flink-table-common + ${flink.version} + test + test-jar + + + + org.apache.flink + flink-avro + ${flink.version} + test + test-jar + + + + + diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java new file mode 100644 index 000000000..4ddbe95c6 --- /dev/null +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.glue.schema.registry; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import software.amazon.awssdk.services.glue.model.Compatibility; + +import java.time.Duration; + + +/** Options for AWS Glue Schema Registry Avro format. */ +@PublicEvolving +public class AvroGlueFormatOptions { + public static final ConfigOption AWS_REGION = + ConfigOptions.key("aws.region") + .stringType() + .noDefaultValue() + .withDescription("AWS region"); + + public static final ConfigOption AWS_ENDPOINT = + ConfigOptions.key("aws.endpoint").stringType().noDefaultValue(); + + public static final ConfigOption CACHE_SIZE = + ConfigOptions.key("cache.size") + .intType() + .defaultValue(200) + .withDescription("Cache maximum size in *items*. Defaults to 200"); + + public static final ConfigOption CACHE_TTL_MS = + ConfigOptions.key("cache.ttlMs") + .longType() + .defaultValue(Duration.ofDays(1L).toMillis()) + .withDescription("Cache TTL in milliseconds. Defaults to 1 day"); + + public static final ConfigOption REGISTRY_NAME = + ConfigOptions.key("registry.name") + .stringType() + .noDefaultValue() + .withDescription("Registry name"); + + public static final ConfigOption SCHEMA_AUTO_REGISTRATION = + ConfigOptions.key("schema.autoRegistration") + .booleanType() + .defaultValue(false) + .withDescription("Whether auto-registration is enabled. Defaults to true."); + + public static final ConfigOption SCHEMA_COMPATIBILITY = + ConfigOptions.key("schema.compatibility") + .enumType(Compatibility.class) + .defaultValue(AWSSchemaRegistryConstants.DEFAULT_COMPATIBILITY_SETTING); + + public static final ConfigOption SCHEMA_COMPRESSION = + ConfigOptions.key("schema.compression") + .enumType(AWSSchemaRegistryConstants.COMPRESSION.class) + .defaultValue(AWSSchemaRegistryConstants.COMPRESSION.NONE) + .withDescription("Compression type"); + + public static final ConfigOption SCHEMA_NAME = + ConfigOptions.key("schema.name") + .stringType() + .noDefaultValue() + .withDescription("The Schema name under which to register the schema used by this format during serialization."); + + public static final ConfigOption SCHEMA_TYPE = + ConfigOptions.key("schema.type") + .enumType(AvroRecordType.class) + .defaultValue(AvroRecordType.GENERIC_RECORD) + .withDescription("Record type"); + + private AvroGlueFormatOptions() { + } +} diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java new file mode 100644 index 000000000..a05a7bb7d --- /dev/null +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.glue.schema.registry; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.*; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Table format factory for creating {@link SerializationSchema} and {@link DeserializationSchema} + * for Glue Schema Registry to RowData. + */ +@Internal +public class GlueSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "avro-glue"; + + @Override + public DecodingFormat> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + final Map configMap = buildConfigMap(formatOptions); + + return new DecodingFormat>() { + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType producedDataType) { + final RowType rowType = (RowType) producedDataType.getLogicalType(); + final TypeInformation rowDataTypeInfo = + context.createTypeInformation(producedDataType); + return new AvroRowDataDeserializationSchema( + GlueSchemaRegistryAvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType), configMap), + AvroToRowDataConverters.createRowConverter(rowType), + rowDataTypeInfo); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public EncodingFormat> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + return new EncodingFormat>() { + @Override + public SerializationSchema createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + final RowType rowType = (RowType) consumedDataType.getLogicalType(); + return new AvroRowDataSerializationSchema( + rowType, + GlueSchemaRegistryAvroSerializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType), + formatOptions.get(SCHEMA_NAME), + buildConfigMap(formatOptions)), + RowDataToAvroConverters.createConverter(rowType)); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> result = new HashSet<>(); + result.add(REGISTRY_NAME); + result.add(AWS_REGION); + result.add(SCHEMA_NAME); + return result; + } + + @Override + public Set> optionalOptions() { + Set> result = new HashSet<>(); + result.add(AWS_ENDPOINT); + result.add(CACHE_SIZE); + result.add(CACHE_TTL_MS); + result.add(SCHEMA_AUTO_REGISTRATION); + result.add(SCHEMA_COMPATIBILITY); + result.add(SCHEMA_COMPRESSION); + result.add(SCHEMA_TYPE); + return result; + } + + private Map buildConfigMap(ReadableConfig formatOptions) { + final Map properties = new HashMap(); + + formatOptions + .getOptional(AWS_REGION) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_REGION, v)); + formatOptions + .getOptional(AWS_ENDPOINT) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, v)); + formatOptions + .getOptional(CACHE_SIZE) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v)); + formatOptions + .getOptional(CACHE_TTL_MS) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v)); + formatOptions + .getOptional(REGISTRY_NAME) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v)); + formatOptions + .getOptional(SCHEMA_AUTO_REGISTRATION) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, v)); + formatOptions + .getOptional(SCHEMA_COMPATIBILITY) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v)); + formatOptions + .getOptional(SCHEMA_COMPRESSION) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v)); + formatOptions + .getOptional(SCHEMA_TYPE) + .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, v)); + return properties; + } +} diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java index d4c24f032..21f1b5dbd 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.avro.SchemaCoder; import java.util.Map; +import java.util.Objects; /** Provider for {@link GlueSchemaRegistryAvroSchemaCoder}. */ @PublicEvolving @@ -50,4 +51,29 @@ public GlueSchemaRegistryAvroSchemaCoderProvider(Map configs) { public GlueSchemaRegistryAvroSchemaCoder get() { return new GlueSchemaRegistryAvroSchemaCoder(transportName, configs); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + GlueSchemaRegistryAvroSchemaCoderProvider that = (GlueSchemaRegistryAvroSchemaCoderProvider) o; + if (transportName == null) { + if (that.transportName != null) { + return false; + } + } else if (!transportName.equals(that.transportName)) { + return false; + } + return configs.equals(that.configs); + } + + @Override + public int hashCode() { + return Objects.hash(transportName, configs); + } } diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..28a2da31d --- /dev/null +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroFormatFactory \ No newline at end of file diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java new file mode 100644 index 000000000..788829270 --- /dev/null +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java @@ -0,0 +1,156 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.glue.schema.registry; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.TestDynamicTableFactory; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link GlueSchemaRegistryAvroFormatFactory}. + */ +public class GlueSchemaRegistryFormatFactoryTest { + + private static final ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.INT()), + Column.physical("c", DataTypes.BOOLEAN())); + + private static final RowType ROW_TYPE = + (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType(); + + private static final String SCHEMA_NAME = "test-subject"; + private static final String REGISTRY_NAME = "test-registry-name"; + private static final String REGION = "us-middle-1"; + private static final Map REGISTRY_CONFIG = + Map.of( + AWSSchemaRegistryConstants.REGISTRY_NAME, REGISTRY_NAME, + AWSSchemaRegistryConstants.AWS_REGION, REGION); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDeserializationSchema() { + final AvroRowDataDeserializationSchema expectedDeser = + new AvroRowDataDeserializationSchema( + GlueSchemaRegistryAvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(ROW_TYPE), REGISTRY_CONFIG), + AvroToRowDataConverters.createRowConverter(ROW_TYPE), + InternalTypeInfo.of(ROW_TYPE)); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions()); + assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class)); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + assertEquals(expectedDeser, actualDeser); + } + + @Test + public void testSerializationSchema() { + final AvroRowDataSerializationSchema expectedSer = + new AvroRowDataSerializationSchema( + ROW_TYPE, + GlueSchemaRegistryAvroSerializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(ROW_TYPE), + SCHEMA_NAME, + REGISTRY_CONFIG), + RowDataToAvroConverters.createConverter(ROW_TYPE)); + + final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions()); + assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class)); + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema actualSer = + sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType()); + + assertEquals(expectedSer, actualSer); + } + + @Test + public void testMissingSubjectForSink() { + thrown.expect(ValidationException.class); + final Map options = + getModifiedOptions(opts -> opts.remove("avro-glue.schema.name")); + + createTableSink(SCHEMA, options); + } + + + /** + * Returns the full options modified by the given consumer {@code optionModifier}. + * + * @param optionModifier Consumer to modify the options + */ + private Map getModifiedOptions(Consumer> optionModifier) { + Map options = getDefaultOptions(); + optionModifier.accept(options); + return options; + } + + private Map getDefaultOptions() { + final Map options = new HashMap<>(); + options.put("connector", TestDynamicTableFactory.IDENTIFIER); + options.put("target", "MyTarget"); + options.put("buffer-size", "1000"); + + options.put("format", GlueSchemaRegistryAvroFormatFactory.IDENTIFIER); + options.put("avro-glue.schema.name", SCHEMA_NAME); + options.put("avro-glue.registry.name", REGISTRY_NAME); + options.put("avro-glue.aws.region", REGION); + return options; + } +} From 7081f88d36e4a1e862ce74596243eaee9601b9f4 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 22 Dec 2023 13:26:26 +0100 Subject: [PATCH 2/7] Fixed style violations --- .../flink-avro-glue-schema-registry/pom.xml | 3 -- .../registry/AvroGlueFormatOptions.java | 12 +++--- .../GlueSchemaRegistryAvroFormatFactory.java | 37 +++++++++++++++---- ...SchemaRegistryAvroSchemaCoderProvider.java | 5 ++- .../GlueSchemaRegistryFormatFactoryTest.java | 6 +-- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml index 29692b3a3..8cd906e01 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml @@ -81,7 +81,6 @@ under the License. test - org.apache.flink flink-table-api-java ${flink.version} @@ -89,7 +88,6 @@ under the License. test-jar - org.apache.flink flink-table-common ${flink.version} @@ -97,7 +95,6 @@ under the License. test-jar - org.apache.flink flink-avro ${flink.version} diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java index 4ddbe95c6..017691500 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java @@ -18,16 +18,16 @@ package org.apache.flink.formats.avro.glue.schema.registry; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import software.amazon.awssdk.services.glue.model.Compatibility; import java.time.Duration; - /** Options for AWS Glue Schema Registry Avro format. */ @PublicEvolving public class AvroGlueFormatOptions { @@ -79,7 +79,8 @@ public class AvroGlueFormatOptions { ConfigOptions.key("schema.name") .stringType() .noDefaultValue() - .withDescription("The Schema name under which to register the schema used by this format during serialization."); + .withDescription( + "The Schema name under which to register the schema used by this format during serialization."); public static final ConfigOption SCHEMA_TYPE = ConfigOptions.key("schema.type") @@ -87,6 +88,5 @@ public class AvroGlueFormatOptions { .defaultValue(AvroRecordType.GENERIC_RECORD) .withDescription("Record type"); - private AvroGlueFormatOptions() { - } + private AvroGlueFormatOptions() {} } diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java index a05a7bb7d..b414288cb 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.formats.avro.glue.schema.registry; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -43,24 +42,37 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.*; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_ENDPOINT; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_REGION; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_SIZE; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_TTL_MS; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.REGISTRY_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_AUTO_REGISTRATION; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_COMPATIBILITY; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_COMPRESSION; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_TYPE; + /** * Table format factory for creating {@link SerializationSchema} and {@link DeserializationSchema} * for Glue Schema Registry to RowData. */ @Internal -public class GlueSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { +public class GlueSchemaRegistryAvroFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { public static final String IDENTIFIER = "avro-glue"; @Override - public DecodingFormat> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { + public DecodingFormat> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); final Map configMap = buildConfigMap(formatOptions); @@ -86,7 +98,8 @@ public ChangelogMode getChangelogMode() { } @Override - public EncodingFormat> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { + public EncodingFormat> createEncodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); return new EncodingFormat>() { @@ -151,16 +164,24 @@ private Map buildConfigMap(ReadableConfig formatOptions) { .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v)); formatOptions .getOptional(CACHE_TTL_MS) - .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v)); + .ifPresent( + v -> + properties.put( + AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v)); formatOptions .getOptional(REGISTRY_NAME) .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v)); formatOptions .getOptional(SCHEMA_AUTO_REGISTRATION) - .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, v)); + .ifPresent( + v -> + properties.put( + AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, + v)); formatOptions .getOptional(SCHEMA_COMPATIBILITY) - .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v)); + .ifPresent( + v -> properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v)); formatOptions .getOptional(SCHEMA_COMPRESSION) .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v)); diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java index 21f1b5dbd..c20e616f0 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java @@ -61,8 +61,9 @@ public boolean equals(Object o) { return false; } - GlueSchemaRegistryAvroSchemaCoderProvider that = (GlueSchemaRegistryAvroSchemaCoderProvider) o; - if (transportName == null) { + GlueSchemaRegistryAvroSchemaCoderProvider that = + (GlueSchemaRegistryAvroSchemaCoderProvider) o; + if (transportName == null) { if (that.transportName != null) { return false; } diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java index 788829270..d39f0a167 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -53,9 +52,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -/** - * Tests for the {@link GlueSchemaRegistryAvroFormatFactory}. - */ +/** Tests for the {@link GlueSchemaRegistryAvroFormatFactory}. */ public class GlueSchemaRegistryFormatFactoryTest { private static final ResolvedSchema SCHEMA = @@ -129,7 +126,6 @@ public void testMissingSubjectForSink() { createTableSink(SCHEMA, options); } - /** * Returns the full options modified by the given consumer {@code optionModifier}. * From 9f54cc127b52016c9527b98f12be7bc9748dbf42 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 22 Dec 2023 14:54:00 +0100 Subject: [PATCH 3/7] Fix factories location --- .../schema/registry/GlueSchemaRegistryAvroFormatFactory.java | 4 ++-- .../services}/org.apache.flink.table.factories.Factory | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/{ => META-INF/services}/org.apache.flink.table.factories.Factory (100%) diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java index b414288cb..f2253d6c6 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java @@ -61,8 +61,8 @@ import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_TYPE; /** - * Table format factory for creating {@link SerializationSchema} and {@link DeserializationSchema} - * for Glue Schema Registry to RowData. + * Table format factory for providing configured instances of AWS Glue Schema Registry Avro to + * RowData {@link SerializationSchema} and {@link DeserializationSchema}. */ @Internal public class GlueSchemaRegistryAvroFormatFactory diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/org.apache.flink.table.factories.Factory rename to flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory From 8da3a689ef9aaa10fecf8c00088ec4ce1e34440a Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 22 Dec 2023 15:50:38 +0100 Subject: [PATCH 4/7] Add SQL module --- .../pom.xml | 132 ++++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 29 ++++ .../META-INF/licenses/LICENSE.zstd-jni | 26 ++++ .../glue/schema/registry/PackagingITCase.java | 49 +++++++ flink-formats-aws/pom.xml | 1 + 5 files changed, 237 insertions(+) create mode 100644 flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml create mode 100644 flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE create mode 100644 flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/licenses/LICENSE.zstd-jni create mode 100644 flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml new file mode 100644 index 000000000..1fd6e0fa0 --- /dev/null +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml @@ -0,0 +1,132 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-formats-aws-parent + 4.3-SNAPSHOT + + + flink-sql-avro-glue-schema-registry + Flink : Formats : AWS : SQL : Avro Glue Schema Registry + jar + + + + + org.apache.flink + flink-avro-glue-schema-registry + ${project.version} + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-connector-aws-base + org.apache.flink:flink-connector-dynamodb + software.amazon.awssdk:* + org.reactivestreams:* + com.typesafe.netty:* + org.apache.httpcomponents:* + io.netty:* + commons-logging:commons-logging + commons-codec:commons-codec + + + + + software.amazon + org.apache.flink.avro.registry.glue.shaded.software.amazon + + + org.apache.kafka + org.apache.flink.avro.registry.glue.shaded.org.apache.kafka + + + org.reactivestreams + org.apache.flink.connector.dynamodb.shaded.org.reactivestreams + + + com.typesafe.netty + org.apache.flink.connector.dynamodb.shaded.com.typesafe.netty + + + org.apache.http + org.apache.flink.connector.dynamodb.shaded.org.apache.http + + + io.netty + org.apache.flink.connector.dynamodb.shaded.io.netty + + + + + com.fasterxml.jackson + org.apache.flink.avro.shaded.com.fasterxml.jackson + + + org.apache.avro + org.apache.flink.avro.shaded.org.apache.avro + + + org.apache.commons.compress + org.apache.flink.avro.shaded.org.apache.commons.compress + + + + + org.apache.flink:flink-avro-glue-schema-registry:* + + profile + + + + + + + + + + + \ No newline at end of file diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000..658fd415b --- /dev/null +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE @@ -0,0 +1,29 @@ +link-sql-avro-glue-schema-registryu +Copyright 2014-2023 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.fasterxml.jackson.core:jackson-annotations:2.14.3 +- com.fasterxml.jackson.core:jackson-core:2.14.3 +- com.fasterxml.jackson.core:jackson-databind:2.14.3 +- com.fasterxml.jackson.core:jackson-dataformat-cbor:2.14.3 +- com.google.guava:guava:32.1.3-jre +- org.apache.avro:avro:1.11.3 +- org.apache.kafka:kafka-clients:2.8.1 +- org.xerial.snappy:snappy-java:1.1.10.4 +- joda-time:joda-time:2.8.1 +- org.apache.commons:commons-lang3:3.12.0 +- commons-logging:commons-logging:1.1.3 +- commons-lang:commons-lang:2.6 +- commons-io:commons-io:2.11.0 +- commons-codec:commons-codec:1.15 +- commons-validator:commons-validator:1.7 + + +This project bundles the following dependencies under the BSD license. +See bundled license files for details. + +- com.github.luben:zstd-jni:1.4.9-1 \ No newline at end of file diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/licenses/LICENSE.zstd-jni b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/licenses/LICENSE.zstd-jni new file mode 100644 index 000000000..7bdccb6a9 --- /dev/null +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/licenses/LICENSE.zstd-jni @@ -0,0 +1,26 @@ +Zstd-jni: JNI bindings to Zstd Library + +Copyright (c) 2015-present, Luben Karavelov/ All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java new file mode 100644 index 000000000..9dc9df996 --- /dev/null +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.glue.schema.registry; + +import org.apache.flink.packaging.PackagingTestUtils; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.test.resources.ResourceTestUtils; + +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.util.Arrays; + +class PackagingITCase { + + @Test + void testPackaging() throws Exception { + final Path jar = + ResourceTestUtils.getResource( + ".*/flink-sql-connector-aws-kinesis-streams[^/]*\\.jar"); + + PackagingTestUtils.assertJarContainsOnlyFilesMatching( + jar, + Arrays.asList( + "org/apache/flink/", + "org/apache/commons/", + "META-INF/", + "mozilla/", + "mime.types", + "VersionInfo.java")); + PackagingTestUtils.assertJarContainsServiceEntry(jar, Factory.class); + } +} diff --git a/flink-formats-aws/pom.xml b/flink-formats-aws/pom.xml index 73063cbb5..80a6069eb 100644 --- a/flink-formats-aws/pom.xml +++ b/flink-formats-aws/pom.xml @@ -36,6 +36,7 @@ under the License. flink-avro-glue-schema-registry flink-json-glue-schema-registry + flink-sql-avro-glue-schema-registry From 6373dd77607f25be60c51d77fd92417a5d6b6c42 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Sat, 23 Dec 2023 19:31:36 +0100 Subject: [PATCH 5/7] Fixed uber-jar packaging --- .../flink-avro-glue-schema-registry/pom.xml | 2 +- .../GlueSchemaRegistryAvroFormatFactory.java | 4 +- .../pom.xml | 64 ++++++++++++++----- .../src/main/resources/META-INF/NOTICE | 1 - .../glue/schema/registry/PackagingITCase.java | 12 +--- 5 files changed, 54 insertions(+), 29 deletions(-) diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml index 8cd906e01..f7ec408b4 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml @@ -62,7 +62,7 @@ under the License. ${project.version} - + software.amazon.glue schema-registry-serde ${glue.schema.registry.version} diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java index f2253d6c6..e7954ed4d 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java @@ -107,10 +107,12 @@ public EncodingFormat> createEncodingFormat( public SerializationSchema createRuntimeEncoder( DynamicTableSink.Context context, DataType consumedDataType) { final RowType rowType = (RowType) consumedDataType.getLogicalType(); + final org.apache.avro.Schema avroSchema = + AvroSchemaConverter.convertToSchema(rowType); return new AvroRowDataSerializationSchema( rowType, GlueSchemaRegistryAvroSerializationSchema.forGeneric( - AvroSchemaConverter.convertToSchema(rowType), + avroSchema, formatOptions.get(SCHEMA_NAME), buildConfigMap(formatOptions)), RowDataToAvroConverters.createConverter(rowType)); diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml index 1fd6e0fa0..e865cff01 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml @@ -63,40 +63,58 @@ org.apache.flink:flink-connector-aws-base - org.apache.flink:flink-connector-dynamodb + org.apache.flink:flink-avro-glue-schema-registry + com.amazonaws:* software.amazon.awssdk:* - org.reactivestreams:* - com.typesafe.netty:* - org.apache.httpcomponents:* - io.netty:* - commons-logging:commons-logging - commons-codec:commons-codec + software.amazon.glue:* + + org.apache.flink:flink-avro + org.apache.avro:avro + com.fasterxml.jackson.core:* + com.fasterxml.jackson.dataformat:* + org.apache.commons:commons-compress + org.reactivestreams:reactive-streams + com.google.guava:guava + com.google.guava:failureaccess + + software.amazon.glue:schema-registry-build-tools + + com.google.guava:listenablefuture + org.checkerframework:checker-qual + com.google.errorprone:error_prone_annotations + com.google.j2objc:j2objc-annotations + com.google.code.findbugs:jsr305 + software.amazon org.apache.flink.avro.registry.glue.shaded.software.amazon + + com.amazonaws + org.apache.flink.avro.registry.glue.shaded.com.amazonaws + org.apache.kafka org.apache.flink.avro.registry.glue.shaded.org.apache.kafka org.reactivestreams - org.apache.flink.connector.dynamodb.shaded.org.reactivestreams + org.apache.flink.avro.registry.glue.shaded.org.reactivestreams - com.typesafe.netty - org.apache.flink.connector.dynamodb.shaded.com.typesafe.netty + com.google + org.apache.flink.avro.registry.glue.shaded.com.google - org.apache.http - org.apache.flink.connector.dynamodb.shaded.org.apache.http + com.typesafe.netty + org.apache.flink.avro.registry.glue.shaded.com.typesafe.netty - io.netty - org.apache.flink.connector.dynamodb.shaded.io.netty + org.apache.http + org.apache.flink.avro.registry.glue.shaded.org.apache.http - profile + additionalTypes/** + java/** + metadata/** diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE index 658fd415b..008dad245 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE @@ -14,7 +14,6 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.avro:avro:1.11.3 - org.apache.kafka:kafka-clients:2.8.1 - org.xerial.snappy:snappy-java:1.1.10.4 -- joda-time:joda-time:2.8.1 - org.apache.commons:commons-lang3:3.12.0 - commons-logging:commons-logging:1.1.3 - commons-lang:commons-lang:2.6 diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java index 9dc9df996..3bbda71dc 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/PackagingITCase.java @@ -32,18 +32,10 @@ class PackagingITCase { @Test void testPackaging() throws Exception { final Path jar = - ResourceTestUtils.getResource( - ".*/flink-sql-connector-aws-kinesis-streams[^/]*\\.jar"); + ResourceTestUtils.getResource(".*/flink-sql-avro-glue-schema-registry[^/]*\\.jar"); PackagingTestUtils.assertJarContainsOnlyFilesMatching( - jar, - Arrays.asList( - "org/apache/flink/", - "org/apache/commons/", - "META-INF/", - "mozilla/", - "mime.types", - "VersionInfo.java")); + jar, Arrays.asList("org/apache/flink/", "org/apache/commons/", "META-INF/")); PackagingTestUtils.assertJarContainsServiceEntry(jar, Factory.class); } } From ff4e184e967316c019f146594dea218fbe24224d Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 29 Dec 2023 11:08:23 +0000 Subject: [PATCH 6/7] Documentation --- .../connectors/table/formats/avro-glue.md | 226 ++++++++++++++++++ .../GlueSchemaRegistryAvroFormatFactory.java | 26 +- .../pom.xml | 1 - 3 files changed, 242 insertions(+), 11 deletions(-) create mode 100644 docs/content/docs/connectors/table/formats/avro-glue.md diff --git a/docs/content/docs/connectors/table/formats/avro-glue.md b/docs/content/docs/connectors/table/formats/avro-glue.md new file mode 100644 index 000000000..eab089e6d --- /dev/null +++ b/docs/content/docs/connectors/table/formats/avro-glue.md @@ -0,0 +1,226 @@ +--- +title: Avro Glue Schema Registry +weight: 5 +type: docs +aliases: +- /dev/table/connectors/formats/avro-glue.html +--- + + +# AWS Glue Avro Format + +{{< label "Format: Serialization Schema" >}} +{{< label "Format: Deserialization Schema" >}} + +The AWS Glue Schema Registry (``avro-glue``) format allows you to read records that were serialized by ``com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer`` and to write records that can in turn be read by ``com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer``. +These records have their schemas stored out-of-band in a configured registry provided by the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas). + +When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured AWS Glue Schema Registry, based on the schema version id encoded in the record, while the reader schema is inferred from table schema. + +When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data. +The lookup is performed against the configured AWS Glue Schema Registry under the [value](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas) given in `avro-glue.schema-name`. +Optionally, you can enable schema auto-registration, allowing the writer to register a new schema version in the schema registry, directly. The new schema will be accepted only if it does not violate the compatibility mode that was set when the schema was created in the first place. + +The AWS Glue Schema format can only be used in conjunction with the [Apache Kafka SQL connector]({{< ref "docs/connectors/table/kafka" >}}) or the [Upsert Kafka SQL Connector]({{< ref "docs/connectors/table/upsert-kafka" >}}). + +Dependencies +------------ + +{{< sql_download_table "avro-glue" >}} + +How to create tables with Avro-Glue format +-------------- + + +Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values: + +```sql +CREATE TABLE user_created ( + + -- one column mapped to the Kafka raw UTF-8 key + the_kafka_key STRING, + + -- a few columns mapped to the Avro fields of the Kafka value + id STRING, + name STRING, + email STRING + +) WITH ( + + 'connector' = 'kafka', + 'topic' = 'user_events_example1', + 'properties.bootstrap.servers' = 'localhost:9092', + + -- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column + 'key.format' = 'raw', + 'key.fields' = 'the_kafka_key', + + 'value.format' = 'avro-glue', + 'value.avro-glue.region' = 'us-east-1', + 'value.avro-glue.registry.name' = 'my-schema-registry', + 'value.avro-glue.schema.name' = 'my-schema-name', + 'avro-glue.schema.autoRegistration' = 'true', + 'value.fields-include' = 'EXCEPT_KEY' +) +``` + +You can write data into the Kafka table as follows: + +``` +INSERT INTO user_created +SELECT + -- replicating the user id into a column mapped to the kafka key + id as the_kafka_key, + + -- all values + id, name, email +FROM some_table +``` + +Format Options +---------------- + +Note that naming of the properties slightly diverges from the [AWS Glue client code](https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L20) properties, to match with the conventions used by other Flink formats. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
format
required(none)StringSpecify what format to use, here should be 'avro-glue'.
avro-glue.aws.region
required(none)StringSpecify what AWS region the Glue Schema Registry is, such as 'us-east-1'.
avro-glue.aws.endpoint
optional(none)StringThe HTTP endpoint to use for AWS calls.
avro-glue.registry.name
required(none)StringThe name (not the ARN) of the Glue schema registry in which to store the schemas.
avro-glue.schema.name
required(none)StringThe name under which to store the schema in the registry.
avro-glue.schema.autoRegistration
optionalfalseBooleanWhether new schemas should be automatically registered rather than treated as errors. Only used when writing (serializing). Ignored when reading (deserializing).(
avro-glue.schema.compression
optionalNONEStringWhat kind of compression to use. Valid values are 'NONE' and 'ZLIB'.
avro-glue.schema.compatibility
optionalBACKWARDStringThe schema compatibility mode under which to store the schema. Valid values are: + 'NONE', + 'DISABLED', + 'BACKWARD', + 'BACKWARD_ALL', + 'FORWARD', + 'FORWARD_ALL', + 'FULL', and + 'FULL_ALL'. + Only used when schema auto-registration is enabled and when the schema is registered in the first place. + Ignored when reading or when a new schema version is auto-registered in an existing schema. +
avro-glue.cache.size
optional200IntegerThe size (in number of items, not bytes) of the cache the Glue client code should manage
avro-glue.cache.ttlMs
optional1 day (24 * 60 * 60 * 1000)IntegerThe time to live (in milliseconds) for cache entries. Defaults to 1 day.
+ +Note that the schema type (Generic or Specific Record) cannot be specified while using Table API. + +Schema Auto-registration +------------------------ + +By default, the schema auto-registration is disabled. When writing to a Kafka table new records are accepted only if a schema version that matches the table schema exactly is already registered in the Schema Registry at `registry.name` and `schema.name`. Otherwise, an exception is thrown. + +You can enable schema auto-registration setting the property `avro-glue.schema.autoRegistration` = `true`. + +When auto-registration is enabled, Flink will first check whether a schema matching the table schema is already registered in the Schema Registry. If the schema is already registered, the writer will reuse the schemaId. +If the table schema does not match any schema version already registered at the specified `registry.name` and `schema.name`, the writer will try to auto-register a new schema version. + +When auto-registering a new schema version, there are two different cases: + +1. No schema is registered at the specified `registry.name` and `schema.name`: a new schema, matching the table schema, will be registered. The compatibility mode is set to the value of the `schema.compatibility` property. +2. Another, different schema version is already registered at the specified `registry.name` and `schema.name`: in this case the new schema version will be accepted only it does not violate the schema evolution rules defined by the Compatibility Mode that has been set when the Schema has been created in the first place. + +When auto-registering a new schema, the schema compatibility mode is set based on the `avro-glue.schema.compatibility` property. + +Note that `avro-glue.schema.compatibility` is used only when a new schema is auto-registered in the first place. When a new schema version is auto-registered in an existing schema, the compatibility mode of the schema is never changed and the `avro-glue.schema.compatibility` is ignored. + +Data Type Mapping +---------------- + +Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. +See the [Apache Avro Format]({{< ref "docs/connectors/table/formats/avro" >}}#data-type-mapping) for the mapping between Avro and Flink DataTypes. + +In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro `union(something, null)`, where `something` is the Avro type converted from Flink type. + +You can refer to [Avro Specification](https://avro.apache.org/docs/current/spec.html) for more information about Avro types. \ No newline at end of file diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java index e7954ed4d..e4a250be6 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java @@ -43,6 +43,7 @@ import org.apache.flink.table.types.logical.RowType; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import org.apache.avro.generic.GenericRecord; import java.util.HashMap; import java.util.HashSet; @@ -83,11 +84,15 @@ public DeserializationSchema createRuntimeDecoder( final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = context.createTypeInformation(producedDataType); - return new AvroRowDataDeserializationSchema( + final org.apache.avro.Schema avroSchema = + AvroSchemaConverter.convertToSchema(rowType); + final GlueSchemaRegistryAvroDeserializationSchema nestedSchema = GlueSchemaRegistryAvroDeserializationSchema.forGeneric( - AvroSchemaConverter.convertToSchema(rowType), configMap), - AvroToRowDataConverters.createRowConverter(rowType), - rowDataTypeInfo); + avroSchema, configMap); + final AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter = + AvroToRowDataConverters.createRowConverter(rowType); + return new AvroRowDataDeserializationSchema( + nestedSchema, runtimeConverter, rowDataTypeInfo); } @Override @@ -109,13 +114,14 @@ public SerializationSchema createRuntimeEncoder( final RowType rowType = (RowType) consumedDataType.getLogicalType(); final org.apache.avro.Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType); - return new AvroRowDataSerializationSchema( - rowType, + final String transportName = formatOptions.get(SCHEMA_NAME); + final Map configMap = buildConfigMap(formatOptions); + final GlueSchemaRegistryAvroSerializationSchema nestedSchema = GlueSchemaRegistryAvroSerializationSchema.forGeneric( - avroSchema, - formatOptions.get(SCHEMA_NAME), - buildConfigMap(formatOptions)), - RowDataToAvroConverters.createConverter(rowType)); + avroSchema, transportName, configMap); + final RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter = + RowDataToAvroConverters.createConverter(rowType); + return new AvroRowDataSerializationSchema(rowType, nestedSchema, runtimeConverter); } @Override diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml index e865cff01..5f9785748 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml @@ -32,7 +32,6 @@ Flink : Formats : AWS : SQL : Avro Glue Schema Registry jar - org.apache.flink From 3cced2a905fda4d0030a8d9cce25940dcfc8527d Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Sun, 12 May 2024 20:03:45 +0100 Subject: [PATCH 7/7] Removed unnecessary dependencies from shaded jar Fixed param description to match actual default Fix typo in NOTICE --- .../flink-avro-glue-schema-registry/pom.xml | 4 ---- .../glue/schema/registry/AvroGlueFormatOptions.java | 2 +- .../flink-sql-avro-glue-schema-registry/pom.xml | 13 ++----------- .../src/main/resources/META-INF/NOTICE | 2 +- 4 files changed, 4 insertions(+), 17 deletions(-) diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml index f7ec408b4..5265a68e9 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml @@ -102,11 +102,7 @@ under the License. test-jar - - - - org.apache.flink flink-architecture-tests-test diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java index 017691500..0fac1a8af 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java @@ -62,7 +62,7 @@ public class AvroGlueFormatOptions { ConfigOptions.key("schema.autoRegistration") .booleanType() .defaultValue(false) - .withDescription("Whether auto-registration is enabled. Defaults to true."); + .withDescription("Whether auto-registration is enabled. Defaults to false."); public static final ConfigOption SCHEMA_COMPATIBILITY = ConfigOptions.key("schema.compatibility") diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml index 5f9785748..bf195b656 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/pom.xml @@ -66,24 +66,23 @@ com.amazonaws:* software.amazon.awssdk:* software.amazon.glue:* - org.apache.flink:flink-avro org.apache.avro:avro com.fasterxml.jackson.core:* com.fasterxml.jackson.dataformat:* org.apache.commons:commons-compress - org.reactivestreams:reactive-streams com.google.guava:guava com.google.guava:failureaccess software.amazon.glue:schema-registry-build-tools - com.google.guava:listenablefuture org.checkerframework:checker-qual com.google.errorprone:error_prone_annotations com.google.j2objc:j2objc-annotations com.google.code.findbugs:jsr305 + org.apache.kafka:kafka-clients + org.reactivestreams:reactive-streams @@ -95,14 +94,6 @@ com.amazonaws org.apache.flink.avro.registry.glue.shaded.com.amazonaws - - org.apache.kafka - org.apache.flink.avro.registry.glue.shaded.org.apache.kafka - - - org.reactivestreams - org.apache.flink.avro.registry.glue.shaded.org.reactivestreams - com.google org.apache.flink.avro.registry.glue.shaded.com.google diff --git a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE index 008dad245..2f5241319 100644 --- a/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE +++ b/flink-formats-aws/flink-sql-avro-glue-schema-registry/src/main/resources/META-INF/NOTICE @@ -1,4 +1,4 @@ -link-sql-avro-glue-schema-registryu +flink-sql-avro-glue-schema-registry Copyright 2014-2023 The Apache Software Foundation This product includes software developed at