From 1c68b9031dc215d4400a120fc0bd25a72c87b729 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Wed, 19 Nov 2025 12:35:30 +0000 Subject: [PATCH 01/13] complete build success # Conflicts: # commons/src/main/java/io/aiven/commons/collections/Scale.java # commons/src/main/java/io/aiven/kafka/connect/common/config/BackoffPolicyFragment.java # commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java # commons/src/main/java/io/aiven/kafka/connect/common/config/FragmentDataAccess.java # commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java # commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java # commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java # commons/src/main/java/io/aiven/kafka/connect/common/config/validators/FilenameTemplateValidator.java # commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java # commons/src/test/java/io/aiven/kafka/connect/common/config/FileNameFragmentTest.java # commons/src/test/java/io/aiven/kafka/connect/common/config/validators/FilenameTemplateValidatorTest.java # commons/src/test/java/io/aiven/kafka/connect/common/source/input/ExampleSourceRecordIteratorTest.java # gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java # s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java --- .../common/config/CompressionType.java | 17 -- .../common/output/avro/AvroOutputWriter.java | 122 +++---------- .../common/output/parquet/ParquetConfig.java | 12 +- .../source/AbstractSourceRecordIterator.java | 19 +- .../AbstractSourceRecordIteratorTest.java | 162 +----------------- 5 files changed, 37 insertions(+), 295 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java index 2de5216cb..669aca3fd 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java @@ -30,7 +30,6 @@ import com.github.luben.zstd.ZstdInputStream; import com.github.luben.zstd.ZstdOutputStream; import org.apache.commons.io.function.IOFunction; -import org.apache.commons.io.function.IOSupplier; import org.xerial.snappy.SnappyInputStream; import org.xerial.snappy.SnappyOutputStream; @@ -139,22 +138,6 @@ public final InputStream decompress(final InputStream input) throws IOException return decompressor.apply(input); } - /** - * Decompresses an input stream wrapped in an IOSupplier - * - * @param input - * the input stream to read compressed data from. - * @return An input stream that returns decompressed data. - */ - public final IOSupplier decompress(final IOSupplier input) { - return new IOSupplier() { - @Override - public InputStream get() throws IOException { - return decompress(input.get()); - } - }; - } - /** * Compresses an output stream. * diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java b/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java index f320a34cd..f9380b4c4 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; @@ -34,120 +35,49 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.io.output.CloseShieldOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * An instance of OutputWriter that writes to Avro files. - */ public final class AvroOutputWriter extends OutputWriter { private static final Logger LOGGER = LoggerFactory.getLogger(AvroOutputWriter.class); + private final AvroSchemaBuilder avroSchemaBuilder; + private final SinkRecordConverter sinkRecordConverter; + public AvroOutputWriter(final Collection fields, final OutputStream out, final Map externalConfig, final boolean envelopeEnabled) { - super(out, new AvroOutputStreamWriter(fields, externalConfig, envelopeEnabled), externalConfig); + super(out, new OutputStreamWriterStub(), externalConfig); + final AvroData avroData = new AvroData(new AvroDataConfig(externalConfig)); + this.sinkRecordConverter = new SinkRecordConverter(fields, avroData, envelopeEnabled); + this.avroSchemaBuilder = new AvroSchemaBuilder(fields, avroData, envelopeEnabled); } - /** - * An instance of OutputStreamWriter that handles writing the Avro format - */ - private static final class AvroOutputStreamWriter implements OutputStreamWriter { - /** - * The sink record converter for Avro. - */ - private final SinkRecordConverter sinkRecordConverter; - /** - * The Avro schema builder. - */ - private final AvroSchemaBuilder avroSchemaBuilder; - /** - * The Avro configuration. - */ - private final AvroConfig avroConfiguration; + @Override + public void writeRecords(final Collection sinkRecords) throws IOException { + final AvroConfig avroConfiguration = AvroConfig.createAvroConfiguration(externalConfiguration); + final Schema avroSchema = avroSchemaBuilder.buildSchema(sinkRecords.iterator().next()); + LOGGER.debug("Record schema is: {}", avroSchema); - /** - * Lazily constructed Avro schema used in the output stream. - */ - private Schema avroSchema; - /** - * Lazily constructed Avro DataFileWriter. - */ - private DataFileWriter dataFileWriter; - - /** - * Constructor. - * - * @param fields - * the fields to output. - * @param externalConfig - * the configuration data for the Avro configuration. - * @param envelopeEnabled - * {@code true if the envelope is enabled} - */ - AvroOutputStreamWriter(final Collection fields, final Map externalConfig, - final boolean envelopeEnabled) { - final AvroData avroData = new AvroData(new AvroDataConfig(externalConfig)); - this.sinkRecordConverter = new SinkRecordConverter(fields, avroData, envelopeEnabled); - this.avroSchemaBuilder = new AvroSchemaBuilder(fields, avroData, envelopeEnabled); - this.avroConfiguration = AvroConfig.createAvroConfiguration(externalConfig); - } - - /** - * Create the data file writer if it does not exist. Requires that {@link #getAvroSchema(SinkRecord)} be called - * at least once prior. - * - * @return the DataFileWriter. - * @throws IOException - * if the writer can not be created. - */ - private DataFileWriter getDataFileWriter(final OutputStream outputStream) throws IOException { - if (dataFileWriter == null) { - final GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); - dataFileWriter = new DataFileWriter<>(writer); - dataFileWriter.setCodec(avroConfiguration.codecFactory()); - // create with output stream that does not close the underlying stream. - dataFileWriter.create(avroSchema, CloseShieldOutputStream.wrap(outputStream)); + final GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + try (DataFileWriter dataFileWriter = new DataFileWriter<>(writer)) { + dataFileWriter.setCodec(avroConfiguration.codecFactory()); + dataFileWriter.create(avroSchema, outputStream); + for (final SinkRecord record : sinkRecords) { + final GenericRecord datum = sinkRecordConverter.convert(record, avroSchema); + dataFileWriter.append(datum); } - return dataFileWriter; } + } - /** - * Creates the Avro schema if necessary. Will throw an exception if the record schema does not match the output - * Avro schema. - * - * @param sinkRecord - * the record to be written. - * @return the file Avro schema. - * @throws IOException - * if the record schema does not match the file schema. - */ - private Schema getAvroSchema(final SinkRecord sinkRecord) throws IOException { - if (avroSchema == null) { - avroSchema = avroSchemaBuilder.buildSchema(sinkRecord); - LOGGER.debug("Record schema is: {}", avroSchema); - } else { - final Schema otherSchema = avroSchemaBuilder.buildSchema(sinkRecord); - if (!avroSchema.equals(otherSchema)) { - LOGGER.error("Illegal Schema Change. {}", otherSchema); - throw new IOException("Illegal schema change"); - } - } - return avroSchema; - } + @Override + public void writeRecord(final SinkRecord record) throws IOException { + writeRecords(List.of(record)); + } + private static final class OutputStreamWriterStub implements OutputStreamWriter { @Override public void writeOneRecord(final OutputStream outputStream, final SinkRecord record) throws IOException { - final GenericRecord datum = sinkRecordConverter.convert(record, getAvroSchema(record)); - getDataFileWriter(outputStream).append(datum); - } - - @Override - public void stopWriting(final OutputStream outputStream) throws IOException { - if (dataFileWriter != null) { - dataFileWriter.close(); - } } } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java index 6d1fa5f55..eaa692214 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -public final class ParquetConfig extends AbstractConfig { +final class ParquetConfig extends AbstractConfig { public ParquetConfig(final Map originals) { super(new ConfigDef(), originals); @@ -50,13 +50,10 @@ public Configuration parquetConfiguration() { } public CompressionCodecName compressionCodecName() { - return compressionCodecName(CompressionType.forName( + final var connectorCompressionType = CompressionType.forName( originals().getOrDefault(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG, CompressionType.NONE.name) - .toString())); - } - - public static CompressionCodecName compressionCodecName(final CompressionType compressionType) { - switch (compressionType) { + .toString()); + switch (connectorCompressionType) { case GZIP : return CompressionCodecName.GZIP; case SNAPPY : @@ -67,4 +64,5 @@ public static CompressionCodecName compressionCodecName(final CompressionType co return CompressionCodecName.UNCOMPRESSED; } } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java index 0abb5565c..6c88ca1cf 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java @@ -27,17 +27,14 @@ import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.commons.collections.RingBuffer; -import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.config.SourceConfigFragment; -import io.aiven.kafka.connect.common.source.input.ParquetTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; import io.aiven.kafka.connect.common.source.task.Context; import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.common.source.task.DistributionType; -import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.ObjectUtils; @@ -239,25 +236,17 @@ final public void remove() { * the SourceRecord that drives the creation of source records with values. * @return a stream of T created from the input stream of the native item. */ - @VisibleForTesting - Stream convert(final T sourceRecord) { + private Stream convert(final T sourceRecord) { sourceRecord .setKeyData(transformer.getKeyData(sourceRecord.getNativeKey(), sourceRecord.getTopic(), sourceConfig)); lastSeenNativeKey = sourceRecord.getNativeKey(); - // parquet handles compression internally. - final CompressionType compressionType = transformer instanceof ParquetTransformer - ? CompressionType.NONE - : sourceConfig.getCompressionType(); - // create an IOSupplier with the specified compression - final IOSupplier inputStream = transformer instanceof ParquetTransformer - ? getInputStream(sourceRecord) - : compressionType.decompress(getInputStream(sourceRecord)); return transformer - .getRecords(inputStream, sourceRecord.getNativeItemSize(), sourceRecord.getContext(), sourceConfig, - sourceRecord.getRecordCount()) + .getRecords(getInputStream(sourceRecord), sourceRecord.getNativeItemSize(), sourceRecord.getContext(), + sourceConfig, sourceRecord.getRecordCount()) .map(new Mapper(sourceRecord)); + } /** diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java index 7a913ab81..7d3ac0fde 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -39,31 +38,18 @@ import java.util.NoSuchElementException; import java.util.Queue; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.json.JsonDeserializer; -import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; -import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.common.config.FormatType; -import io.aiven.kafka.connect.common.config.OutputField; -import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; -import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.format.AvroTestDataFixture; import io.aiven.kafka.connect.common.format.JsonTestDataFixture; import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; -import io.aiven.kafka.connect.common.output.OutputWriter; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.common.source.task.DistributionType; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -83,7 +69,6 @@ * @param * The concrete implementation of the {@link AbstractSourceRecord} . */ -@SuppressWarnings("PMD.ExcessiveImports") public abstract class AbstractSourceRecordIteratorTest, N, O extends OffsetManager.OffsetManagerEntry, T extends AbstractSourceRecord> { /** The offset manager */ private OffsetManager offsetManager; @@ -115,7 +100,7 @@ public abstract class AbstractSourceRecordIteratorTest, * @param offsetManager * A mock offset manager. * @param transformer - * The transformer to use for the test. + * The trnasformer to use for the test. * @return A configured AbstractSourceRecordIterator. */ abstract protected AbstractSourceRecordIterator createSourceRecordIterator( @@ -150,7 +135,7 @@ public void setUp() { } /** - * Create a mock SourceConfig with our necessary items added. + * Create a mock SourceCOnfig with our necessary items added. * * @param filePattern * The file pattern to match. @@ -195,7 +180,6 @@ void testOneObjectReturnsOneObject(final InputFormat format, final byte[] data) final Transformer transformer = TransformerFactory.getTransformer(format); final SourceCommonConfig mockConfig = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(mockConfig.getInputFormat()).thenReturn(format); - when(mockConfig.getCompressionType()).thenReturn(CompressionType.NONE); // verify one data has one data createClientMutator().reset().addObject(key, ByteBuffer.wrap(data)).endOfBlock().build(); @@ -261,8 +245,6 @@ void testMultipleRecordsReturned(final InputFormat format, final byte[] data) { final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(config.getTransformerMaxBufferSize()).thenReturn(4096); when(config.getInputFormat()).thenReturn(format); - when(config.getCompressionType()).thenReturn(CompressionType.NONE); - final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, transformer); @@ -333,8 +315,6 @@ void testIteratorProcessesMultipleObjectsFromByteArrayTransformer() { final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(config.getTransformerMaxBufferSize()).thenReturn(4096); when(config.getInputFormat()).thenReturn(InputFormat.BYTES); - when(config.getCompressionType()).thenReturn(CompressionType.NONE); - final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, transformer); @@ -388,144 +368,6 @@ static List parameterizedNativeStartKey() { return List.of(Arguments.of("startKeyOne", 2), Arguments.of("startKeyOne", 2), Arguments.of(null, 1)); } - /** - * Gets a configured Transformer. - * - * @param formatType - * The input format for the transformer. - * @return the Transformer for the specified input format. - */ - private static InputFormat formatTypeConversion(final FormatType formatType) { - switch (formatType) { - case AVRO : - return InputFormat.AVRO; - case PARQUET : - return InputFormat.PARQUET; - case JSONL : - return InputFormat.JSONL; - case CSV : - case JSON : - return InputFormat.BYTES; - default : - throw new IllegalArgumentException("Unknown format type in configuration: " + formatType); - } - } - @ParameterizedTest - @MethodSource("testDecompressionData") - @SuppressWarnings("PMD.NcssCount") - void testDecompression(final FormatType formatType, final CompressionType compressionType) throws IOException { - // setup the data - final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); - when(config.getTransformerMaxBufferSize()).thenReturn(4096); - when(config.getCompressionType()).thenReturn(compressionType); - when(config.getInputFormat()).thenReturn(formatTypeConversion(formatType)); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - Object value = null; - Schema valueSchema = null; - try (OutputWriter outputWriter = OutputWriter.builder() - .withCompressionType(compressionType) - .withOutputFields( - Collections.singletonList(new OutputField(OutputFieldType.VALUE, OutputFieldEncodingType.NONE))) - .build(byteArrayOutputStream, formatType)) { - - switch (formatType) { - case AVRO : - valueSchema = SchemaBuilder.struct() - .field("message", SchemaBuilder.STRING_SCHEMA) - .field("id", SchemaBuilder.INT32_SCHEMA); - value = new Struct(valueSchema).put("message", "Hello").put("id", 1); - break; - case PARQUET : - valueSchema = ParquetTestDataFixture.PARQUET_SCHEMA; - value = new Struct(valueSchema).put("name", "TheDude") - .put("age", 32) - .put("email", "thedude@example.com"); - break; - case JSONL : - valueSchema = SchemaBuilder.struct() - .field("message", SchemaBuilder.STRING_SCHEMA) - .field("id", SchemaBuilder.INT32_SCHEMA); - value = new Struct(valueSchema).put("message", "Hello").put("id", 2); - break; - case CSV : - valueSchema = SchemaBuilder.BYTES_SCHEMA; - value = "'test','one'".getBytes(StandardCharsets.UTF_8); - break; - case JSON : - valueSchema = SchemaBuilder.STRING_SCHEMA; - value = "json is here"; - break; - default : - throw new IllegalArgumentException("Unknown format type: " + formatType); - } - - final SinkRecord sinkRecord = new SinkRecord("testTopic", 0, Schema.STRING_SCHEMA, "testRecord", - valueSchema, value, 0); - outputWriter.writeRecord(sinkRecord); - } - createClientMutator().addObject(key, ByteBuffer.wrap(byteArrayOutputStream.toByteArray())).endOfBlock().build(); - final Transformer transformer = TransformerFactory.getTransformer(formatTypeConversion(formatType)); - - // Start the test - final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, - transformer); - assertThat(iterator).hasNext(); - final T sourceRecord = iterator.next(); - assertThat(sourceRecord).isNotNull(); - switch (formatType) { - case AVRO : - case PARQUET : - Struct struct = (Struct) sourceRecord.getValue().value(); - struct = (Struct) struct.get("value"); - assertEquivalent(valueSchema, struct.schema()); - for (final Field field : valueSchema.fields()) { - assertThat(struct.get(field)).describedAs("field: " + field).isEqualTo(((Struct) value).get(field)); - } - break; - case CSV : - assertThat(sourceRecord.getValue().schema()).isNull(); - assertThat(sourceRecord.getValue().value()).isEqualTo(value); - break; - case JSON : - assertThat(sourceRecord.getValue().schema()).isNull(); - try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) { - final ArrayNode arrayNode = (ArrayNode) jsonDeserializer.deserialize("topic", - (byte[]) sourceRecord.getValue().value()); - assertThat(arrayNode.size()).isEqualTo(1); - assertThat(arrayNode.get(0).get("value").asText()) - .describedAs(new String((byte[]) sourceRecord.getValue().value(), StandardCharsets.UTF_8) - + " == " + String.format("[%n{\"value\":\"%s\"}%n]", value)) - .isEqualTo(value); - } - break; - case JSONL : - assertThat(sourceRecord.getValue().schema()).isNull(); - Map values = (Map) sourceRecord.getValue().value(); - values = (Map) values.get("value"); - assertThat(values.get("id")).isEqualTo(2L); - assertThat(values.get("message")).isEqualTo("Hello"); - break; - default : - throw new IllegalArgumentException("Unknown format type: " + formatType); - } - } - - private void assertEquivalent(final Schema expected, final Schema actual) { - assertThat(actual.type()).isEqualTo(expected.type()); - assertThat(actual.fields()).containsExactlyElementsOf(expected.fields()); - } - - static List testDecompressionData() { - final List result = new ArrayList<>(); - for (final FormatType formatType : FormatType.values()) { - for (final CompressionType compressionType : CompressionType.values()) { - result.add(Arguments.of(formatType, compressionType)); - } - } - return result; - } - /** * A mutator of the mocked client used by the iterator under test. *

From c2ade3df5a82a32e1f88cf5596aff24a35e7947e Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Wed, 19 Nov 2025 13:51:47 +0000 Subject: [PATCH 02/13] cleaned up PR --- .../common/config/CompressionType.java | 17 ++ .../common/config/SinkCommonConfig.java | 5 - .../common/output/avro/AvroOutputWriter.java | 122 ++++++++++--- .../common/output/parquet/ParquetConfig.java | 12 +- .../source/AbstractSourceRecordIterator.java | 19 +- .../AbstractSourceRecordIteratorTest.java | 162 +++++++++++++++++- 6 files changed, 295 insertions(+), 42 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java index 669aca3fd..2de5216cb 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java @@ -30,6 +30,7 @@ import com.github.luben.zstd.ZstdInputStream; import com.github.luben.zstd.ZstdOutputStream; import org.apache.commons.io.function.IOFunction; +import org.apache.commons.io.function.IOSupplier; import org.xerial.snappy.SnappyInputStream; import org.xerial.snappy.SnappyOutputStream; @@ -138,6 +139,22 @@ public final InputStream decompress(final InputStream input) throws IOException return decompressor.apply(input); } + /** + * Decompresses an input stream wrapped in an IOSupplier + * + * @param input + * the input stream to read compressed data from. + * @return An input stream that returns decompressed data. + */ + public final IOSupplier decompress(final IOSupplier input) { + return new IOSupplier() { + @Override + public InputStream get() throws IOException { + return decompress(input.get()); + } + }; + } + /** * Compresses an output stream. * diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java index 956c2093f..7fb68a2b2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java @@ -142,11 +142,6 @@ public final TimestampSource getFilenameTimestampSource() { return fileNameFragment.getFilenameTimestampSource(); } - /** - * Gets the maximum records allowed in a single file. - * - * @return the maximum records allowed in a single file. - */ public final int getMaxRecordsPerFile() { return fileNameFragment.getMaxRecordsPerFile(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java b/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java index f9380b4c4..f320a34cd 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/output/avro/AvroOutputWriter.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; -import java.util.List; import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; @@ -35,49 +34,120 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.output.CloseShieldOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * An instance of OutputWriter that writes to Avro files. + */ public final class AvroOutputWriter extends OutputWriter { private static final Logger LOGGER = LoggerFactory.getLogger(AvroOutputWriter.class); - private final AvroSchemaBuilder avroSchemaBuilder; - private final SinkRecordConverter sinkRecordConverter; - public AvroOutputWriter(final Collection fields, final OutputStream out, final Map externalConfig, final boolean envelopeEnabled) { - super(out, new OutputStreamWriterStub(), externalConfig); - final AvroData avroData = new AvroData(new AvroDataConfig(externalConfig)); - this.sinkRecordConverter = new SinkRecordConverter(fields, avroData, envelopeEnabled); - this.avroSchemaBuilder = new AvroSchemaBuilder(fields, avroData, envelopeEnabled); + super(out, new AvroOutputStreamWriter(fields, externalConfig, envelopeEnabled), externalConfig); } - @Override - public void writeRecords(final Collection sinkRecords) throws IOException { - final AvroConfig avroConfiguration = AvroConfig.createAvroConfiguration(externalConfiguration); - final Schema avroSchema = avroSchemaBuilder.buildSchema(sinkRecords.iterator().next()); - LOGGER.debug("Record schema is: {}", avroSchema); + /** + * An instance of OutputStreamWriter that handles writing the Avro format + */ + private static final class AvroOutputStreamWriter implements OutputStreamWriter { + /** + * The sink record converter for Avro. + */ + private final SinkRecordConverter sinkRecordConverter; + /** + * The Avro schema builder. + */ + private final AvroSchemaBuilder avroSchemaBuilder; + /** + * The Avro configuration. + */ + private final AvroConfig avroConfiguration; - final GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); - try (DataFileWriter dataFileWriter = new DataFileWriter<>(writer)) { - dataFileWriter.setCodec(avroConfiguration.codecFactory()); - dataFileWriter.create(avroSchema, outputStream); - for (final SinkRecord record : sinkRecords) { - final GenericRecord datum = sinkRecordConverter.convert(record, avroSchema); - dataFileWriter.append(datum); + /** + * Lazily constructed Avro schema used in the output stream. + */ + private Schema avroSchema; + /** + * Lazily constructed Avro DataFileWriter. + */ + private DataFileWriter dataFileWriter; + + /** + * Constructor. + * + * @param fields + * the fields to output. + * @param externalConfig + * the configuration data for the Avro configuration. + * @param envelopeEnabled + * {@code true if the envelope is enabled} + */ + AvroOutputStreamWriter(final Collection fields, final Map externalConfig, + final boolean envelopeEnabled) { + final AvroData avroData = new AvroData(new AvroDataConfig(externalConfig)); + this.sinkRecordConverter = new SinkRecordConverter(fields, avroData, envelopeEnabled); + this.avroSchemaBuilder = new AvroSchemaBuilder(fields, avroData, envelopeEnabled); + this.avroConfiguration = AvroConfig.createAvroConfiguration(externalConfig); + } + + /** + * Create the data file writer if it does not exist. Requires that {@link #getAvroSchema(SinkRecord)} be called + * at least once prior. + * + * @return the DataFileWriter. + * @throws IOException + * if the writer can not be created. + */ + private DataFileWriter getDataFileWriter(final OutputStream outputStream) throws IOException { + if (dataFileWriter == null) { + final GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + dataFileWriter = new DataFileWriter<>(writer); + dataFileWriter.setCodec(avroConfiguration.codecFactory()); + // create with output stream that does not close the underlying stream. + dataFileWriter.create(avroSchema, CloseShieldOutputStream.wrap(outputStream)); } + return dataFileWriter; } - } - @Override - public void writeRecord(final SinkRecord record) throws IOException { - writeRecords(List.of(record)); - } + /** + * Creates the Avro schema if necessary. Will throw an exception if the record schema does not match the output + * Avro schema. + * + * @param sinkRecord + * the record to be written. + * @return the file Avro schema. + * @throws IOException + * if the record schema does not match the file schema. + */ + private Schema getAvroSchema(final SinkRecord sinkRecord) throws IOException { + if (avroSchema == null) { + avroSchema = avroSchemaBuilder.buildSchema(sinkRecord); + LOGGER.debug("Record schema is: {}", avroSchema); + } else { + final Schema otherSchema = avroSchemaBuilder.buildSchema(sinkRecord); + if (!avroSchema.equals(otherSchema)) { + LOGGER.error("Illegal Schema Change. {}", otherSchema); + throw new IOException("Illegal schema change"); + } + } + return avroSchema; + } - private static final class OutputStreamWriterStub implements OutputStreamWriter { @Override public void writeOneRecord(final OutputStream outputStream, final SinkRecord record) throws IOException { + final GenericRecord datum = sinkRecordConverter.convert(record, getAvroSchema(record)); + getDataFileWriter(outputStream).append(datum); + } + + @Override + public void stopWriting(final OutputStream outputStream) throws IOException { + if (dataFileWriter != null) { + dataFileWriter.close(); + } } } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java index eaa692214..6d1fa5f55 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/output/parquet/ParquetConfig.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -final class ParquetConfig extends AbstractConfig { +public final class ParquetConfig extends AbstractConfig { public ParquetConfig(final Map originals) { super(new ConfigDef(), originals); @@ -50,10 +50,13 @@ public Configuration parquetConfiguration() { } public CompressionCodecName compressionCodecName() { - final var connectorCompressionType = CompressionType.forName( + return compressionCodecName(CompressionType.forName( originals().getOrDefault(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG, CompressionType.NONE.name) - .toString()); - switch (connectorCompressionType) { + .toString())); + } + + public static CompressionCodecName compressionCodecName(final CompressionType compressionType) { + switch (compressionType) { case GZIP : return CompressionCodecName.GZIP; case SNAPPY : @@ -64,5 +67,4 @@ public CompressionCodecName compressionCodecName() { return CompressionCodecName.UNCOMPRESSED; } } - } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java index 6c88ca1cf..0abb5565c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java @@ -27,14 +27,17 @@ import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.commons.collections.RingBuffer; +import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.config.SourceConfigFragment; +import io.aiven.kafka.connect.common.source.input.ParquetTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; import io.aiven.kafka.connect.common.source.task.Context; import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.common.source.task.DistributionType; +import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.ObjectUtils; @@ -236,17 +239,25 @@ final public void remove() { * the SourceRecord that drives the creation of source records with values. * @return a stream of T created from the input stream of the native item. */ - private Stream convert(final T sourceRecord) { + @VisibleForTesting + Stream convert(final T sourceRecord) { sourceRecord .setKeyData(transformer.getKeyData(sourceRecord.getNativeKey(), sourceRecord.getTopic(), sourceConfig)); lastSeenNativeKey = sourceRecord.getNativeKey(); + // parquet handles compression internally. + final CompressionType compressionType = transformer instanceof ParquetTransformer + ? CompressionType.NONE + : sourceConfig.getCompressionType(); + // create an IOSupplier with the specified compression + final IOSupplier inputStream = transformer instanceof ParquetTransformer + ? getInputStream(sourceRecord) + : compressionType.decompress(getInputStream(sourceRecord)); return transformer - .getRecords(getInputStream(sourceRecord), sourceRecord.getNativeItemSize(), sourceRecord.getContext(), - sourceConfig, sourceRecord.getRecordCount()) + .getRecords(inputStream, sourceRecord.getNativeItemSize(), sourceRecord.getContext(), sourceConfig, + sourceRecord.getRecordCount()) .map(new Mapper(sourceRecord)); - } /** diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java index 7d3ac0fde..7a913ab81 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIteratorTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -38,18 +39,31 @@ import java.util.NoSuchElementException; import java.util.Queue; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputField; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.format.AvroTestDataFixture; import io.aiven.kafka.connect.common.format.JsonTestDataFixture; import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; +import io.aiven.kafka.connect.common.output.OutputWriter; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.common.source.task.DistributionType; +import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,6 +83,7 @@ * @param * The concrete implementation of the {@link AbstractSourceRecord} . */ +@SuppressWarnings("PMD.ExcessiveImports") public abstract class AbstractSourceRecordIteratorTest, N, O extends OffsetManager.OffsetManagerEntry, T extends AbstractSourceRecord> { /** The offset manager */ private OffsetManager offsetManager; @@ -100,7 +115,7 @@ public abstract class AbstractSourceRecordIteratorTest, * @param offsetManager * A mock offset manager. * @param transformer - * The trnasformer to use for the test. + * The transformer to use for the test. * @return A configured AbstractSourceRecordIterator. */ abstract protected AbstractSourceRecordIterator createSourceRecordIterator( @@ -135,7 +150,7 @@ public void setUp() { } /** - * Create a mock SourceCOnfig with our necessary items added. + * Create a mock SourceConfig with our necessary items added. * * @param filePattern * The file pattern to match. @@ -180,6 +195,7 @@ void testOneObjectReturnsOneObject(final InputFormat format, final byte[] data) final Transformer transformer = TransformerFactory.getTransformer(format); final SourceCommonConfig mockConfig = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(mockConfig.getInputFormat()).thenReturn(format); + when(mockConfig.getCompressionType()).thenReturn(CompressionType.NONE); // verify one data has one data createClientMutator().reset().addObject(key, ByteBuffer.wrap(data)).endOfBlock().build(); @@ -245,6 +261,8 @@ void testMultipleRecordsReturned(final InputFormat format, final byte[] data) { final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(config.getTransformerMaxBufferSize()).thenReturn(4096); when(config.getInputFormat()).thenReturn(format); + when(config.getCompressionType()).thenReturn(CompressionType.NONE); + final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, transformer); @@ -315,6 +333,8 @@ void testIteratorProcessesMultipleObjectsFromByteArrayTransformer() { final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); when(config.getTransformerMaxBufferSize()).thenReturn(4096); when(config.getInputFormat()).thenReturn(InputFormat.BYTES); + when(config.getCompressionType()).thenReturn(CompressionType.NONE); + final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, transformer); @@ -368,6 +388,144 @@ static List parameterizedNativeStartKey() { return List.of(Arguments.of("startKeyOne", 2), Arguments.of("startKeyOne", 2), Arguments.of(null, 1)); } + /** + * Gets a configured Transformer. + * + * @param formatType + * The input format for the transformer. + * @return the Transformer for the specified input format. + */ + private static InputFormat formatTypeConversion(final FormatType formatType) { + switch (formatType) { + case AVRO : + return InputFormat.AVRO; + case PARQUET : + return InputFormat.PARQUET; + case JSONL : + return InputFormat.JSONL; + case CSV : + case JSON : + return InputFormat.BYTES; + default : + throw new IllegalArgumentException("Unknown format type in configuration: " + formatType); + } + } + @ParameterizedTest + @MethodSource("testDecompressionData") + @SuppressWarnings("PMD.NcssCount") + void testDecompression(final FormatType formatType, final CompressionType compressionType) throws IOException { + // setup the data + final SourceCommonConfig config = mockSourceConfig(FILE_PATTERN, 0, 1, null); + when(config.getTransformerMaxBufferSize()).thenReturn(4096); + when(config.getCompressionType()).thenReturn(compressionType); + when(config.getInputFormat()).thenReturn(formatTypeConversion(formatType)); + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + Object value = null; + Schema valueSchema = null; + try (OutputWriter outputWriter = OutputWriter.builder() + .withCompressionType(compressionType) + .withOutputFields( + Collections.singletonList(new OutputField(OutputFieldType.VALUE, OutputFieldEncodingType.NONE))) + .build(byteArrayOutputStream, formatType)) { + + switch (formatType) { + case AVRO : + valueSchema = SchemaBuilder.struct() + .field("message", SchemaBuilder.STRING_SCHEMA) + .field("id", SchemaBuilder.INT32_SCHEMA); + value = new Struct(valueSchema).put("message", "Hello").put("id", 1); + break; + case PARQUET : + valueSchema = ParquetTestDataFixture.PARQUET_SCHEMA; + value = new Struct(valueSchema).put("name", "TheDude") + .put("age", 32) + .put("email", "thedude@example.com"); + break; + case JSONL : + valueSchema = SchemaBuilder.struct() + .field("message", SchemaBuilder.STRING_SCHEMA) + .field("id", SchemaBuilder.INT32_SCHEMA); + value = new Struct(valueSchema).put("message", "Hello").put("id", 2); + break; + case CSV : + valueSchema = SchemaBuilder.BYTES_SCHEMA; + value = "'test','one'".getBytes(StandardCharsets.UTF_8); + break; + case JSON : + valueSchema = SchemaBuilder.STRING_SCHEMA; + value = "json is here"; + break; + default : + throw new IllegalArgumentException("Unknown format type: " + formatType); + } + + final SinkRecord sinkRecord = new SinkRecord("testTopic", 0, Schema.STRING_SCHEMA, "testRecord", + valueSchema, value, 0); + outputWriter.writeRecord(sinkRecord); + } + createClientMutator().addObject(key, ByteBuffer.wrap(byteArrayOutputStream.toByteArray())).endOfBlock().build(); + final Transformer transformer = TransformerFactory.getTransformer(formatTypeConversion(formatType)); + + // Start the test + final AbstractSourceRecordIterator iterator = createSourceRecordIterator(config, offsetManager, + transformer); + assertThat(iterator).hasNext(); + final T sourceRecord = iterator.next(); + assertThat(sourceRecord).isNotNull(); + switch (formatType) { + case AVRO : + case PARQUET : + Struct struct = (Struct) sourceRecord.getValue().value(); + struct = (Struct) struct.get("value"); + assertEquivalent(valueSchema, struct.schema()); + for (final Field field : valueSchema.fields()) { + assertThat(struct.get(field)).describedAs("field: " + field).isEqualTo(((Struct) value).get(field)); + } + break; + case CSV : + assertThat(sourceRecord.getValue().schema()).isNull(); + assertThat(sourceRecord.getValue().value()).isEqualTo(value); + break; + case JSON : + assertThat(sourceRecord.getValue().schema()).isNull(); + try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) { + final ArrayNode arrayNode = (ArrayNode) jsonDeserializer.deserialize("topic", + (byte[]) sourceRecord.getValue().value()); + assertThat(arrayNode.size()).isEqualTo(1); + assertThat(arrayNode.get(0).get("value").asText()) + .describedAs(new String((byte[]) sourceRecord.getValue().value(), StandardCharsets.UTF_8) + + " == " + String.format("[%n{\"value\":\"%s\"}%n]", value)) + .isEqualTo(value); + } + break; + case JSONL : + assertThat(sourceRecord.getValue().schema()).isNull(); + Map values = (Map) sourceRecord.getValue().value(); + values = (Map) values.get("value"); + assertThat(values.get("id")).isEqualTo(2L); + assertThat(values.get("message")).isEqualTo("Hello"); + break; + default : + throw new IllegalArgumentException("Unknown format type: " + formatType); + } + } + + private void assertEquivalent(final Schema expected, final Schema actual) { + assertThat(actual.type()).isEqualTo(expected.type()); + assertThat(actual.fields()).containsExactlyElementsOf(expected.fields()); + } + + static List testDecompressionData() { + final List result = new ArrayList<>(); + for (final FormatType formatType : FormatType.values()) { + for (final CompressionType compressionType : CompressionType.values()) { + result.add(Arguments.of(formatType, compressionType)); + } + } + return result; + } + /** * A mutator of the mocked client used by the iterator under test. *

From 375a8a6c398381d43638cd539dfec7cfdf870f03 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Wed, 19 Nov 2025 14:09:09 +0000 Subject: [PATCH 03/13] clean up spotless --- .../aiven/kafka/connect/common/config/SinkCommonConfig.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java index 7fb68a2b2..956c2093f 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java @@ -142,6 +142,11 @@ public final TimestampSource getFilenameTimestampSource() { return fileNameFragment.getFilenameTimestampSource(); } + /** + * Gets the maximum records allowed in a single file. + * + * @return the maximum records allowed in a single file. + */ public final int getMaxRecordsPerFile() { return fileNameFragment.getMaxRecordsPerFile(); } From c31d693e77778cf4eae900d71012490f1c677d0a Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Fri, 21 Nov 2025 13:06:41 +0000 Subject: [PATCH 04/13] updated backoffpolicyfragment --- .../java/io/aiven/commons/collections/TimeScale.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/commons/src/main/java/io/aiven/commons/collections/TimeScale.java b/commons/src/main/java/io/aiven/commons/collections/TimeScale.java index bd05292cf..42ddc30e0 100644 --- a/commons/src/main/java/io/aiven/commons/collections/TimeScale.java +++ b/commons/src/main/java/io/aiven/commons/collections/TimeScale.java @@ -33,9 +33,12 @@ public enum TimeScale { public String format(final long milliseconds) { return String.format("%s %s", milliseconds, unitName()); } - }, - SECONDS(MILLISECONDS.milliseconds * 1000), MINUTES(SECONDS.milliseconds * 60), HOURS( - MINUTES.milliseconds * 60), DAYS(HOURS.milliseconds * 24); + }, // + SECONDS(MILLISECONDS.milliseconds * 1000), // + MINUTES( SECONDS.milliseconds * 60), // + HOURS(MINUTES.milliseconds * 60 ), // + DAYS(HOURS.milliseconds * 24); + /** * The Decimal format for the TimeUnit displays. From f4e45cf89556cc449878948e96d7e1c4f37abe9f Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Wed, 5 Nov 2025 13:41:39 +0000 Subject: [PATCH 05/13] Added S3 sink configuration documentation. * Added Scale class to present file sizes and memory sizes in human readable form. * fixed validators to show correct information. * Updated the S3SinkConfigDef so that it contains all the S3SinkConfiguration definition informtion. * Removed S3Sink configuration information out of S3SinkConfig # Conflicts: # commons/src/main/java/io/aiven/commons/collections/Scale.java # commons/src/main/java/io/aiven/kafka/connect/common/config/validators/ScaleValidator.java # s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java # s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java --- .../validators/OutputFieldsValidator.java | 5 ++ .../connect/config/s3/S3ConfigFragment.java | 2 +- s3-sink-connector/build.gradle.kts | 47 +++++++++++++ .../s3/AivenKafkaConnectS3SinkConnector.java | 5 +- .../kafka/connect/s3/S3OutputStream.java | 3 +- .../kafka/connect/s3/config/S3SinkConfig.java | 68 +------------------ .../connect/s3/config/S3SinkConfigDef.java | 56 +++++++++++++++ site/pom.xml | 2 +- 8 files changed, 116 insertions(+), 72 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/OutputFieldsValidator.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/OutputFieldsValidator.java index f07ed852a..36749b3ef 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/OutputFieldsValidator.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/OutputFieldsValidator.java @@ -44,4 +44,9 @@ public void ensureValid(final String name, final Object value) { } } + @Override + public String toString() { + return OutputField.SUPPORTED_OUTPUT_FIELDS; + } + } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 0059bb1d9..8dbe2da57 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -463,7 +463,7 @@ public void ensureValid(final String name, final Object value) { @Override public String toString() { - return "Supported values are: " + SUPPORTED_AWS_REGIONS; + return SUPPORTED_AWS_REGIONS; } } diff --git a/s3-sink-connector/build.gradle.kts b/s3-sink-connector/build.gradle.kts index 9017b1430..effeef5bc 100644 --- a/s3-sink-connector/build.gradle.kts +++ b/s3-sink-connector/build.gradle.kts @@ -72,6 +72,9 @@ idea { dependencies { compileOnly(apache.kafka.connect.api) compileOnly(apache.kafka.connect.runtime) + compileOnly(project(":site")) + compileOnly(apache.velocity.engine.core) + compileOnly(apache.velocity.tools) implementation(project(":commons")) implementation(project(":s3-commons")) @@ -254,3 +257,47 @@ signing { } signatureTypes = ASCSignatureProvider() } + + +/** ******************************* */ +/* Documentation building section */ +/** ******************************* */ +tasks.register("buildDocs") { + dependsOn("buildConfigMd") + dependsOn("buildConfigYml") +} + +tasks.register("buildConfigMd") { + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", + "src/templates/configData.md.vm", + "build/site/markdown/s3-sink-connector/S3SinkConfig.md") +} + +tasks.register("buildConfigYml") { + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", + "src/templates/configData.yml.vm", + "build/site/s3-sink-connector/S3SinkConfig.yml") +} + +/** ****************************** */ +/* End of documentation section */ +/** ****************************** */ + diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java index bc2ee745f..a9a68d19f 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java @@ -21,12 +21,11 @@ import java.util.Map; import java.util.Objects; +import io.aiven.kafka.connect.s3.config.S3SinkConfigDef; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; -import io.aiven.kafka.connect.s3.config.S3SinkConfig; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +37,7 @@ public class AivenKafkaConnectS3SinkConnector extends SinkConnector { @Override public ConfigDef config() { - return S3SinkConfig.configDef(); + return new S3SinkConfigDef(); } @Override diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java index fd4fcd3b2..f68d7f210 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java @@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.UploadPartRequest; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.aiven.commons.collections.Scale; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class S3OutputStream extends OutputStream { private final Logger logger = LoggerFactory.getLogger(S3OutputStream.class); - public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; + public static final int DEFAULT_PART_SIZE = (int) Scale.MiB.asBytes(5); private final AmazonS3 client; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index 7125cff29..ca915b303 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -17,7 +17,6 @@ package io.aiven.kafka.connect.s3.config; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -25,8 +24,6 @@ import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; import io.aiven.kafka.connect.common.config.CompressionType; @@ -35,13 +32,10 @@ import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.TimestampSource; -import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; -import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.config.s3.S3CommonConfig; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.config.s3.S3SinkBaseConfig; -import io.aiven.kafka.connect.s3.S3OutputStream; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; @@ -54,8 +48,7 @@ final public class S3SinkConfig extends S3SinkBaseConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkConfig.class); - private static final String GROUP_AWS = "AWS"; - private static final String GROUP_FILE = "File"; + // Default values from AWS SDK, since they are hidden public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; @@ -68,67 +61,11 @@ final public class S3SinkConfig extends S3SinkBaseConfig { public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; public S3SinkConfig(final Map properties) { - super(configDef(), preprocessProperties(properties)); + super(new S3SinkConfigDef(), preprocessProperties(properties)); } static Map preprocessProperties(final Map properties) { final Map result = S3ConfigFragment.handleDeprecations(properties); - // Add other preprocessings when needed here. Mind the order. - return S3CommonConfig.handleDeprecatedYyyyUppercase(result); - } - - public static S3SinkConfigDef configDef() { - final var configDef = new S3SinkConfigDef(); - S3ConfigFragment.update(configDef); - addS3partSizeConfig(configDef); - addDeprecatedTimestampConfig(configDef); - - return configDef; - } - - private static void addS3partSizeConfig(final ConfigDef configDef) { - - // add awsS3SinkCounter if more S3 Sink Specific config is added - // This is used to set orderInGroup - configDef.define(S3ConfigFragment.AWS_S3_PART_SIZE, Type.INT, S3OutputStream.DEFAULT_PART_SIZE, - new ConfigDef.Validator() { - - static final int MAX_BUFFER_SIZE = 2_000_000_000; - - @Override - public void ensureValid(final String name, final Object value) { - if (value == null) { - throw new ConfigException(name, null, "Part size must be non-null"); - } - final var number = (Number) value; - if (number.longValue() <= 0) { - throw new ConfigException(name, value, "Part size must be greater than 0"); - } - if (number.longValue() > MAX_BUFFER_SIZE) { - throw new ConfigException(name, value, - "Part size must be no more: " + MAX_BUFFER_SIZE + " bytes (2GB)"); - } - } - }, Importance.MEDIUM, - "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + Integer.MAX_VALUE - + " (2GB) and default is " + S3OutputStream.DEFAULT_PART_SIZE + " (5MB)", - GROUP_AWS, 0, ConfigDef.Width.NONE, S3ConfigFragment.AWS_S3_PART_SIZE); - - } - - private static void addDeprecatedTimestampConfig(final ConfigDef configDef) { - int timestampGroupCounter = 0; - - configDef.define(S3ConfigFragment.TIMESTAMP_TIMEZONE, Type.STRING, ZoneOffset.UTC.toString(), - new TimeZoneValidator(), Importance.LOW, - "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " - + "Use standard shot and long names. Default is UTC", - GROUP_FILE, timestampGroupCounter++, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_TIMEZONE); - - configDef.define(S3ConfigFragment.TIMESTAMP_SOURCE, Type.STRING, TimestampSource.Type.WALLCLOCK.name(), - new TimestampSourceValidator(), Importance.LOW, - "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, - timestampGroupCounter, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_SOURCE); } @Override @@ -162,7 +99,6 @@ public List getOutputFields() { return outputFormatFragment.getOutputFields(S3ConfigFragment.OUTPUT_FIELDS); } return List.of(new OutputField(OutputFieldType.VALUE, OutputFieldEncodingType.BASE64)); - } /** diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java index 55447370a..33d54216e 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java @@ -16,12 +16,68 @@ package io.aiven.kafka.connect.s3.config; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Map; import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.SinkCommonConfig; +import io.aiven.commons.collections.Scale; +import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.config.validators.ScaleValidator; +import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; +import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; +import io.aiven.kafka.connect.config.s3.S3ConfigFragment; +import io.aiven.kafka.connect.s3.S3OutputStream; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; + +import static io.aiven.commons.collections.Scale.GiB; + public class S3SinkConfigDef extends SinkCommonConfig.SinkCommonConfigDef { + private static final String GROUP_AWS = "AWS"; + private static final String GROUP_FILE = "File"; + public S3SinkConfigDef() { super(null, CompressionType.GZIP); + S3ConfigFragment.update(this); + addS3partSizeConfig(this); + addDeprecatedTimestampConfig(this); } + +// @Override +// public List validate(final Map props) { +// return super.validate(S3SinkConfig.preprocessProperties(props)); +// } + + private static void addDeprecatedTimestampConfig(final ConfigDef configDef) { + int timestampGroupCounter = 0; + + configDef.define(S3ConfigFragment.TIMESTAMP_TIMEZONE, Type.STRING, ZoneOffset.UTC.toString(), + new TimeZoneValidator(), Importance.LOW, + "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " + + "Use standard shot and long names. Default is UTC", + GROUP_FILE, ++timestampGroupCounter, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_TIMEZONE); + + configDef.define(S3ConfigFragment.TIMESTAMP_SOURCE, Type.STRING, TimestampSource.Type.WALLCLOCK.name(), + new TimestampSourceValidator(), Importance.LOW, + "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, + ++timestampGroupCounter, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_SOURCE); + } + + private static void addS3partSizeConfig(final ConfigDef configDef) { + + // add awsS3SinkCounter if more S3 Sink Specific config is added + // This is used to set orderInGroup + configDef.define(S3ConfigFragment.AWS_S3_PART_SIZE, Type.INT, S3OutputStream.DEFAULT_PART_SIZE, + ScaleValidator.between(0, GiB.asBytes(2), Scale.IEC) + , Importance.MEDIUM, + "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + GiB.units(2) + " and default is " + Scale.size(S3OutputStream.DEFAULT_PART_SIZE, Scale.IEC), + GROUP_AWS, 0, ConfigDef.Width.NONE, S3ConfigFragment.AWS_S3_PART_SIZE); + } + + } diff --git a/site/pom.xml b/site/pom.xml index b9fcf7645..95b55ac95 100644 --- a/site/pom.xml +++ b/site/pom.xml @@ -20,7 +20,7 @@ 4.0.0 io.aiven cloud-storage-connectors-for-apache-kafka - 3.3.0-SNAPSHOT + 3.4.2-SNAPSHOT jar Cloud Storage Connectors For Apache Kafka From 431ab99f4d933f5c007edd2d177b4cc76cdb8b6f Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Mon, 10 Nov 2025 15:47:31 +0000 Subject: [PATCH 06/13] updated S3SinkConfig and tests --- .../connect/config/s3/S3ConfigFragment.java | 6 +-- s3-sink-connector/build.gradle.kts | 54 +++++++++---------- .../s3/AivenKafkaConnectS3SinkConnector.java | 3 +- .../kafka/connect/s3/S3OutputStream.java | 3 +- .../kafka/connect/s3/config/S3SinkConfig.java | 1 - .../connect/s3/config/S3SinkConfigDef.java | 17 +++--- .../connect/s3/config/S3SinkConfigTest.java | 46 +++++++++------- 7 files changed, 67 insertions(+), 63 deletions(-) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 8dbe2da57..dcb7091d4 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -456,7 +456,7 @@ public void ensureValid(final String name, final Object value) { if (Objects.nonNull(value)) { final String valueStr = (String) value; if (Region.regions().stream().noneMatch(r -> r.id().equals(valueStr))) { - throw new ConfigException(name, valueStr, toString()); + throw new ConfigException(name, valueStr, "See documentation for list of valid regions."); } } } @@ -600,7 +600,7 @@ public String getAwsS3Prefix() { } public int getAwsS3PartSize() { - return getInt(AWS_S3_PART_SIZE); + return getLong(AWS_S3_PART_SIZE).intValue(); } public long getS3RetryBackoffDelayMs() { @@ -702,7 +702,7 @@ public Setter fetchPageSize(final int fetchPageSize) { return setValue(FETCH_PAGE_SIZE, fetchPageSize); } - public Setter partSize(final int partSize) { + public Setter partSize(final long partSize) { return setValue(AWS_S3_PART_SIZE, partSize); } diff --git a/s3-sink-connector/build.gradle.kts b/s3-sink-connector/build.gradle.kts index effeef5bc..a7b0c04e5 100644 --- a/s3-sink-connector/build.gradle.kts +++ b/s3-sink-connector/build.gradle.kts @@ -258,46 +258,44 @@ signing { signatureTypes = ASCSignatureProvider() } - /** ******************************* */ /* Documentation building section */ /** ******************************* */ tasks.register("buildDocs") { - dependsOn("buildConfigMd") - dependsOn("buildConfigYml") + dependsOn("buildConfigMd") + dependsOn("buildConfigYml") } tasks.register("buildConfigMd") { - mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" - classpath = - sourceSets.main - .get() - .compileClasspath - .plus(files(tasks.jar)) - .plus(sourceSets.main.get().runtimeClasspath) - args = - listOf( - "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", - "src/templates/configData.md.vm", - "build/site/markdown/s3-sink-connector/S3SinkConfig.md") + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", + "src/templates/configData.md.vm", + "build/site/markdown/s3-sink-connector/S3SinkConfig.md") } tasks.register("buildConfigYml") { - mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" - classpath = - sourceSets.main - .get() - .compileClasspath - .plus(files(tasks.jar)) - .plus(sourceSets.main.get().runtimeClasspath) - args = - listOf( - "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", - "src/templates/configData.yml.vm", - "build/site/s3-sink-connector/S3SinkConfig.yml") + mainClass = "io.aiven.kafka.connect.tools.ConfigDoc" + classpath = + sourceSets.main + .get() + .compileClasspath + .plus(files(tasks.jar)) + .plus(sourceSets.main.get().runtimeClasspath) + args = + listOf( + "io.aiven.kafka.connect.s3.config.S3SinkConfigDef", + "src/templates/configData.yml.vm", + "build/site/s3-sink-connector/S3SinkConfig.yml") } /** ****************************** */ /* End of documentation section */ /** ****************************** */ - diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java index a9a68d19f..8bf366d19 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/AivenKafkaConnectS3SinkConnector.java @@ -21,11 +21,12 @@ import java.util.Map; import java.util.Objects; -import io.aiven.kafka.connect.s3.config.S3SinkConfigDef; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import io.aiven.kafka.connect.s3.config.S3SinkConfigDef; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java index f68d7f210..4572c7d41 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Objects; +import io.aiven.commons.collections.Scale; + import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; @@ -33,7 +35,6 @@ import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.UploadPartRequest; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.aiven.commons.collections.Scale; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index ca915b303..5780705e7 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -48,7 +48,6 @@ final public class S3SinkConfig extends S3SinkBaseConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkConfig.class); - // Default values from AWS SDK, since they are hidden public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java index 33d54216e..31059e649 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java @@ -22,6 +22,9 @@ import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.SinkCommonConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; + import io.aiven.commons.collections.Scale; import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.OutputFormatFragment; @@ -31,10 +34,6 @@ import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.s3.S3OutputStream; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigValue; - -import static io.aiven.commons.collections.Scale.GiB; public class S3SinkConfigDef extends SinkCommonConfig.SinkCommonConfigDef { @@ -72,12 +71,12 @@ private static void addS3partSizeConfig(final ConfigDef configDef) { // add awsS3SinkCounter if more S3 Sink Specific config is added // This is used to set orderInGroup - configDef.define(S3ConfigFragment.AWS_S3_PART_SIZE, Type.INT, S3OutputStream.DEFAULT_PART_SIZE, - ScaleValidator.between(0, GiB.asBytes(2), Scale.IEC) - , Importance.MEDIUM, - "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + GiB.units(2) + " and default is " + Scale.size(S3OutputStream.DEFAULT_PART_SIZE, Scale.IEC), + configDef.define(S3ConfigFragment.AWS_S3_PART_SIZE, Type.LONG, S3OutputStream.DEFAULT_PART_SIZE, + ScaleValidator.between(Scale.MiB.asBytes(1), Integer.MAX_VALUE, Scale.IEC), Importance.MEDIUM, + "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + + Scale.scaleOf(Integer.MAX_VALUE, Scale.IEC) + " and default is " + + Scale.size(S3OutputStream.DEFAULT_PART_SIZE, Scale.IEC), GROUP_AWS, 0, ConfigDef.Width.NONE, S3ConfigFragment.AWS_S3_PART_SIZE); } - } diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java index 5da53bd23..ecdc42688 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java @@ -182,19 +182,25 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() { @Test void wrongPartSize() { - final var wrongMaxPartSizeProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-key-id", - S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "bla-bla-access-key", S3ConfigFragment.AWS_S3_PART_SIZE, - Long.toString(2_000_000_001L)); - assertThatThrownBy(() -> new S3SinkConfig(wrongMaxPartSizeProps)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value 2000000001 for configuration aws.s3.part.size.bytes: " - + "Part size must be no more: 2000000000 bytes (2GB)"); - - final var wrongMinPartSizeProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-key-id", - S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "bla-bla-access-key", S3ConfigFragment.AWS_S3_PART_SIZE, - "0"); - assertThatThrownBy(() -> new S3SinkConfig(wrongMinPartSizeProps)).isInstanceOf(ConfigException.class) + final Map wrongPartSizeProps = new HashMap<>(); + // Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-key-id", + // S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "bla-bla-access-key", S3ConfigFragment.AWS_S3_PART_SIZE, + // Long.toString(2_000_000_001L), + // S3ConfigFragment.setter()); + S3ConfigFragment.setter(wrongPartSizeProps) + .accessKeyId("blah-blah-key-id") + .accessKeySecret("bla-bla-access-key") + .bucketName("bla-bucket-name") + .partSize((long) Integer.MAX_VALUE + 1); + + assertThatThrownBy(() -> new S3SinkConfig(wrongPartSizeProps)).isInstanceOf(ConfigException.class) .hasMessage( - "Invalid value 0 for configuration aws.s3.part.size.bytes: Part size must be greater than 0"); + "Invalid value 2147483648 for configuration aws.s3.part.size.bytes: Value must be no more than 2.0 GiB (2147483647 B)"); + + S3ConfigFragment.setter(wrongPartSizeProps).partSize(0); + assertThatThrownBy(() -> new S3SinkConfig(wrongPartSizeProps)).isInstanceOf(ConfigException.class) + .hasMessage( + "Invalid value 0 for configuration aws.s3.part.size.bytes: Value must be at least 1.0 MiB (1048576 B)"); } @Test @@ -241,16 +247,16 @@ void emptyAwsS3Region() { props.put(S3ConfigFragment.AWS_S3_BUCKET, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_REGION, ""); assertThatThrownBy(() -> new S3SinkConfig(props)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value for configuration aws_s3_region: " + "Supported values are: " - + Arrays.stream(Regions.values()).map(Regions::getName).collect(Collectors.joining(", "))); + .hasMessage( + "Invalid value for configuration aws_s3_region: See documentation for list of valid regions."); props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, ""); assertThatThrownBy(() -> new S3SinkConfig(props)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value for configuration aws.s3.region: " + "Supported values are: " - + Arrays.stream(Regions.values()).map(Regions::getName).collect(Collectors.joining(", "))); + .hasMessage( + "Invalid value for configuration aws.s3.region: See documentation for list of valid regions."); } @Test @@ -261,16 +267,16 @@ void unknownAwsS3Region() { props.put(S3ConfigFragment.AWS_S3_BUCKET, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_REGION, "unknown"); assertThatThrownBy(() -> new S3SinkConfig(props)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value unknown for configuration aws_s3_region: " + "Supported values are: " - + Arrays.stream(Regions.values()).map(Regions::getName).collect(Collectors.joining(", "))); + .hasMessage( + "Invalid value unknown for configuration aws_s3_region: See documentation for list of valid regions."); props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, ""); assertThatThrownBy(() -> new S3SinkConfig(props)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value for configuration aws.s3.region: " + "Supported values are: " - + Arrays.stream(Regions.values()).map(Regions::getName).collect(Collectors.joining(", "))); + .hasMessage( + "Invalid value for configuration aws.s3.region: See documentation for list of valid regions."); } @Test From 87d03147ec45536e7799037af3fee689bcf72a03 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Thu, 20 Nov 2025 12:16:48 +0000 Subject: [PATCH 07/13] cleaned up s3 configuration --- .../connect/common/config/CommonConfig.java | 5 +- .../common/config/FileNameFragment.java | 4 +- .../common/config/OutputFormatFragment.java | 10 +- .../config/OutputFormatFragmentFixture.java | 2 +- .../connect/config/s3/S3CommonConfig.java | 70 ----- .../connect/config/s3/S3ConfigFragment.java | 289 ++++++++---------- .../connect/config/s3/S3SinkBaseConfig.java | 143 --------- .../iam/AwsCredentialProviderFactoryTest.java | 17 +- .../iam/AwsCredentialTestingConfig.java | 63 ++++ .../AwsCredentialV2ProviderFactoryTest.java | 17 +- .../tools/AwsCredentialBaseConfig.java | 36 --- .../kafka/connect/s3/S3OutputStream.java | 4 - .../io/aiven/kafka/connect/s3/S3SinkTask.java | 3 +- .../kafka/connect/s3/config/S3SinkConfig.java | 98 ++++-- .../connect/s3/config/S3SinkConfigDef.java | 60 +--- .../connect/s3/config/S3SinkConfigTest.java | 5 +- .../connect/s3/source/S3SourceConnector.java | 4 +- .../s3/source/config/S3SourceConfig.java | 23 +- .../s3/source/config/S3SourceConfigDef.java | 20 +- 19 files changed, 323 insertions(+), 550 deletions(-) delete mode 100644 s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3CommonConfig.java delete mode 100644 s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3SinkBaseConfig.java create mode 100644 s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java delete mode 100644 s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java index ca28ff927..ef9f752c0 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java @@ -175,13 +175,14 @@ final Collection multiValidateValues(final Map prop } @Override + @SuppressWarnings("PMD.AvoidCatchingGenericException") public final List validate(final Map props) { final Map valueMap = validateAll(props); try { return new ArrayList<>(multiValidate(valueMap).values()); - } catch (RuntimeException e) { // NOPMD AvoidCatchingGenericException - // any exceptions thrown in the above block are accounted for in the super.validate(props) call. + } catch (RuntimeException e) { + // any exceptions thrown in the above block are accounted for in the validateAll(props) call. return new ArrayList<>(valueMap.values()); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java index 579d9a1b1..1f5c46aca 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java @@ -58,9 +58,9 @@ public final class FileNameFragment extends ConfigFragment { @VisibleForTesting static final String FILE_MAX_RECORDS = "file.max.records"; @VisibleForTesting - static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; + public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; @VisibleForTesting - static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; + public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; @VisibleForTesting public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; @VisibleForTesting diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java index 52fcc3fdb..7c00b0362 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/OutputFormatFragment.java @@ -32,7 +32,7 @@ */ public final class OutputFormatFragment extends ConfigFragment { @VisibleForTesting - static final String GROUP_FORMAT = "Format"; + public static final String GROUP_NAME = "Format"; @VisibleForTesting static public final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields"; @VisibleForTesting @@ -90,13 +90,13 @@ public static int update(final ConfigDef configDef, final OutputFieldType defaul int formatGroupCounter = 0; configDef.define(FORMAT_OUTPUT_TYPE_CONFIG, ConfigDef.Type.STRING, FormatType.CSV.name, OUTPUT_TYPE_VALIDATOR, - ConfigDef.Importance.MEDIUM, "The format type of output content.", GROUP_FORMAT, ++formatGroupCounter, + ConfigDef.Importance.MEDIUM, "The format type of output content.", GROUP_NAME, ++formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_TYPE_CONFIG, FixedSetRecommender.ofSupportedValues(FormatType.names())); configDef.define(FORMAT_OUTPUT_FIELDS_CONFIG, ConfigDef.Type.LIST, Objects.isNull(defaultFieldType) ? null : defaultFieldType.name, // NOPMD NullAssignment - OUTPUT_FIELDS_VALIDATOR, ConfigDef.Importance.MEDIUM, "Fields to put into output files.", GROUP_FORMAT, + OUTPUT_FIELDS_VALIDATOR, ConfigDef.Importance.MEDIUM, "Fields to put into output files.", GROUP_NAME, ++formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_CONFIG, FixedSetRecommender.ofSupportedValues(OutputFieldType.names())); @@ -104,11 +104,11 @@ public static int update(final ConfigDef configDef, final OutputFieldType defaul OutputFieldEncodingType.BASE64.name, OUTPUT_FIELDS_ENCODING_VALIDATOR, ConfigDef.Importance.MEDIUM, "The type of encoding for the value field. " + "The supported values are: " + OutputFieldEncodingType.SUPPORTED_FIELD_ENCODING_TYPES + ".", - GROUP_FORMAT, ++formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, + GROUP_NAME, ++formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, FixedSetRecommender.ofSupportedValues(OutputFieldEncodingType.names())); configDef.define(FORMAT_OUTPUT_ENVELOPE_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, - "Whether to enable envelope for entries with single field.", GROUP_FORMAT, ++formatGroupCounter, + "Whether to enable envelope for entries with single field.", GROUP_NAME, ++formatGroupCounter, ConfigDef.Width.SHORT, FORMAT_OUTPUT_ENVELOPE_CONFIG); return formatGroupCounter; } diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/config/OutputFormatFragmentFixture.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/config/OutputFormatFragmentFixture.java index 335143ecc..ef65aab54 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/config/OutputFormatFragmentFixture.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/config/OutputFormatFragmentFixture.java @@ -19,7 +19,7 @@ public class OutputFormatFragmentFixture {// NOPMD public enum OutputFormatArgs { - GROUP_FORMAT(OutputFormatFragment.GROUP_FORMAT), FORMAT_OUTPUT_FIELDS_CONFIG( + GROUP_FORMAT(OutputFormatFragment.GROUP_NAME), FORMAT_OUTPUT_FIELDS_CONFIG( OutputFormatFragment.FORMAT_OUTPUT_FIELDS_CONFIG), FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG( OutputFormatFragment.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG), FORMAT_OUTPUT_ENVELOPE_CONFIG( OutputFormatFragment.FORMAT_OUTPUT_ENVELOPE_CONFIG), FORMAT_OUTPUT_TYPE_CONFIG( diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3CommonConfig.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3CommonConfig.java deleted file mode 100644 index 2c742bb22..000000000 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3CommonConfig.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.config.s3; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -import io.aiven.kafka.connect.common.config.FileNameFragment; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This common config handles a specific deprecated date format and is extensible to allow other common configuration - * that is not specific to a Config Fragment to be available for both sink and source configurations in S3 connectors. - */ - -public final class S3CommonConfig { - public static final Logger LOGGER = LoggerFactory.getLogger(S3CommonConfig.class); - - private S3CommonConfig() { - - } - - public static Map handleDeprecatedYyyyUppercase(final Map properties) { - final List keysToProcess = List.of(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, - FileNameFragment.FILE_NAME_TEMPLATE_CONFIG, FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG); - if (keysToProcess.stream().noneMatch(properties::containsKey)) { - return properties; - } - - final var result = new HashMap<>(properties); - for (final var prop : keysToProcess) { - if (properties.containsKey(prop)) { - String template = properties.get(prop); - final String originalTemplate = template; - - final var unitYyyyPattern = Pattern.compile("\\{\\{\\s*timestamp\\s*:\\s*unit\\s*=\\s*YYYY\\s*}}"); - template = unitYyyyPattern.matcher(template) - .replaceAll(matchResult -> matchResult.group().replace("YYYY", "yyyy")); - - if (!template.equals(originalTemplate)) { - LOGGER.warn("{{timestamp:unit=YYYY}} is no longer supported, " - + "please use {{timestamp:unit=yyyy}} instead. " + "It was automatically replaced: {}", - template); - } - - result.put(prop, template); - } - } - return result; - } - -} diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index dcb7091d4..99893ca5f 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -18,25 +18,29 @@ import java.net.URI; import java.time.Duration; +import java.time.ZoneOffset; import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.utils.Utils; +import io.aiven.commons.collections.Scale; import io.aiven.kafka.connect.common.config.AbstractFragmentSetter; import io.aiven.kafka.connect.common.config.ConfigFragment; import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.OutputFormatFragment; import io.aiven.kafka.connect.common.config.SourceConfigFragment; +import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; +import io.aiven.kafka.connect.common.config.validators.ScaleValidator; +import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; +import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; import io.aiven.kafka.connect.common.config.validators.UrlValidator; import io.aiven.kafka.connect.common.config.validators.UsageLoggingValidator; import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; @@ -48,6 +52,7 @@ import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.internal.BucketNameUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -61,16 +66,10 @@ @SuppressWarnings({ "PMD.ExcessivePublicCount", "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.GodClass" }) public final class S3ConfigFragment extends ConfigFragment { - private static final Pattern UNIT_YYYY_PATTERN = Pattern - .compile("\\{\\{\\s*timestamp\\s*:\\s*unit\\s*=\\s*YYYY\\s*}}"); - + public static final int DEFAULT_PART_SIZE = (int) Scale.MiB.asBytes(5); private static final Logger LOGGER = LoggerFactory.getLogger(S3ConfigFragment.class); @Deprecated public static final String OUTPUT_COMPRESSION = "output_compression"; - @Deprecated - public static final String OUTPUT_COMPRESSION_TYPE_GZIP = "gzip"; - @Deprecated - public static final String OUTPUT_COMPRESSION_TYPE_NONE = "none"; @Deprecated public static final String OUTPUT_FIELDS = "output_fields"; @@ -155,11 +154,11 @@ public final class S3ConfigFragment extends ConfigFragment { /** * Constructor. * - * @param cfg + * @param dataAccess * the configuration to resolve requests against. */ - public S3ConfigFragment(final AbstractConfig cfg) { - super(FragmentDataAccess.from(cfg)); + public S3ConfigFragment(final FragmentDataAccess dataAccess) { + super(dataAccess); } /** @@ -169,10 +168,9 @@ public S3ConfigFragment(final AbstractConfig cfg) { * the Configuration definition. * @return the update configuration definition */ - public static ConfigDef update(final ConfigDef configDef) { - addAwsConfigGroup(configDef); + public static ConfigDef update(final ConfigDef configDef, final boolean isSink) { + addAwsConfigGroup(configDef, isSink); addAwsStsConfigGroup(configDef); - addDeprecatedConfiguration(configDef); addS3RetryPolicies(configDef); return configDef; } @@ -204,7 +202,7 @@ static void addS3RetryPolicies(final ConfigDef configDef) { AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG); } - static void addAwsConfigGroup(final ConfigDef configDef) { + static void addAwsConfigGroup(final ConfigDef configDef, final boolean isSink) { int awsGroupCounter = 0; configDef.define(AWS_ACCESS_KEY_ID_CONFIG, ConfigDef.Type.PASSWORD, null, new NonEmptyPassword(), @@ -219,7 +217,6 @@ static void addAwsConfigGroup(final ConfigDef configDef) { "When you initialize a new service client without supplying any arguments, " + "the AWS SDK for Java attempts to find temporary " + "credentials by using the default credential provider chain.", - GROUP_AWS, ++awsGroupCounter, ConfigDef.Width.NONE, AWS_CREDENTIALS_PROVIDER_CONFIG); configDef.define(AWS_S3_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, null, new BucketNameValidator(), @@ -249,24 +246,87 @@ static void addAwsConfigGroup(final ConfigDef configDef) { ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, ++awsGroupCounter, ConfigDef.Width.NONE, FETCH_PAGE_SIZE); - configDef.define(AWS_S3_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1000, new ConfigDef.Validator() { - ConfigDef.Range range = ConfigDef.Range.atLeast(1); + if (isSink) { + configDef.define(AWS_S3_PART_SIZE, ConfigDef.Type.LONG, DEFAULT_PART_SIZE, + ScaleValidator.between(Scale.MiB.asBytes(1), Integer.MAX_VALUE, Scale.IEC), + ConfigDef.Importance.MEDIUM, + "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + + Scale.scaleOf(Integer.MAX_VALUE, Scale.IEC) + " and default is " + + Scale.size(S3ConfigFragment.DEFAULT_PART_SIZE, Scale.IEC), + GROUP_AWS, ++awsGroupCounter, ConfigDef.Width.NONE, S3ConfigFragment.AWS_S3_PART_SIZE); + } + + // deprecated options + addDeprecatedConfiguration(configDef, awsGroupCounter, isSink); - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_S3_FETCH_BUFFER_SIZE, SourceConfigFragment.RING_BUFFER_SIZE); - } - range.ensureValid(name, value); - } + } + + private static String deprecatedDescription(final String deprecatedKey, final ConfigDef.ConfigKey validKey) { + return String.format("%s property is deprecated, use %s. %s", deprecatedKey, validKey.name, + validKey.documentation); + } + + private static int deprecation(final int counter, final ConfigDef configDef, final String deprecatedKey, + final String validKey) { + final int result = counter + 1; + final ConfigDef.ConfigKey key = configDef.configKeys().get(validKey); + final String description = deprecatedDescription(deprecatedKey, key); + configDef.define(deprecatedKey, key.type(), null, + new UsageLoggingValidator(key.validator, (n, v) -> logDeprecated(LOGGER, deprecatedKey, validKey)), + key.importance, description, GROUP_AWS, result, key.width, deprecatedKey); + return result; + } + + static void addDeprecatedConfiguration(final ConfigDef configDef, final int awsGroupCounter, final boolean isSink) { + int counter = awsGroupCounter; + if (isSink) { + // Output fields in different group + final ConfigDef.ConfigKey key = configDef.configKeys() + .get(OutputFormatFragment.FORMAT_OUTPUT_FIELDS_CONFIG); + final String description = deprecatedDescription(OUTPUT_FIELDS, key); + configDef.define(OUTPUT_FIELDS, key.type(), null, + new UsageLoggingValidator(OutputFormatFragment.OUTPUT_FIELDS_VALIDATOR, + (name, value) -> logDeprecated(LOGGER, name, + OutputFormatFragment.FORMAT_OUTPUT_FIELDS_CONFIG)), + key.importance, description, OutputFormatFragment.GROUP_NAME, 50, key.width, OUTPUT_FIELDS); + } else { + // allow null values for deprecated value. + final ConfigDef.ConfigKey key = configDef.configKeys().get(SourceConfigFragment.RING_BUFFER_SIZE); + final String description = deprecatedDescription(AWS_S3_FETCH_BUFFER_SIZE, key); + configDef.define(AWS_S3_FETCH_BUFFER_SIZE, key.type(), null, new UsageLoggingValidator((n, v) -> { + }, (n, v) -> logDeprecated(LOGGER, AWS_S3_FETCH_BUFFER_SIZE, SourceConfigFragment.RING_BUFFER_SIZE)), + key.importance, description, GROUP_AWS, ++counter, key.width, AWS_S3_FETCH_BUFFER_SIZE); + } + counter = deprecation(counter, configDef, AWS_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID_CONFIG); + counter = deprecation(counter, configDef, AWS_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY_CONFIG); + counter = deprecation(counter, configDef, AWS_S3_BUCKET, AWS_S3_BUCKET_NAME_CONFIG); + counter = deprecation(counter, configDef, AWS_S3_ENDPOINT, AWS_S3_ENDPOINT_CONFIG); + counter = deprecation(counter, configDef, AWS_S3_REGION, AWS_S3_REGION_CONFIG); + deprecation(counter, configDef, AWS_S3_PREFIX, AWS_S3_PREFIX_CONFIG); + + // deprecations in different groups + + configDef.define(OUTPUT_COMPRESSION, ConfigDef.Type.STRING, null, + new UsageLoggingValidator(FileNameFragment.COMPRESSION_TYPE_VALIDATOR, + (name, value) -> logDeprecated(LOGGER, name, FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG)), + ConfigDef.Importance.MEDIUM, "Output compression.", FileNameFragment.GROUP_NAME, 50, + ConfigDef.Width.SHORT, OUTPUT_COMPRESSION); + + configDef.define(TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(), + new UsageLoggingValidator(new TimeZoneValidator(), + (n, v) -> logDeprecated(LOGGER, TIMESTAMP_TIMEZONE, + FileNameFragment.FILE_NAME_TIMESTAMP_TIMEZONE)), + ConfigDef.Importance.LOW, + "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " + + "Use standard shot and long names. Default is UTC", + FileNameFragment.GROUP_NAME, 51, ConfigDef.Width.SHORT, TIMESTAMP_TIMEZONE); + + configDef.define(TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(), + new UsageLoggingValidator(new TimestampSourceValidator(), + (n, v) -> logDeprecated(LOGGER, TIMESTAMP_SOURCE, FileNameFragment.FILE_NAME_TIMESTAMP_SOURCE)), + ConfigDef.Importance.LOW, "Specifies the the timestamp variable source. Default is wall-clock.", + FileNameFragment.GROUP_NAME, 52, ConfigDef.Width.SHORT, TIMESTAMP_SOURCE); - @Override - public String toString() { - return range.toString(); - } - }, ConfigDef.Importance.MEDIUM, - "AWS S3 Fetch buffer size, this is the number of s3object keys kept in a buffer to ensure lexically older objet keys aren't skipped for processing if they are slower to upload.", - GROUP_AWS, ++awsGroupCounter, ConfigDef.Width.NONE, AWS_S3_FETCH_BUFFER_SIZE); } static void addAwsStsConfigGroup(final ConfigDef configDef) { @@ -293,137 +353,32 @@ static void addAwsStsConfigGroup(final ConfigDef configDef) { ++awsStsGroupCounter, ConfigDef.Width.NONE, AWS_STS_CONFIG_ENDPOINT); } - @SuppressWarnings({ "PMD.CognitiveComplexity", "PMD.NPathComplexity" }) - static void addDeprecatedConfiguration(final ConfigDef configDef) { - - configDef.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.PASSWORD, null, new NonEmptyPassword() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.MEDIUM, "AWS Access Key ID"); - - configDef.define(AWS_SECRET_ACCESS_KEY, ConfigDef.Type.PASSWORD, null, new NonEmptyPassword() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.MEDIUM, "AWS Secret Access Key"); - - configDef.define(AWS_S3_BUCKET, ConfigDef.Type.STRING, null, new BucketNameValidator() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_S3_BUCKET, AWS_S3_BUCKET_NAME_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.MEDIUM, "AWS S3 Bucket name"); - - configDef.define(AWS_S3_ENDPOINT, ConfigDef.Type.STRING, null, new UrlValidator() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_S3_ENDPOINT, AWS_S3_ENDPOINT_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.LOW, "Explicit AWS S3 Endpoint Address, mainly for testing"); - - configDef.define(AWS_S3_REGION, ConfigDef.Type.STRING, null, new AwsRegionValidator() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_S3_REGION, AWS_S3_REGION_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.MEDIUM, "AWS S3 Region, for example us-east-1"); - - configDef.define(AWS_S3_PREFIX, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString() { - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - logDeprecated(LOGGER, AWS_S3_PREFIX, AWS_S3_PREFIX_CONFIG); - } - super.ensureValid(name, value); - } - }, ConfigDef.Importance.MEDIUM, "Prefix for stored objects, for example cluster-1/"); - - configDef.define(OUTPUT_FIELDS, ConfigDef.Type.LIST, null, - new UsageLoggingValidator(OutputFormatFragment.OUTPUT_FIELDS_VALIDATOR, - (name, value) -> logDeprecated(LOGGER, name, OutputFormatFragment.FORMAT_OUTPUT_FIELDS_CONFIG)), - ConfigDef.Importance.MEDIUM, - "Output fields. A comma separated list of one or more: " + OUTPUT_FIELD_NAME_KEY + ", " - + OUTPUT_FIELD_NAME_OFFSET + ", " + OUTPUT_FIELD_NAME_TIMESTAMP + ", " + OUTPUT_FIELD_NAME_VALUE - + ", " + OUTPUT_FIELD_NAME_HEADERS); - - configDef.define(OUTPUT_COMPRESSION, ConfigDef.Type.STRING, null, - new UsageLoggingValidator(FileNameFragment.COMPRESSION_TYPE_VALIDATOR, - (name, value) -> logDeprecated(LOGGER, name, FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG)), - ConfigDef.Importance.MEDIUM, "Output compression."); - } - - public static Map handleDeprecations(final Map properties) { - handleDeprecatedYyyyUppercase(properties); - if (properties.containsKey(AWS_S3_FETCH_BUFFER_SIZE)) { - SourceConfigFragment.setter(properties) - .ringBufferSize(Integer.parseInt(properties.get(AWS_S3_FETCH_BUFFER_SIZE))); - properties.remove(AWS_S3_FETCH_BUFFER_SIZE); - } - if (Objects.nonNull(properties.get(OUTPUT_COMPRESSION)) - && Objects.isNull(properties.get(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG))) { - properties.put(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG, properties.get(OUTPUT_COMPRESSION)); - } - return properties; - } - - private static String doReplaceYYYY(final String template) { - if (template != null) { - final String newTemplate = UNIT_YYYY_PATTERN.matcher(template) - .replaceAll(matchResult -> matchResult.group().replace("YYYY", "yyyy")); - - if (!newTemplate.equals(template)) { - LOGGER.warn( - "{{timestamp:unit=YYYY}} is no longer supported, please use {{timestamp:unit=yyyy}} instead. It was automatically replaced: {}", - newTemplate); - } - return newTemplate; - } - return template; - } - - public static Map handleDeprecatedYyyyUppercase(final Map properties) { - if (!properties.containsKey(AWS_S3_PREFIX_CONFIG)) { - return properties; - } - - final var result = new HashMap<>(properties); - result.put(AWS_S3_PREFIX_CONFIG, doReplaceYYYY(properties.get(AWS_S3_PREFIX_CONFIG))); - return result; - } - + /** + * Validate the various variables do not conflict. + * + * @param configMap + * the distribution type for the validator + */ @Override - public void validate() { - validateCredentials(); - validateBucket(); + public void validate(final Map configMap) { + validateCredentials(configMap); + validateBucket(configMap); } - public void validateCredentials() { + public void validateCredentials(final Map configMap) { final AwsStsRole awsStsRole = getStsRole(); if (awsStsRole.isValid()) { final AwsStsEndpointConfig stsEndpointConfig = getStsEndpointConfig(); if (!stsEndpointConfig.isValid() && !AwsStsEndpointConfig.AWS_STS_GLOBAL_ENDPOINT.equals(stsEndpointConfig.getServiceEndpoint())) { - throw new ConfigException(String.format("%s should be specified together with %s", AWS_S3_REGION_CONFIG, - AWS_STS_CONFIG_ENDPOINT)); + if (StringUtils.isEmpty(getString(AWS_S3_REGION_CONFIG))) { + registerIssue(configMap, AWS_S3_REGION_CONFIG, getString(AWS_S3_REGION_CONFIG), String.format( + "%s should be specified together with %s", AWS_S3_REGION_CONFIG, AWS_STS_CONFIG_ENDPOINT)); + } else { + registerIssue(configMap, AWS_STS_CONFIG_ENDPOINT, getString(AWS_STS_CONFIG_ENDPOINT), String.format( + "%s should be specified together with %s", AWS_S3_REGION_CONFIG, AWS_STS_CONFIG_ENDPOINT)); + } } } else { final BasicAWSCredentials awsCredentials = getAwsCredentials(); @@ -438,10 +393,10 @@ public void validateCredentials() { } } - public void validateBucket() { + public void validateBucket(final Map configMap) { if (Objects.isNull(getString(AWS_S3_BUCKET_NAME_CONFIG)) && Objects.isNull(getString(AWS_S3_BUCKET))) { - throw new ConfigException(String.format("Neither %s nor %s properties have been set", - AWS_S3_BUCKET_NAME_CONFIG, AWS_S3_BUCKET)); + registerIssue(configMap, AWS_S3_BUCKET_NAME_CONFIG, getString(AWS_S3_BUCKET_NAME_CONFIG), + AWS_S3_BUCKET_NAME_CONFIG + " should be specified"); } } @@ -655,9 +610,25 @@ public static Map handleDeprecatedOptions(final Map originals) { // NOPMD - // UnusedAssignment - super(definition, handleDeprecatedYyyyUppercase(originals)); - s3ConfigFragment = new S3ConfigFragment(this); - validate(); - } - - private void validate() { - s3ConfigFragment.validate(); - } - - /** - * - */ - @Deprecated - protected static void addDeprecatedConfiguration(final ConfigDef configDef) { - - } - - /** - * - */ - @Deprecated - protected static void addAwsStsConfigGroup(final ConfigDef configDef) { - - } - - /** - * - */ - @Deprecated - protected static void addAwsConfigGroup(final ConfigDef configDef) { - - } - - /** - * - */ - @Deprecated - protected static void addS3RetryPolicies(final ConfigDef configDef) { - - } - - public AwsStsRole getStsRole() { - return s3ConfigFragment.getStsRole(); - } - - public boolean hasAwsStsRole() { - return s3ConfigFragment.hasAwsStsRole(); - } - - public boolean hasStsEndpointConfig() { - return s3ConfigFragment.hasStsEndpointConfig(); - } - - public AwsStsEndpointConfig getStsEndpointConfig() { - return s3ConfigFragment.getStsEndpointConfig(); - } - - public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { - return s3ConfigFragment.getAwsEndpointConfiguration(); - } - - public BasicAWSCredentials getAwsCredentials() { - return s3ConfigFragment.getAwsCredentials(); - } - - public String getAwsS3EndPoint() { - return s3ConfigFragment.getAwsS3EndPoint(); - } - - public Region getAwsS3Region() { - return s3ConfigFragment.getAwsS3Region(); - } - - public String getAwsS3BucketName() { - return s3ConfigFragment.getAwsS3BucketName(); - } - - public String getServerSideEncryptionAlgorithmName() { - return s3ConfigFragment.getServerSideEncryptionAlgorithmName(); - } - - public String getAwsS3Prefix() { - return s3ConfigFragment.getAwsS3Prefix(); - } - - public int getAwsS3PartSize() { - return s3ConfigFragment.getAwsS3PartSize(); - } - - public long getS3RetryBackoffDelayMs() { - return s3ConfigFragment.getS3RetryBackoffDelayMs(); - } - - public long getS3RetryBackoffMaxDelayMs() { - return s3ConfigFragment.getS3RetryBackoffMaxDelayMs(); - } - - public int getS3RetryBackoffMaxRetries() { - return s3ConfigFragment.getS3RetryBackoffMaxRetries(); - } - - public AWSCredentialsProvider getCustomCredentialsProvider() { - return s3ConfigFragment.getCustomCredentialsProvider(); - } -} diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java index 9dbfd5288..33a8318bd 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.Configurable; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; @@ -59,9 +58,9 @@ void createsStsCredentialProviderIfSpecified() { props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); props.put(S3ConfigFragment.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getProvider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getProvider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(STSAssumeRoleSessionCredentialsProvider.class); } @@ -70,26 +69,26 @@ void createStaticCredentialProviderByDefault() { props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getProvider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getProvider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(AWSStaticCredentialsProvider.class); } @Test void createDefaultCredentialsWhenNoCredentialsSpecified() { - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getProvider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getProvider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class); } @Test void customCredentialProviderTest() { props.put(S3ConfigFragment.AWS_CREDENTIALS_PROVIDER_CONFIG, DummyCredentialsProvider.class.getName()); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getProvider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getProvider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(DummyCredentialsProvider.class); assertThat(((DummyCredentialsProvider) credentialProvider).configured).isTrue(); } diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java new file mode 100644 index 000000000..5aef87ea1 --- /dev/null +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import java.util.Map; + +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; +import io.aiven.kafka.connect.config.s3.S3ConfigFragment; + +public class AwsCredentialTestingConfig extends SinkCommonConfig { + private final S3ConfigFragment s3ConfigFragment; + + public AwsCredentialTestingConfig(final Map properties) { + super(new CredTestingDef(), properties); + s3ConfigFragment = new S3ConfigFragment(FragmentDataAccess.from(this)); + } + + public static CredTestingDef configDef() { // NOPMD UnusedAssignment + return new CredTestingDef(); + } + + S3ConfigFragment getS3ConfigFragment() { + return s3ConfigFragment; + } + + // private static ConfigDef getBaseConfigDefinition() { + // final ConfigDef definition = new ConfigDef(); + // addOutputFieldsFormatConfigGroup(definition, OutputFieldType.VALUE); + // definition.define(FileNameFragment.FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, + // ConfigDef.Importance.MEDIUM, "File name template"); + // definition.define(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, + // CompressionType.NONE.name, ConfigDef.Importance.MEDIUM, "File compression"); + // definition.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, + // "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " + // + "0 is interpreted as \"unlimited\", which is the default."); + // return definition; + // } + // + public static class CredTestingDef extends SinkCommonConfig.SinkCommonConfigDef { + + public CredTestingDef() { + super(null, CompressionType.NONE); + S3ConfigFragment.update(this, true); + } + } + +} diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialV2ProviderFactoryTest.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialV2ProviderFactoryTest.java index 56a537cdb..55f9ee0bf 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialV2ProviderFactoryTest.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialV2ProviderFactoryTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.Configurable; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,9 +57,9 @@ void createsStsCredentialProviderIfSpecified() { props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id()); props.put(S3ConfigFragment.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getAwsV2Provider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(StsAssumeRoleCredentialsProvider.class); } @@ -69,26 +68,26 @@ void createStaticCredentialProviderByDefault() { props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getAwsV2Provider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(StaticCredentialsProvider.class); } @Test void createDefaultCredentialsWhenNoCredentialsSpecified() { - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getAwsV2Provider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(AwsCredentialsProvider.class); } @Test void customCredentialProviderTest() { props.put(S3ConfigFragment.AWS_CREDENTIALS_PROVIDER_CONFIG, DummyCredentialsProvider.class.getName()); - final var config = new AwsCredentialBaseConfig(props); + final AwsCredentialTestingConfig config = new AwsCredentialTestingConfig(props); - final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config)); + final var credentialProvider = factory.getAwsV2Provider(config.getS3ConfigFragment()); assertThat(credentialProvider).isInstanceOf(DummyCredentialsProvider.class); assertThat(((DummyCredentialsProvider) credentialProvider).configured).isTrue(); } diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java b/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java deleted file mode 100644 index 44fecb11b..000000000 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.tools; - -import java.util.Map; - -import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.common.config.OutputFieldType; -import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.config.s3.S3SinkBaseConfig; - -public class AwsCredentialBaseConfig extends S3SinkBaseConfig { - public AwsCredentialBaseConfig(final Map properties) { - super(configDef(), properties); - } - - public static SinkCommonConfigDef configDef() { // NOPMD UnusedAssignment - final SinkCommonConfigDef configDef = new SinkCommonConfigDef(OutputFieldType.VALUE, CompressionType.NONE); - S3ConfigFragment.update(configDef); - return configDef; - } -} diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java index 4572c7d41..cc89e9296 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Objects; -import io.aiven.commons.collections.Scale; - import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; @@ -42,8 +40,6 @@ public class S3OutputStream extends OutputStream { private final Logger logger = LoggerFactory.getLogger(S3OutputStream.class); - public static final int DEFAULT_PART_SIZE = (int) Scale.MiB.asBytes(5); - private final AmazonS3 client; private final ByteBuffer byteBuffer; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 189655569..f9ff0d868 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -42,7 +42,6 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.output.OutputWriter; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; -import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.config.S3SinkConfig; @@ -97,7 +96,7 @@ private AmazonS3 createAmazonS3Client(final S3SinkConfig config) { Math.toIntExact(config.getS3RetryBackoffMaxDelayMs())), config.getS3RetryBackoffMaxRetries(), false)); final var s3ClientBuilder = AmazonS3ClientBuilder.standard() - .withCredentials(credentialFactory.getProvider(new S3ConfigFragment(config))) + .withCredentials(credentialFactory.getProvider(config.getS3ConfigFragment())) .withClientConfiguration(clientConfig); if (Objects.isNull(awsEndpointConfig)) { s3ClientBuilder.withRegion(config.getAwsS3Region().getName()); diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index 5780705e7..5c453fa2b 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -17,34 +17,31 @@ package io.aiven.kafka.connect.s3.config; import java.time.ZoneId; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; - import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.FileNameFragment; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.OutputField; import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.templating.Template; -import io.aiven.kafka.connect.config.s3.S3CommonConfig; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.config.s3.S3SinkBaseConfig; +import io.aiven.kafka.connect.iam.AwsStsRole; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Region; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.regions.Regions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({ "PMD.TooManyMethods", "PMD.GodClass", "PMD.ExcessiveImports", "PMD.TooManyStaticImports" }) -final public class S3SinkConfig extends S3SinkBaseConfig { +public final class S3SinkConfig extends SinkCommonConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkConfig.class); @@ -59,12 +56,19 @@ final public class S3SinkConfig extends S3SinkBaseConfig { // in other words we can't use values greater than 30 public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; + private final S3ConfigFragment s3ConfigFragment; + public S3SinkConfig(final Map properties) { super(new S3SinkConfigDef(), preprocessProperties(properties)); + s3ConfigFragment = new S3ConfigFragment(FragmentDataAccess.from(this)); + } + + public S3ConfigFragment getS3ConfigFragment() { + return s3ConfigFragment; } static Map preprocessProperties(final Map properties) { - final Map result = S3ConfigFragment.handleDeprecations(properties); + return S3ConfigFragment.handleDeprecatedOptions(properties); } @Override @@ -81,6 +85,57 @@ public CompressionType getCompressionType() { return CompressionType.GZIP; } + public long getS3RetryBackoffDelayMs() { + return s3ConfigFragment.getS3RetryBackoffDelayMs(); + } + + public long getS3RetryBackoffMaxDelayMs() { + return s3ConfigFragment.getS3RetryBackoffMaxDelayMs(); + } + + public int getS3RetryBackoffMaxRetries() { + return s3ConfigFragment.getS3RetryBackoffMaxRetries(); + } + + @Deprecated + public Region getAwsS3Region() { + return s3ConfigFragment.getAwsS3Region(); + } + + public String getAwsS3BucketName() { + return s3ConfigFragment.getAwsS3BucketName(); + } + + public int getAwsS3PartSize() { + return s3ConfigFragment.getAwsS3PartSize(); + } + + public String getServerSideEncryptionAlgorithmName() { + return s3ConfigFragment.getServerSideEncryptionAlgorithmName(); + } + + public String getAwsS3EndPoint() { + return s3ConfigFragment.getAwsS3EndPoint(); + } + + @Deprecated + public BasicAWSCredentials getAwsCredentials() { + return s3ConfigFragment.getAwsCredentials(); + } + + @Deprecated + public AWSCredentialsProvider getCustomCredentialsProvider() { + return s3ConfigFragment.getCustomCredentialsProvider(); + } + + public String getAwsS3Prefix() { + return s3ConfigFragment.getAwsS3Prefix(); + } + + public AwsStsRole getStsRole() { + return s3ConfigFragment.getStsRole(); + } + /** * Gets the list of output fields. Will check {OutputFormatFragment#FORMAT_OUTPUT_FIELDS_CONFIG} and then * {OutputFormatFragment#OUTPUT_FIELDS}. If neither is set will create an output field of @@ -119,7 +174,7 @@ public List getOutputFields(final String format) { } public Template getPrefixTemplate() { - final var template = Template.of(getAwsS3Prefix()); + final var template = Template.of(s3ConfigFragment.getAwsS3Prefix()); template.instance().bindVariable("utc_date", () -> { LOGGER.info("utc_date variable is deprecated please read documentation for the new name"); return ""; @@ -138,27 +193,6 @@ public TimestampSource getTimestampSource() { return TimestampSource.of(getTimezone(), TimestampSource.Type.of(getString(S3ConfigFragment.TIMESTAMP_SOURCE))); } - /** - * Deprecated please use S3ConfigFragment.AwsRegionValidator - */ - @Deprecated - protected static class AwsRegionValidator implements ConfigDef.Validator { - private static final String SUPPORTED_AWS_REGIONS = Arrays.stream(Regions.values()) - .map(Regions::getName) - .collect(Collectors.joining(", ")); - - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.nonNull(value)) { - final String valueStr = (String) value; - final Region region = RegionUtils.getRegion(valueStr); - if (!RegionUtils.getRegions().contains(region)) { - throw new ConfigException(name, valueStr, "supported values are: " + SUPPORTED_AWS_REGIONS); - } - } - } - } - public Boolean usesFileNameTemplate() { return Objects.isNull(getString(S3ConfigFragment.AWS_S3_PREFIX_CONFIG)) && Objects.isNull(getString(S3ConfigFragment.AWS_S3_PREFIX)); diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java index 31059e649..13d3d4b60 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfigDef.java @@ -16,67 +16,27 @@ package io.aiven.kafka.connect.s3.config; -import java.time.ZoneOffset; -import java.util.List; import java.util.Map; -import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.common.config.SinkCommonConfig; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; -import io.aiven.commons.collections.Scale; -import io.aiven.kafka.connect.common.config.FileNameFragment; -import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.TimestampSource; -import io.aiven.kafka.connect.common.config.validators.ScaleValidator; -import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; -import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.s3.S3OutputStream; public class S3SinkConfigDef extends SinkCommonConfig.SinkCommonConfigDef { - private static final String GROUP_AWS = "AWS"; - private static final String GROUP_FILE = "File"; - public S3SinkConfigDef() { super(null, CompressionType.GZIP); - S3ConfigFragment.update(this); - addS3partSizeConfig(this); - addDeprecatedTimestampConfig(this); - } - -// @Override -// public List validate(final Map props) { -// return super.validate(S3SinkConfig.preprocessProperties(props)); -// } - - private static void addDeprecatedTimestampConfig(final ConfigDef configDef) { - int timestampGroupCounter = 0; - - configDef.define(S3ConfigFragment.TIMESTAMP_TIMEZONE, Type.STRING, ZoneOffset.UTC.toString(), - new TimeZoneValidator(), Importance.LOW, - "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " - + "Use standard shot and long names. Default is UTC", - GROUP_FILE, ++timestampGroupCounter, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_TIMEZONE); - - configDef.define(S3ConfigFragment.TIMESTAMP_SOURCE, Type.STRING, TimestampSource.Type.WALLCLOCK.name(), - new TimestampSourceValidator(), Importance.LOW, - "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, - ++timestampGroupCounter, ConfigDef.Width.SHORT, S3ConfigFragment.TIMESTAMP_SOURCE); + S3ConfigFragment.update(this, true); } - private static void addS3partSizeConfig(final ConfigDef configDef) { - - // add awsS3SinkCounter if more S3 Sink Specific config is added - // This is used to set orderInGroup - configDef.define(S3ConfigFragment.AWS_S3_PART_SIZE, Type.LONG, S3OutputStream.DEFAULT_PART_SIZE, - ScaleValidator.between(Scale.MiB.asBytes(1), Integer.MAX_VALUE, Scale.IEC), Importance.MEDIUM, - "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " - + Scale.scaleOf(Integer.MAX_VALUE, Scale.IEC) + " and default is " - + Scale.size(S3OutputStream.DEFAULT_PART_SIZE, Scale.IEC), - GROUP_AWS, 0, ConfigDef.Width.NONE, S3ConfigFragment.AWS_S3_PART_SIZE); + @Override + public Map multiValidate(final Map valueMap) { + final Map values = super.multiValidate(valueMap); + final FragmentDataAccess fragmentDataAccess = FragmentDataAccess.from(valueMap); + new S3ConfigFragment(fragmentDataAccess).validate(values); + return values; } - } diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java index ecdc42688..5f58a3052 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java @@ -37,7 +37,6 @@ import io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs; import io.aiven.kafka.connect.common.config.StableTimeFormatter; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.s3.S3OutputStream; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; @@ -87,7 +86,7 @@ void correctFullConfig() { new OutputField(OutputFieldType.TIMESTAMP, OutputFieldEncodingType.NONE), new OutputField(OutputFieldType.HEADERS, OutputFieldEncodingType.NONE)); assertThat(conf.getFormatType()).isEqualTo(FormatType.forName("csv")); - assertThat(conf.getAwsS3PartSize()).isEqualTo(S3OutputStream.DEFAULT_PART_SIZE); + assertThat(conf.getAwsS3PartSize()).isEqualTo(S3ConfigFragment.DEFAULT_PART_SIZE); assertThat(conf.getKafkaRetryBackoffMs()).isNull(); assertThat(conf.getS3RetryBackoffDelayMs()).isEqualTo(S3SinkConfig.AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT); assertThat(conf.getS3RetryBackoffMaxDelayMs()) @@ -671,7 +670,7 @@ void stsEndpointShouldNotBeSetWithoutRegion() { props.put(S3ConfigFragment.AWS_STS_CONFIG_ENDPOINT, "https://sts.eu-north-1.amazonaws.com"); assertThatThrownBy(() -> new S3SinkConfig(props)).isInstanceOf(ConfigException.class) - .hasMessage("aws.s3.region should be specified together with aws.sts.config.endpoint"); + .hasMessageContaining("aws.s3.region should be specified together with aws.sts.config.endpoint"); } @ParameterizedTest diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceConnector.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceConnector.java index b5dd8c312..e7c96f728 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceConnector.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceConnector.java @@ -28,7 +28,7 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfigDef; import io.aiven.kafka.connect.s3.source.utils.Version; import org.slf4j.Logger; @@ -46,7 +46,7 @@ public class S3SourceConnector extends SourceConnector { @Override public ConfigDef config() { - return S3SourceConfig.configDef(); + return new S3SourceConfigDef(); } @Override diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 95f40882e..9aa0e6ef7 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -16,10 +16,9 @@ package io.aiven.kafka.connect.s3.source.config; -import static io.aiven.kafka.connect.config.s3.S3CommonConfig.handleDeprecatedYyyyUppercase; - import java.util.Map; +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; @@ -37,25 +36,9 @@ final public class S3SourceConfig extends SourceCommonConfig { private final AwsCredentialProviderFactory awsCredentialsProviderFactory; public S3SourceConfig(final Map properties) { - super(configDef(), handleDeprecatedYyyyUppercase(properties)); - s3ConfigFragment = new S3ConfigFragment(this); + super(new S3SourceConfigDef(), properties); + s3ConfigFragment = new S3ConfigFragment(FragmentDataAccess.from(this)); awsCredentialsProviderFactory = new AwsCredentialProviderFactory(); - validate(); // NOPMD ConstructorCallsOverridableMethod getStsRole is called - } - - public static S3SourceConfigDef configDef() { - final var configDef = new S3SourceConfigDef(); - S3ConfigFragment.update(configDef); - return configDef; - } - - private void validate() { - - // s3ConfigFragment is validated in this method as it is created here. - // Other Fragments created in the ConfigDef are validated in the parent classes their instances are created in. - // e.g. SourceConfigFragment, FileNameFragment, TransformerFragment and OutputFormatFragment are all - // validated in SourceCommonConfig. - s3ConfigFragment.validate(); } public AwsStsRole getStsRole() { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java index c0ead6b37..126979cce 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java @@ -16,8 +16,26 @@ package io.aiven.kafka.connect.s3.source.config; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigValue; + +import io.aiven.kafka.connect.common.config.FragmentDataAccess; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.config.s3.S3ConfigFragment; + +public final class S3SourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { -public class S3SourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { + public S3SourceConfigDef() { + super(); + S3ConfigFragment.update(this, false); + } + @Override + public Map multiValidate(final Map valueMap) { + final Map values = super.multiValidate(valueMap); + final FragmentDataAccess fragmentDataAccess = FragmentDataAccess.from(valueMap); + new S3ConfigFragment(fragmentDataAccess).validate(values); + return values; + } } From e2714afc2614e3ea06e29ad710029a0792793850 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Thu, 20 Nov 2025 15:46:27 +0000 Subject: [PATCH 08/13] spotless changes --- .../aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java index 126979cce..7b0b00e7a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java @@ -26,7 +26,7 @@ public final class S3SourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { - public S3SourceConfigDef() { + public S3SourceConfigDef() { super(); S3ConfigFragment.update(this, false); } From 7cbcedd463a5a7318dc27a3abc8aed4fb333a523 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Thu, 20 Nov 2025 15:55:31 +0000 Subject: [PATCH 09/13] spotless changes --- .../aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java index 7b0b00e7a..126979cce 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigDef.java @@ -26,7 +26,7 @@ public final class S3SourceConfigDef extends SourceCommonConfig.SourceCommonConfigDef { - public S3SourceConfigDef() { + public S3SourceConfigDef() { super(); S3ConfigFragment.update(this, false); } From e3f0ce88b2b9c741ddfa9793da843f6602177812 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Fri, 21 Nov 2025 13:12:27 +0000 Subject: [PATCH 10/13] updated timescales --- .../io/aiven/kafka/connect/config/s3/S3ConfigFragment.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 99893ca5f..291828ce7 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.stream.Collectors; +import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; @@ -182,13 +183,13 @@ public static Setter setter(final Map configData) { static void addS3RetryPolicies(final ConfigDef configDef) { var retryPolicyGroupCounter = 0; configDef.define(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.MEDIUM, + AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT, TimeScaleValidator.atLeast(1), ConfigDef.Importance.MEDIUM, "S3 default base sleep time for non-throttled exceptions in milliseconds. " + "Default is " + AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT + ".", GROUP_S3_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG); configDef.define(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, ConfigDef.Type.LONG, - AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.MEDIUM, + AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, TimeScaleValidator.atLeast(1), ConfigDef.Importance.MEDIUM, "S3 maximum back-off time before retrying a request in milliseconds. " + "Default is " + AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT + ".", GROUP_S3_RETRY_BACKOFF_POLICY, ++retryPolicyGroupCounter, ConfigDef.Width.NONE, From 21e9aac3e87a478bbbbed7ccff5a4689ad11a8d6 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Fri, 21 Nov 2025 14:22:14 +0000 Subject: [PATCH 11/13] updated validators --- .../io/aiven/kafka/connect/config/s3/S3ConfigFragment.java | 2 +- .../io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 291828ce7..dc365e13f 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -24,7 +24,6 @@ import java.util.Objects; import java.util.stream.Collectors; -import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; @@ -40,6 +39,7 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; import io.aiven.kafka.connect.common.config.validators.ScaleValidator; +import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator; import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; import io.aiven.kafka.connect.common.config.validators.UrlValidator; diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java index 5f58a3052..a0d7e52ef 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java @@ -435,14 +435,16 @@ void wrongAwsS3BackoffPolicy() { S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah", S3ConfigFragment.AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG, "0"); assertThatThrownBy(() -> new S3SinkConfig(wrongDelayProps)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value 0 for configuration aws.s3.backoff.delay.ms: Value must be at least 1"); + .hasMessage( + "Invalid value 0 for configuration aws.s3.backoff.delay.ms: Value must be at least 1 Milliseconds"); final var wrongMaxDelayProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah", S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah", S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah", S3ConfigFragment.AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, "0"); assertThatThrownBy(() -> new S3SinkConfig(wrongMaxDelayProps)).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value 0 for configuration aws.s3.backoff.max.delay.ms: Value must be at least 1"); + .hasMessage( + "Invalid value 0 for configuration aws.s3.backoff.max.delay.ms: Value must be at least 1 Milliseconds"); final var wrongMaxRetriesProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah", S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah", From 45dd6010812fd45e07839f0501fd3bd351ccb005 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Tue, 25 Nov 2025 10:12:17 +0000 Subject: [PATCH 12/13] fixed spotless issus --- .../main/java/io/aiven/commons/collections/TimeScale.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/commons/src/main/java/io/aiven/commons/collections/TimeScale.java b/commons/src/main/java/io/aiven/commons/collections/TimeScale.java index 42ddc30e0..65c3e507d 100644 --- a/commons/src/main/java/io/aiven/commons/collections/TimeScale.java +++ b/commons/src/main/java/io/aiven/commons/collections/TimeScale.java @@ -35,11 +35,10 @@ public String format(final long milliseconds) { } }, // SECONDS(MILLISECONDS.milliseconds * 1000), // - MINUTES( SECONDS.milliseconds * 60), // - HOURS(MINUTES.milliseconds * 60 ), // + MINUTES(SECONDS.milliseconds * 60), // + HOURS(MINUTES.milliseconds * 60), // DAYS(HOURS.milliseconds * 24); - /** * The Decimal format for the TimeUnit displays. */ From 601798708f52ac02c8ea21aaa2435ea13da50188 Mon Sep 17 00:00:00 2001 From: Claude Warren Date: Tue, 25 Nov 2025 10:45:37 +0000 Subject: [PATCH 13/13] removed dead code --- .../connect/iam/AwsCredentialTestingConfig.java | 14 -------------- .../kafka/connect/s3/config/S3SinkConfigTest.java | 4 ---- 2 files changed, 18 deletions(-) diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java index 5aef87ea1..7e917b06e 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialTestingConfig.java @@ -39,19 +39,6 @@ S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } - // private static ConfigDef getBaseConfigDefinition() { - // final ConfigDef definition = new ConfigDef(); - // addOutputFieldsFormatConfigGroup(definition, OutputFieldType.VALUE); - // definition.define(FileNameFragment.FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, - // ConfigDef.Importance.MEDIUM, "File name template"); - // definition.define(FileNameFragment.FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, - // CompressionType.NONE.name, ConfigDef.Importance.MEDIUM, "File compression"); - // definition.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, - // "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " - // + "0 is interpreted as \"unlimited\", which is the default."); - // return definition; - // } - // public static class CredTestingDef extends SinkCommonConfig.SinkCommonConfigDef { public CredTestingDef() { @@ -59,5 +46,4 @@ public CredTestingDef() { S3ConfigFragment.update(this, true); } } - } diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java index a0d7e52ef..88d06b8d0 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java @@ -182,10 +182,6 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() { @Test void wrongPartSize() { final Map wrongPartSizeProps = new HashMap<>(); - // Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-key-id", - // S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "bla-bla-access-key", S3ConfigFragment.AWS_S3_PART_SIZE, - // Long.toString(2_000_000_001L), - // S3ConfigFragment.setter()); S3ConfigFragment.setter(wrongPartSizeProps) .accessKeyId("blah-blah-key-id") .accessKeySecret("bla-bla-access-key")