Skip to content

Commit 41e35db

Browse files
authored
Add azure sink documentation (#560)
Added documentation for Azure sink and cleaned up Azure source.
1 parent e55948d commit 41e35db

File tree

22 files changed

+750
-385
lines changed

22 files changed

+750
-385
lines changed

azure-sink-connector/build.gradle.kts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ idea {
7171

7272
dependencies {
7373
compileOnly(apache.kafka.connect.api)
74+
compileOnly(project(":site"))
75+
compileOnly(apache.velocity.engine.core)
76+
compileOnly(apache.velocity.tools)
7477

7578
implementation(project(":commons"))
7679

@@ -189,7 +192,7 @@ publishing {
189192
licenses {
190193
license {
191194
name = "Apache 2.0"
192-
url = "http://www.apache.org/licenses/LICENSE-2.0"
195+
url = "https://www.apache.org/licenses/LICENSE-2.0"
193196
distribution = "repo"
194197
}
195198
}
@@ -249,3 +252,47 @@ signing {
249252
}
250253
signatureTypes = ASCSignatureProvider()
251254
}
255+
256+
/** ******************************* */
257+
/* Documentation building section */
258+
/** ******************************* */
259+
tasks.register("buildDocs") {
260+
dependsOn("buildConfigMd")
261+
dependsOn("buildConfigYml")
262+
}
263+
264+
tasks.register<JavaExec>("buildConfigMd") {
265+
mainClass = "io.aiven.kafka.connect.tools.ConfigDoc"
266+
classpath =
267+
sourceSets.main
268+
.get()
269+
.compileClasspath
270+
.plus(files(tasks.jar))
271+
.plus(sourceSets.main.get().runtimeClasspath)
272+
args =
273+
listOf(
274+
"io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig",
275+
"configDef",
276+
"src/templates/configData.md.vm",
277+
"build/site/markdown/azure-sink-connector/AzureSinkConfig.md")
278+
}
279+
280+
tasks.register<JavaExec>("buildConfigYml") {
281+
mainClass = "io.aiven.kafka.connect.tools.ConfigDoc"
282+
classpath =
283+
sourceSets.main
284+
.get()
285+
.compileClasspath
286+
.plus(files(tasks.jar))
287+
.plus(sourceSets.main.get().runtimeClasspath)
288+
args =
289+
listOf(
290+
"io.aiven.kafka.connect.azure.sink.AzureBlobSinkConfig",
291+
"configDef",
292+
"src/templates/configData.yml.vm",
293+
"build/site/azure-sink-connector/AzureSinkConfig.yml")
294+
}
295+
296+
/** ****************************** */
297+
/* End of documentation section */
298+
/** ****************************** */

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
import org.apache.kafka.clients.producer.RecordMetadata;
3636

3737
import io.aiven.kafka.connect.common.config.CompressionType;
38+
import io.aiven.kafka.connect.common.config.FileNameFragment;
39+
import io.aiven.kafka.connect.common.config.FormatType;
40+
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
41+
import io.aiven.kafka.connect.common.config.OutputFieldType;
42+
import io.aiven.kafka.connect.common.config.OutputFormatFragment;
3843

3944
import org.apache.avro.Schema;
4045
import org.apache.avro.file.DataFileReader;
@@ -95,8 +100,9 @@ private void produceRecords(final int recordCountPerPartition) throws ExecutionE
95100
@Test
96101
void avroOutput() throws ExecutionException, InterruptedException, IOException {
97102
final Map<String, String> connectorConfig = basicConnectorConfig();
98-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
99-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
103+
OutputFormatFragment.setter(connectorConfig)
104+
.withFormatType(FormatType.AVRO)
105+
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
100106
createConnector(connectorConfig);
101107

102108
final int recordCountPerPartition = 10;
@@ -161,10 +167,11 @@ private byte[] getBlobBytes(final byte[] blobBytes, final String compression) th
161167
void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String compression)
162168
throws ExecutionException, InterruptedException, IOException {
163169
final Map<String, String> connectorConfig = basicConnectorConfig();
164-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
165-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
166-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
167-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
170+
OutputFormatFragment.setter(connectorConfig)
171+
.withFormatType(FormatType.AVRO)
172+
.withOutputFields(OutputFieldType.VALUE)
173+
.envelopeEnabled(false);
174+
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression));
168175
connectorConfig.put("avro.codec", avroCodec);
169176
createConnector(connectorConfig);
170177

@@ -223,10 +230,12 @@ void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String co
223230
@Test
224231
void schemaChanged() throws ExecutionException, InterruptedException, IOException {
225232
final Map<String, String> connectorConfig = basicConnectorConfig();
226-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
227-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
228-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
229-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
233+
OutputFormatFragment.setter(connectorConfig)
234+
.withFormatType(FormatType.AVRO)
235+
.withOutputFields(OutputFieldType.VALUE)
236+
.envelopeEnabled(false)
237+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
238+
230239
createConnector(connectorConfig);
231240

232241
final Schema evolvedAvroInputDataSchema = new Schema.Parser()
@@ -279,10 +288,12 @@ void schemaChanged() throws ExecutionException, InterruptedException, IOExceptio
279288
void jsonlOutput() throws ExecutionException, InterruptedException {
280289
final Map<String, String> connectorConfig = basicConnectorConfig();
281290
final String compression = "none";
282-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
283-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
284-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
285-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "jsonl");
291+
OutputFormatFragment.setter(connectorConfig)
292+
.withFormatType(FormatType.JSONL)
293+
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
294+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
295+
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);
296+
286297
createConnector(connectorConfig);
287298

288299
final int recordCountPerPartition = 10;
@@ -334,12 +345,12 @@ private Map<String, String> basicConnectorConfig() {
334345
return config;
335346
}
336347

337-
protected String getAvroBlobName(final int partition, final int startOffset, final String compression) {
348+
String getAvroBlobName(final int partition, final int startOffset, final String compression) {
338349
return super.getBaseBlobName(partition, startOffset) + ".avro"
339350
+ CompressionType.forName(compression).extension();
340351
}
341352

342-
protected String getAvroBlobName(final int partition, final int startOffset) {
353+
String getAvroBlobName(final int partition, final int startOffset) {
343354
return super.getBaseBlobName(partition, startOffset) + ".avro";
344355
}
345356
}

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
import org.apache.kafka.clients.producer.ProducerConfig;
3434
import org.apache.kafka.clients.producer.RecordMetadata;
3535

36+
import io.aiven.kafka.connect.common.config.CompressionType;
37+
import io.aiven.kafka.connect.common.config.FileNameFragment;
38+
import io.aiven.kafka.connect.common.config.FormatType;
39+
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
40+
import io.aiven.kafka.connect.common.config.OutputFieldType;
41+
import io.aiven.kafka.connect.common.config.OutputFormatFragment;
3642
import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;
3743

3844
import org.apache.avro.Schema;
@@ -66,8 +72,10 @@ void setUp() throws ExecutionException, InterruptedException {
6672
void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
6773
final var compression = "none";
6874
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
69-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers");
70-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
75+
OutputFormatFragment.setter(connectorConfig)
76+
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE, OutputFieldType.OFFSET,
77+
OutputFieldType.TIMESTAMP, OutputFieldType.HEADERS)
78+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
7179
createConnector(connectorConfig);
7280

7381
final Schema valueSchema = SchemaBuilder.record("value")
@@ -134,8 +142,9 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
134142
void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
135143
final String compression = "none";
136144
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
137-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
138-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
145+
OutputFormatFragment.setter(connectorConfig)
146+
.withOutputFields(OutputFieldType.VALUE)
147+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
139148
createConnector(connectorConfig);
140149

141150
final Schema valueSchema = SchemaBuilder.record("value")
@@ -198,8 +207,9 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
198207
void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, InterruptedException, IOException {
199208
final String compression = "none";
200209
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
201-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
202-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
210+
OutputFormatFragment.setter(connectorConfig)
211+
.withOutputFields(OutputFieldType.VALUE)
212+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
203213
createConnector(connectorConfig);
204214

205215
final Schema valueSchema = SchemaBuilder.record("value")
@@ -290,8 +300,9 @@ private Map<String, String> basicConnectorConfig(final String compression) {
290300
config.put(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, testContainerName);
291301
config.put(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, azurePrefix);
292302
config.put("topics", testTopic0 + "," + testTopic1);
293-
config.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
294-
config.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "parquet");
303+
304+
FileNameFragment.setter(config).fileCompression(CompressionType.forName(compression));
305+
OutputFormatFragment.setter(config).withFormatType(FormatType.PARQUET);
295306
return config;
296307
}
297308
}

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/IntegrationTest.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
import org.apache.kafka.common.TopicPartition;
3737

3838
import io.aiven.kafka.connect.common.config.CompressionType;
39+
import io.aiven.kafka.connect.common.config.FileNameFragment;
40+
import io.aiven.kafka.connect.common.config.FormatType;
41+
import io.aiven.kafka.connect.common.config.OutputFieldEncodingType;
42+
import io.aiven.kafka.connect.common.config.OutputFieldType;
43+
import io.aiven.kafka.connect.common.config.OutputFormatFragment;
3944

4045
import org.junit.jupiter.api.BeforeEach;
4146
import org.junit.jupiter.api.Test;
@@ -63,8 +68,8 @@ void setUp() throws ExecutionException, InterruptedException {
6368
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
6469
void basicTest(final String compression) throws ExecutionException, InterruptedException {
6570
final Map<String, String> connectorConfig = basicConnectorConfig();
66-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
67-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
71+
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
72+
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.forName(compression));
6873
createConnector(connectorConfig);
6974

7075
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
@@ -118,10 +123,11 @@ void basicTest(final String compression) throws ExecutionException, InterruptedE
118123
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
119124
void groupByTimestampVariable(final String compression) throws ExecutionException, InterruptedException {
120125
final Map<String, String> connectorConfig = basicConnectorConfig();
121-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
122-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
123-
connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}-"
124-
+ "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}");
126+
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
127+
FileNameFragment.setter(connectorConfig)
128+
.fileCompression(CompressionType.forName(compression))
129+
.template("{{topic}}-{{partition}}-{{start_offset}}-"
130+
+ "{{timestamp:unit=yyyy}}-{{timestamp:unit=MM}}-{{timestamp:unit=dd}}");
125131
createConnector(connectorConfig);
126132

127133
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
@@ -176,10 +182,12 @@ private String getTimestampBlobName(final int partition, final int startOffset)
176182
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
177183
void oneFilePerRecordWithPlainValues(final String compression) throws ExecutionException, InterruptedException {
178184
final Map<String, String> connectorConfig = basicConnectorConfig();
179-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
180-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
181-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
182-
connectorConfig.put(AzureBlobSinkConfig.FILE_MAX_RECORDS, "1");
185+
FileNameFragment.setter(connectorConfig)
186+
.maxRecordsPerFile(1)
187+
.fileCompression(CompressionType.forName(compression));
188+
OutputFormatFragment.setter(connectorConfig)
189+
.withOutputFields(OutputFieldType.VALUE)
190+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
183191
createConnector(connectorConfig);
184192

185193
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
@@ -226,9 +234,10 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted
226234
final Map<String, String> connectorConfig = basicConnectorConfig();
227235
final CompressionType compressionType = CompressionType.forName(compression);
228236
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
229-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
230-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
231-
connectorConfig.put(AzureBlobSinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}" + compressionType.extension());
237+
OutputFormatFragment.setter(connectorConfig).withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE);
238+
FileNameFragment.setter(connectorConfig)
239+
.fileCompression(CompressionType.forName(compression))
240+
.template("{{key}}" + compressionType.extension());
232241
createConnector(connectorConfig);
233242

234243
final Map<TopicPartition, List<String>> keysPerTopicPartition = new HashMap<>();
@@ -292,14 +301,14 @@ void groupByKey(final String compression) throws ExecutionException, Interrupted
292301
void jsonlOutput() throws ExecutionException, InterruptedException {
293302
final Map<String, String> connectorConfig = basicConnectorConfig();
294303
final String compression = "none";
295-
final String contentType = "jsonl";
296-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
297-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
298304
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
299305
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
300306
connectorConfig.put("value.converter.schemas.enable", "false");
301-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
302-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType);
307+
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);
308+
OutputFormatFragment.setter(connectorConfig)
309+
.withFormatType(FormatType.JSONL)
310+
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
311+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
303312
createConnector(connectorConfig);
304313

305314
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
@@ -353,16 +362,16 @@ void jsonlOutput() throws ExecutionException, InterruptedException {
353362
void jsonOutput() throws ExecutionException, InterruptedException {
354363
final Map<String, String> connectorConfig = basicConnectorConfig();
355364
final String compression = "none";
356-
final String contentType = "json";
357365
connectorConfig.put("azure.storage.connection.string",
358366
azureEndpoint != null ? azureEndpoint : azureConnectionString); // NOPMD
359-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
360-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
361367
connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
362368
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
363369
connectorConfig.put("value.converter.schemas.enable", "false");
364-
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
365-
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, contentType);
370+
FileNameFragment.setter(connectorConfig).fileCompression(CompressionType.NONE);
371+
OutputFormatFragment.setter(connectorConfig)
372+
.withFormatType(FormatType.JSON)
373+
.withOutputFields(OutputFieldType.KEY, OutputFieldType.VALUE)
374+
.withOutputFieldEncodingType(OutputFieldEncodingType.NONE);
366375
createConnector(connectorConfig);
367376

368377
final int numEpochs = 10;

0 commit comments

Comments
 (0)