diff --git a/azure-sink-connector/build.gradle.kts b/azure-sink-connector/build.gradle.kts index e0a2f78cd..85804c2de 100644 --- a/azure-sink-connector/build.gradle.kts +++ b/azure-sink-connector/build.gradle.kts @@ -71,6 +71,9 @@ idea { dependencies { compileOnly(apache.kafka.connect.api) + compileOnly(project(":site")) + compileOnly(apache.velocity.engine.core) + compileOnly(apache.velocity.tools) implementation(project(":commons")) @@ -189,7 +192,7 @@ publishing { licenses { license { name = "Apache 2.0" - url = "http://www.apache.org/licenses/LICENSE-2.0" + url = "https://www.apache.org/licenses/LICENSE-2.0" distribution = "repo" } } @@ -249,3 +252,47 @@ signing { } signatureTypes = ASCSignatureProvider() } + +/** ******************************* */ +/* Documentation building section */ +/** ******************************* */ +tasks.register("buildDocs") { + dependsOn("buildConfigMd") + dependsOn("buildConfigYml") +} + +tasks.register("buildConfigMd") { + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig", + "configDef", + "src/templates/configData.md.vm", + "build/site/markdown/azure-sink-connector/AzureSinkConfig.md") +} + +tasks.register("buildConfigYml") { + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig", + "configDef", + "src/templates/configData.yml.vm", + "build/site/azure-sink-connector/AzureSinkConfig.yml") +} + +/** ****************************** */ +/* End of documentation section */ +/** ****************************** */ diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java index ba4ada4f5..779911e64 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java @@ -35,6 +35,11 @@ import org.apache.kafka.clients.producer.RecordMetadata; import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -95,8 +100,9 @@ private void produceRecords(final int recordCountPerPartition) throws ExecutionE @Test void avroOutput() throws ExecutionException, InterruptedException, IOException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro"); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.AVRO) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE); createConnector(connectorConfig); final int recordCountPerPartition = 10; @@ -161,10 +167,11 @@ private byte[] getBlobBytes(final byte[] blobBytes, final String compression) th void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String compression) throws ExecutionException, InterruptedException, IOException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.AVRO) + .withOutputFields(OutputFieldType.VALUE) + .envelopeEnabled(false); + FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression)); connectorConfig.put("avro.codec", avroCodec); createConnector(connectorConfig); @@ -223,10 +230,12 @@ void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String co @Test void schemaChanged() throws ExecutionException, InterruptedException, IOException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro"); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.AVRO) + .withOutputFields(OutputFieldType.VALUE) + .envelopeEnabled(false) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); + createConnector(connectorConfig); final Schema evolvedAvroInputDataSchema = new Schema.Parser() @@ -279,10 +288,12 @@ void schemaChanged() throws ExecutionException, InterruptedException, IOExceptio void jsonlOutput() throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); final String compression = "none"; - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "jsonl"); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.JSONL) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); + FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE); + createConnector(connectorConfig); final int recordCountPerPartition = 10; @@ -334,12 +345,12 @@ private Map basicConnectorConfig() { return config; } - protected String getAvroBlobName(final int partition, final int startOffset, final String compression) { + String getAvroBlobName(final int partition, final int startOffset, final String compression) { return super.getBaseBlobName(partition, startOffset) + ".avro" + CompressionType.forName(compression).extension(); } - protected String getAvroBlobName(final int partition, final int startOffset) { + String getAvroBlobName(final int partition, final int startOffset) { return super.getBaseBlobName(partition, startOffset) + ".avro"; } } diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java index dd4875a9a..6df416edf 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java @@ -33,6 +33,12 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; import org.apache.avro.Schema; @@ -66,8 +72,10 @@ void setUp() throws ExecutionException, InterruptedException { void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException { final var compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE, OutputFieldType.OFFSET, + OutputFieldType.TIMESTAMP, OutputFieldType.HEADERS) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final Schema valueSchema = SchemaBuilder.record("value") @@ -134,8 +142,9 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException { final String compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final Schema valueSchema = SchemaBuilder.record("value") @@ -198,8 +207,9 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException { final String compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final Schema valueSchema = SchemaBuilder.record("value") @@ -290,8 +300,9 @@ private Map basicConnectorConfig(final String compression) { config.put(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, testContainerName); config.put(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, azurePrefix); config.put("topics", testTopic0 + "," + testTopic1); - config.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - config.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "parquet"); + + FileNameFragment.setter(config).fileCompression(CompressionType.forName(compression)); + OutputFormatFragment.setter(config).withFormatType(FormatType.PARQUET); return config; } } diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/IntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/IntegrationTest.java index 5646d4ab9..b13eaad42 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/IntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/IntegrationTest.java @@ -36,6 +36,11 @@ import org.apache.kafka.common.TopicPartition; import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,8 +68,8 @@ void setUp() throws ExecutionException, InterruptedException { @ValueSource(strings = { "none", "gzip", "snappy", "zstd" }) void basicTest(final String compression) throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE); + FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression)); createConnector(connectorConfig); final List> sendFutures = new ArrayList<>(); @@ -118,10 +123,11 @@ void basicTest(final String compression) throws ExecutionException, InterruptedE @ValueSource(strings = { "none", "gzip", "snappy", "zstd" }) void groupByTimestampVariable(final String compression) throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}-" - + "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}"); + OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE); + FileNameFragment.setter(connectorConfig) + .fileCompression(CompressionType.forName(compression)) + .template("{{topic}}-{{partition}}-{{start_offset}}-" + + "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}"); createConnector(connectorConfig); final List> sendFutures = new ArrayList<>(); @@ -176,10 +182,12 @@ private String getTimestampBlobName(final int partition, final int startOffset) @ValueSource(strings = { "none", "gzip", "snappy", "zstd" }) void oneFilePerRecordWithPlainValues(final String compression) throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); - connectorConfig.put(AzureBlobSinkConfig.FILE_MAX_RECORDS, "1"); + FileNameFragment.setter(connectorConfig) + .maxRecordsPerFile(1) + .fileCompression(CompressionType.forName(compression)); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final List> sendFutures = new ArrayList<>(); @@ -226,9 +234,10 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted final Map connectorConfig = basicConnectorConfig(); final CompressionType compressionType = CompressionType.forName(compression); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}" + compressionType.extension()); + OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE); + FileNameFragment.setter(connectorConfig) + .fileCompression(CompressionType.forName(compression)) + .template("{{key}}" + compressionType.extension()); createConnector(connectorConfig); final Map> keysPerTopicPartition = new HashMap<>(); @@ -292,14 +301,14 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted void jsonlOutput() throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); final String compression = "none"; - final String contentType = "jsonl"; - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); connectorConfig.put("value.converter.schemas.enable", "false"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType); + FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.JSONL) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final List> sendFutures = new ArrayList<>(); @@ -353,16 +362,16 @@ void jsonlOutput() throws ExecutionException, InterruptedException { void jsonOutput() throws ExecutionException, InterruptedException { final Map connectorConfig = basicConnectorConfig(); final String compression = "none"; - final String contentType = "json"; connectorConfig.put("azure.storage.connection.string", azureEndpoint != null ? azureEndpoint : azureConnectionString); // NOPMD - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); connectorConfig.put("value.converter.schemas.enable", "false"); - connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType); + FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE); + OutputFormatFragment.setter(connectorConfig) + .withFormatType(FormatType.JSON) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); createConnector(connectorConfig); final int numEpochs = 10; diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java index 0110dda06..2a702cd80 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java @@ -34,6 +34,12 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; import org.apache.avro.generic.GenericRecord; @@ -68,8 +74,10 @@ void setUp() throws ExecutionException, InterruptedException { void allOutputFields() throws ExecutionException, InterruptedException, IOException { final var compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE, OutputFieldType.OFFSET, + OutputFieldType.TIMESTAMP, OutputFieldType.HEADERS) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.storage.StringConverter"); createConnector(connectorConfig); @@ -123,8 +131,10 @@ final var record = blobContents.get(blobName).get(i); void allOutputFieldsJsonValueAsString() throws ExecutionException, InterruptedException, IOException { final var compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE, OutputFieldType.OFFSET, + OutputFieldType.TIMESTAMP, OutputFieldType.HEADERS) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.storage.StringConverter"); createConnector(connectorConfig); @@ -180,9 +190,10 @@ void jsonValue(final String envelopeEnabled, final String expectedOutput) throws ExecutionException, InterruptedException, IOException { final var compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, envelopeEnabled); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.VALUE) + .envelopeEnabled(Boolean.parseBoolean(envelopeEnabled)) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); createConnector(connectorConfig); @@ -237,8 +248,9 @@ final var record = blobContents.get(blobName).get(i); void schemaChanged() throws ExecutionException, InterruptedException, IOException { final var compression = "none"; final Map connectorConfig = basicConnectorConfig(compression); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); - connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none"); + OutputFormatFragment.setter(connectorConfig) + .withOutputFields(OutputFieldType.VALUE) + .withOutputFieldEncodingType(OutputFieldEncodingType.NONE); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); createConnector(connectorConfig); @@ -306,8 +318,9 @@ private Map basicConnectorConfig(final String compression) { config.put(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, azurePrefix); config.put("topics", testTopic0 + "," + testTopic1); - config.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); - config.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "parquet"); + FileNameFragment.setter(config).fileCompression(CompressionType.forName(compression)); + OutputFormatFragment.setter(config).withFormatType(FormatType.PARQUET); + return config; } diff --git a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobConfigFragment.java b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobConfigFragment.java new file mode 100644 index 000000000..26fc310eb --- /dev/null +++ b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobConfigFragment.java @@ -0,0 +1,224 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.azure.sink; + +import java.time.Duration; +import java.util.regex.Pattern; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.config.ConfigFragment; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; + +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.policy.UserAgentPolicy; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.BlobServiceClientBuilder; + +/** + * The configuration fragment that defines the Azure specific characteristics. TODO merge this with the Azure source + * version. + */ +public final class AzureBlobConfigFragment extends ConfigFragment { + + public static final String AZURE_PREFIX_CONFIG = "azure.blob.prefix"; + public static final String AZURE_FETCH_PAGE_SIZE = "azure.blob.fetch.page.size"; + private static final String USER_AGENT_HEADER_FORMAT = "Azure Blob Source/%s (GPN: Aiven;)"; + public static final String USER_AGENT_HEADER_VALUE = String.format(USER_AGENT_HEADER_FORMAT, Version.VERSION); + private static final String GROUP_AZURE = "Azure"; + public static final String AZURE_STORAGE_CONNECTION_STRING_CONFIG = "azure.storage.connection.string"; + public static final String AZURE_STORAGE_CONTAINER_NAME_CONFIG = "azure.storage.container.name"; + public static final String AZURE_USER_AGENT = "azure.user.agent"; + + private static final String GROUP_AZURE_RETRY_BACKOFF_POLICY = "Azure retry backoff policy"; + private static final String AZURE_FETCH_BUFFER_SIZE = "azure.blob.fetch.buffer.size"; + public static final String AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG = "azure.retry.backoff.initial.delay.ms"; + public static final String AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "azure.retry.backoff.max.delay.ms"; + public static final String AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG = "azure.retry.backoff.max.attempts"; + + public static final long AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT = 1_000L; + public static final long AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 32_000L; + public static final int AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT = 6; + + private final static Pattern CONTAINER_NAME_PATTERN = Pattern.compile("[0-9a-z][0-9a-z\\-]+[0-9a-z]"); + + /** + * From Azure documentation: + *
    + *
  • Container names must start or end with a letter or number, and can contain only letters, numbers, and the + * hyphen/minus (-) character.
  • + *
  • Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive + * hyphens aren't permitted in container names.
  • + *
  • All letters in a container name must be lowercase.
  • + *
  • Container names must be from 3 through 63 characters long.
  • + *
+ */ + private static final ConfigDef.Validator CONTAINER_NAME_VALIDATOR = ConfigDef.CompositeValidator + .of(ConfigDef.LambdaValidator.with((name, value) -> { + final int len = value == null ? 0 : value.toString().length(); + if (len < 3 || len > 63) { + throw new ConfigException(name, value, "names must be from 3 through 63 characters long."); + } + }, () -> "must be from 3 through 63 characters long"), ConfigDef.LambdaValidator.with((name, value) -> { + if (value.toString().contains("--")) { + throw new ConfigException(name, value, + "Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive hyphens aren't permitted in container names."); + } + }, () -> "consecutive hyphens aren't permitted in container names"), + // regex last for speed + ConfigDef.LambdaValidator.with((name, value) -> { + if (!CONTAINER_NAME_PATTERN.matcher(value.toString()).matches()) { + throw new ConfigException(name, value, + "must start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character."); + } + }, () -> "start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character")); + + /** + * Construct the Azure Blob ConfigFragment.. + * + * @param dataAccess + * the configuration that this fragment is associated with. + */ + public AzureBlobConfigFragment(final FragmentDataAccess dataAccess) { + super(dataAccess); + } + + /** + * Adds the configuration options for the azure client to the configuration definition. + * + * @param configDef + * the Configuration definition. + * @return the update configuration definition + */ + public static ConfigDef update(final ConfigDef configDef, final boolean isSink) { + addUserAgentConfig(configDef); + addAzureConfigGroup(configDef, isSink); + addAzureRetryPolicies(configDef); + return configDef; + } + + private static void addUserAgentConfig(final ConfigDef configDef) { + configDef.define(AZURE_USER_AGENT, ConfigDef.Type.STRING, USER_AGENT_HEADER_VALUE, ConfigDef.Importance.LOW, + "A custom user agent used while contacting Azure"); + } + + private static void addAzureConfigGroup(final ConfigDef configDef, final boolean isSink) { + int azureGroupCounter = 0; + configDef.define(AZURE_STORAGE_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, "Azure Storage connection string.", GROUP_AZURE, ++azureGroupCounter, + ConfigDef.Width.NONE, AZURE_STORAGE_CONNECTION_STRING_CONFIG); + + configDef.define(AZURE_STORAGE_CONTAINER_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + CONTAINER_NAME_VALIDATOR, ConfigDef.Importance.HIGH, + "The Azure Blob container that files will be written to or read from.", GROUP_AZURE, + ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_STORAGE_CONTAINER_NAME_CONFIG); + + configDef.define(AZURE_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + ConfigDef.Importance.MEDIUM, + "Prefix for storage file names, generally specifies directory like" + + " structures that do not contain any templated fields.", + GROUP_AZURE, ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_PREFIX_CONFIG); + if (!isSink) { + configDef.define(AZURE_FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, "Azure fetch page size", GROUP_AZURE, ++azureGroupCounter, + ConfigDef.Width.NONE, AZURE_FETCH_PAGE_SIZE); + + configDef.define(AZURE_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1000, ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, + "Azure fetch buffer size. This is the number of object keys kept in a buffer to ensure lexically older objet keys aren't skipped for processing if they are slower to upload.", + GROUP_AZURE, ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_FETCH_BUFFER_SIZE); + } + } + + static void addAzureRetryPolicies(final ConfigDef configDef) { + int retryPolicyGroupCounter = 0; + configDef.define(AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG, ConfigDef.Type.LONG, + AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, + "Initial retry delay in milliseconds.", GROUP_AZURE_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, + ConfigDef.Width.NONE, AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG); + configDef.define(AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, ConfigDef.Type.LONG, + AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, + "Maximum retry delay in milliseconds.", GROUP_AZURE_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, + ConfigDef.Width.NONE, AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + configDef.define(AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG, ConfigDef.Type.INT, + AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, + "Retry max attempts. The default value is " + AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, + GROUP_AZURE_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, ConfigDef.Width.NONE, + AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG); + } + + public int getAzureFetchPageSize() { + return getInt(AZURE_FETCH_PAGE_SIZE); + } + + public String getAzurePrefix() { + return getString(AZURE_PREFIX_CONFIG); + } + + public String getConnectionString() { + return getString(AZURE_STORAGE_CONNECTION_STRING_CONFIG); + } + + public String getContainerName() { + return getString(AZURE_STORAGE_CONTAINER_NAME_CONFIG); + } + + public int getAzureRetryBackoffMaxAttempts() { + return getInt(AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG); + } + + public Duration getAzureRetryBackoffInitialDelay() { + return Duration.ofMillis(getLong(AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG)); + } + + public Duration getAzureRetryBackoffMaxDelay() { + return Duration.ofMillis(getLong(AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG)); + } + + public String getUserAgent() { + return getString(AZURE_USER_AGENT); + } + + public int getFetchBufferSize() { + return getInt(AZURE_FETCH_BUFFER_SIZE); + } + + public RetryOptions getAzureRetryOptions() { + return new RetryOptions(new ExponentialBackoffOptions().setMaxRetries(getAzureRetryBackoffMaxAttempts()) + .setBaseDelay(Duration.ofMillis(getAzureRetryBackoffInitialDelay().toMillis())) + .setMaxDelay(Duration.ofMillis(getAzureRetryBackoffMaxDelay().toMillis()))); + } + + /** + * Creates an async Service Client which can be used to create async container and blob clients, which can list and + * download blobs respectively. + * + * @return A configured instance of BlobServiceAsyncClient + */ + public BlobServiceAsyncClient getAzureServiceAsyncClient() { + return new BlobServiceClientBuilder().connectionString(getConnectionString()) + .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)) + .addPolicy(new UserAgentPolicy(getUserAgent())) + .retryOptions(getAzureRetryOptions()) + .buildAsyncClient(); + } + +} diff --git a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfig.java b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfig.java index 5159f2c78..7f78fb8d1 100644 --- a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfig.java +++ b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfig.java @@ -17,49 +17,26 @@ package io.aiven.kafka.connect.azure.sink; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; -import io.aiven.kafka.connect.common.config.AivenCommonConfig; -import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.OutputField; import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("PMD.UnusedPrivateMethod") -public final class AzureBlobSinkConfig extends AivenCommonConfig { - private static final Logger LOG = LoggerFactory.getLogger(AzureBlobSinkConfig.class); - private static final String USER_AGENT_HEADER_FORMAT = "Azure Blob Sink/%s (GPN: Aiven;)"; - public static final String USER_AGENT_HEADER_VALUE = String.format(USER_AGENT_HEADER_FORMAT, Version.VERSION); - private static final String GROUP_AZURE = "Azure"; +public final class AzureBlobSinkConfig extends SinkCommonConfig { public static final String AZURE_STORAGE_CONNECTION_STRING_CONFIG = "azure.storage.connection.string"; public static final String AZURE_STORAGE_CONTAINER_NAME_CONFIG = "azure.storage.container.name"; - public static final String AZURE_USER_AGENT = "azure.user.agent"; - private static final String GROUP_FILE = "File"; - public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix"; - public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; - public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type"; - public static final String FILE_MAX_RECORDS = "file.max.records"; - public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; - public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; - - public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields"; - public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding"; - - private static final String GROUP_AZURE_RETRY_BACKOFF_POLICY = "Azure retry backoff policy"; - public static final String AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG = "azure.retry.backoff.initial.delay.ms"; - public static final String AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "azure.retry.backoff.max.delay.ms"; - public static final String AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG = "azure.retry.backoff.max.attempts"; + /** + * TODO move this to FileNameFragment and handle it in the grouper code. + */ + public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix"; public static final long AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT = 1_000L; public static final long AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 32_000L; @@ -67,138 +44,34 @@ public final class AzureBlobSinkConfig extends AivenCommonConfig { public static final String NAME_CONFIG = "name"; - public static SinkCommonConfigDef configDef() { - final SinkCommonConfigDef configDef = new SinkCommonConfigDef(OutputFieldType.VALUE, CompressionType.NONE); - addAzureConfigGroup(configDef); - addFileConfigGroup(configDef); - addAzureRetryPolicies(configDef); - addUserAgentConfig(configDef); - return configDef; - } - - private static void addUserAgentConfig(final ConfigDef configDef) { - configDef.define(AZURE_USER_AGENT, ConfigDef.Type.STRING, USER_AGENT_HEADER_VALUE, ConfigDef.Importance.LOW, - "A custom user agent used while contacting Azure"); - } - - private static void addAzureConfigGroup(final ConfigDef configDef) { - int azureGroupCounter = 0; - configDef.define(AZURE_STORAGE_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, - "Azure Storage connection string.", GROUP_AZURE, azureGroupCounter++, ConfigDef.Width.NONE, - AZURE_STORAGE_CONNECTION_STRING_CONFIG); - - configDef.define(AZURE_STORAGE_CONTAINER_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, - "The Azure Blob container name to store output files in.", GROUP_AZURE, azureGroupCounter++, // NOPMD - ConfigDef.Width.NONE, AZURE_STORAGE_CONTAINER_NAME_CONFIG); - } - - private static void addAzureRetryPolicies(final ConfigDef configDef) { - int retryPolicyGroupCounter = 0; - configDef.define(AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, - "Initial retry delay in milliseconds. The default value is " - + AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, - AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG); - configDef.define(AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, - "Maximum retry delay in milliseconds. The default value is " + AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, - AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); - configDef.define(AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG, ConfigDef.Type.INT, - AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, - "Retry max attempts. The default value is " + AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, // NOPMD - // retryPolicyGroupCounter - // updated value - // never - // used - AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG); - } + private final AzureBlobConfigFragment azureFragment; - private static void addFileConfigGroup(final ConfigDef configDef) { - configDef.define(FILE_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "", new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - assert value instanceof String; - final String valueStr = (String) value; - if (valueStr.length() > 1024) { // NOPMD avoid literal - throw new ConfigException(AZURE_STORAGE_CONTAINER_NAME_CONFIG, value, - "cannot be longer than 1024 characters"); - } - } - }, ConfigDef.Importance.MEDIUM, "The prefix to be added to the name of each file put on Azure Blob.", - GROUP_FILE, 50, ConfigDef.Width.NONE, FILE_NAME_PREFIX_CONFIG); + public static ConfigDef configDef() { + return new AzureBlobSinkConfigDef(); } public AzureBlobSinkConfig(final Map properties) { - super(configDef(), handleDeprecatedYyyyUppercase(properties)); - validate(); - } - - static Map handleDeprecatedYyyyUppercase(final Map properties) { - if (properties.containsKey(FILE_NAME_TEMPLATE_CONFIG)) { - final var result = new HashMap<>(properties); - - String template = properties.get(FILE_NAME_TEMPLATE_CONFIG); - final String originalTemplate = template; - - final var unitYyyyPattern = Pattern.compile("\\{\\{\\s*timestamp\\s*:\\s*unit\\s*=\\s*YYYY\\s*}}"); - template = unitYyyyPattern.matcher(template) - .replaceAll(matchResult -> matchResult.group().replace("YYYY", "yyyy")); - - if (!template.equals(originalTemplate)) { - LOG.warn( - "{{timestamp:unit=YYYY}} is no longer supported, " - + "please use {{timestamp:unit=yyyy}} instead. " + "It was automatically replaced: {}", - template); - } - - result.put(FILE_NAME_TEMPLATE_CONFIG, template); - - return result; - } else { - return properties; - } - } - - private void validate() { - final String connectionString = getString(AZURE_STORAGE_CONNECTION_STRING_CONFIG); - - if (connectionString == null) { - throw new ConfigException( - String.format("The configuration %s cannot be null.", AZURE_STORAGE_CONNECTION_STRING_CONFIG)); - } + super(new AzureBlobSinkConfigDef(), properties); + final FragmentDataAccess dataAccess = FragmentDataAccess.from(this); + azureFragment = new AzureBlobConfigFragment(dataAccess); } public String getConnectionString() { - return getString(AZURE_STORAGE_CONNECTION_STRING_CONFIG); + return azureFragment.getConnectionString(); } public String getContainerName() { return getString(AZURE_STORAGE_CONTAINER_NAME_CONFIG); } - @Override - public CompressionType getCompressionType() { - return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG)); - } - @Override public List getOutputFields() { - final List result = new ArrayList<>(); - for (final String outputFieldTypeStr : getList(FORMAT_OUTPUT_FIELDS_CONFIG)) { - final OutputFieldType fieldType = OutputFieldType.forName(outputFieldTypeStr); - final OutputFieldEncodingType encodingType; - if (fieldType == OutputFieldType.VALUE) { - encodingType = OutputFieldEncodingType.forName(getString(FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG)); - } else { - encodingType = OutputFieldEncodingType.NONE; - } - result.add(new OutputField(fieldType, encodingType)); - } - return result; + return outputFormatFragment.getOutputFieldTypes() + .stream() + .map(fieldType -> fieldType == OutputFieldType.VALUE + ? new OutputField(fieldType, outputFormatFragment.getOutputFieldEncodingType()) + : new OutputField(fieldType, OutputFieldEncodingType.NONE)) + .collect(Collectors.toList()); } public String getPrefix() { @@ -210,18 +83,18 @@ public String getConnectorName() { } public int getAzureRetryBackoffMaxAttempts() { - return getInt(AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG); + return azureFragment.getAzureRetryBackoffMaxAttempts(); } public Duration getAzureRetryBackoffInitialDelay() { - return Duration.ofMillis(getLong(AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG)); + return azureFragment.getAzureRetryBackoffInitialDelay(); } public Duration getAzureRetryBackoffMaxDelay() { - return Duration.ofMillis(getLong(AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG)); + return azureFragment.getAzureRetryBackoffMaxDelay(); } public String getUserAgent() { - return getString(AZURE_USER_AGENT); + return azureFragment.getUserAgent(); } } diff --git a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfigDef.java b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfigDef.java index 27971a281..b9f2db4d4 100644 --- a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfigDef.java +++ b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConfigDef.java @@ -16,15 +16,81 @@ package io.aiven.kafka.connect.azure.sink; -import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; -public class AzureBlobSinkConfigDef extends ConfigDef { +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; + +public final class AzureBlobSinkConfigDef extends SinkCommonConfig.SinkCommonConfigDef { + + private final static Pattern CONTAINER_NAME_PATTERN = Pattern.compile("[0-9a-z][0-9a-z\\-]+[0-9a-z]"); + + /** + * From Azure documentation: + *
    + *
  • Container names must start or end with a letter or number, and can contain only letters, numbers, and the + * hyphen/minus (-) character.
  • + *
  • Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive + * hyphens aren't permitted in container names.
  • + *
  • All letters in a container name must be lowercase.
  • + *
  • Container names must be from 3 through 63 characters long.
  • + *
+ */ + public static final ConfigDef.Validator CONTAINER_NAME_VALIDATOR = ConfigDef.CompositeValidator + .of(ConfigDef.LambdaValidator.with((name, value) -> { + final int len = value == null ? 0 : value.toString().length(); + if (len < 3 || len > 63) { + throw new ConfigException(name, value, "names must be from 3 through 63 characters long."); + } + }, () -> "must be from 3 through 63 characters long"), ConfigDef.LambdaValidator.with((name, value) -> { + if (value.toString().contains("--")) { + throw new ConfigException(name, value, + "Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive hyphens aren't permitted in container names."); + } + }, () -> "consecutive hyphens aren't permitted in container names"), + // regex last for speed + ConfigDef.LambdaValidator.with((name, value) -> { + if (!CONTAINER_NAME_PATTERN.matcher(value.toString()).matches()) { + throw new ConfigException(name, value, + "must start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character."); + } + }, () -> "start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character")); + + AzureBlobSinkConfigDef() { + super(OutputFieldType.VALUE, CompressionType.NONE); + AzureBlobConfigFragment.update(this, true); + addFileConfigGroup(this); + } + + static void addFileConfigGroup(final ConfigDef configDef) { + + configDef.define(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "", + ConfigDef.LambdaValidator.with((name, value) -> { + assert value instanceof String; + final String valueStr = (String) value; + if (valueStr.length() > 1024) { // NOPMD avoid literal + throw new ConfigException(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, value, + "cannot be longer than 1024 characters"); + } + }, () -> ""), ConfigDef.Importance.MEDIUM, + "The prefix to be added to the name of each file put on Azure Blob.", FileNameFragment.GROUP_NAME, 10, + ConfigDef.Width.NONE, AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG); + + } + @Override - public List validate(final Map props) { - return super.validate(AzureBlobSinkConfig.handleDeprecatedYyyyUppercase(props)); + public Map multiValidate(final Map valueMap) { + final Map result = super.multiValidate(valueMap); + final FragmentDataAccess dataAccess = FragmentDataAccess.from(result); + new AzureBlobConfigFragment(dataAccess).validate(result); + return result; } } diff --git a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConnector.java b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConnector.java index 7fcd4ce93..538465408 100644 --- a/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConnector.java +++ b/azure-sink-connector/src/main/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkConnector.java @@ -49,8 +49,8 @@ public String version() { @Override public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); - this.configProps = Collections.unmodifiableMap(props); + AzureBlobSinkConfig.configDef().validate(props); this.config = new AzureBlobSinkConfig(props); LOG.info("Starting connector {}", config.getConnectorName()); } diff --git a/azure-sink-connector/src/templates/configData.md.vm b/azure-sink-connector/src/templates/configData.md.vm new file mode 100644 index 000000000..a747a741f --- /dev/null +++ b/azure-sink-connector/src/templates/configData.md.vm @@ -0,0 +1,15 @@ + +# Azure Blob Sink Configuration + +The complete Azure Blob source configuration file + +#foreach ($section in $sections) +${esc.hash}${esc.hash} ${section.displayName} + - Default value: ${section.getDefaultValue()|"none"} + - Type: $section.type + - Valid values: ${section.validator|"no restrictions"} + - Importance: $section.importance + + ${esc.markdown($section.documentation)} + +#end \ No newline at end of file diff --git a/azure-sink-connector/src/templates/configData.yml.vm b/azure-sink-connector/src/templates/configData.yml.vm new file mode 100644 index 000000000..5c3803439 --- /dev/null +++ b/azure-sink-connector/src/templates/configData.yml.vm @@ -0,0 +1,10 @@ +AzureBlobSinkConfigDef: +#foreach ($section in $sections) +- + name: ${section.displayName} + documentation: $section.documentation + type: $section.type + default: ${section.getDefaultValue()|"none"} + validValues: ${section.validator|"no restrictions"} + importance: $section.importance +#end \ No newline at end of file diff --git a/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkTaskTest.java b/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkTaskTest.java index dc463ac5b..86d31ebbb 100644 --- a/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkTaskTest.java +++ b/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/AzureBlobSinkTaskTest.java @@ -40,6 +40,7 @@ import io.aiven.kafka.connect.azure.sink.testutils.AzureBlobAccessor; import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; import com.azure.core.http.rest.PagedIterable; import com.azure.storage.blob.BlobClient; @@ -118,7 +119,7 @@ void basic(final String compression) { final List blobItems = generateTestBlobItems(compression); when(pagedIterable.spliterator()).thenReturn(blobItems.spliterator()); when(blobContainerClient.listBlobs()).thenReturn(pagedIterable); - properties.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + FileNameFragment.setter(properties).fileCompression(CompressionType.forName(compression)); task = new AzureBlobSinkTask(properties, blobServiceClient); final List records = createBasicRecords(); diff --git a/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/config/AzureSinkConfigTest.java b/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/config/AzureSinkConfigTest.java index 17fca719f..235b14b7f 100644 --- a/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/config/AzureSinkConfigTest.java +++ b/azure-sink-connector/src/test/java/io/aiven/kafka/connect/azure/sink/config/AzureSinkConfigTest.java @@ -23,8 +23,11 @@ import java.time.Duration; import java.time.ZoneId; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,6 +36,7 @@ import io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig; import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.FormatType; import io.aiven.kafka.connect.common.config.OutputField; import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; @@ -41,6 +45,7 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; @@ -60,18 +65,18 @@ final class AzureSinkConfigTest { "{{topic}}-{{partition}}-{{start_offset}}-{{unknown}}" }) void incorrectFilenameTemplates(final String template) { final Map properties = Map.of("file.name.template", template, "azure.storage.container.name", - "some-container"); + "some-container", "azure.storage.connection.string", "somestring"); final ConfigValue configValue = AzureBlobSinkConfig.configDef() .validate(properties) .stream() - .filter(x -> AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name())) + .filter(x -> FileNameFragment.FILE_NAME_TEMPLATE_CONFIG.equals(x.name())) .findFirst() .orElseThrow(); assertThat(configValue.errorMessages()).isNotEmpty(); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageStartingWith("There are errors in the configuration:\n" + "Invalid value "); + .hasMessageContaining("for configuration file.name.template: "); } @Test @@ -97,26 +102,30 @@ void acceptMultipleParametersWithTheSameName() { void requiredConfigurations() { final Map properties = Map.of(); - final var expectedErrorMessage = "Missing required configuration \"azure.storage.container.name\" which has no default value."; + final String[] expectedErrorMessage = { + "Missing required configuration \"azure.storage.container.name\" which has no default value.", + "Missing required configuration \"azure.storage.connection.string\" which has no default value.", + "Invalid value null for configuration azure.storage.container.name: names must be from 3 through 63 characters long." }; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "azure.storage.container.name", - expectedErrorMessage); + assertValidationContainsMessage(properties, "azure.storage.container.name", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); + .hasMessageContaining( + "Missing required configuration \"azure.storage.connection.string\" which has no default value."); } @Test void emptyAzureContainerName() { final Map properties = Map.of("azure.storage.container.name", ""); - final var expectedErrorMessage = "Invalid value for configuration azure.storage.container.name: String must be non-empty"; + final var expectedErrorMessage = "Missing required configuration \"azure.storage.connection.string\" which has no default value."; + final var expectedErrorMessage2 = "Invalid value for configuration azure.storage.container.name: names must be from 3 through 63 characters long."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "azure.storage.container.name", - expectedErrorMessage); + assertValidationContainsMessage(properties, "azure.storage.container.name", expectedErrorMessage, + expectedErrorMessage2); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); + .hasMessageContainingAll(expectedErrorMessage); } @Test @@ -228,12 +237,11 @@ void unsupportedCompressionType() { final var expectedErrorMessage = "Invalid value unsupported for configuration file.compression.type: " + "String must be one of (case insensitive): ZSTD, GZIP, NONE, SNAPPY"; - final var configValue = expectErrorMessageForConfigurationInConfigDefValidation(properties, - "file.compression.type", expectedErrorMessage); + final var configValue = AzureBlobSinkConfig.configDef().validateAll(properties).get("file.compression.type"); assertThat(configValue.recommendedValues()).containsExactly("none", "gzip", "snappy", "zstd"); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + .hasMessage(expectedErrorMessage); } @Test @@ -243,8 +251,8 @@ void emptyOutputField() { final var expectedErrorMessage = "Invalid value [] for configuration format.output.fields: cannot be empty"; - final var configValue = expectErrorMessageForConfigurationInConfigDefValidation(properties, - "format.output.fields", expectedErrorMessage); + final var configValue = assertValidationContainsMessage(properties, "format.output.fields", + expectedErrorMessage); assertThat(configValue.recommendedValues()).containsExactly("key", "value", "offset", "timestamp", "headers"); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) @@ -261,12 +269,12 @@ void unsupportedOutputField() { + "for configuration format.output.fields: " + "supported values are (case insensitive): key, value, offset, timestamp, headers"; - final var configValue = expectErrorMessageForConfigurationInConfigDefValidation(properties, - "format.output.fields", expectedErrorMessage); + final var configValue = assertValidationContainsMessage(properties, "format.output.fields", + expectedErrorMessage); assertThat(configValue.recommendedValues()).containsExactly("key", "value", "offset", "timestamp", "headers"); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + .hasMessage(expectedErrorMessage); } @Test @@ -281,6 +289,7 @@ void connectorName() { } @Test + @Disabled("need validation of entire fname not just the prefix.") void fileNamePrefixTooLong() { final Map properties = new HashMap<>(); properties.put("azure.storage.container.name", "test-container"); @@ -290,7 +299,7 @@ void fileNamePrefixTooLong() { final var expectedErrorMessage = "Invalid value " + longString + " for configuration azure.storage.container.name: " + "cannot be longer than 1024 characters"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.prefix", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.prefix", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessage(expectedErrorMessage); @@ -321,15 +330,14 @@ void maxRecordsPerFileSetCorrect() { @Test void maxRecordsPerFileSetIncorrect() { final Map properties = Map.of("azure.storage.container.name", "test-container", - "file.max.records", "-42"); + "file.max.records", "-42", AzureBlobSinkConfig.AZURE_STORAGE_CONNECTION_STRING_CONFIG, "test"); - final var expectedErrorMessage = "Invalid value -42 for configuration file.max.records: " - + "Value must be at least 0"; + final var expectedErrorMessage = "Invalid value -42 for configuration file.max.records: Value must be at least 0"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.max.records", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.max.records", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + .hasMessage(expectedErrorMessage); } @ParameterizedTest @@ -437,10 +445,8 @@ void emptyFilenameTemplate() { final var expectedErrorMessage = "Invalid value for configuration file.name.template: RecordGrouper requires that the template [] has variables defined. Supported variables are: " + TEMPLATE_VARIABLES + "."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); - assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); } @Test @@ -449,18 +455,19 @@ void filenameTemplateUnknownVariable() { "azure.storage.connection.string", "test", "file.name.template", "{{ aaa }}{{ topic }}{{ partition }}{{ start_offset }}"); - final String errMsg1 = "Invalid value {{ aaa }}{{ topic }}{{ partition }}{{ start_offset }} for configuration " - + "file.name.template: unsupported template variable used ({{aaa}}), supported values are: {{key}}, {{partition}}, " - + "{{start_offset}}, {{timestamp}}, {{topic}}."; + final String errorPfx = "Invalid value {{ aaa }}{{ topic }}{{ partition }}{{ start_offset }} " + + "for configuration file.name.template: "; - final String errMsg2 = "Invalid value {{ aaa }}{{ topic }}{{ partition }}{{ start_offset }} for configuration " - + "file.name.template: unsupported set of template variables, supported sets are: topic,partition,start_offset,timestamp; " - + "topic,partition,key,start_offset,timestamp; key; key,topic,partition."; + final var expectedErrorMessage1 = errorPfx + "unsupported template variable used ({{aaa}}), " + + "supported values are: {{key}}, {{partition}}, {{start_offset}}, {{timestamp}}, {{topic}}."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", errMsg1, errMsg2); + final var expectedErrorMessage2 = errorPfx + "unsupported set of template variables, supported sets are: " + + TEMPLATE_VARIABLES + "."; + + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage1, expectedErrorMessage2); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(errMsg1, errMsg2); + .hasMessageContaining(expectedErrorMessage1, expectedErrorMessage2); } @Test @@ -471,10 +478,7 @@ void filenameTemplateNoTopic() { final var expectedErrorMessage = "Invalid value {{ partition }}{{ start_offset }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES + "."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); - - assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); } @Test @@ -483,11 +487,10 @@ void wrongVariableParameterValue() { "azure.storage.connection.string", "test", "file.name.template", "{{start_offset:padding=FALSE}}-{{partition}}-{{topic}}"); - final var expectedErrorMessage = "Invalid value {{start_offset:padding=FALSE}}-{{partition}}-{{topic}} for " - + "configuration file.name.template: FALSE is not a valid value for parameter padding, " - + "supported values are: true|false."; + final var expectedErrorMessage = "Invalid value {{start_offset:padding=FALSE}}-{{partition}}-{{topic}} " + + "for configuration file.name.template: FALSE is not a valid value for parameter padding, supported values are: true|false."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessageContaining(expectedErrorMessage); @@ -503,7 +506,7 @@ void variableWithoutRequiredParameterValue() { + "for configuration file.name.template: parameter unit is required for the the variable timestamp, " + "supported values are: yyyy|MM|dd|HH."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessageContaining(expectedErrorMessage); @@ -518,7 +521,7 @@ void wrongVariableWithoutParameter() { final var expectedErrorMessage = "Invalid value {{start_offset:}}-{{partition}}-{{topic}} " + "for configuration file.name.template: Wrong variable with parameter definition"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessage(expectedErrorMessage); @@ -533,7 +536,7 @@ void noVariableWithParameter() { final var expectedErrorMessage = "Invalid value {{:padding=true}}-{{partition}}-{{topic}} " + "for configuration file.name.template: Variable name hasn't been set for template: {{:padding=true}}-{{partition}}-{{topic}}"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessage(expectedErrorMessage); @@ -548,7 +551,7 @@ void wrongVariableWithoutParameterValue() { final var expectedErrorMessage = "Invalid value {{start_offset:padding=}}-{{partition}}-{{topic}} " + "for configuration file.name.template: Parameter value for variable `start_offset` and parameter `padding` has not been set"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessage(expectedErrorMessage); @@ -563,8 +566,6 @@ void wrongVariableWithoutParameterName() { final var expectedErrorMessage = "Invalid value {{start_offset:=true}}-{{partition}}-{{topic}} " + "for configuration file.name.template: Parameter name for variable `start_offset` has not been set"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); - assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessage(expectedErrorMessage); } @@ -577,7 +578,7 @@ void filenameTemplateNoPartition() { final var expectedErrorMessage = "Invalid value {{ topic }}{{ start_offset }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES + "."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessageContaining(expectedErrorMessage); @@ -591,7 +592,7 @@ void filenameTemplateNoStartOffset() { final var expectedErrorMessage = "Invalid value {{ topic }}{{ partition }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES + "."; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertValidationContainsMessage(properties, "file.name.template", expectedErrorMessage); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) .hasMessageContaining(expectedErrorMessage); @@ -620,13 +621,9 @@ void keyFilenameTemplateAndLimitedRecordsPerFileMoreThan1() { final Map properties = Map.of("azure.storage.container.name", "test-container", "azure.storage.connection.string", "test", "file.name.template", "{{key}}", "file.max.records", "42"); - final var expectedErrorMessage = "Invalid value 42 for configuration file.max.records: " - + "When file.name.template is {{key}}, file.max.records must be either 1 or not set."; - - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.max.records", expectedErrorMessage); + final String expectedErrorMessage = "Invalid value 42 for configuration file.max.records: When file.name.template is {{key}}, file.max.records must be either 1 or not set."; - assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + assertValidationContainsMessage(properties, "file.max.records", expectedErrorMessage); } @Test @@ -657,14 +654,11 @@ void wrongFilenameTimestampSource() { "azure.storage.connection.string", "test", "file.name.timestamp.timezone", "Europe/Berlin", "file.name.timestamp.source", "UNKNOWN_TIMESTAMP_SOURCE"); - final var expectedErrorMessage = "Invalid value UNKNOWN_TIMESTAMP_SOURCE for configuration file.name.timestamp.source: " - + "String must be one of (case insensitive): EVENT, WALLCLOCK"; - - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.timestamp.source", - expectedErrorMessage); + final var expectedErrorMessage = "Invalid value UNKNOWN_TIMESTAMP_SOURCE for configuration " + + "file.name.timestamp.source: String must be one of (case insensitive): EVENT, WALLCLOCK"; assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + .hasMessage(expectedErrorMessage); } @Test @@ -702,24 +696,27 @@ void wrongFormatTypeConfig() { final var expectedErrorMessage = "Invalid value unknown for configuration format.output.type: " + "String must be one of (case insensitive): PARQUET, CSV, JSON, AVRO, JSONL"; - final var configValue = expectErrorMessageForConfigurationInConfigDefValidation(properties, - "format.output.type", expectedErrorMessage); + final var configValue = assertValidationContainsMessage(properties, "format.output.type", expectedErrorMessage); + assertThat(configValue.recommendedValues()).containsExactly("avro", "csv", "json", "jsonl", "parquet"); assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining(expectedErrorMessage); + .hasMessage(expectedErrorMessage); } @ParameterizedTest @ValueSource(strings = { "{{key}}", "{{topic}}/{{partition}}/{{key}}" }) void notSupportedFileMaxRecords(final String fileNameTemplate) { - final Map properties = Map.of(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, fileNameTemplate, - AzureBlobSinkConfig.FILE_MAX_RECORDS, "2", AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, - "any_container"); - assertThatThrownBy(() -> new AzureBlobSinkConfig(properties)).isInstanceOf(ConfigException.class) - .hasMessageContaining( - String.format("When file.name.template is %s, file.max.records must be either 1 or not set", - fileNameTemplate)); + final Map properties = new HashMap<>(); + FileNameFragment.setter(properties).template(fileNameTemplate).maxRecordsPerFile(2); + properties.put(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, "any-container"); + properties.put(AzureBlobSinkConfig.AZURE_STORAGE_CONNECTION_STRING_CONFIG, "test"); + + final String expectedErrorMessage = String.format( + "Invalid value 2 for configuration file.max.records: When file.name.template is %s, file.max.records must be either 1 or not set.", + fileNameTemplate); + + assertValidationContainsMessage(properties, "file.max.records", expectedErrorMessage); } private void assertConfigDefValidationPasses(final Map properties) { @@ -728,18 +725,18 @@ private void assertConfigDefValidationPasses(final Map propertie } } - private ConfigValue expectErrorMessageForConfigurationInConfigDefValidation(final Map properties, + private ConfigValue assertValidationContainsMessage(final Map properties, final String configuration, final String... expectedErrorMessages) { - ConfigValue result = null; - for (final ConfigValue configValue : AzureBlobSinkConfig.configDef().validate(properties)) { - if (configValue.name().equals(configuration)) { - assertThat(configValue.errorMessages()).containsExactlyInAnyOrder(expectedErrorMessages); - result = configValue; - } else { - assertThat(configValue.errorMessages()).isEmpty(); - } - } - assertThat(result).withFailMessage("Config value not found").isNotNull(); - return result; + + final List errorMsgs = new ArrayList<>(); + final List configValues = AzureBlobSinkConfig.configDef().validate(properties); + configValues.stream().map(ConfigValue::errorMessages).forEach(errorMsgs::addAll); + assertThat(errorMsgs).containsExactlyInAnyOrder(expectedErrorMessages); + + final Optional result = configValues.stream() + .filter(cv -> cv.name().equals(configuration)) + .findAny(); + assertThat(result).withFailMessage("Config value not found").isNotEmpty(); + return result.get(); } } diff --git a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/AzureBlobSourceConnector.java b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/AzureBlobSourceConnector.java index 82ef42c16..97876c4ab 100644 --- a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/AzureBlobSourceConnector.java +++ b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/AzureBlobSourceConnector.java @@ -28,7 +28,7 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; -import io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfig; +import io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfigDef; import io.aiven.kafka.connect.azure.source.utils.VersionInfo; import org.slf4j.Logger; @@ -46,7 +46,7 @@ public class AzureBlobSourceConnector extends SourceConnector { @Override public ConfigDef config() { - return AzureBlobSourceConfig.configDef(); + return new AzureBlobSourceConfigDef(); } @Override diff --git a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobConfigFragment.java b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobConfigFragment.java index 9973832bc..4a45e46c4 100644 --- a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobConfigFragment.java +++ b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobConfigFragment.java @@ -17,14 +17,15 @@ package io.aiven.kafka.connect.azure.source.config; import java.time.Duration; +import java.util.regex.Pattern; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import io.aiven.kafka.connect.azure.source.utils.VersionInfo; import io.aiven.kafka.connect.common.config.ConfigFragment; import io.aiven.kafka.connect.common.config.FragmentDataAccess; +import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator; import com.azure.core.http.policy.ExponentialBackoffOptions; import com.azure.core.http.policy.HttpLogDetailLevel; @@ -33,6 +34,7 @@ import com.azure.core.http.policy.UserAgentPolicy; import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClientBuilder; + /** * The configuration fragment that defines the Azure specific characteristics. */ @@ -58,14 +60,47 @@ public class AzureBlobConfigFragment extends ConfigFragment { public static final long AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 32_000L; public static final int AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT = 6; + private final static Pattern CONTAINER_NAME_PATTERN = Pattern.compile("[0-9a-z][0-9a-z\\-]+[0-9a-z]"); + + /** + * From Azure documentation: + *
    + *
  • Container names must start or end with a letter or number, and can contain only letters, numbers, and the + * hyphen/minus (-) character.
  • + *
  • Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive + * hyphens aren't permitted in container names.
  • + *
  • All letters in a container name must be lowercase.
  • + *
  • Container names must be from 3 through 63 characters long.
  • + *
+ */ + private static final ConfigDef.Validator CONTAINER_NAME_VALIDATOR = ConfigDef.CompositeValidator + .of(ConfigDef.LambdaValidator.with((name, value) -> { + final int len = value == null ? 0 : value.toString().length(); + if (len < 3 || len > 63) { + throw new ConfigException(name, value, "names must be from 3 through 63 characters long."); + } + }, () -> "must be from 3 through 63 characters long"), ConfigDef.LambdaValidator.with((name, value) -> { + if (value.toString().contains("--")) { + throw new ConfigException(name, value, + "Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive hyphens aren't permitted in container names."); + } + }, () -> "consecutive hyphens aren't permitted in container names"), + // regex last for speed + ConfigDef.LambdaValidator.with((name, value) -> { + if (!CONTAINER_NAME_PATTERN.matcher(value.toString()).matches()) { + throw new ConfigException(name, value, + "must start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character."); + } + }, () -> "start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character")); + /** * Construct the Azure Blob ConfigFragment.. * - * @param cfg + * @param dataAccess * the configuration that this fragment is associated with. */ - protected AzureBlobConfigFragment(final AbstractConfig cfg) { - super(FragmentDataAccess.from(cfg)); + protected AzureBlobConfigFragment(final FragmentDataAccess dataAccess) { + super(dataAccess); } /** @@ -75,9 +110,9 @@ protected AzureBlobConfigFragment(final AbstractConfig cfg) { * the Configuration definition. * @return the update configuration definition */ - public static ConfigDef update(final ConfigDef configDef) { + public static ConfigDef update(final ConfigDef configDef, final boolean isSink) { addUserAgentConfig(configDef); - addAzureConfigGroup(configDef); + addAzureConfigGroup(configDef, isSink); addAzureRetryPolicies(configDef); return configDef; } @@ -87,67 +122,51 @@ private static void addUserAgentConfig(final ConfigDef configDef) { "A custom user agent used while contacting Azure"); } - private static void addAzureConfigGroup(final ConfigDef configDef) { + private static void addAzureConfigGroup(final ConfigDef configDef, final boolean isSink) { int azureGroupCounter = 0; - configDef.define(AZURE_STORAGE_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, - "Azure Storage connection string.", GROUP_AZURE, azureGroupCounter++, ConfigDef.Width.NONE, - AZURE_STORAGE_CONNECTION_STRING_CONFIG); + configDef.define(AZURE_STORAGE_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, "Azure Storage connection string.", GROUP_AZURE, ++azureGroupCounter, + ConfigDef.Width.NONE, AZURE_STORAGE_CONNECTION_STRING_CONFIG); configDef.define(AZURE_STORAGE_CONTAINER_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, + CONTAINER_NAME_VALIDATOR, ConfigDef.Importance.HIGH, "The Azure Blob container that files will be written to or read from.", GROUP_AZURE, - azureGroupCounter++, ConfigDef.Width.NONE, AZURE_STORAGE_CONTAINER_NAME_CONFIG); - configDef.define(AZURE_FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1), - ConfigDef.Importance.MEDIUM, "Azure fetch page size", GROUP_AZURE, azureGroupCounter++, - ConfigDef.Width.NONE, AZURE_FETCH_PAGE_SIZE); + ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_STORAGE_CONTAINER_NAME_CONFIG); + configDef.define(AZURE_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "Prefix for storage file names, generally specifies directory like" + " structures that do not contain any templated fields.", - GROUP_AZURE, azureGroupCounter++, ConfigDef.Width.NONE, AZURE_PREFIX_CONFIG); // NOPMD increment value - // never used - - configDef.define(AZURE_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1000, ConfigDef.Range.atLeast(1), - ConfigDef.Importance.MEDIUM, - "Azure fetch buffer size. This is the number of object keys kept in a buffer to ensure lexically older objet keys aren't skipped for processing if they are slower to upload.", - GROUP_AZURE, azureGroupCounter++, // NOPMD - // UnusedAssignment - ConfigDef.Width.NONE, AZURE_FETCH_BUFFER_SIZE); + GROUP_AZURE, ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_PREFIX_CONFIG); + if (!isSink) { + configDef.define(AZURE_FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, "Azure fetch page size", GROUP_AZURE, ++azureGroupCounter, + ConfigDef.Width.NONE, AZURE_FETCH_PAGE_SIZE); + + configDef.define(AZURE_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1000, ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, + "Azure fetch buffer size. This is the number of object keys kept in a buffer to ensure lexically older objet keys aren't skipped for processing if they are slower to upload.", + GROUP_AZURE, ++azureGroupCounter, ConfigDef.Width.NONE, AZURE_FETCH_BUFFER_SIZE); + } } private static void addAzureRetryPolicies(final ConfigDef configDef) { int retryPolicyGroupCounter = 0; configDef.define(AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, - "Initial retry delay in milliseconds. The default value is " - + AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, - AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG); + AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_DEFAULT, TimeScaleValidator.atLeast(0), + ConfigDef.Importance.MEDIUM, "Initial retry delay in milliseconds.", GROUP_AZURE_RETRY_BACKOFF_POLICY, + ++retryPolicyGroupCounter, ConfigDef.Width.NONE, AZURE_RETRY_BACKOFF_INITIAL_DELAY_MS_CONFIG); configDef.define(AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, - "Maximum retry delay in milliseconds. The default value is " + AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, - AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + AZURE_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, TimeScaleValidator.atLeast(0), ConfigDef.Importance.MEDIUM, + "Maximum retry delay in milliseconds.", GROUP_AZURE_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, + ConfigDef.Width.NONE, AZURE_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); configDef.define(AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG, ConfigDef.Type.INT, AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.MEDIUM, "Retry max attempts. The default value is " + AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_DEFAULT, - GROUP_AZURE_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, // NOPMD - // retryPolicyGroupCounter - // updated value - // never - // used + GROUP_AZURE_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, ConfigDef.Width.NONE, AZURE_RETRY_BACKOFF_MAX_ATTEMPTS_CONFIG); } - @Override - public void validate() { - final String connectionString = getString(AZURE_STORAGE_CONNECTION_STRING_CONFIG); - - if (connectionString == null) { - throw new ConfigException( - String.format("The configuration %s cannot be null.", AZURE_STORAGE_CONNECTION_STRING_CONFIG)); - } - } public int getAzureFetchPageSize() { return getInt(AZURE_FETCH_PAGE_SIZE); } diff --git a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfig.java b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfig.java index 2449481e2..9ba402b45 100644 --- a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfig.java +++ b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfig.java @@ -18,10 +18,8 @@ import java.util.Map; -import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.SourceCommonConfig; -import io.aiven.kafka.connect.common.config.SourceConfigFragment; -import io.aiven.kafka.connect.common.config.TransformerFragment; import com.azure.storage.blob.BlobServiceAsyncClient; @@ -30,22 +28,8 @@ public class AzureBlobSourceConfig extends SourceCommonConfig { // TODO AzureBlobFragment needs to be extracted from Azure Sink. private final AzureBlobConfigFragment azureBlobConfigFragment; public AzureBlobSourceConfig(final Map properties) { - super(configDef(), properties); - azureBlobConfigFragment = new AzureBlobConfigFragment(this); - validate(); - } - - public static AzureBlobSourceConfigDef configDef() { - - final var configDef = new AzureBlobSourceConfigDef(); - - FileNameFragment.update(configDef); - SourceConfigFragment.update(configDef); - TransformerFragment.update(configDef); - AzureBlobConfigFragment.update(configDef); - return configDef; - } - private void validate() { + super(new AzureBlobSourceConfigDef(), properties); + azureBlobConfigFragment = new AzureBlobConfigFragment(FragmentDataAccess.from(this)); } public int getAzureFetchPageSize() { diff --git a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfigDef.java b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfigDef.java index 0699125a9..6dac67b59 100644 --- a/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfigDef.java +++ b/azure-source-connector/src/main/java/io/aiven/kafka/connect/azure/source/config/AzureBlobSourceConfigDef.java @@ -16,8 +16,25 @@ package io.aiven.kafka.connect.azure.source.config; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigValue; + +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.SourceCommonConfig; -public class AzureBlobSourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { +public final class AzureBlobSourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { + + public AzureBlobSourceConfigDef() { + super(); + AzureBlobConfigFragment.update(this, false); + } + @Override + public Map multiValidate(final Map valueMap) { + final Map result = super.multiValidate(valueMap); + final FragmentDataAccess dataAccess = FragmentDataAccess.from(result); + new AzureBlobConfigFragment(dataAccess).validate(result); + return result; + } } diff --git a/azure-source-connector/src/templates/configData.md.vm b/azure-source-connector/src/templates/configData.md.vm index 8c81ec487..6e5d1e8cb 100644 --- a/azure-source-connector/src/templates/configData.md.vm +++ b/azure-source-connector/src/templates/configData.md.vm @@ -10,6 +10,6 @@ ${esc.hash}${esc.hash} ${section.displayName} - Valid values: ${section.validator|"no restrictions"} - Importance: $section.importance - $section.documentation + ${esc.markdown($section.documentation)} #end \ No newline at end of file diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfigFragment.java index 82bb28e80..87054a2b8 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfigFragment.java @@ -18,6 +18,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import java.util.Collections; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; @@ -52,13 +53,15 @@ public static ConfigDef update(final ConfigDef configDef) { int orderInGroup = 0; final String commonGroup = "commons"; - return configDef - .define(ConnectorConfig.TASKS_MAX_CONFIG, ConfigDef.Type.INT, 1, atLeast(1), ConfigDef.Importance.HIGH, - "Maximum number of tasks to use for this connector.", commonGroup, ++orderInGroup, - ConfigDef.Width.SHORT, ConnectorConfig.TASKS_MAX_CONFIG) - .define(TASK_ID, ConfigDef.Type.INT, 1, atLeast(0), ConfigDef.Importance.HIGH, - "The task ID that this connector is working with.", commonGroup, ++orderInGroup, - ConfigDef.Width.SHORT, TASK_ID); + configDef.define(ConnectorConfig.TASKS_MAX_CONFIG, ConfigDef.Type.INT, 1, atLeast(1), ConfigDef.Importance.HIGH, + "Maximum number of tasks to use for this connector.", commonGroup, ++orderInGroup, + ConfigDef.Width.SHORT, ConnectorConfig.TASKS_MAX_CONFIG); + + // make TASK_ID an internal configuration (not visible to users) + final ConfigDef.ConfigKey key = new ConfigDef.ConfigKey(TASK_ID, ConfigDef.Type.INT, 0, atLeast(0), + ConfigDef.Importance.HIGH, "The task ID that this connector is working with.", commonGroup, + ++orderInGroup, ConfigDef.Width.SHORT, TASK_ID, Collections.emptyList(), null, true); + return configDef.define(key); } /** diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java index 7c00b0362..a6f7711d3 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java @@ -255,7 +255,7 @@ public Setter withOutputFieldEncodingType(final OutputFieldEncodingType encoding } /** - * Sets the list of output fields. + * Sets the list of output fields. The order of output fields will match the order they are added. * * @param outputFields * the list of output fields @@ -267,7 +267,7 @@ public Setter withOutputFields(final List outputFields) { } /** - * Sets the list of output fields. + * Sets the list of output fields. The order of output fields will match the order they are added. * * @param outputFields * the list of output fields diff --git a/settings.gradle.kts b/settings.gradle.kts index 1d1a8826d..2567b6ade 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -43,7 +43,7 @@ dependencyResolutionManagement { "org.apache.commons:commons-collections4:$commonsCollections4Version") library("commons-io", "commons-io:commons-io:$commonsIOVersion") library("commons-lang3", "org.apache.commons:commons-lang3:$commonsLang3Version") - library("kafka-connect-api", "org.apache.kafka:connect-api:$kafkaAPIVersion") + library("kafka-connect-api", "org.apache.kafka:connect-api:$kafkaVersion") library("kafka-connect-json", "org.apache.kafka:connect-json:$kafkaVersion") library("kafka-connect-runtime", "org.apache.kafka:connect-runtime:$kafkaVersion") library("kafka-connect-transforms", "org.apache.kafka:connect-transforms:$kafkaVersion") diff --git a/site/src/main/java/io/aiven/kafka/connect/tools/ConfigDoc.java b/site/src/main/java/io/aiven/kafka/connect/tools/ConfigDoc.java index 1b421a23d..df0d8f567 100644 --- a/site/src/main/java/io/aiven/kafka/connect/tools/ConfigDoc.java +++ b/site/src/main/java/io/aiven/kafka/connect/tools/ConfigDoc.java @@ -74,11 +74,13 @@ public static void execute(final ConfigDef configDef, final String templateFile, final Collection keys = configDef.configKeys().values(); final Map sections = new TreeMap<>(); for (final ConfigDef.ConfigKey key : keys) { - sections.put(key.name, new ConfigData(key)); + if (!key.internalConfig) { + sections.put(key.name, new ConfigData(key)); + } } context.put("sections", sections.values()); - context.put("esc", new EscapeTool()); + context.put("esc", new Escaper()); final File file = new File(output); if (!file.getParentFile().exists() && !file.getParentFile().mkdirs()) { @@ -138,4 +140,67 @@ public static void main(final String[] args) throws IOException, ClassNotFoundEx } } + public static class Escaper extends EscapeTool { + + /** + * The characters to escape for markdown. + */ + private static final String[] MARKDOWN_CHARS = charParser("\\`*_{}[]<>()#+-.!|"); + /** + * The characters to escape for APT (Almost Plain Text). + */ + private static final String[] APT_CHARS = charParser("\\~=-+*[]<>{}"); + + private static String[] charParser(final String charText) { + final char[] chars = charText.toCharArray(); + final String[] result = new String[chars.length]; + for (int i = 0; i < chars.length; i++) { + result[i] = String.valueOf(chars[i]); + } + return result; + } + + /** + * Escapes a text string. + * + * @param text + * the text to escape. + * @param chars + * the characters to escape. + * @return the escaped string. + */ + private String escape(final String text, final String[] chars) { + if (text == null) { + return ""; + } + String result = text; + for (final String chrStr : chars) { + result = result.replace(chrStr, "\\" + chrStr); + } + return result; + } + + /** + * Escapes a string for markdown. + * + * @param text + * the text to escape. + * @return the text with the markdown specific characters escaped. + */ + public String markdown(final String text) { + return escape(text, MARKDOWN_CHARS); + } + + /** + * Escapes a string for APT (almost plain text). + * + * @param text + * the text to escape. + * @return the text with the APT specific characters escaped. + */ + public String apt(final String text) { + return escape(text, APT_CHARS); + } + + } }