Skip to content

Commit c2d344d

Browse files
authored
Feat common source integration tests (#546)
Add common source integration tests Adjust sink tests to account for new dependencies in cloud commons.
2 parents a77fce1 + b4bf82a commit c2d344d

File tree

76 files changed

+5279
-4468
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+5279
-4468
lines changed

azure-sink-connector/build.gradle.kts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ dependencies {
7474

7575
implementation(project(":commons"))
7676

77-
implementation("com.azure:azure-storage-blob:12.30.0")
77+
implementation(azure.storage.blob)
7878

7979
implementation(tools.spotbugs.annotations)
8080
implementation(logginglibs.slf4j)
@@ -134,9 +134,11 @@ dependencies {
134134

135135
testRuntimeOnly(logginglibs.slf4j.log4j12)
136136

137+
integrationTestImplementation(tools.spotbugs.annotations)
137138
integrationTestImplementation(testinglibs.wiremock)
138139
integrationTestImplementation(testcontainers.junit.jupiter)
139140
integrationTestImplementation(testcontainers.kafka) // this is not Kafka version
141+
integrationTestImplementation(testcontainers.azure)
140142
integrationTestImplementation(testinglibs.awaitility)
141143

142144
integrationTestImplementation(apache.kafka.connect.transforms)
@@ -147,6 +149,8 @@ dependencies {
147149

148150
// Make test utils from "test" available in "integration-test"
149151
integrationTestImplementation(sourceSets["test"].output)
152+
// integrationTestImplementation(testFixtures(project(":azure-commons")))
153+
integrationTestImplementation(testFixtures(project(":commons")))
150154
}
151155

152156
tasks.named<Pmd>("pmdIntegrationTest") {

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AbstractIntegrationTest.java

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,44 +26,41 @@
2626
import java.time.format.DateTimeFormatter;
2727
import java.util.Arrays;
2828
import java.util.HashMap;
29-
import java.util.List;
29+
import java.util.HashSet;
3030
import java.util.Map;
31-
import java.util.Properties;
31+
import java.util.Set;
3232
import java.util.UUID;
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.Future;
3535

36-
import org.apache.kafka.clients.admin.AdminClient;
37-
import org.apache.kafka.clients.admin.AdminClientConfig;
38-
import org.apache.kafka.clients.admin.NewTopic;
3936
import org.apache.kafka.clients.producer.KafkaProducer;
4037
import org.apache.kafka.clients.producer.ProducerConfig;
4138
import org.apache.kafka.clients.producer.ProducerRecord;
4239
import org.apache.kafka.clients.producer.RecordMetadata;
4340

41+
import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase;
42+
import io.aiven.commons.kafka.testkit.KafkaManager;
4443
import io.aiven.kafka.connect.azure.sink.testutils.AzureBlobAccessor;
4544
import io.aiven.kafka.connect.common.config.CompressionType;
4645

4746
import com.azure.storage.blob.BlobServiceClient;
4847
import com.azure.storage.blob.BlobServiceClientBuilder;
49-
import com.github.dockerjava.api.model.Ulimit;
5048
import org.junit.jupiter.api.AfterEach;
5149
import org.junit.jupiter.api.BeforeAll;
50+
import org.junit.jupiter.api.BeforeEach;
5251
import org.testcontainers.containers.FixedHostPortGenericContainer;
5352
import org.testcontainers.containers.GenericContainer;
54-
import org.testcontainers.containers.KafkaContainer;
55-
import org.testcontainers.containers.Network;
5653
import org.testcontainers.junit.jupiter.Container;
5754
import org.testcontainers.junit.jupiter.Testcontainers;
5855

5956
@SuppressWarnings({ "deprecation", "PMD.TestClassWithoutTestCases" })
6057
@Testcontainers
61-
class AbstractIntegrationTest<K, V> {
58+
class AbstractIntegrationTest<K, V> extends KafkaIntegrationTestBase {
6259
protected final String testTopic0;
6360
protected final String testTopic1;
6461

65-
private AdminClient adminClient;
66-
private ConnectRunner connectRunner;
62+
private KafkaManager kafkaManager;
63+
6764
private KafkaProducer<K, V> producer;
6865

6966
protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000;
@@ -85,6 +82,7 @@ class AbstractIntegrationTest<K, V> {
8582
private static final String AZURE_ENDPOINT = "http://127.0.0.1:10000";
8683
private static final String ACCOUNT_NAME = "devstoreaccount1";
8784
private static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
85+
private static final Set<String> CONNECTOR_NAMES = new HashSet<>();
8886

8987
@Container
9088
private static final GenericContainer<?> AZURITE_CONTAINER = new FixedHostPortGenericContainer<>( // NOPMD
@@ -94,15 +92,9 @@ class AbstractIntegrationTest<K, V> {
9492
.withFixedExposedPort(AZURE_TABLE_PORT, AZURE_TABLE_PORT)
9593
.withCommand("azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0")
9694
.withReuse(true);
97-
@Container
98-
protected static final KafkaContainer KAFKA = new KafkaContainer("7.1.0")
99-
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
100-
.withNetwork(Network.newNetwork())
101-
.withExposedPorts(KafkaContainer.KAFKA_PORT, 9092)
102-
.withCreateContainerCmdModifier(
103-
cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L))));
10495

10596
protected AbstractIntegrationTest() {
97+
super();
10698
testTopic0 = "test-topic-0-" + UUID.randomUUID();
10799
testTopic1 = "test-topic-1-" + UUID.randomUUID();
108100
}
@@ -144,13 +136,16 @@ static void setUpAll() throws IOException, InterruptedException {
144136
assert process.waitFor() == 0;
145137
}
146138

139+
@BeforeEach
140+
void setupKafka() throws IOException {
141+
kafkaManager = setupKafka(true, AzureBlobSinkConnector.class);
142+
}
143+
147144
@AfterEach
148145
void tearDown() {
149-
connectRunner.stop();
150-
adminClient.close();
151146
producer.close();
152147
testBlobAccessor.clear(azurePrefix);
153-
connectRunner.awaitStop();
148+
CONNECTOR_NAMES.forEach(kafkaManager::deleteConnector);
154149
}
155150

156151
protected static boolean useFakeAzure() {
@@ -188,27 +183,20 @@ protected Future<RecordMetadata> sendMessageAsync(final String topicName, final
188183
return producer.send(msg);
189184
}
190185

191-
protected ConnectRunner getConnectRunner() {
192-
return connectRunner;
193-
}
194-
195186
protected void startConnectRunner(final Map<String, Object> testSpecificProducerProperties)
196187
throws ExecutionException, InterruptedException {
197188
testBlobAccessor.clear(azurePrefix);
198189

199-
final Properties adminClientConfig = new Properties();
200-
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
201-
adminClient = AdminClient.create(adminClientConfig);
190+
kafkaManager.createTopics(Arrays.asList(testTopic0, testTopic1));
202191

203192
final Map<String, Object> producerProps = new HashMap<>(testSpecificProducerProperties);
204-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
193+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaManager.bootstrapServers());
205194
producer = new KafkaProducer<>(producerProps);
206195

207-
final NewTopic newTopic0 = new NewTopic(testTopic0, 4, (short) 1);
208-
final NewTopic newTopic1 = new NewTopic(testTopic1, 4, (short) 1);
209-
adminClient.createTopics(Arrays.asList(newTopic0, newTopic1)).all().get();
196+
}
210197

211-
connectRunner = new ConnectRunner(pluginDir, KAFKA.getBootstrapServers(), OFFSET_FLUSH_INTERVAL_MS);
212-
connectRunner.start();
198+
protected void createConnector(final Map<String, String> connectorConfig) {
199+
CONNECTOR_NAMES.add(connectorConfig.get("name"));
200+
kafkaManager.configureConnector(connectorConfig.get("name"), connectorConfig);
213201
}
214202
}

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroIntegrationTest.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,30 +50,26 @@
5050
import org.junit.jupiter.params.ParameterizedTest;
5151
import org.junit.jupiter.params.provider.Arguments;
5252
import org.junit.jupiter.params.provider.MethodSource;
53-
import org.testcontainers.junit.jupiter.Container;
5453
import org.testcontainers.junit.jupiter.Testcontainers;
5554

5655
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
5756
@Testcontainers
5857
final class AvroIntegrationTest extends AbstractIntegrationTest<String, GenericRecord> {
5958
private static final String CONNECTOR_NAME = "aiven-azure-sink-connector-avro";
6059

61-
@Container
62-
private final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer(KAFKA);
63-
6460
private final Schema avroInputDataSchema = new Schema.Parser().parse(
6561
"{\"type\":\"record\",\"name\":\"input_data\"," + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
6662

6763
@BeforeEach
6864
void setUp() throws ExecutionException, InterruptedException {
6965
testBlobAccessor.clear(azurePrefix);
7066
final Map<String, Object> producerProps = new HashMap<>();
71-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
67+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers());
7268
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
7369
"io.confluent.kafka.serializers.KafkaAvroSerializer");
7470
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
7571
"io.confluent.kafka.serializers.KafkaAvroSerializer");
76-
producerProps.put("schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
72+
producerProps.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
7773
startConnectRunner(producerProps);
7874
}
7975

@@ -101,7 +97,7 @@ void avroOutput() throws ExecutionException, InterruptedException, IOException {
10197
final Map<String, String> connectorConfig = basicConnectorConfig();
10298
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value");
10399
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
104-
getConnectRunner().createConnector(connectorConfig);
100+
createConnector(connectorConfig);
105101

106102
final int recordCountPerPartition = 10;
107103
produceRecords(recordCountPerPartition);
@@ -170,7 +166,7 @@ void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final String co
170166
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
171167
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
172168
connectorConfig.put("avro.codec", avroCodec);
173-
getConnectRunner().createConnector(connectorConfig);
169+
createConnector(connectorConfig);
174170

175171
final int recordCountPerPartition = 10;
176172
produceRecords(recordCountPerPartition);
@@ -231,7 +227,7 @@ void schemaChanged() throws ExecutionException, InterruptedException, IOExceptio
231227
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
232228
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
233229
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "avro");
234-
getConnectRunner().createConnector(connectorConfig);
230+
createConnector(connectorConfig);
235231

236232
final Schema evolvedAvroInputDataSchema = new Schema.Parser()
237233
.parse("{\"type\":\"record\",\"name\":\"input_data\","
@@ -287,7 +283,7 @@ void jsonlOutput() throws ExecutionException, InterruptedException {
287283
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
288284
connectorConfig.put(AzureBlobSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
289285
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "jsonl");
290-
getConnectRunner().createConnector(connectorConfig);
286+
createConnector(connectorConfig);
291287

292288
final int recordCountPerPartition = 10;
293289
produceRecords(recordCountPerPartition);
@@ -323,9 +319,9 @@ private Map<String, String> basicConnectorConfig() {
323319
config.put(AzureBlobSinkConfig.NAME_CONFIG, CONNECTOR_NAME);
324320
config.put("connector.class", AzureBlobSinkConnector.class.getName());
325321
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
326-
config.put("key.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
322+
config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
327323
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
328-
config.put("value.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
324+
config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
329325
config.put("tasks.max", "1");
330326
if (useFakeAzure()) {
331327
config.put(AzureBlobSinkConfig.AZURE_STORAGE_CONNECTION_STRING_CONFIG, azureEndpoint);

azure-sink-connector/src/integration-test/java/io/aiven/kafka/connect/azure/sink/AvroParquetIntegrationTest.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,32 @@
3333
import org.apache.kafka.clients.producer.ProducerConfig;
3434
import org.apache.kafka.clients.producer.RecordMetadata;
3535

36+
import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;
37+
3638
import org.apache.avro.Schema;
3739
import org.apache.avro.SchemaBuilder;
3840
import org.apache.avro.generic.GenericData;
3941
import org.apache.avro.generic.GenericRecord;
4042
import org.junit.jupiter.api.BeforeEach;
4143
import org.junit.jupiter.api.Test;
4244
import org.junit.jupiter.api.io.TempDir;
43-
import org.testcontainers.junit.jupiter.Container;
4445
import org.testcontainers.junit.jupiter.Testcontainers;
4546

4647
@Testcontainers
4748
final class AvroParquetIntegrationTest extends AbstractIntegrationTest<String, GenericRecord> {
4849

4950
private static final String CONNECTOR_NAME = "aiven-azure-sink-connector-parquet";
5051

51-
@Container
52-
private final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer(KAFKA);
53-
5452
@BeforeEach
5553
void setUp() throws ExecutionException, InterruptedException {
5654
testBlobAccessor.clear(azurePrefix);
5755
final Map<String, Object> producerProps = new HashMap<>();
58-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
56+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers());
5957
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
6058
"io.confluent.kafka.serializers.KafkaAvroSerializer");
6159
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
6260
"io.confluent.kafka.serializers.KafkaAvroSerializer");
63-
producerProps.put("schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
61+
producerProps.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
6462
startConnectRunner(producerProps);
6563
}
6664

@@ -70,7 +68,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
7068
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
7169
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "key,value,offset,timestamp,headers");
7270
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
73-
getConnectRunner().createConnector(connectorConfig);
71+
createConnector(connectorConfig);
7472

7573
final Schema valueSchema = SchemaBuilder.record("value")
7674
.fields()
@@ -109,7 +107,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
109107

110108
final Map<String, List<GenericRecord>> blobContents = new HashMap<>();
111109
for (final String blobName : expectedBlobs) {
112-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
110+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
113111
testBlobAccessor.readBytes(blobName));
114112
blobContents.put(blobName, records);
115113
}
@@ -138,7 +136,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
138136
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
139137
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
140138
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
141-
getConnectRunner().createConnector(connectorConfig);
139+
createConnector(connectorConfig);
142140

143141
final Schema valueSchema = SchemaBuilder.record("value")
144142
.fields()
@@ -177,7 +175,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
177175

178176
final Map<String, List<GenericRecord>> blobContents = new HashMap<>();
179177
for (final String blobName : expectedBlobs) {
180-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
178+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
181179
testBlobAccessor.readBytes(blobName));
182180
blobContents.put(blobName, records);
183181
}
@@ -202,7 +200,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
202200
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
203201
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
204202
connectorConfig.put(AzureBlobSinkConfig.FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, "none");
205-
getConnectRunner().createConnector(connectorConfig);
203+
createConnector(connectorConfig);
206204

207205
final Schema valueSchema = SchemaBuilder.record("value")
208206
.fields()
@@ -268,7 +266,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
268266

269267
final var blobContents = new ArrayList<String>();
270268
for (final String blobName : expectedBlobs) {
271-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
269+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
272270
testBlobAccessor.readBytes(blobName));
273271
blobContents.addAll(records.stream().map(r -> r.get("value").toString()).collect(Collectors.toList()));
274272
}
@@ -280,9 +278,9 @@ private Map<String, String> basicConnectorConfig(final String compression) {
280278
config.put(AzureBlobSinkConfig.NAME_CONFIG, CONNECTOR_NAME);
281279
config.put("connector.class", AzureBlobSinkConnector.class.getName());
282280
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
283-
config.put("key.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
281+
config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
284282
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
285-
config.put("value.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
283+
config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
286284
config.put("tasks.max", "1");
287285
if (useFakeAzure()) {
288286
config.put(AzureBlobSinkConfig.AZURE_STORAGE_CONNECTION_STRING_CONFIG, azureEndpoint);

0 commit comments

Comments
 (0)