Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase;
import io.aiven.commons.kafka.testkit.KafkaManager;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class AzureBlobClientTest {

private BlobContainerAsyncClient containerClient;
private BlobAsyncClient blobClient;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove if you want :D

@BeforeEach
public void setup() {
this.config = mock(AzureBlobSourceConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static Collection<String> names() {
}

/**
* Gets the file name extension associated with the compression.
* Gets the file name extension associated with the compression. Includes the '.' separator.
*
* @return the file name extension.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@

public enum FormatType {
/** Handles in Avro format */
AVRO("avro", (stream, fields, config, envelope) -> new AvroOutputWriter(fields, stream, config, envelope)),
AVRO("avro", "avro", (stream, fields, config, envelope) -> new AvroOutputWriter(fields, stream, config, envelope)),
/** Handles in CSV format */
CSV("csv", (stream, fields, config, envelope) -> new PlainOutputWriter(fields, stream)),
CSV("csv", "", (stream, fields, config, envelope) -> new PlainOutputWriter(fields, stream)),
/** Handles in JSON format */
JSON("json", (stream, fields, config, envelope) -> new JsonOutputWriter(fields, stream, envelope)),
JSON("json", "", (stream, fields, config, envelope) -> new JsonOutputWriter(fields, stream, envelope)),
/** Handles in JSONL format */
JSONL("jsonl", (stream, fields, config, envelope) -> new JsonLinesOutputWriter(fields, stream, envelope)),
JSONL("jsonl", "", (stream, fields, config, envelope) -> new JsonLinesOutputWriter(fields, stream, envelope)),
/** Handles Parquet format */
PARQUET("parquet", (stream, fields, config, envelope) -> new ParquetOutputWriter(fields, stream, config, envelope));
PARQUET("parquet", "", (stream, fields, config, envelope) -> new ParquetOutputWriter(fields, stream, config, envelope));

/**
* A list of supported format types for display.
Expand All @@ -57,6 +57,11 @@ public enum FormatType {
*/
public final String name;

/**
* File name segment to be added into file names, may be an empty string. Does not include separator.
*/
final String fileNameSegment;

/** The writer constructor for this type */
private final WriterConstructor writerConstructor;

Expand All @@ -68,11 +73,16 @@ public enum FormatType {
* @param writerConstructor
* the writer constructor for this type.
*/
FormatType(final String name, final WriterConstructor writerConstructor) {
FormatType(final String name, final String fileNameSegment, final WriterConstructor writerConstructor) {
this.name = name;
this.fileNameSegment = fileNameSegment;
this.writerConstructor = writerConstructor;
}

public String getFileNameSegment() {
return fileNameSegment;
}

/**
* Gets the format type by name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@

import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.templating.Template;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SchemaBasedTopicPartitionKeyRecordGrouper extends TopicPartitionKeyRecordGrouper {
private static final Logger LOG = LoggerFactory.getLogger(SchemaBasedTopicPartitionKeyRecordGrouper.class);

private final SchemaBasedRotator schemaBasedRotator = new SchemaBasedRotator();

Expand All @@ -39,6 +42,7 @@ final class SchemaBasedTopicPartitionKeyRecordGrouper extends TopicPartitionKeyR

@Override
protected String resolveRecordKeyFor(final SinkRecord record) {
LOG.debug("resolveRecordKeyFor()");
if (schemaBasedRotator.rotate(record)) {
return generateNewRecordKey(record);
} else {
Expand All @@ -53,7 +57,7 @@ public void clear() {
}

private static final class SchemaBasedRotator implements Rotator<SinkRecord> {

private static final Logger LOG = LoggerFactory.getLogger(SchemaBasedRotator.class);
private final Map<TopicPartitionKey, KeyValueSchema> keyValueSchemas = new HashMap<>();

@Override
Expand All @@ -64,7 +68,8 @@ public boolean rotate(final SinkRecord record) {
final var key = recordKey(record);
final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key);
final var keyValueVersion = keyValueSchemas.computeIfAbsent(tpk,
ignored -> new KeyValueSchema(record.keySchema(), record.valueSchema()));
ignored -> {LOG.debug("Creating new KeyValueSchema"); return new KeyValueSchema(record.keySchema(), record.valueSchema());});

final var schemaChanged = !keyValueVersion.keySchema.equals(record.keySchema())
|| !keyValueVersion.valueSchema.equals(record.valueSchema());
if (schemaChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@

import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.templating.Template;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SchemaBasedTopicPartitionRecordGrouper extends TopicPartitionRecordGrouper {

private final SchemaBasedRotator schemaBasedRotator = new SchemaBasedRotator();
private final static Logger LOG = LoggerFactory.getLogger(SchemaBasedTopicPartitionRecordGrouper.class);

SchemaBasedTopicPartitionRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile,
final TimestampSource tsSource) {
Expand All @@ -39,6 +42,7 @@ final class SchemaBasedTopicPartitionRecordGrouper extends TopicPartitionRecordG

@Override
protected String resolveRecordKeyFor(final SinkRecord record) {
LOG.debug("Checking schema based rotator");
if (schemaBasedRotator.rotate(record)) {
return generateNewRecordKey(record);
} else {
Expand All @@ -61,12 +65,15 @@ public boolean rotate(final SinkRecord record) {
if (Objects.isNull(record.valueSchema()) || Objects.isNull(record.keySchema())) {
throw new SchemaProjectorException("Record must have schemas for key and value");
}
final KeyValueSchema recordSchemas = new KeyValueSchema(record.keySchema(), record.valueSchema());
final var topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
final var keyValueVersion = keyValueSchemas.computeIfAbsent(topicPartition,
ignored -> new KeyValueSchema(record.keySchema(), record.valueSchema()));
final var schemaChanged = !keyValueVersion.keySchema.equals(record.keySchema())
|| !keyValueVersion.valueSchema.equals(record.valueSchema());
final KeyValueSchema topicPartitionSchemas = keyValueSchemas.computeIfAbsent(topicPartition,
ignored -> recordSchemas);
LOG.debug("comparing keys {} to {}", topicPartitionSchemas, recordSchemas);

final var schemaChanged = ! topicPartitionSchemas.equals(recordSchemas);
if (schemaChanged) {
LOG.debug("Schema change detected for topic partition {}", topicPartition);
keyValueSchemas.put(topicPartition, new KeyValueSchema(record.keySchema(), record.valueSchema()));
}
return schemaChanged;
Expand Down Expand Up @@ -99,6 +106,11 @@ public boolean equals(final Object other) {
public int hashCode() {
return Objects.hash(keySchema, valueSchema);
}

@Override
public String toString() {
return String.format("KeyValueSchema{keySchema=%s, valueSchema=%s}", keySchema, valueSchema);
}
}

void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicPartitionKeyRecordGrouper implements RecordGrouper {

private static final Logger LOG = LoggerFactory.getLogger(TopicPartitionKeyRecordGrouper.class);

private final Template filenameTemplate;

private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<>();
Expand Down Expand Up @@ -68,12 +72,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
LOG.debug("put({}}", recordKey);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
}

protected String resolveRecordKeyFor(final SinkRecord record) {
final var key = recordKey(record);

LOG.debug("resolveRecordKeyFor(): {}", key);
final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()),
key);
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link RecordGrouper} that groups records by topic and partition.
Expand All @@ -44,6 +46,7 @@
* The class supports limited and unlimited number of records in files.
*/
class TopicPartitionRecordGrouper implements RecordGrouper {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());

private final Template filenameTemplate;

Expand Down Expand Up @@ -87,6 +90,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
LOG.debug("{} key is {}", record.kafkaOffset(), recordKey);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public interface BackoffConfig {
/**
* Gets the abort timer rule flag. If there is no timer that may expire and shorten the time for the delay then
* this value should be {@code false} otherwise if the delay time will exceed the maximum time remaining no
* delay is executed. By default, the false is {@code true}.
* delay is executed. By default, the flag is {@code true}.
*
* @return The abort time rule flag.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void stopConnectCluster() {
* @param connectorName
* The name for the connector.
* @param connectorConfig
* the map of data items for the configuraiton of the connector.
* the map of data items for the configuration of the connector.
* @return the result of the cluster configuration call.
*/
public String configureConnector(final String connectorName, final Map<String, String> connectorConfig) {
Expand Down Expand Up @@ -291,7 +291,7 @@ public void restartConnector(final String connectorName) {
*/
public Map<String, String> getWorkerProperties(final Class<? extends Connector> connectorClass) {
final Map<String, String> workerProperties = new HashMap<>();
workerProperties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getName());
workerProperties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getCanonicalName());
workerProperties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getCanonicalName());
workerProperties.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG,
Long.toString(offsetFlushInterval.toMillis()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.aiven.kafka.connect.common;

public class StringHelpers {
private StringHelpers() {
// do not instantiate.
}
public static final String quoted(String s) {
return "\"" + s + "\"";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.IntFunction;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand Down Expand Up @@ -129,10 +130,11 @@ public static List<GenericRecord> generateAvroRecords(final int numRecs) {
*
* @param numRecs
* the numer of records to generate
* @param recordCreator A function to convert the record number into a generic record.
* @return A byte array containing the specified number of records.
*/
public static List<GenericRecord> generateAvroRecords(final int numRecs,
final Function<Integer, GenericRecord> recordCreator) {
final IntFunction<GenericRecord> recordCreator) {
return generateAvroRecords(0, numRecs, recordCreator);
}

Expand All @@ -158,11 +160,12 @@ public static List<GenericRecord> generateAvroRecords(final int messageId, final
* the messageId to start with.
* @param numOfRecs
* the number of records to write.
* @param recordCreator The creator of the generic records
* @return A byte array containing the specified number of records.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public static List<GenericRecord> generateAvroRecords(final int messageId, final int numOfRecs,
final Function<Integer, GenericRecord> recordCreator) {
final IntFunction<GenericRecord> recordCreator) {
// Create Avro records
final List<GenericRecord> avroRecords = new ArrayList<>();
final int limit = messageId + numOfRecs;
Expand Down Expand Up @@ -196,6 +199,9 @@ public static GenericRecord generateAvroRecord(final int messageId, final Schema
final GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("message", "Hello, from Avro Test Data Fixture! object " + messageId);
avroRecord.put("id", messageId);
if (schema.getField("age") != null) {
avroRecord.put("age", messageId);
}
return avroRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,30 @@
*/
final public class JsonTestDataFixture {


private final static String MSG_FORMAT = "{\"id\" : %s, \"message\" : \"%s\", \"value\" : \"value%s\"}%n";

/**
* A schema with fields {@code id} of type {@code int32} and {@code message} of type {@code string}.
*/
public static final String SCHEMA_JSON = "{\n \"type\": \"struct\", \"name\": \"TestRecord\",\n "
+ " \"fields\": [\n {\"field\": \"message\", \"type\": \"string\"},\n"
+ " {\"field\": \"id\", \"type\": \"int32\"}\n ]\n}";


public static final String CONNECT_EXTRA_SCHEMA_JSON = "{\n \"type\": \"struct\",\n \"name\": \"TestRecord\",\n"
+ " \"fields\": [\n {\"name\": \"message\", \"type\": \"string\"},\n"
+ " {\"name\": \"id\", \"type\": \"int32\"}\n ],\n"
+ " \"connect.version\":1, \"connect.name\": \"TestRecord\"}\n";

/**
* message format for the {@link #EVOLVED_SCHEMA_JSON} with an added "value" field.
*/
private final static String EVOLVED_MSG_FORMAT = "{\"id\" : %s, \"message\" : \"%s\", \"value\" : \"value%s\", \"age\" : %s}%n";

/**
* A schema with fields from {@link #SCHEMA_JSON} and added field of {@code age} of type {@code int32} and default of {@code 0}.
*/
public static final String EVOLVED_SCHEMA_JSON = "{\n \"type\": \"struct\",\n \"name\": \"TestRecord\",\n"
+ " \"fields\": [\n {\"field\": \"message\", \"type\": \"string\"},\n"
+ " {\"field\": \"id\", \"type\": \"int32\"},\n"
Expand Down Expand Up @@ -83,7 +96,9 @@ public static String generateJsonRecs(final int recordCount) {
}

/**
* Generates a single JSON record
* Generates a single JSON record with the structure
* {@code {"id":[messageId], "message":"[msg]", "value":"value[messagId]"}\n}.
* The produced record has the "value" field that is not included in the schema.
*
* @param messageId
* the id for the record
Expand All @@ -95,6 +110,21 @@ public static String generateJsonRec(final int messageId, final String msg) {
return String.format(MSG_FORMAT, messageId, msg, messageId);
}

/**
* Generates a single evolved JSON record with the structure
* {@code {"id":[messageId], "message":"[msg]", "value":"value[messageId]", "age":[messageId]*10}\n}.
* The produced record has the "value" field that is not included in the schema.
*
* @param messageId
* the id for the record
* @param msg
* the message for the record
* @return a standard JSON test record.
*/
public static String generateEvolvedJsonRec(final int messageId, final String msg) {
return String.format(EVOLVED_MSG_FORMAT, messageId, msg, messageId, messageId * 10);
}

/**
* Creates Json test data.
*
Expand Down
Loading
Loading