diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AbstractIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AbstractIntegrationTest.java index 41debb191..79e463f93 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AbstractIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AbstractIntegrationTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase; +import io.aiven.commons.kafka.testkit.KafkaManager; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java index dd4875a9a..ac2d93259 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; diff --git a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java index 0110dda06..2e6f2ce03 100644 --- a/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java +++ b/azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/ParquetIntegrationTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; diff --git a/azure-source-connector/src/test/java/io/aiven/kafka/connect/azure/source/utils/AzureBlobClientTest.java b/azure-source-connector/src/test/java/io/aiven/kafka/connect/azure/source/utils/AzureBlobClientTest.java index d188bf9ac..d39c61af8 100644 --- a/azure-source-connector/src/test/java/io/aiven/kafka/connect/azure/source/utils/AzureBlobClientTest.java +++ b/azure-source-connector/src/test/java/io/aiven/kafka/connect/azure/source/utils/AzureBlobClientTest.java @@ -57,6 +57,7 @@ class AzureBlobClientTest { private BlobContainerAsyncClient containerClient; private BlobAsyncClient blobClient; + @BeforeEach public void setup() { this.config = mock(AzureBlobSourceConfig.class); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java index 669aca3fd..d65140617 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CompressionType.java @@ -117,7 +117,7 @@ public static Collection names() { } /** - * Gets the file name extension associated with the compression. + * Gets the file name extension associated with the compression. Includes the '.' separator. * * @return the file name extension. */ diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FormatType.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FormatType.java index 2f1714952..ccd6406c5 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FormatType.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FormatType.java @@ -34,15 +34,15 @@ public enum FormatType { /** Handles in Avro format */ - AVRO("avro", (stream, fields, config, envelope) -> new AvroOutputWriter(fields, stream, config, envelope)), + AVRO("avro", "avro", (stream, fields, config, envelope) -> new AvroOutputWriter(fields, stream, config, envelope)), /** Handles in CSV format */ - CSV("csv", (stream, fields, config, envelope) -> new PlainOutputWriter(fields, stream)), + CSV("csv", "", (stream, fields, config, envelope) -> new PlainOutputWriter(fields, stream)), /** Handles in JSON format */ - JSON("json", (stream, fields, config, envelope) -> new JsonOutputWriter(fields, stream, envelope)), + JSON("json", "", (stream, fields, config, envelope) -> new JsonOutputWriter(fields, stream, envelope)), /** Handles in JSONL format */ - JSONL("jsonl", (stream, fields, config, envelope) -> new JsonLinesOutputWriter(fields, stream, envelope)), + JSONL("jsonl", "", (stream, fields, config, envelope) -> new JsonLinesOutputWriter(fields, stream, envelope)), /** Handles Parquet format */ - PARQUET("parquet", (stream, fields, config, envelope) -> new ParquetOutputWriter(fields, stream, config, envelope)); + PARQUET("parquet", "", (stream, fields, config, envelope) -> new ParquetOutputWriter(fields, stream, config, envelope)); /** * A list of supported format types for display. @@ -57,6 +57,11 @@ public enum FormatType { */ public final String name; + /** + * File name segment to be added into file names, may be an empty string. Does not include separator. + */ + final String fileNameSegment; + /** The writer constructor for this type */ private final WriterConstructor writerConstructor; @@ -68,11 +73,16 @@ public enum FormatType { * @param writerConstructor * the writer constructor for this type. */ - FormatType(final String name, final WriterConstructor writerConstructor) { + FormatType(final String name, final String fileNameSegment, final WriterConstructor writerConstructor) { this.name = name; + this.fileNameSegment = fileNameSegment; this.writerConstructor = writerConstructor; } + public String getFileNameSegment() { + return fileNameSegment; + } + /** * Gets the format type by name. * diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java index 730b971c0..68ac24a80 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java @@ -27,8 +27,11 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.templating.Template; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class SchemaBasedTopicPartitionKeyRecordGrouper extends TopicPartitionKeyRecordGrouper { + private static final Logger LOG = LoggerFactory.getLogger(SchemaBasedTopicPartitionKeyRecordGrouper.class); private final SchemaBasedRotator schemaBasedRotator = new SchemaBasedRotator(); @@ -39,6 +42,7 @@ final class SchemaBasedTopicPartitionKeyRecordGrouper extends TopicPartitionKeyR @Override protected String resolveRecordKeyFor(final SinkRecord record) { + LOG.debug("resolveRecordKeyFor()"); if (schemaBasedRotator.rotate(record)) { return generateNewRecordKey(record); } else { @@ -53,7 +57,7 @@ public void clear() { } private static final class SchemaBasedRotator implements Rotator { - + private static final Logger LOG = LoggerFactory.getLogger(SchemaBasedRotator.class); private final Map keyValueSchemas = new HashMap<>(); @Override @@ -64,7 +68,8 @@ public boolean rotate(final SinkRecord record) { final var key = recordKey(record); final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); final var keyValueVersion = keyValueSchemas.computeIfAbsent(tpk, - ignored -> new KeyValueSchema(record.keySchema(), record.valueSchema())); + ignored -> {LOG.debug("Creating new KeyValueSchema"); return new KeyValueSchema(record.keySchema(), record.valueSchema());}); + final var schemaChanged = !keyValueVersion.keySchema.equals(record.keySchema()) || !keyValueVersion.valueSchema.equals(record.valueSchema()); if (schemaChanged) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouper.java index 675fd1433..6b953d20b 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouper.java @@ -27,10 +27,13 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.templating.Template; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class SchemaBasedTopicPartitionRecordGrouper extends TopicPartitionRecordGrouper { private final SchemaBasedRotator schemaBasedRotator = new SchemaBasedRotator(); + private final static Logger LOG = LoggerFactory.getLogger(SchemaBasedTopicPartitionRecordGrouper.class); SchemaBasedTopicPartitionRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile, final TimestampSource tsSource) { @@ -39,6 +42,7 @@ final class SchemaBasedTopicPartitionRecordGrouper extends TopicPartitionRecordG @Override protected String resolveRecordKeyFor(final SinkRecord record) { + LOG.debug("Checking schema based rotator"); if (schemaBasedRotator.rotate(record)) { return generateNewRecordKey(record); } else { @@ -61,12 +65,15 @@ public boolean rotate(final SinkRecord record) { if (Objects.isNull(record.valueSchema()) || Objects.isNull(record.keySchema())) { throw new SchemaProjectorException("Record must have schemas for key and value"); } + final KeyValueSchema recordSchemas = new KeyValueSchema(record.keySchema(), record.valueSchema()); final var topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - final var keyValueVersion = keyValueSchemas.computeIfAbsent(topicPartition, - ignored -> new KeyValueSchema(record.keySchema(), record.valueSchema())); - final var schemaChanged = !keyValueVersion.keySchema.equals(record.keySchema()) - || !keyValueVersion.valueSchema.equals(record.valueSchema()); + final KeyValueSchema topicPartitionSchemas = keyValueSchemas.computeIfAbsent(topicPartition, + ignored -> recordSchemas); + LOG.debug("comparing keys {} to {}", topicPartitionSchemas, recordSchemas); + + final var schemaChanged = ! topicPartitionSchemas.equals(recordSchemas); if (schemaChanged) { + LOG.debug("Schema change detected for topic partition {}", topicPartition); keyValueSchemas.put(topicPartition, new KeyValueSchema(record.keySchema(), record.valueSchema())); } return schemaChanged; @@ -99,6 +106,11 @@ public boolean equals(final Object other) { public int hashCode() { return Objects.hash(keySchema, valueSchema); } + + @Override + public String toString() { + return String.format("KeyValueSchema{keySchema=%s, valueSchema=%s}", keySchema, valueSchema); + } } void clear() { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java index 1b3737557..84e67dd72 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -33,9 +33,13 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TopicPartitionKeyRecordGrouper implements RecordGrouper { + private static final Logger LOG = LoggerFactory.getLogger(TopicPartitionKeyRecordGrouper.class); + private final Template filenameTemplate; private final Map currentHeadRecords = new HashMap<>(); @@ -68,12 +72,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); + LOG.debug("put({}}", recordKey); fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); } protected String resolveRecordKeyFor(final SinkRecord record) { final var key = recordKey(record); - + LOG.debug("resolveRecordKeyFor(): {}", key); final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java index 994baa62f..8de5de986 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java @@ -32,6 +32,8 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link RecordGrouper} that groups records by topic and partition. @@ -44,6 +46,7 @@ * The class supports limited and unlimited number of records in files. */ class TopicPartitionRecordGrouper implements RecordGrouper { + private final Logger LOG = LoggerFactory.getLogger(this.getClass()); private final Template filenameTemplate; @@ -87,6 +90,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); + LOG.debug("{} key is {}", record.kafkaOffset(), recordKey); fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java index 33b12c2b8..c0b6269db 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java @@ -570,7 +570,7 @@ public interface BackoffConfig { /** * Gets the abort timer rule flag. If there is no timer that may expire and shorten the time for the delay then * this value should be {@code false} otherwise if the delay time will exceed the maximum time remaining no - * delay is executed. By default, the false is {@code true}. + * delay is executed. By default, the flag is {@code true}. * * @return The abort time rule flag. */ diff --git a/commons/src/testFixtures/java/io/aiven/commons/kafka/testkit/KafkaConnectRunner.java b/commons/src/testFixtures/java/io/aiven/commons/kafka/testkit/KafkaConnectRunner.java index 3c91fc90a..b307bdf92 100644 --- a/commons/src/testFixtures/java/io/aiven/commons/kafka/testkit/KafkaConnectRunner.java +++ b/commons/src/testFixtures/java/io/aiven/commons/kafka/testkit/KafkaConnectRunner.java @@ -233,7 +233,7 @@ public void stopConnectCluster() { * @param connectorName * The name for the connector. * @param connectorConfig - * the map of data items for the configuraiton of the connector. + * the map of data items for the configuration of the connector. * @return the result of the cluster configuration call. */ public String configureConnector(final String connectorName, final Map connectorConfig) { @@ -291,7 +291,7 @@ public void restartConnector(final String connectorName) { */ public Map getWorkerProperties(final Class connectorClass) { final Map workerProperties = new HashMap<>(); - workerProperties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getName()); + workerProperties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getCanonicalName()); workerProperties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getCanonicalName()); workerProperties.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, Long.toString(offsetFlushInterval.toMillis())); diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/StringHelpers.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/StringHelpers.java new file mode 100644 index 000000000..bf223c03a --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/StringHelpers.java @@ -0,0 +1,10 @@ +package io.aiven.kafka.connect.common; + +public class StringHelpers { + private StringHelpers() { + // do not instantiate. + } + public static final String quoted(String s) { + return "\"" + s + "\""; + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/AvroTestDataFixture.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/AvroTestDataFixture.java index 1b81cb02a..0deaf5547 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/AvroTestDataFixture.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/AvroTestDataFixture.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import java.util.function.IntFunction; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -129,10 +130,11 @@ public static List generateAvroRecords(final int numRecs) { * * @param numRecs * the numer of records to generate + * @param recordCreator A function to convert the record number into a generic record. * @return A byte array containing the specified number of records. */ public static List generateAvroRecords(final int numRecs, - final Function recordCreator) { + final IntFunction recordCreator) { return generateAvroRecords(0, numRecs, recordCreator); } @@ -158,11 +160,12 @@ public static List generateAvroRecords(final int messageId, final * the messageId to start with. * @param numOfRecs * the number of records to write. + * @param recordCreator The creator of the generic records * @return A byte array containing the specified number of records. */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") public static List generateAvroRecords(final int messageId, final int numOfRecs, - final Function recordCreator) { + final IntFunction recordCreator) { // Create Avro records final List avroRecords = new ArrayList<>(); final int limit = messageId + numOfRecs; @@ -196,6 +199,9 @@ public static GenericRecord generateAvroRecord(final int messageId, final Schema final GenericRecord avroRecord = new GenericData.Record(schema); avroRecord.put("message", "Hello, from Avro Test Data Fixture! object " + messageId); avroRecord.put("id", messageId); + if (schema.getField("age") != null) { + avroRecord.put("age", messageId); + } return avroRecord; } diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/JsonTestDataFixture.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/JsonTestDataFixture.java index b93b3777d..eebccfb31 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/JsonTestDataFixture.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/JsonTestDataFixture.java @@ -39,17 +39,30 @@ */ final public class JsonTestDataFixture { + private final static String MSG_FORMAT = "{\"id\" : %s, \"message\" : \"%s\", \"value\" : \"value%s\"}%n"; + /** + * A schema with fields {@code id} of type {@code int32} and {@code message} of type {@code string}. + */ public static final String SCHEMA_JSON = "{\n \"type\": \"struct\", \"name\": \"TestRecord\",\n " + " \"fields\": [\n {\"field\": \"message\", \"type\": \"string\"},\n" + " {\"field\": \"id\", \"type\": \"int32\"}\n ]\n}"; + public static final String CONNECT_EXTRA_SCHEMA_JSON = "{\n \"type\": \"struct\",\n \"name\": \"TestRecord\",\n" + " \"fields\": [\n {\"name\": \"message\", \"type\": \"string\"},\n" + " {\"name\": \"id\", \"type\": \"int32\"}\n ],\n" + " \"connect.version\":1, \"connect.name\": \"TestRecord\"}\n"; + /** + * message format for the {@link #EVOLVED_SCHEMA_JSON} with an added "value" field. + */ + private final static String EVOLVED_MSG_FORMAT = "{\"id\" : %s, \"message\" : \"%s\", \"value\" : \"value%s\", \"age\" : %s}%n"; + + /** + * A schema with fields from {@link #SCHEMA_JSON} and added field of {@code age} of type {@code int32} and default of {@code 0}. + */ public static final String EVOLVED_SCHEMA_JSON = "{\n \"type\": \"struct\",\n \"name\": \"TestRecord\",\n" + " \"fields\": [\n {\"field\": \"message\", \"type\": \"string\"},\n" + " {\"field\": \"id\", \"type\": \"int32\"},\n" @@ -83,7 +96,9 @@ public static String generateJsonRecs(final int recordCount) { } /** - * Generates a single JSON record + * Generates a single JSON record with the structure + * {@code {"id":[messageId], "message":"[msg]", "value":"value[messagId]"}\n}. + * The produced record has the "value" field that is not included in the schema. * * @param messageId * the id for the record @@ -95,6 +110,21 @@ public static String generateJsonRec(final int messageId, final String msg) { return String.format(MSG_FORMAT, messageId, msg, messageId); } + /** + * Generates a single evolved JSON record with the structure + * {@code {"id":[messageId], "message":"[msg]", "value":"value[messageId]", "age":[messageId]*10}\n}. + * The produced record has the "value" field that is not included in the schema. + * + * @param messageId + * the id for the record + * @param msg + * the message for the record + * @return a standard JSON test record. + */ + public static String generateEvolvedJsonRec(final int messageId, final String msg) { + return String.format(EVOLVED_MSG_FORMAT, messageId, msg, messageId, messageId * 10); + } + /** * Creates Json test data. * diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/ParquetTestDataFixture.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/ParquetTestDataFixture.java index 135e77d9a..d523cb62b 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/ParquetTestDataFixture.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/format/ParquetTestDataFixture.java @@ -19,9 +19,11 @@ import static org.apache.kafka.connect.data.Schema.INT32_SCHEMA; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; @@ -30,6 +32,7 @@ import java.util.Collections; import java.util.List; +import org.apache.avro.generic.GenericData; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; @@ -94,6 +97,33 @@ public final class ParquetTestDataFixture { private ParquetTestDataFixture() { // do not instantiate } + + /** + * Generate an avro record with the specified message id using the specified schema + * + * @param messageId + * the message id. + * @param schema + * the schaema to use. + * @return a GenericRecord with the specified data and schema. + */ + public static Struct generateParquetRecord(final int messageId) { + return generateParquetRecord(messageId, "name-", messageId); + } + + /** + * Generate an avro record with the specified message id using the specified schema + * + * @param messageId + * the message id. + * @param name the name prefix. Final value will be {@code }name+messageId}. + * @param age the age value + * @return a GenericRecord with the specified data and schema. + */ + public static Struct generateParquetRecord(final int messageId, final String name, final int age) { + return new Struct(PARQUET_SCHEMA).put("name", name + messageId).put("age", age).put("email", name + "@test"); + } + /** * Generate the specified number of parquet records in a byte array. * @@ -112,8 +142,7 @@ public static byte[] generateParquetData(final String name, final int numOfRecor final List allParquetRecords = new ArrayList<>(); // Write records to the Parquet file for (int i = 0; i < numOfRecords; i++) { - allParquetRecords - .add(new Struct(PARQUET_SCHEMA).put("name", name + i).put("age", 30).put("email", name + "@test")); + allParquetRecords.add(generateParquetRecord(i, name, 30)); } // Create a Parquet writer diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkAvroIntegrationTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkAvroIntegrationTest.java new file mode 100644 index 000000000..39fd302f2 --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkAvroIntegrationTest.java @@ -0,0 +1,312 @@ +package io.aiven.kafka.connect.common.integration.sink; + + +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.format.AvroTestDataFixture; + +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; + + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractSinkAvroIntegrationTest, N> extends AbstractSinkGenericRecordIntegrationTest { + + protected AbstractSinkAvroIntegrationTest() { + super(FormatType.AVRO); + } +// public enum AvroCodec { NULL, DEFLATE, BZIP2, SNAPPY, XZ, ZSTANDARD}; + + /** + * A record of the key, value and partition as written to kafka. + * Equality is only checked against key and value. + */ + public static class Record implements Comparable { + private final String key; + private final GenericRecord value; + private final int partition; + + /** + * Constructor + * @param key the key. + * @param value the value + * @param partition the partition + */ + public Record(String key, GenericRecord value, int partition) { + Objects.requireNonNull(value, "value must not be null"); + this.key = key; + this.value = value; + this.partition = partition; + } + + /** + * Gest the key value + * @return the key value. + */ + public String getKey() { + return key; + } + + /** + * Gets the value. + * @return the value. + */ + public GenericRecord getValue() { + return value; + } + + /** + * Gets the partition. + * @return the partition. + */ + public int getPartition() { + return partition; + } + + @Override + public int compareTo(Record o) { + int result = value.toString().compareTo(o.value.toString()); +// String[] schema = value.getSchema().getFields().stream().map(Schema.Field::name).toArray(String[]::new); +// String[] schema2 = o.value.getSchema().getFields().stream().map(Schema.Field::name).toArray(String[]::new); +// int result = Arrays.compare(schema, schema2); +// if (result == 0) { +// Arrays.stream(schema).map(s -> value.hasField(s) ? value.get(s).toString() : null). +// +// } +// Comparator.(value.getSchema().getFields(), o.value.getSchema().getFields()); +// Map.value.getSchema().getObjectProps() +// int result = value.String.compare(value, o.value); + return result == 0 ? key.compareTo(o.key) : result; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof Record) { + return this.compareTo((Record) o) == 0; + } + return false; + } + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public String toString() { + return String.format("Record(key=%s, value=%s, partition=%d)", key, value, partition); + } + } + +// @Override +// protected Map basicConnectorConfig() { +// final Map config = super.basicConnectorConfig(); +// config.put("format.output.type", FormatType.AVRO.name); +// config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); +// config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); +// config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); +// config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); +// return config; +// } + +// @Override +// protected final Map getProducerConfig() { +// Map props = new HashMap<>(); +// props.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, +// KafkaAvroSerializer.class.getName()); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, +// KafkaAvroSerializer.class.getName()); +// return props; +// } + + @Test + void avroOutput() throws IOException { + final AvroSerDe serializer = new AvroSerDe(); + CompressionType compressionType = CompressionType.NONE; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + List avroRecords = serializer.sendRecords(40, Duration.ofSeconds(45)); + + final Map> expectedBlobs = new HashMap<>(); + avroRecords.forEach(record -> { + expectedBlobs.compute(getNativeKey(record.getPartition(), 0, CompressionType.NONE, FormatType.AVRO), (k, v) -> v == null ? new ArrayList<>() : v) + .add(record); + }); + + awaitAllBlobsWritten(expectedBlobs.keySet(), Duration.ofSeconds(45)); + + for (final K nativeKey : expectedBlobs.keySet()) { + final List items = serializer.extractRecords(bucketAccessor.readBytes(nativeKey)); + assertThat(items).containsExactlyInAnyOrderElementsOf(expectedBlobs.get(nativeKey)); + } + } + + @Test + void schemaChanged() throws IOException { + final AvroSerDe serializer = new AvroSerDe(); + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.envelope", "false"); + connectorConfig.put("format.output.fields", "value"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + createConnector(connectorConfig); + + // write one record with old schema, one with new, and one with old again. All written into partition 0 + List avroRecords = serializer.sendRecords(3, 1, Duration.ofSeconds(3), (i) -> { + // new schema every 3 records + boolean newSchema = (i % 2) == 1; + GenericRecord value = new GenericData.Record(newSchema ? AvroTestDataFixture.EVOLVED_SCHEMA : AvroTestDataFixture.DEFAULT_SCHEMA); + value.put("message", new Utf8("user-" + i)); + value.put("id", i); + if (newSchema) { + value.put("age", i * 10); + } + return value; + } + ); + + // since each record changes the schema each record should be in its own partition. + final List expectedKeys = new ArrayList<>(); + for (int i = 0; i < 3; i++) + { + expectedKeys.add(getNativeKey(0, i, CompressionType.NONE, FormatType.AVRO)); + } + + awaitAllBlobsWritten(expectedKeys, Duration.ofSeconds(45)); + + for (int i = 0; i < expectedKeys.size(); i++) { + final List items = serializer.extractGenericRecord(bucketAccessor.readBytes(expectedKeys.get(i))); + assertThat(items).size().isEqualTo(1); + boolean newSchema = (i % 2) == 1; + GenericRecord record = items.get(0); + assertThat(record.get("id")).isEqualTo(i); + assertThat(record.hasField("message")).isTrue(); + if (newSchema) { + assertThat(record.get("age")).isEqualTo(i*10); + } else { + assertThat(record.hasField("age")).isFalse(); + } + Map props = record.getSchema().getObjectProps(); + assertThat(record.getSchema().getObjectProp("connect.version")).isEqualTo( newSchema ? 2 : 1); + assertThat(record.getSchema().getObjectProp("connect.name")).isEqualTo("TestRecord"); + } + } + + /** + * Methods to serialize and deserialize records from the sink storage. + */ + public final class AvroSerDe { + + KafkaAvroSerializer serializer; + + AvroSerDe() { + Map serializerConfig = new HashMap<>(); + serializerConfig.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); + serializer = new KafkaAvroSerializer(); + serializer.configure(serializerConfig, false); + } + + /** + * Creates and sends records with the default testing schema. + * Will wait for the records to be sent before returning. + * @param recordCount the number of records to send. + * @param timeLimit the time limit to wait for the records to be sent. + * @return a list of {@link Record} representing each record sent. + */ + private List sendRecords(final int recordCount, Duration timeLimit) { + return sendRecords(recordCount, 4, timeLimit, AvroTestDataFixture::generateAvroRecord); + } + + /** + * Creates and sends records generated with the specified generator. + * Will wait for the records to be sent before returning. + * @param recordCount the number of records to send. + * @param partitionCount the number of partitions to split the records across. + * @param timeLimit the time limit to wait for the records to be sent. + * @param generator The function to convert the current record number to a GenericRecord to send. + * @return a list of {@link Record} representing each record sent. + */ + private List sendRecords(final int recordCount, final int partitionCount, Duration timeLimit, final Function generator) { + final List> sendFutures = new ArrayList<>(); + final List result = new ArrayList<>(); + for (int cnt = 0; cnt < recordCount; cnt++) { + int partition = cnt % partitionCount; + final String key = "key-" + cnt; + GenericRecord value = generator.apply(cnt); + sendFutures.add(sendMessageAsync(testTopic, partition, key, value)); + result.add(new Record(key, value, partition)); + } + awaitFutures(sendFutures, timeLimit); + return result; + } + + /** + * Extract {@link Record} values from a blob of data read from the storage. + * @param blobBytes the bytes for the blob of data. + * @return a list of {@link Record} representing each record read from the blob. + * @throws IOException on error reading blob. + */ + private List extractRecords(byte[] blobBytes) throws IOException { + List items = new ArrayList<>(); + try (SeekableInput sin = new SeekableByteArrayInput(blobBytes)) { + final GenericDatumReader datumReader = new GenericDatumReader<>(); + try (DataFileReader reader = new DataFileReader<>(sin, datumReader)) { + reader.forEach( genericRecord -> { + String key = genericRecord.hasField("key") ? genericRecord.get("key").toString() : null; + GenericRecord value = (GenericRecord) genericRecord.get("value"); + items.add(new Record(key, value, -1)); + }); + } + } + return items; + } + + /** + * Extract {@link Record} values from a blob of data read from the storage. + * @param blobBytes the bytes for the blob of data. + * @return a list of {@link Record} representing each record read from the blob. + * @throws IOException on error reading blob. + */ + private List extractGenericRecord(byte[] blobBytes) throws IOException { + List items = new ArrayList<>(); + try (SeekableInput sin = new SeekableByteArrayInput(blobBytes)) { + final GenericDatumReader datumReader = new GenericDatumReader<>(); + try (DataFileReader reader = new DataFileReader<>(sin, datumReader)) { + reader.forEach(items::add); + } + } + return items; + } + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkGenericRecordIntegrationTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkGenericRecordIntegrationTest.java new file mode 100644 index 000000000..59b82aa37 --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkGenericRecordIntegrationTest.java @@ -0,0 +1,316 @@ +package io.aiven.kafka.connect.common.integration.sink; + + +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.format.AvroTestDataFixture; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractSinkGenericRecordIntegrationTest, N> extends AbstractSinkIntegrationBase { + + protected final FormatType formatType; + + protected AbstractSinkGenericRecordIntegrationTest(FormatType formatType) { + this.formatType = Objects.requireNonNull(formatType); + } + + /** + * A record of the key, value and partition as written to kafka. + * Equality is only checked against key and value. + */ + public static class Record implements Comparable { + private final byte[] key; + private final byte[] value; + private final int partition; + private final GenericRecord record; + + /** + * Constructor + * @param key the key. + * @param value the value + * @param partition the partition + */ + public Record(byte[] key, byte[] value, int partition) { + Objects.requireNonNull(value, "value must not be null"); + this.key = key; + this.value = value; + this.partition = partition; + this.record = null; + } + + /** + * Constructor + * @param key the key. + * @param value the value + * @param genericRecord the partition + */ + public Record(byte[] key, byte[] value, GenericRecord genericRecord) { + Objects.requireNonNull(value, "value must not be null"); + this.key = key; + this.value = value; + this.partition = -1; + this.record = genericRecord; + } + + /** + * Gest the key value + * @return the key value. + */ + public byte[] getKey() { + return key; + } + + /** + * Gets the value. + * @return the value. + */ + public byte[] getValue() { + return value; + } + + /** + * Gets the partition. + * @return the partition. + */ + public int getPartition() { + return partition; + } + + @Override + public int compareTo(Record o) { + int result = Arrays.compare(value, o.value); + return result == 0 ? Arrays.compare(key, o.key) : result; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof Record) { + return this.compareTo((Record) o) == 0; + } + return false; + } + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public String toString() { + return String.format("Record(key=%s, value=%s, partition=%d)", key == null ? key : new Utf8(key), new Utf8(value), partition); + } + } + + @Override + protected final Map basicConnectorConfig() { + final Map config = super.basicConnectorConfig(); + config.put("format.output.type", formatType.name()); + config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); + config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); + return config; + } + + @Override + protected final Map getProducerConfig() { + Map props = new HashMap<>(); + props.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + KafkaAvroSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + KafkaAvroSerializer.class.getName()); + return props; + } + +// @Test +// void avroOutput() throws IOException { +// final AvroSerDe serializer = new AvroSerDe(); +// CompressionType compressionType = CompressionType.NONE; +// final Map connectorConfig = basicConnectorConfig(); +// connectorConfig.put("format.output.fields", "key,value"); +// connectorConfig.put("file.compression.type", compressionType.name); +// createConnector(connectorConfig); +// +// List avroRecords = serializer.sendRecords(40, Duration.ofSeconds(45)); +// +// final Map> expectedBlobs = new HashMap<>(); +// avroRecords.forEach(record -> { +// expectedBlobs.compute(getNativeKey(record.getPartition(), 0, CompressionType.NONE, FormatType.AVRO), (k, v) -> v == null ? new ArrayList<>() : v) +// .add(record); +// }); +// +// awaitAllBlobsWritten(expectedBlobs.keySet(), Duration.ofSeconds(45)); +// +// for (final K nativeKey : expectedBlobs.keySet()) { +// final List items = serializer.extractRecords(bucketAccessor.readBytes(nativeKey)); +// assertThat(items).containsExactlyInAnyOrderElementsOf(expectedBlobs.get(nativeKey)); +// } +// } +// +// @Test +// void schemaChanged() throws IOException { +// final AvroSerDe serializer = new AvroSerDe(); +// final Map connectorConfig = basicConnectorConfig(); +// connectorConfig.put("format.output.envelope", "false"); +// connectorConfig.put("format.output.fields", "value"); +// connectorConfig.put("format.output.fields.value.encoding", "none"); +// createConnector(connectorConfig); +// +// // write one record with old schema, one with new, and one with old again. All written into partition 0 +// List avroRecords = serializer.sendRecords(3, 1, Duration.ofSeconds(3), (i) -> { +// // new schema every 3 records +// boolean newSchema = (i % 2) == 1; +// GenericRecord value = new GenericData.Record(newSchema ? AvroTestDataFixture.EVOLVED_SCHEMA : AvroTestDataFixture.DEFAULT_SCHEMA); +// value.put("message", new Utf8("user-" + i)); +// value.put("id", i); +// if (newSchema) { +// value.put("age", i * 10); +// } +// return value; +// } +// ); +// +// // since each record changes the schema each record should be in its own partition. +// final List expectedKeys = new ArrayList<>(); +// for (int i = 0; i < 3; i++) +// { +// expectedKeys.add(getNativeKey(0, i, CompressionType.NONE, FormatType.AVRO)); +// } +// +// awaitAllBlobsWritten(expectedKeys, Duration.ofSeconds(45)); +// +// for (int i = 0; i < expectedKeys.size(); i++) { +// final List items = serializer.extractGenericRecord(bucketAccessor.readBytes(expectedKeys.get(i))); +// assertThat(items).size().isEqualTo(1); +// boolean newSchema = (i % 2) == 1; +// GenericRecord record = items.get(0); +// assertThat(record.get("id")).isEqualTo(i); +// assertThat(record.hasField("message")).isTrue(); +// if (newSchema) { +// assertThat(record.get("age")).isEqualTo(i*10); +// } else { +// assertThat(record.hasField("age")).isFalse(); +// } +// Map props = record.getSchema().getObjectProps(); +// assertThat(record.getSchema().getObjectProp("connect.version")).isEqualTo( newSchema ? 2 : 1); +// assertThat(record.getSchema().getObjectProp("connect.name")).isEqualTo("TestRecord"); +// } +// } +// +// /** +// * Methods to serialize and deserialize records from the sink storage. +// */ +// public final class AvroSerDe { +// +// KafkaAvroSerializer serializer; +// +// AvroSerDe() { +// Map serializerConfig = new HashMap<>(); +// serializerConfig.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl()); +// serializer = new KafkaAvroSerializer(); +// serializer.configure(serializerConfig, false); +// } +// +// /** +// * Creates and sends records with the default testing schema. +// * Will wait for the records to be sent before returning. +// * @param recordCount the number of records to send. +// * @param timeLimit the time limit to wait for the records to be sent. +// * @return a list of {@link Record} representing each record sent. +// */ +// private List sendRecords(final int recordCount, Duration timeLimit) { +// return sendRecords(recordCount, 4, timeLimit, AvroTestDataFixture::generateAvroRecord); +// } +// +// /** +// * Creates and sends records generated with the specified generator. +// * Will wait for the records to be sent before returning. +// * @param recordCount the number of records to send. +// * @param partitionCount the number of partitions to split the records across. +// * @param timeLimit the time limit to wait for the records to be sent. +// * @param generator The function to convert the current record number to a GenericRecord to send. +// * @return a list of {@link Record} representing each record sent. +// */ +// private List sendRecords(final int recordCount, final int partitionCount, Duration timeLimit, final Function generator) { +// final List> sendFutures = new ArrayList<>(); +// final List result = new ArrayList<>(); +// for (int cnt = 0; cnt < recordCount; cnt++) { +// int partition = cnt % partitionCount; +// final String key = "key-" + cnt; +// GenericRecord value = generator.apply(cnt); +// byte[] serializedValue = serializer.serialize(testTopic, value); +// byte[] serializedKey = key.getBytes(StandardCharsets.UTF_8); +// sendFutures.add(sendMessageAsync(testTopic, partition, key, value)); +// result.add(new Record(serializedKey, serializedValue, partition)); +// } +// awaitFutures(sendFutures, timeLimit); +// return result; +// } +// +// /** +// * Extract {@link Record} values from a blob of data read from the storage. +// * @param blobBytes the bytes for the blob of data. +// * @return a list of {@link Record} representing each record read from the blob. +// * @throws IOException on error reading blob. +// */ +// private List extractRecords(byte[] blobBytes) throws IOException { +// List items = new ArrayList<>(); +// try (SeekableInput sin = new SeekableByteArrayInput(blobBytes)) { +// final GenericDatumReader datumReader = new GenericDatumReader<>(); +// try (DataFileReader reader = new DataFileReader<>(sin, datumReader)) { +// reader.forEach( genericRecord -> { +// byte[] key = genericRecord.hasField("key") ? ((ByteBuffer)genericRecord.get("key")).array() : null; +// byte[] value = ((ByteBuffer)genericRecord.get("value")).array(); +// items.add(new Record(key, value, -1)); +// }); +// } +// } +// return items; +// } +// +// /** +// * Extract {@link Record} values from a blob of data read from the storage. +// * @param blobBytes the bytes for the blob of data. +// * @return a list of {@link Record} representing each record read from the blob. +// * @throws IOException on error reading blob. +// */ +// private List extractGenericRecord(byte[] blobBytes) throws IOException { +// List items = new ArrayList<>(); +// try (SeekableInput sin = new SeekableByteArrayInput(blobBytes)) { +// final GenericDatumReader datumReader = new GenericDatumReader<>(); +// try (DataFileReader reader = new DataFileReader<>(sin, datumReader)) { +// reader.forEach(items::add); +// } +// } +// return items; +// } +// } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationBase.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationBase.java new file mode 100644 index 000000000..61a8459d4 --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationBase.java @@ -0,0 +1,221 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.integration.sink; + +import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase; +import io.aiven.commons.kafka.testkit.KafkaManager; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Testcontainers; + + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Tests to ensure that data written to the storage layer in various formats can be correctly read by the source + * implementation. + * + * @param + * the native key type. + * @param + * the kafka producer index type + * @param the kafka producer value type. + */ +@SuppressWarnings({ "PMD.TestClassWithoutTestCases" }) +@Testcontainers +public abstract class AbstractSinkIntegrationBase, I, V> extends KafkaIntegrationTestBase { + + private KafkaManager kafkaManager; + + protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000; + + private static final Set CONNECTOR_NAMES = new HashSet<>(); + + protected String testTopic; + + protected String prefix; + + protected BucketAccessor bucketAccessor; + + protected SinkStorage sinkStorage; + + protected KafkaProducer producer; + + /** + * Gets the SinkStorage implementation for these tests. + * Sink storage encapsulates the functionality needed to verify items writen to storage. + * + * @return the SinkStorage implementation for these tests. + */ + protected abstract SinkStorage getSinkStorage(); + + /** + * Retrieves the topic name from the test info. This ensures that each test has its own topic. + * + * @param testInfo the test info to create the topic name from. + * @return the topic name. + */ + static String topicName(final TestInfo testInfo) { + return testInfo.getTestMethod().get().getName() + "-" + testInfo.getDisplayName().hashCode(); + } + + /** + * The connector configuration for the specified sink. + *
    + *
  • connector specific settings from sink storage
  • + *
  • name - connector class simple name
  • + *
  • class - sink storage provided class
  • + *
  • tasks.max - 1
  • + *
  • topics - testTopic
  • + *
  • file.name.prefix - specified prefix
  • + *
+ * @return a Map of configuration properties to string representations. + */ + protected Map basicConnectorConfig() { + final Map config = getSinkStorage().createSinkProperties(bucketAccessor.bucketName); + config.put("name", getSinkStorage().getConnectorClass().getSimpleName()); + config.put("connector.class", getSinkStorage().getConnectorClass().getName()); + config.put("tasks.max", "1"); + config.put("topics", testTopic); + config.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, Long.toString(OFFSET_FLUSH_INTERVAL_MS)); + if (prefix != null) { + config.put("file.name.prefix", prefix); + } + return config; + } + + /** + * Sends a message in an async manner. All messages are sent with a byte[] key and value type. + * @param topicName the topic to send the message on. + * @param partition the partition for the message. + * @param key the key for the message. + * @param value the value for the message. + * @return A future that will return the {@link RecordMetadata} for the message. + */ + protected Future sendMessageAsync(final String topicName, final int partition, final I key, final V value) { + final ProducerRecord msg = new ProducerRecord<>(topicName, partition, key, value); + return producer.send(msg); + } + + /** + * Sets the prefix used for files in testing. + * + * @param prefix the testing prefix. May be {@code null}. + */ + protected final void setPrefix(String prefix) { + this.prefix = prefix; + } + + @BeforeEach + void setUp() throws ExecutionException, InterruptedException, IOException { + sinkStorage = getSinkStorage(); + kafkaManager = setupKafka(sinkStorage.getConnectorClass()); + testTopic = topicName(testInfo); + kafkaManager.createTopic(testTopic); + bucketAccessor = sinkStorage.getBucketAccessor("testBucket"); + bucketAccessor.createBucket(); + producer = createProducer(); + } + + @AfterEach + void tearDown() { + producer.close(); + CONNECTOR_NAMES.forEach(kafkaManager::deleteConnector); + CONNECTOR_NAMES.clear(); + bucketAccessor.removeBucket(); + } + + protected void createConnector(final Map connectorConfig) { + CONNECTOR_NAMES.add(connectorConfig.get("name")); + kafkaManager.configureConnector(connectorConfig.get("name"), connectorConfig); + } + + protected abstract Map getProducerConfig(); + + private KafkaProducer createProducer() { + final Map producerProps = getProducerConfig(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaManager.bootstrapServers()); + return new KafkaProducer<>(producerProps); + } + + protected final K getNativeKeyForTimestamp(final int partition, final int startOffset, CompressionType compressionType, FormatType formatType) { + return getSinkStorage().getTimestampNativeKey(prefix, testTopic, partition, startOffset, compressionType, formatType); + } + + protected final K getNativeKey(final int partition, final int startOffset, final CompressionType compressionType, FormatType formatType) { + return getSinkStorage().getNativeKey(prefix, testTopic, partition, startOffset, compressionType, formatType); + } + + protected final K getNativeKeyForKey(final byte[] key, final CompressionType compressionType, final FormatType formatType) { + return getSinkStorage().getKeyNativeKey(prefix, new String(key), compressionType, formatType); + } + + + /** + * Wait until all the specified futures have completed. + * + * @param futures the futures to wait for. + * @param timeout the maximum time to wait for the futures to complete. + */ + protected void awaitFutures(List> futures, Duration timeout) { + producer.flush(); + await("All futures written").atMost(timeout).until(() -> { + for (final Future future : futures) { + future.get(); + } + return true; + }); + } + + /** + * Wait until the keys specified in the expectedKeys, and only those keys, are found in the storage. + * System will check every {@link #OFFSET_FLUSH_INTERVAL_MS} for updates. + * + * @param expectedKeys the expected keys + * @param timeout the maximum time to wait. + */ + protected void awaitAllBlobsWritten(final Collection expectedKeys, Duration timeout) { + await("All expected files on storage").atMost(timeout) + .pollInterval(Duration.ofMillis(OFFSET_FLUSH_INTERVAL_MS)) + .untilAsserted(() -> assertThat(bucketAccessor.listKeys(prefix)).containsExactlyInAnyOrderElementsOf(expectedKeys)); + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationTest.java new file mode 100644 index 000000000..c0a11d844 --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkIntegrationTest.java @@ -0,0 +1,732 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.integration.sink; + +import io.aiven.kafka.connect.common.StringHelpers; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.format.AvroTestDataFixture; +import io.aiven.kafka.connect.common.format.JsonTestDataFixture; +import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.storage.StringConverter; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.IntFunction; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractSinkIntegrationTest> extends AbstractSinkIntegrationBase { + + private static final String VALUE_KEY_JSON_FMT = "{\"value\":%s,\"key\":%s}"; + + @Override + protected final Map getProducerConfig() { + Map props = new HashMap<>(); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + return props; + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + void standardGrouping(final CompressionType compression) throws IOException { + final FormatType formatType = FormatType.CSV; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("file.compression.type", compression.name); + createConnector(connectorConfig); + + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobsAndContent = new HashMap<>(); + + int cnt = 0; + for (int i = 0; i < 1000; i++) { + for (int partition = 0; partition < 4; partition++) { + final String key = "key-" + cnt; + final String value = "value-" + cnt; + cnt += 1; + + sendFutures.add(sendMessageAsync(testTopic, partition, key, + value.getBytes(StandardCharsets.UTF_8))); + K objectKey = getNativeKey(partition, 0, compression, formatType); + expectedBlobsAndContent.compute(objectKey, (k, v) -> v == null ? new ArrayList<>() : v).add(String.format("%s,%s", key, value)); + } + } + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + awaitAllBlobsWritten(expectedBlobsAndContent.keySet(), Duration.ofSeconds(10)); + + for (final K expectedBlobName : expectedBlobsAndContent.keySet()) { + final List blobContent = bucketAccessor.readAndDecodeLines(expectedBlobName, compression, 0, 1) + .stream() + .map(fields -> String.join(",", fields).trim()) + .collect(Collectors.toList()); + assertThat(blobContent).containsExactlyInAnyOrderElementsOf(expectedBlobsAndContent.get(expectedBlobName)); + } + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + @Disabled + void groupByTimestampVariable(final CompressionType compression) throws IOException { + final FormatType formatType = FormatType.CSV; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("file.compression.type", compression.name()); + connectorConfig.put("file.name.template", "{{topic}}-{{partition}}-{{start_offset}}-" + + "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}"); + createConnector(connectorConfig); + + final List> sendFutures = new ArrayList<>(); + + sendFutures.add(sendMessageAsync(testTopic, 0, "key-0", + "value-0".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 0, "key-1", + "value-1".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 0, "key-2", + "value-2".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 1, "key-3", + "value-3".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 3, "key-4", + "value-4".getBytes(StandardCharsets.UTF_8))); + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + final Map expectedBlobsAndContent = new HashMap<>(); + expectedBlobsAndContent.put(getNativeKeyForTimestamp(0, 0, compression, formatType), + new String[] { "key-0,value-0", "key-1,value-1", "key-2,value-2" }); + expectedBlobsAndContent.put(getNativeKeyForTimestamp(1, 0, compression, formatType), new String[] { "key-3,value-3" }); + expectedBlobsAndContent.put(getNativeKeyForTimestamp(3, 0, compression, formatType), new String[] { "key-4,value-4" }); + + + awaitAllBlobsWritten(expectedBlobsAndContent.keySet(), Duration.ofSeconds(10)); + + + for (final K expectedBlobName : expectedBlobsAndContent.keySet()) { + final List blobContent = bucketAccessor.readAndDecodeLines(expectedBlobName, compression, 0, 1) + .stream() + .map(fields -> String.join(",", fields).trim()) + .collect(Collectors.toList()); + assertThat(blobContent).containsExactlyInAnyOrder(expectedBlobsAndContent.get(expectedBlobName)); + } + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + void oneRecordPerFileWithPlainValues(final CompressionType compression) throws IOException { + final FormatType formatType = FormatType.CSV; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "value"); + connectorConfig.put("file.compression.type", compression.name); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("file.max.records", "1"); + createConnector(connectorConfig); + + final List> sendFutures = new ArrayList<>(); + + sendFutures.add(sendMessageAsync(testTopic, 0, "key-0", + "value-0".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 0, "key-1", + "value-1".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 0, "key-2", + "value-2".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 1, "key-3", + "value-3".getBytes(StandardCharsets.UTF_8))); + sendFutures.add(sendMessageAsync(testTopic, 3, "key-4", + "value-4".getBytes(StandardCharsets.UTF_8))); + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + final Map expectedBlobsAndContent = new HashMap<>(); + expectedBlobsAndContent.put(getNativeKey(0, 0, compression, formatType), "value-0"); + expectedBlobsAndContent.put(getNativeKey(0, 1, compression, formatType), "value-1"); + expectedBlobsAndContent.put(getNativeKey(0, 2, compression, formatType), "value-2"); + expectedBlobsAndContent.put(getNativeKey(1, 0, compression, formatType), "value-3"); + expectedBlobsAndContent.put(getNativeKey(3, 0, compression, formatType), "value-4"); + + awaitAllBlobsWritten(expectedBlobsAndContent.keySet(), Duration.ofSeconds(45)); + + for (final Map.Entry entry : expectedBlobsAndContent.entrySet()) { + assertThat(bucketAccessor.readString(entry.getKey(), compression)) + .isEqualTo(entry.getValue()); + } + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + void groupByKey(final CompressionType compressionType) throws IOException { + final FormatType formatType = FormatType.CSV; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("file.compression.type", compressionType.name); + connectorConfig.put("file.name.template", "{{key}}" + compressionType.extension()); + createConnector(connectorConfig); + + + final Map> keysPerTopicPartition = new HashMap<>(); + keysPerTopicPartition.put(new TopicPartition(testTopic, 0), Arrays.asList("key-0", "key-1", "key-2", "key-3", "key-7")); + keysPerTopicPartition.put(new TopicPartition(testTopic, 1), Arrays.asList("key-4", "key-5", "key-6", "key-8")); + + final Map> expectedBlobs = new TreeMap<>(); + final List> sendFutures = new ArrayList<>(); + final Map lastValuePerKey = new HashMap<>(); + final int cntMax = 1000; + int cnt = 0; + outer : while (true) { + for (final Map.Entry> entry : keysPerTopicPartition.entrySet()) { + for (final String key : entry.getValue()) { + final String value = "value-" + cnt; + cnt += 1; + final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); + sendFutures.add(sendMessageAsync(entry.getKey().topic(), entry.getKey().partition(), key, + value.getBytes(StandardCharsets.UTF_8))); + expectedBlobs.put(getNativeKeyForKey(keyBytes, compressionType, formatType), Pair.of(keyBytes, String.format("%s,%s",key, value))); + lastValuePerKey.put(key, value); + if (cnt >= cntMax) { + break outer; + } + } + } + } + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + awaitAllBlobsWritten(expectedBlobs.keySet(), Duration.ofSeconds(45)); + + for (final K blobName : expectedBlobs.keySet()) { + final String blobContent = bucketAccessor.readAndDecodeLines(blobName, compressionType, 0, 1) + .stream() + .map(fields -> String.join(",", fields)) + .collect(Collectors.joining()); + + Pair keyValue = expectedBlobs.get(blobName); + assertThat(blobName).isEqualTo(getNativeKeyForKey(keyValue.getKey(), compressionType, formatType)); + assertThat(blobContent).isEqualTo(keyValue.getValue()); + } + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + void jsonlOutput(CompressionType compressionType) throws IOException { + final Map connectorConfig = basicConnectorConfig(); + final FormatType contentType = FormatType.JSONL; + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter.schemas.enable", "false"); + connectorConfig.put("file.compression.type", compressionType.name); + connectorConfig.put("format.output.type", contentType.name); + createConnector(connectorConfig); + + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobsAndContent = new TreeMap<>(); + int cnt = 0; + for (int i = 0; i < 10; i++) { + for (int partition = 0; partition < 4; partition++) { + final String key = "key-" + cnt; + final String value = String.format("[{\"name\":\"user-%s\"}]", cnt); + cnt += 1; + + sendFutures.add(sendMessageAsync(testTopic, partition, key, + value.getBytes(StandardCharsets.UTF_8))); + expectedBlobsAndContent.computeIfAbsent(getNativeKey(partition, 0, compressionType, contentType), k -> new ArrayList<>()) + .add(String.format(VALUE_KEY_JSON_FMT, value, StringHelpers.quoted(key))); + } + } + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + awaitAllBlobsWritten(expectedBlobsAndContent.keySet(), Duration.ofSeconds(45)); + + for (final Map.Entry> entry : expectedBlobsAndContent.entrySet()) { + assertThat(bucketAccessor.readLines(entry.getKey(), compressionType)).containsExactlyElementsOf(entry.getValue()); + } + } + + private List removeCommaFromLastEntry(List strings) { + int pos = strings.size()-1; + String lastEntry = strings.get(pos); + lastEntry = lastEntry.substring(0, lastEntry.length() - 1); + strings.set(pos, lastEntry); + return strings; + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + void jsonOutput(CompressionType compressionType) throws IOException { + final Map connectorConfig = basicConnectorConfig(); + final FormatType contentType = FormatType.JSON; + connectorConfig.put("format.output.fields", "key,value"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter.schemas.enable", "false"); + connectorConfig.put("file.compression.type", compressionType.name); + connectorConfig.put("format.output.type", contentType.name); + createConnector(connectorConfig); + + final int numEpochs = 10; + + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobsAndContent = new TreeMap<>(); + + int cnt = 0; + for (int i = 0; i < numEpochs; i++) { + for (int partition = 0; partition < 4; partition++) { + final String key = "key-" + cnt; + final String value = "[{" + "\"name\":\"user-" + cnt + "\"}]"; + + sendFutures.add(sendMessageAsync(testTopic, partition, key, + value.getBytes(StandardCharsets.UTF_8))); + expectedBlobsAndContent.computeIfAbsent(getNativeKey(partition, 0, compressionType, contentType), k -> new ArrayList<>()) + .add(String.format(VALUE_KEY_JSON_FMT, value, StringHelpers.quoted(key)) + ","); + cnt += 1; + } + } + + awaitFutures(sendFutures, Duration.ofSeconds(2)); + + awaitAllBlobsWritten(expectedBlobsAndContent.keySet(), Duration.ofHours(2)); + + for (final Map.Entry> entry : expectedBlobsAndContent.entrySet()) { + List lst = bucketAccessor.readLines(entry.getKey(), compressionType); + // remove first and last entries because they are "[" and "]" + lst.remove(0); + lst.remove(lst.size() - 1); + assertThat(lst).containsExactlyElementsOf(removeCommaFromLastEntry(entry.getValue())); + } + } + +// private static WireMockServer enableFaultyProxy() { +// final WireMockServer wireMockServer = new WireMockServer(WireMockConfiguration.options().dynamicPort()); +// wireMockServer.start(); +// wireMockServer.addStubMapping(WireMock.request(RequestMethod.ANY.getName(), UrlPattern.ANY) +// .willReturn(aResponse().proxiedFrom(gcsEndpoint)) +// .build()); +// final String urlPathPattern = "/upload/storage/v1/b/" + testBucketName + "/o"; +// wireMockServer.addStubMapping( +// WireMock.request(RequestMethod.POST.getName(), UrlPattern.fromOneOf(null, null, null, urlPathPattern)) +// .inScenario("temp-error") +// .willSetStateTo("Error") +// .willReturn(aResponse().withStatus(400)) +// .build()); +// wireMockServer.addStubMapping( +// WireMock.request(RequestMethod.POST.getName(), UrlPattern.fromOneOf(null, null, null, urlPathPattern)) +// .inScenario("temp-error") +// .whenScenarioStateIs("Error") +// .willReturn(aResponse().proxiedFrom(gcsEndpoint)) +// .build()); +// return wireMockServer; +// } + + private class Record { + String key; + byte[] value; + int partition; + + Record(String key, byte[] value, int partition) { + this.key = key; + this.value = value; + this.partition = partition; + } + + public void setKey(String key) { + this.key = key; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public void setPartition(int partition) { + this.partition = partition; + } + } + + /** + * Sends a batch of messages + * @param recordCount the number of records to send. + * @param partitionCount the number of partitions to spread them across. + * @param valueGenerator the value of the record. + * @param recGenerator the function to generate the expected contents ({@code }) based on the recorc number + * @param nativeKeyGenerator a function to generate the native Key based on the record number + * @param timeout the time to wait for all the messages to be sent to Kafka + * @return A map of native key to a list of {@code } + * @param The return record type. + */ + private Map> sendMessages(int recordCount, int partitionCount, IntFunction valueGenerator, IntFunction recGenerator, + IntFunction nativeKeyGenerator, Duration timeout) { + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobsAndContent = new TreeMap<>(); + for (int recordNumber = 0; recordNumber < recordCount; recordNumber++) { + int partition = recordNumber % partitionCount; + final String key = "key-"+recordNumber; + final byte[] value = valueGenerator.apply(recordNumber); + sendFutures.add(sendMessageAsync(testTopic, partition, key, value)); + expectedBlobsAndContent.computeIfAbsent(nativeKeyGenerator.apply(recordNumber), k -> new ArrayList<>()) + .add(recGenerator.apply(recordNumber)); + } + awaitFutures(sendFutures, timeout); + return expectedBlobsAndContent; + } + + /** + * Create one key for each partition. + * @param partitionCount the number of partitions. + * @param compressionType the compression type. + * @param contentType the content format type. + * @return A native key + */ + private IntFunction partitionZeroOffsetKey(int partitionCount, CompressionType compressionType, FormatType contentType) { + return i -> getNativeKey(i % partitionCount, 0, compressionType, contentType); + } + + private static IntFunction byteValueGenerator = i -> ("value-"+i).getBytes(StandardCharsets.UTF_8); + + + @ParameterizedTest + @EnumSource(CompressionType.class) + void parquetOutput(CompressionType compressionType, @TempDir Path tempDir) throws IOException { + final FormatType contentType = FormatType.PARQUET; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value,offset,timestamp,headers"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", StringConverter.class.getName()); + //connectorConfig.put("value.converter", Converter.class.getName()); + connectorConfig.put("format.output.type", contentType.name); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + int partitionCount = 4; + final Map> expectedContents = sendMessages(40, partitionCount, byteValueGenerator, i -> i, + partitionZeroOffsetKey(partitionCount, compressionType, contentType), Duration.ofSeconds(5)); + + awaitAllBlobsWritten(expectedContents.keySet(), Duration.ofSeconds(45)); + + for (K key : expectedContents.keySet()) { + String blobName = key.toString(); + List records = ParquetTestDataFixture.readRecords(tempDir.resolve(Paths.get(blobName)), + bucketAccessor.readBytes(key)); + List expectedRecords = expectedContents.get(key); + assertThat(records).hasSize(expectedRecords.size()); + for (int i = 0; i < expectedRecords.size(); i++) { + GenericRecord record = records.get(i); + int counter = expectedRecords.get(i); + assertThat(record.get("key")).hasToString("key-" + counter); + assertThat(((ByteBuffer)record.get("value")).array()).isEqualTo(byteValueGenerator.apply(counter)); + assertThat(record.get("offset")).isEqualTo((long)i); + assertThat(record.get("timestamp")).isNotNull(); + assertThat(record.get("headers")).isNull(); + } + } + } + + private static IntFunction jsonValueGenerator = i -> String.format("{\"name\": \"name-%1$s\", \"value\": \"value-%1$s\"}", i).getBytes(StandardCharsets.UTF_8); + + @ParameterizedTest + @EnumSource(CompressionType.class) + void parquetJsonValueAsString(CompressionType compressionType, @TempDir Path tempDir) throws IOException { + final FormatType contentType = FormatType.PARQUET; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value,offset,timestamp,headers"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("format.output.type", contentType.name); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + int partitionCount = 4; + final Map> expectedContents = sendMessages(40, partitionCount, jsonValueGenerator, i -> i, + partitionZeroOffsetKey(partitionCount, compressionType, contentType), Duration.ofSeconds(5)); + + awaitAllBlobsWritten(expectedContents.keySet(), Duration.ofSeconds(45)); + + for (K key : expectedContents.keySet()) { + String blobName = key.toString(); + List records = ParquetTestDataFixture.readRecords(tempDir.resolve(Paths.get(blobName)), + bucketAccessor.readBytes(key)); + List expectedRecords = expectedContents.get(key); + assertThat(records).hasSize(expectedRecords.size()); + for (int i = 0; i < expectedRecords.size(); i++) { + GenericRecord record = records.get(i); + int counter = expectedRecords.get(i); + assertThat(record.get("key")).hasToString("key-" + counter); + assertThat(((ByteBuffer) record.get("value")).array()).isEqualTo(jsonValueGenerator.apply(counter)); + assertThat(record.get("offset")).isEqualTo((long) i); + assertThat(record.get("timestamp")).isNotNull(); + assertThat(record.get("headers")).isNull(); + } + } + } + + private static IntFunction JsonFmt = i -> String.format("{\"schema\": %s, \"payload\": %s}", JsonTestDataFixture.SCHEMA_JSON, JsonTestDataFixture.generateJsonRec(i, "user-" + i)).getBytes(StandardCharsets.UTF_8); + +// @ParameterizedTest +// @EnumSource(CompressionType.class) +// @CsvSource({ "true, {\"value\": {\"name\": \"%s\"}} ", "false, {\"name\": \"%s\"}" }) + @Test + void parquetJsonValue(@TempDir Path tempDir/*final String envelopeEnabled, final String expectedOutput*/) + throws IOException { + final CompressionType compressionType = CompressionType.NONE; + final FormatType contentType = FormatType.PARQUET; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "value"); + connectorConfig.put("format.output.envelope", "true"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("format.output.type", contentType.name); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + int partitionCount = 4; + final Map> expectedContents = sendMessages(40, partitionCount, JsonFmt, i -> i, + partitionZeroOffsetKey(partitionCount, compressionType, contentType), Duration.ofSeconds(5)); + + awaitAllBlobsWritten(expectedContents.keySet(), Duration.ofSeconds(45)); + + for (K key : expectedContents.keySet()) { + String blobName = key.toString(); + List records = ParquetTestDataFixture.readRecords(tempDir.resolve(Paths.get(blobName)), + bucketAccessor.readBytes(key)); + List expectedRecords = expectedContents.get(key); + assertThat(records).hasSize(expectedRecords.size()); + for (int i = 0; i < expectedRecords.size(); i++) { + GenericRecord record = records.get(i); + int counter = expectedRecords.get(i); + GenericRecord value = (GenericRecord) record.get("value"); + assertThat(value.get("id")).isEqualTo(counter); + assertThat(value.get("message").toString()).isEqualTo("user-" + counter); + assertThat(value.hasField("value")).isFalse(); + } + } + } + +// //final var jsonMessageSchema = "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"field\":\"name\"}]}"; +// // +// // final var jsonMessagePattern = "{\"schema\": %s, \"payload\": %s}"; +// +// final List> sendFutures = new ArrayList<>(); +// int cnt = 0; +// for (int i = 0; i < 10; i++) { +// for (int partition = 0; partition < 4; partition++) { +// final String key = "key-" + cnt; +// final String value = String.format(jsonMessagePattern, JsonTestDataFixture.SCHEMA_JSON, +// "{" + "\"name\":\"user-" + cnt + "\"}"); +// cnt += 1; +// +// sendFutures.add(sendMessageAsync(testTopic0, partition, key.getBytes(StandardCharsets.UTF_8), +// value.getBytes(StandardCharsets.UTF_8))); +// } +// } +// getProducer().flush(); +// for (final Future sendFuture : sendFutures) { +// sendFuture.get(); +// } +// +// final List expectedBlobs = Arrays.asList(getBlobName(0, 0, compression), getBlobName(1, 0, compression), +// getBlobName(2, 0, compression), getBlobName(3, 0, compression)); +// +// awaitAllBlobsWritten(expectedBlobs.size()); +// assertThat(testBucketAccessor.getBlobNames(gcsPrefix)).containsExactlyElementsOf(expectedBlobs); +// +// final Map> blobContents = new HashMap<>(); +// for (final String blobName : expectedBlobs) { +// final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)), +// testBucketAccessor.readBytes(blobName)); +// blobContents.put(blobName, records); +// } +// cnt = 0; +// for (int i = 0; i < 10; i++) { +// for (int partition = 0; partition < 4; partition++) { +// final var name = "user-" + cnt; +// final String blobName = getBlobName(partition, 0, compression); +// final var record = blobContents.get(blobName).get(i); +// final String expectedLine = String.format(expectedOutput, name); +// assertThat(record).hasToString(expectedLine); +// cnt += 1; +// } +// } +// } + + private static IntFunction EvolvedJsonFmt = i -> String.format("{\"schema\": %s, \"payload\": %s}", JsonTestDataFixture.EVOLVED_SCHEMA_JSON, JsonTestDataFixture.generateEvolvedJsonRec(i, "user-" + i)).getBytes(StandardCharsets.UTF_8); + + @Test + void parquetSchemaChanged(@TempDir Path tempDir) throws IOException { + final CompressionType compressionType = CompressionType.NONE; + final FormatType contentType = FormatType.PARQUET; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "value"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("format.output.type", contentType.name); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + IntFunction alternatingSchema = i -> i % 2 == 1 ? EvolvedJsonFmt.apply(i) : JsonFmt.apply(i); + int partitionCount = 1; + + IntFunction nativeKeyGenerator = i -> getNativeKey(i % partitionCount, i, compressionType, contentType); + final Map> expectedContents = sendMessages(3, partitionCount, alternatingSchema, i -> i, + nativeKeyGenerator, Duration.ofSeconds(5)); + + // since each record changes the schema each record should be in its own partition. + final List expectedKeys = new ArrayList<>(); + for (int i = 0; i < 3; i++) + { + expectedKeys.add(nativeKeyGenerator.apply(i)); + } + + + awaitAllBlobsWritten(expectedKeys, Duration.ofSeconds(45)); + + for (int i = 0; i < expectedKeys.size(); i++) { + K key = expectedKeys.get(i); + assertThat(expectedContents.get(key).size()).isEqualTo(1); + int counter = expectedContents.get(key).get(0); + List records = ParquetTestDataFixture.readRecords(tempDir.resolve(Paths.get(key.toString())), + bucketAccessor.readBytes(key)); + assertThat(records).size().isEqualTo(1); + boolean newSchema = (i % 2) == 1; + GenericRecord record = records.get(0); + + GenericRecord value = (GenericRecord) record.get("value"); + assertThat(value.get("id")).isEqualTo(counter); + assertThat(value.get("message").toString()).isEqualTo("user-" + counter); + assertThat(value.hasField("value")).isFalse(); + if (newSchema) { + assertThat(value.get("age")).isEqualTo(counter * 10); + } else { + assertThat(value.hasField("age")).isFalse(); + } + } + +// for (int i = 0; i < expectedKeys.size(); i++) { +// final List items = serializer.extractGenericRecord(bucketAccessor.readBytes(expectedKeys.get(i))); +// assertThat(items).size().isEqualTo(1); +// boolean newSchema = (i % 2) == 1; +// GenericRecord record = items.get(0); +// assertThat(record.get("id")).isEqualTo(i); +// assertThat(record.hasField("message")).isTrue(); +// if (newSchema) { +// assertThat(record.get("age")).isEqualTo(i*10); +// } else { +// assertThat(record.hasField("age")).isFalse(); +// } +// Map props = record.getSchema().getObjectProps(); +// assertThat(record.getSchema().getObjectProp("connect.version")).isEqualTo( newSchema ? 2 : 1); +// assertThat(record.getSchema().getObjectProp("connect.name")).isEqualTo("TestRecord"); +// } + +// // write one record with old schema, one with new, and one with old again. All written into partition 0 +// List avroRecords = serializer.sendRecords(3, 1, Duration.ofSeconds(3), (i) -> { +// // new schema every 3 records +// boolean newSchema = (i % 2) == 1; +// GenericRecord value = new GenericData.Record(newSchema ? AvroTestDataFixture.EVOLVED_SCHEMA : AvroTestDataFixture.DEFAULT_SCHEMA); +// value.put("message", new Utf8("user-" + i)); +// value.put("id", i); +// if (newSchema) { +// value.put("age", i * 10); +// } +// return value; +// } +// ); + +// final var jsonMessageSchema = "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"field\":\"name\"}]}"; +// final var jsonMessageNewSchema = "{\"type\":\"struct\",\"fields\":" +// + "[{\"type\":\"string\",\"field\":\"name\"}, " +// + "{\"type\":\"string\",\"field\":\"value\", \"default\": \"foo\"}]}"; +// final var jsonMessagePattern = "{\"schema\": %s, \"payload\": %s}"; + +// final List> sendFutures = new ArrayList<>(); +// final var expectedRecords = new ArrayList(); +// int cnt = 0; +// for (int i = 0; i < 10; i++) { +// for (int partition = 0; partition < 4; partition++) { +// final var key = "key-" + cnt; +// final String value; +// final String payload; +// if (i < 5) { // NOPMD literal +// payload = "{" + "\"name\": \"user-" + cnt + "\"}"; +// value = String.format(jsonMessagePattern, jsonMessageSchema, payload); +// } else { +// payload = "{" + "\"name\": \"user-" + cnt + "\", \"value\": \"value-" + cnt + "\"}"; +// value = String.format(jsonMessagePattern, jsonMessageNewSchema, payload); +// } +// expectedRecords.add(String.format("{\"value\": %s}", payload)); +// sendFutures.add(sendMessageAsync(testTopic0, partition, key.getBytes(StandardCharsets.UTF_8), +// value.getBytes(StandardCharsets.UTF_8))); +// cnt += 1; +// } +// } +// getProducer().flush(); +// for (final Future sendFuture : sendFutures) { +// sendFuture.get(); +// } +// +// final List expectedBlobs = Arrays.asList(getBlobName(0, 0, compression), getBlobName(0, 5, compression), +// getBlobName(1, 0, compression), getBlobName(1, 5, compression), getBlobName(2, 0, compression), +// getBlobName(2, 5, compression), getBlobName(3, 0, compression), getBlobName(3, 5, compression)); +// +// awaitAllBlobsWritten(expectedBlobs.size()); +// assertThat(testBucketAccessor.getBlobNames(gcsPrefix)).containsExactlyElementsOf(expectedBlobs); +// +// final var blobContents = new ArrayList(); +// for (final String blobName : expectedBlobs) { +// final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)), +// testBucketAccessor.readBytes(blobName)); +// blobContents.addAll(records.stream().map(GenericRecord::toString).collect(Collectors.toList())); +// } +// assertThat(blobContents).containsExactlyInAnyOrderElementsOf(expectedRecords); + } + +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkParquetIntegrationTest.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkParquetIntegrationTest.java new file mode 100644 index 000000000..4bf0b623e --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/AbstractSinkParquetIntegrationTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.integration.sink; + +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.OutputField; +import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; +import io.aiven.kafka.connect.common.config.OutputFieldType; +import io.aiven.kafka.connect.common.format.AvroTestDataFixture; +import io.aiven.kafka.connect.common.format.ParquetTestDataFixture; +import io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractSinkParquetIntegrationTest, N> extends AbstractSinkGenericRecordIntegrationTest { + + public AbstractSinkParquetIntegrationTest() { + super(FormatType.PARQUET); + } + + @Test + void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException { + CompressionType compressionType = CompressionType.NONE; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "key,value,offset,timestamp,headers"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + int recordCount = 40; + int partitionCount = 4; + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobs = new HashMap<>(); + for (int cnt = 0; cnt < recordCount; cnt++) { + int partition = cnt % partitionCount; + final String key = "key-" + cnt; + GenericRecord value = AvroTestDataFixture.generateAvroRecord(cnt); + sendFutures.add(sendMessageAsync(testTopic, partition, key, value)); + expectedBlobs.compute(getNativeKey(partition, 0, CompressionType.NONE, formatType), (k, v) -> v == null ? new ArrayList<>() : v) + .add(cnt); + } + + awaitFutures(sendFutures, Duration.ofSeconds(45)); + + awaitAllBlobsWritten(expectedBlobs.keySet(), Duration.ofSeconds(45)); + + for (final K nativeKey : expectedBlobs.keySet()) { + List records = ParquetTestDataFixture.readRecords(tmpDir, bucketAccessor.readBytes(nativeKey)); + List recordNumbers = expectedBlobs.get(nativeKey); + assertThat(records).hasSize(recordNumbers.size()); + for (int i = 0; i < recordNumbers.size(); i++) { + int recordNumber = recordNumbers.get(i); + GenericRecord record = records.get(i); + assertThat(record.get("key")).hasToString("key-" + recordNumber); + assertThat(record.get("value")).isNotNull(); + assertThat(record.get("offset")).isEqualTo((long) i); + assertThat(record.get("timestamp")).isNotNull(); + assertThat(record.get("headers")).isNull(); + GenericRecord valueRecord = (GenericRecord) record.get("value"); + assertThat(valueRecord.get("id")).isEqualTo(recordNumber); + assertThat(valueRecord.get("message")).hasToString("Hello, from Avro Test Data Fixture! object " + recordNumber); + } + } + } + + + @Test + void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException { + CompressionType compressionType = CompressionType.NONE; + final Map connectorConfig = basicConnectorConfig(); + connectorConfig.put("format.output.fields", "value"); + connectorConfig.put("format.output.fields.value.encoding", "none"); + connectorConfig.put("file.compression.type", compressionType.name); + createConnector(connectorConfig); + + int recordCount = 3; + final List> sendFutures = new ArrayList<>(); + final Map> expectedBlobs = new HashMap<>(); + for (int cnt = 0; cnt < recordCount; cnt++) { + boolean newSchema = (cnt % 2) == 1; + final String key = "key-" + cnt; + GenericRecord value = new GenericData.Record(newSchema ? AvroTestDataFixture.EVOLVED_SCHEMA : AvroTestDataFixture.DEFAULT_SCHEMA); + value.put("message", new Utf8("user-" + cnt)); + value.put("id", cnt); + if (newSchema) { + value.put("age", cnt * 10); + } + sendFutures.add(sendMessageAsync(testTopic, 0, key, value)); + expectedBlobs.compute(getNativeKey(0, cnt, CompressionType.NONE, formatType), (k, v) -> v == null ? new ArrayList<>() : v) + .add(cnt); + } + + awaitFutures(sendFutures, Duration.ofSeconds(45)); + + awaitAllBlobsWritten(expectedBlobs.keySet(), Duration.ofSeconds(45)); + + for (final K nativeKey : expectedBlobs.keySet()) { + List records = ParquetTestDataFixture.readRecords(tmpDir, bucketAccessor.readBytes(nativeKey)); + List recordNumbers = expectedBlobs.get(nativeKey); + assertThat(records).hasSize(recordNumbers.size()); + for (int i = 0; i < recordNumbers.size(); i++) { + int recordNumber = recordNumbers.get(i); + GenericRecord record = records.get(i); + assertThat(record.get("value")).isNotNull(); + GenericRecord valueRecord = (GenericRecord) record.get("value"); + assertThat(valueRecord.get("id")).isEqualTo(recordNumber); + assertThat(valueRecord.get("message")).hasToString("user-" + recordNumber); + boolean newSchema = (recordNumber % 2) == 1; + if (newSchema) { + assertThat(valueRecord.get("age")).isEqualTo(recordNumber * 10); + } else { + assertThat(valueRecord.hasField("age")).isFalse(); + } + } + } + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/BucketAccessor.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/BucketAccessor.java new file mode 100644 index 000000000..1397b9bec --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/BucketAccessor.java @@ -0,0 +1,191 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.integration.sink; + + +import io.aiven.kafka.connect.common.config.CompressionType; +import org.apache.commons.io.IOUtils; + + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public abstract class BucketAccessor> { + protected final String bucketName; + + /** + * Creates an accessor. + * @param storage + * @param bucketName + */ + protected BucketAccessor(final SinkStorage storage, final String bucketName) { + Objects.requireNonNull(storage, "storage cannot be null"); + Objects.requireNonNull(bucketName, "bucketName cannot be null"); + this.bucketName = bucketName; + } + + /** + * Gets the input stream for an object in this bucket. + * @param objectKey the key for the object + * @return the InputStream for the data in the object. + * @throws IOException if there is an error. + */ + protected abstract InputStream getInputStream(final K objectKey) throws IOException; + + /** + * Gets the list of keys for objects in this bucket. + * @return the list of keys for objects in this bucket. + * @throws IOException if there is an error. + */ + final protected List listKeys() throws IOException { + return listKeys(null); + } + + /** + * Gets the list of keys for objects in this bucket. + * @param prefix May be {@code null}. + * @return the list of keys for objects in this bucket. + * @throws IOException if there is an error. + */ + protected abstract List listKeys(String prefix) throws IOException; + + /** + * Removes this bucket. + */ + public abstract void removeBucket(); + + /** + * Creates the bucket; + */ + public abstract void createBucket(); + + /** + * See if the specific object exists within the bucket. + * @param objectKey the key for the object. + * @return {@code true} if the object exits, {@code false} otherwise. + * @throws IOException on error + */ + public boolean doesObjectExist(final K objectKey) throws IOException { + return listKeys().contains(objectKey); + } + + + /** + * Reads the data from an object and decodes the specified fields before returning the result. + * @param objectKey the object key to read. + * @param compression the compression that was used on the object. + * @param fieldsToDecode the fields to decode + * @return a List of lists of strings. + * @throws IOException on error. + */ + public final List> readAndDecodeLines(final K objectKey, final CompressionType compression, + final int... fieldsToDecode) throws IOException { + Objects.requireNonNull(objectKey, "objectKey cannot be null"); + Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null"); + + return readLines(objectKey, compression).stream() + .map(l -> l.split(",")) + .map(fields -> decodeRequiredFields(fields, fieldsToDecode)) + .collect(Collectors.toList()); + } + + /** + * Reads the bytes from an object key and decompresses if necessary. + * @param objectKey the key for the object to read. + * @param compression the compression that was used on the object + * @return the bytes from the object. + * @throws IOException on error. + */ + public final byte[] readBytes(final K objectKey, final CompressionType compression) throws IOException { + Objects.requireNonNull(objectKey, "objectKey cannot be null"); + try (InputStream decompressedStream = compression.decompress(getInputStream(objectKey)); + ByteArrayOutputStream decompressedBytes = new ByteArrayOutputStream()) { + IOUtils.copy(decompressedStream, decompressedBytes); + return decompressedBytes.toByteArray(); + } + } + + /** + * Read the bytes from the object without any additional handling. + * @param objectKey the key for the object to read. + * @return the bytes from the object. + * @throws IOException on error. + */ + public final byte[] readBytes(final K objectKey) throws IOException { + return readBytes(objectKey, CompressionType.NONE); + } + + /** + * Reads the first string from an object. + * @param objectKey the key for the object to read. + * @param compression the compression that was used on the object + * @return the string read from the object. + * @throws IOException on error. + */ + public final String readString(final K objectKey, final CompressionType compression) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(readBytes(objectKey, compression)); + InputStreamReader reader = new InputStreamReader(bais, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(reader)) { + return bufferedReader.readLine(); + } + } + + /** + * Read the contents of the object as a collection of lines. + * @param objectKey the key for the object to read. + * @param compression the compression that was used on the object + * @return lines read from the object stream. + * @throws IOException on error. + */ + public final List readLines(final K objectKey, final CompressionType compression) throws IOException { + final byte[] blobBytes = readBytes(objectKey, compression); + try (ByteArrayInputStream bais = new ByteArrayInputStream(blobBytes); + InputStreamReader reader = new InputStreamReader(bais, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(reader)) { + return bufferedReader.lines().collect(Collectors.toList()); + } + } + + private List decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) { + Objects.requireNonNull(originalFields, "originalFields cannot be null"); + Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null"); + + final List result = Arrays.asList(originalFields); + for (final int fieldIdx : fieldsToDecode) { + result.set(fieldIdx, b64Decode(result.get(fieldIdx))); + } + return result; + } + + private String b64Decode(final String value) { + Objects.requireNonNull(value, "value cannot be null"); + + return new String(Base64.getDecoder().decode(value), StandardCharsets.UTF_8); + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/FaultyProxy.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/FaultyProxy.java new file mode 100644 index 000000000..8f410cdef --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/FaultyProxy.java @@ -0,0 +1,38 @@ +package io.aiven.kafka.connect.common.integration.sink; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.http.RequestMethod; +import com.github.tomakehurst.wiremock.matching.UrlPattern; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; + +public class FaultyProxy { + + private FaultyProxy() { + // do not instantiate + } + + public static WireMockServer createFaultyProxy(SinkStorage storage, String topicName) { + final WireMockServer wireMockServer = new WireMockServer(WireMockConfiguration.options().dynamicPort()); + String urlPathPattern = storage.getURLPathPattern(topicName); + wireMockServer.start(); + wireMockServer.addStubMapping(WireMock.request(RequestMethod.ANY.getName(), UrlPattern.ANY) + .willReturn(aResponse().proxiedFrom(storage.getEndpointURL())) + .build()); + wireMockServer.addStubMapping( + WireMock.request(RequestMethod.POST.getName(), UrlPattern.fromOneOf(null, null, null, urlPathPattern)) + .inScenario("temp-error") + .willSetStateTo("Error") + .willReturn(aResponse().withStatus(400)) + .build()); + wireMockServer.addStubMapping( + WireMock.request(RequestMethod.POST.getName(), UrlPattern.fromOneOf(null, null, null, urlPathPattern)) + .inScenario("temp-error") + .whenScenarioStateIs("Error") + .willReturn(aResponse().proxiedFrom(storage.getEndpointURL())) + .build()); + return wireMockServer; + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/RecordProducer.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/RecordProducer.java new file mode 100644 index 000000000..abb54d37e --- /dev/null +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/RecordProducer.java @@ -0,0 +1,37 @@ +package io.aiven.kafka.connect.common.integration.sink; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.concurrent.Future; + +public class RecordProducer { + + final KafkaProducer producer; + + RecordProducer(final KafkaProducer producer) { + this.producer = producer; + } + + /** + * Sends a message in an async manner. All messages are sent with a byte[] key and value type. + * @param topicName the topic to send the message on. + * @param partition the partition for the message. + * @param key the key for the message. + * @param value the value for the message. + * @return A future that will return the {@link RecordMetadata} for the message. + */ + public Future sendMessageAsync(final String topicName, final int partition, final K key, final V value) { + final ProducerRecord msg = new ProducerRecord<>(topicName, partition, key, value); + return producer.send(msg); + } + + public void close() { + producer.close(); + } + + public void flush() { + producer.flush(); + } +} diff --git a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/SinkStorage.java b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/SinkStorage.java index 46d6f8a70..72ef7b12d 100644 --- a/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/SinkStorage.java +++ b/commons/src/testFixtures/java/io/aiven/kafka/connect/common/integration/sink/SinkStorage.java @@ -19,6 +19,7 @@ import java.util.Map; import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; import io.aiven.kafka.connect.common.integration.StorageBase; import com.github.tomakehurst.wiremock.WireMockServer; @@ -36,22 +37,6 @@ * the native storage object type */ public interface SinkStorage, N> extends StorageBase { - /** - * Get the native key for an avro based blob. - * - * @param prefix - * the prefix for the storage location. - * @param topicName - * the topic name for the storage location. - * @param partition - * the partition for the storage location. - * @param startOffset - * the start offset for the storage location. - * @param compression - * the compression type for the data at the storage location. - * @return a native key for the specified avro file. - */ - K getAvroBlobName(String prefix, String topicName, int partition, int startOffset, CompressionType compression); /** * Get the native key for a standard blob. @@ -68,7 +53,7 @@ public interface SinkStorage, N> extends StorageBase, N> extends StorageBase, N> extends StorageBase, N> extends StorageBase createSinkProperties(String prefix, String connectorName); + Map createSinkProperties(String bucketName); /** * Get the URL of the sink storage endpoint. This is used in testing to create a proxy that will return a HTTP 500 @@ -137,11 +125,11 @@ public interface SinkStorage, N> extends StorageBase, N> extends StorageBase getBucketAccessor(String bucketName); + + /** + * Configures a WireMockServer that will throw an error + * @return + */ + WireMockServer enableFaultyProxy(); } diff --git a/gcs-sink-connector/build.gradle.kts b/gcs-sink-connector/build.gradle.kts index 92111fece..14b93dbcb 100644 --- a/gcs-sink-connector/build.gradle.kts +++ b/gcs-sink-connector/build.gradle.kts @@ -65,7 +65,7 @@ tasks.register("integrationTest") { val distTarTask = tasks["distTar"] as Tar val distributionFilePath = distTarTask.archiveFile.get().asFile.path systemProperty("integration-test.distribution.file.path", distributionFilePath) - systemProperty("fake-gcs-server-version", "1.45.2") + systemProperty("fake-gcs-server-version", "1.47.4") } idea { @@ -144,7 +144,8 @@ dependencies { integrationTestImplementation(testcontainers.junit.jupiter) integrationTestImplementation(testcontainers.kafka) // this is not Kafka version integrationTestImplementation(testinglibs.awaitility) - + integrationTestImplementation("io.aiven:testcontainers-fake-gcs-server:0.2.0") + integrationTestImplementation(logginglibs.slf4j.simple) integrationTestImplementation(apache.kafka.connect.transforms) // TODO: add avro-converter to ConnectRunner via plugin.path instead of on worker classpath integrationTestImplementation(confluent.kafka.connect.avro.converter) { diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroIntegrationTest.java index b94c58b95..71c24e7f4 100644 --- a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroIntegrationTest.java +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroIntegrationTest.java @@ -46,6 +46,7 @@ import org.apache.avro.util.Utf8; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroParquetIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroParquetIntegrationTest.java index 52a41ea73..eef261bef 100644 --- a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroParquetIntegrationTest.java +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroParquetIntegrationTest.java @@ -40,11 +40,13 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers +@Disabled final class AvroParquetIntegrationTest extends AbstractIntegrationTest { private static final String CONNECTOR_NAME = "aiven-gcs-sink-connector-parquet"; diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSAvroIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSAvroIntegrationTest.java new file mode 100644 index 000000000..6a566eb43 --- /dev/null +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSAvroIntegrationTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.Blob; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.integration.sink.AbstractSinkAvroIntegrationTest; +import io.aiven.kafka.connect.common.integration.sink.AbstractSinkIntegrationTest; +import io.aiven.kafka.connect.common.integration.sink.SinkStorage; +import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.commons.io.IOUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") +@Testcontainers +final class GCSAvroIntegrationTest extends AbstractSinkAvroIntegrationTest { + + @Container + FakeGcsServerContainer gcsServerContainer = new FakeGcsServerContainer(FakeGcsServerContainer.DEFAULT_IMAGE_NAME); + + private GCSSinkStorage sinkStorage; + + @Override + protected SinkStorage getSinkStorage() { + if (sinkStorage == null) { + sinkStorage = new GCSSinkStorage(gcsServerContainer); + } + return sinkStorage; + } +} diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSIntegrationTest.java new file mode 100644 index 000000000..eac456861 --- /dev/null +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSIntegrationTest.java @@ -0,0 +1,33 @@ +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.Blob; + +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.integration.sink.AbstractSinkIntegrationTest; +import io.aiven.kafka.connect.common.integration.sink.RecordProducer; +import io.aiven.kafka.connect.common.integration.sink.SinkStorage; + +import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; + +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + + + +@Testcontainers +public class GCSIntegrationTest extends AbstractSinkIntegrationTest { + + @Container + FakeGcsServerContainer gcsServerContainer = new FakeGcsServerContainer(FakeGcsServerContainer.DEFAULT_IMAGE_NAME); + + GCSSinkStorage sinkStorage; + + @Override + protected SinkStorage getSinkStorage() { + if (sinkStorage == null) { + sinkStorage = new GCSSinkStorage(gcsServerContainer); + } + return sinkStorage; + } + +} diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSParquetIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSParquetIntegrationTest.java new file mode 100644 index 000000000..9a1d03bd9 --- /dev/null +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSParquetIntegrationTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.Blob; +import io.aiven.kafka.connect.common.integration.sink.AbstractSinkAvroIntegrationTest; +import io.aiven.kafka.connect.common.integration.sink.AbstractSinkParquetIntegrationTest; +import io.aiven.kafka.connect.common.integration.sink.SinkStorage; +import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") +@Testcontainers +final class GCSParquetIntegrationTest extends AbstractSinkParquetIntegrationTest { + + @Container + FakeGcsServerContainer gcsServerContainer = new FakeGcsServerContainer(FakeGcsServerContainer.DEFAULT_IMAGE_NAME); + + private GCSSinkStorage sinkStorage; + + @Override + protected SinkStorage getSinkStorage() { + if (sinkStorage == null) { + sinkStorage = new GCSSinkStorage(gcsServerContainer); + } + return sinkStorage; + } +} diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSSinkStorage.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSSinkStorage.java new file mode 100644 index 000000000..c478c44e7 --- /dev/null +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/GCSSinkStorage.java @@ -0,0 +1,196 @@ +package io.aiven.kafka.connect.gcs; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.google.api.gax.paging.Page; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.StorageRetryStrategy; +import io.aiven.kafka.connect.common.NativeInfo; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.integration.sink.BucketAccessor; +import io.aiven.kafka.connect.common.integration.sink.SinkStorage; +import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; +import org.apache.commons.io.function.IOSupplier; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.kafka.connect.connector.Connector; +import org.codehaus.plexus.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class GCSSinkStorage implements SinkStorage { + private final Storage storage; + private final FakeGcsServerContainer gcsServerContainer; + + GCSSinkStorage(FakeGcsServerContainer gcsServerContainer) { + this.gcsServerContainer = gcsServerContainer; + this.storage = StorageOptions.newBuilder().setHost(gcsServerContainer.url()) + .setProjectId("gcs-sink-connector") + .setCredentials(NoCredentials.getInstance()) + .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(1).build()) + .build().getService(); + } + + String formatSegment(FormatType formatType) { + return StringUtils.isEmpty(formatType.getFileNameSegment()) ? "" : "."+formatType.getFileNameSegment(); + } + + @Override + public String getNativeKey(String prefix, String topicName, int partition, int startOffset, CompressionType compression, FormatType formatType) { + return String.format("%s%s-%d-%d%s%s", Objects.toString(prefix, ""), topicName, partition, startOffset, formatSegment(formatType), compression.extension()); + } + + @Override + public String getKeyNativeKey(String prefix, String key, CompressionType compression, FormatType formatType) { + return String.format("%s%s%s%s", Objects.toString(prefix, ""), key, formatSegment(formatType), compression.extension()); + } + + @Override + public String getTimestampNativeKey(String prefix, String topicName, int partition, int startOffset, CompressionType compression, FormatType formatType) { + final ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + return String.format("%s%s-%d-%d-%s%s%s", Objects.toString(prefix, ""), topicName, partition, startOffset, + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")), formatSegment(formatType), compression.extension()); + } + + @Override + public Map createSinkProperties(String bucketName) { + final Map config = new HashMap<>(); +// if (gcsCredentialsPath != null) { +// config.put("gcs.credentials.path", gcsCredentialsPath); +// } +// if (gcsCredentialsJson != null) { +// config.put("gcs.credentials.json", gcsCredentialsJson); +// } +// if (useFakeGCS()) { +// config.put("gcs.endpoint", gcsEndpoint); +// } + config.put("gcs.endpoint", gcsServerContainer.url()); + config.put("gcs.bucket.name", bucketName); + return config; + } + + @Override + public String getEndpointURL() { + return gcsServerContainer.url(); + } + + @Override + public String getURLPathPattern(String bucketName) { + return String.format("/upload/storage/v1/b/%s/o", bucketName); + } + + @Override + public boolean enableProxy(Map config, WireMockServer wireMockServer) { + return false; + } + + @Override + public CompressionType getDefaultCompression() { + return null; + } + + @Override + public BucketAccessor getBucketAccessor(String bucketName) { + return new GCSBucketAccessor(this, bucketName); + } + + @Override + public WireMockServer enableFaultyProxy() { + return null; + } + + @Override + public Class getConnectorClass() { + return GcsSinkConnector.class; + } + + @Override + public void createStorage() { + + } + + @Override + public void removeStorage() { + + } + + @Override + public List> getNativeStorage() { + return List.of(); + } + + @Override + public IOSupplier getInputStream(String nativeKey) { + return null; + } + + @Override + public String defaultPrefix() { + return ""; + } + + public final class GCSBucketAccessor extends BucketAccessor { + + private final BucketInfo bucketInfo; + + public GCSBucketAccessor(final GCSSinkStorage storage, final String bucketName) { + super(storage, bucketName); + bucketInfo = BucketInfo.of(bucketName); + } + + private BlobId blobId(final String objectKey) { + return BlobId.of(bucketInfo.getName(), objectKey); + } + + private BlobInfo blobInfo(String objectKey) { + return BlobInfo.newBuilder(bucketInfo, objectKey).build(); + } + @Override + protected InputStream getInputStream(String objectKey) throws IOException { + return new ByteArrayInputStream(storage.readAllBytes(blobId(objectKey))); + } + + private List extractKeys(Page page) { + return StreamSupport.stream(page.iterateAll().spliterator(), false) + .map(BlobInfo::getName) + .sorted() + .collect(Collectors.toList()); + } + + @Override + protected List listKeys(String prefix) throws IOException { + return extractKeys(StringUtils.isEmpty(prefix) ? storage.list(bucketInfo.getName()) : storage.list(bucketInfo.getName(), Storage.BlobListOption.prefix(prefix))); + } + + @Override + public void removeBucket() { + Page page = storage.list(bucketInfo.getName()); + page.iterateAll().forEach(blob -> storage.delete(blob.getBlobId())); + if (!storage.delete(bucketInfo.getName())) { + throw new IllegalStateException(String.format("Bucket %s was not deleted", bucketInfo.getName())); + } + } + + @Override + public void createBucket() { + storage.create(bucketInfo); + } + } +} diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/IntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/IntegrationTest.java index f001a46e0..0be2c1f8e 100644 --- a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/IntegrationTest.java +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/IntegrationTest.java @@ -44,12 +44,14 @@ import com.github.tomakehurst.wiremock.http.RequestMethod; import com.github.tomakehurst.wiremock.matching.UrlPattern; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers +@Disabled final class IntegrationTest extends AbstractIntegrationTest { private static final String CONNECTOR_NAME = "aiven-gcs-sink-connector"; diff --git a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/ParquetIntegrationTest.java b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/ParquetIntegrationTest.java index 7fcd9cbf8..57f57294d 100644 --- a/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/ParquetIntegrationTest.java +++ b/gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/ParquetIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers +@Disabled final class ParquetIntegrationTest extends AbstractIntegrationTest { private static final String CONNECTOR_NAME = "aiven-gcs-sink-connector-plain-parquet"; diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 510e1cbd7..1b8fe0940 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -211,14 +211,16 @@ private static void addFileConfigGroup(final ConfigDef configDef) { configDef.define(FILE_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "", new ConfigDef.Validator() { @Override public void ensureValid(final String name, final Object value) { - // See https://cloud.google.com/storage/docs/naming - assert value instanceof String; + if (value == null) { + throw new ConfigException(name, value, + "Should not be null"); + } final String valueStr = (String) value; if (valueStr.length() > 1024) { // NOPMD avoid literal - throw new ConfigException(GCS_BUCKET_NAME_CONFIG, value, "cannot be longer than 1024 characters"); + throw new ConfigException(name, value, "cannot be longer than 1024 characters"); } if (valueStr.startsWith(".well-known/acme-challenge")) { - throw new ConfigException(GCS_BUCKET_NAME_CONFIG, value, + throw new ConfigException(name, value, "cannot start with '.well-known/acme-challenge'"); } } diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java index 222e75393..7aa3ae398 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java @@ -106,11 +106,13 @@ public void put(final Collection records) { for (final SinkRecord record : records) { recordGrouper.put(record); } + LOG.debug("Processed {} records into {} files", records.size(), recordGrouper.records().size()); } @Override public void flush(final Map currentOffsets) { try { + LOG.debug("Flushing {} files", recordGrouper.records().size()); recordGrouper.records().forEach(this::flushFile); } finally { recordGrouper.clear(); diff --git a/settings.gradle.kts b/settings.gradle.kts index 1d1a8826d..d2abb30d7 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -90,6 +90,7 @@ dependencyResolutionManagement { library("logback-classic", "ch.qos.logback:logback-classic:$logbackVersion") library("slf4j", "org.slf4j:slf4j-api:$slf4jVersion") library("slf4j-log4j12", "org.slf4j:slf4j-log4j12:$slf4jVersion") + library("slf4j-simple", "org.slf4j:slf4j-simple:2.0.17") } create("tools") { library(