Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion azure-sink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ idea {

dependencies {
compileOnly(apache.kafka.connect.api)
compileOnly(project(":site"))
compileOnly(apache.velocity.engine.core)
compileOnly(apache.velocity.tools)

implementation(project(":commons"))

Expand Down Expand Up @@ -189,7 +192,7 @@ publishing {
licenses {
license {
name = "Apache 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0"
url = "https://www.apache.org/licenses/LICENSE-2.0"
distribution = "repo"
}
}
Expand Down Expand Up @@ -249,3 +252,47 @@ signing {
}
signatureTypes = ASCSignatureProvider()
}

/** ******************************* */
/* Documentation building section */
/** ******************************* */
tasks.register("buildDocs") {
dependsOn("buildConfigMd")
dependsOn("buildConfigYml")
}

tasks.register<JavaExec>("buildConfigMd") {
mainClass = "io.aiven.kafka.connect.tools.ConfigDoc"
classpath =
sourceSets.main
.get()
.compileClasspath
.plus(files(tasks.jar))
.plus(sourceSets.main.get().runtimeClasspath)
args =
listOf(
"io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig",
"configDef",
"src/templates/configData.md.vm",
"build/site/markdown/azure-sink-connector/AzureSinkConfig.md")
}

tasks.register<JavaExec>("buildConfigYml") {
mainClass = "io.aiven.kafka.connect.tools.ConfigDoc"
classpath =
sourceSets.main
.get()
.compileClasspath
.plus(files(tasks.jar))
.plus(sourceSets.main.get().runtimeClasspath)
args =
listOf(
"io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig",
"configDef",
"src/templates/configData.yml.vm",
"build/site/azure-sink-connector/AzureSinkConfig.yml")
}

/** ****************************** */
/* End of documentation section */
/** ****************************** */
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
import org.apache.kafka.clients.producer.RecordMetadata;

import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.FileNameFragment;
import io.aiven.kafka.connect.common.config.FormatType;
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.common.config.OutputFormatFragment;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand Down Expand Up @@ -95,8 +100,9 @@ private void produceRecords(final int recordCountPerPartition) throws ExecutionE
@Test
void avroOutput() throws ExecutionException, InterruptedException, IOException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.AVRO)
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
createConnector(connectorConfig);

final int recordCountPerPartition = 10;
Expand Down Expand Up @@ -161,10 +167,11 @@ private byte[] getBlobBytes(final byte[] blobBytes, final String compression) th
void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String compression)
throws ExecutionException, InterruptedException, IOException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.AVRO)
.withOutputFields(OutputFieldType.VALUE)
.envelopeEnabled(false);
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression));
connectorConfig.put("avro.codec", avroCodec);
createConnector(connectorConfig);

Expand Down Expand Up @@ -223,10 +230,12 @@ void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String co
@Test
void schemaChanged() throws ExecutionException, InterruptedException, IOException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.AVRO)
.withOutputFields(OutputFieldType.VALUE)
.envelopeEnabled(false)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);

createConnector(connectorConfig);

final Schema evolvedAvroInputDataSchema = new Schema.Parser()
Expand Down Expand Up @@ -279,10 +288,12 @@ void schemaChanged() throws ExecutionException, InterruptedException, IOExceptio
void jsonlOutput() throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
final String compression = "none";
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "jsonl");
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.JSONL)
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);

createConnector(connectorConfig);

final int recordCountPerPartition = 10;
Expand Down Expand Up @@ -334,12 +345,12 @@ private Map<String, String> basicConnectorConfig() {
return config;
}

protected String getAvroBlobName(final int partition, final int startOffset, final String compression) {
String getAvroBlobName(final int partition, final int startOffset, final String compression) {
return super.getBaseBlobName(partition, startOffset) + ".avro"
+ CompressionType.forName(compression).extension();
}

protected String getAvroBlobName(final int partition, final int startOffset) {
String getAvroBlobName(final int partition, final int startOffset) {
return super.getBaseBlobName(partition, startOffset) + ".avro";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;

import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.FileNameFragment;
import io.aiven.kafka.connect.common.config.FormatType;
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.common.config.OutputFormatFragment;
import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -66,8 +72,10 @@ void setUp() throws ExecutionException, InterruptedException {
void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
final var compression = "none";
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
OutputFormatFragment.setter(connectorConfig)
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE, OutputFieldType.OFFSET,
OutputFieldType.TIMESTAMP, OutputFieldType.HEADERS)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final Schema valueSchema = SchemaBuilder.record("value")
Expand Down Expand Up @@ -134,8 +142,9 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
final String compression = "none";
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
OutputFormatFragment.setter(connectorConfig)
.withOutputFields(OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final Schema valueSchema = SchemaBuilder.record("value")
Expand Down Expand Up @@ -198,8 +207,9 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
final String compression = "none";
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
OutputFormatFragment.setter(connectorConfig)
.withOutputFields(OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final Schema valueSchema = SchemaBuilder.record("value")
Expand Down Expand Up @@ -290,8 +300,9 @@ private Map<String, String> basicConnectorConfig(final String compression) {
config.put(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, testContainerName);
config.put(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, azurePrefix);
config.put("topics", testTopic0 + "," + testTopic1);
config.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
config.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "parquet");

FileNameFragment.setter(config).fileCompression(CompressionType.forName(compression));
OutputFormatFragment.setter(config).withFormatType(FormatType.PARQUET);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
import org.apache.kafka.common.TopicPartition;

import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.FileNameFragment;
import io.aiven.kafka.connect.common.config.FormatType;
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.common.config.OutputFormatFragment;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -63,8 +68,8 @@ void setUp() throws ExecutionException, InterruptedException {
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void basicTest(final String compression) throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression));
createConnector(connectorConfig);

final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
Expand Down Expand Up @@ -118,10 +123,11 @@ void basicTest(final String compression) throws ExecutionException, InterruptedE
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void groupByTimestampVariable(final String compression) throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}");
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
FileNameFragment.setter(connectorConfig)
.fileCompression(CompressionType.forName(compression))
.template("{{topic}}-{{partition}}-{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}");
createConnector(connectorConfig);

final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
Expand Down Expand Up @@ -176,10 +182,12 @@ private String getTimestampBlobName(final int partition, final int startOffset)
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void oneFilePerRecordWithPlainValues(final String compression) throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
connectorConfig.put(AzureBlobSinkConfig.FILE_MAX_RECORDS, "1");
FileNameFragment.setter(connectorConfig)
.maxRecordsPerFile(1)
.fileCompression(CompressionType.forName(compression));
OutputFormatFragment.setter(connectorConfig)
.withOutputFields(OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
Expand Down Expand Up @@ -226,9 +234,10 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted
final Map<String, String> connectorConfig = basicConnectorConfig();
final CompressionType compressionType = CompressionType.forName(compression);
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}" + compressionType.extension());
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
FileNameFragment.setter(connectorConfig)
.fileCompression(CompressionType.forName(compression))
.template("{{key}}" + compressionType.extension());
createConnector(connectorConfig);

final Map<TopicPartition, List<String>> keysPerTopicPartition = new HashMap<>();
Expand Down Expand Up @@ -292,14 +301,14 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted
void jsonlOutput() throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
final String compression = "none";
final String contentType = "jsonl";
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter.schemas.enable", "false");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType);
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.JSONL)
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
Expand Down Expand Up @@ -353,16 +362,16 @@ void jsonlOutput() throws ExecutionException, InterruptedException {
void jsonOutput() throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = basicConnectorConfig();
final String compression = "none";
final String contentType = "json";
connectorConfig.put("azure.storage.connection.string",
azureEndpoint != null ? azureEndpoint : azureConnectionString); // NOPMD
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter.schemas.enable", "false");
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType);
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);
OutputFormatFragment.setter(connectorConfig)
.withFormatType(FormatType.JSON)
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
createConnector(connectorConfig);

final int numEpochs = 10;
Expand Down
Loading
Loading