Skip to content

Commit 672cc57

Browse files
wicknicksewencp
authored andcommitted
KAFKA-7503: Connect integration test harness
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests. Signed-off-by: Arjun Satish <arjunconfluent.io> Author: Arjun Satish <[email protected]> Author: Arjun Satish <[email protected]> Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #5516 from wicknicks/connect-integration-test (cherry picked from commit 69d8d2e) Signed-off-by: Ewen Cheslack-Postava <[email protected]>
1 parent f25a9c4 commit 672cc57

24 files changed

+1617
-105
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,8 @@ project(':connect:runtime') {
14091409
testCompile libs.powermockEasymock
14101410

14111411
testCompile project(':clients').sourceSets.test.output
1412+
testCompile project(':core')
1413+
testCompile project(':core').sourceSets.test.output
14121414

14131415
testRuntime libs.slf4jlog4j
14141416
}

checkstyle/import-control.xml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,6 @@
335335
</subpackage>
336336
</subpackage>
337337

338-
339-
340338
<subpackage name="cli">
341339
<allow pkg="org.apache.kafka.connect.runtime" />
342340
<allow pkg="org.apache.kafka.connect.storage" />
@@ -354,6 +352,18 @@
354352
<allow pkg="org.reflections.vfs" />
355353
<!-- for annotations to avoid code duplication -->
356354
<allow pkg="com.fasterxml.jackson.annotation" />
355+
<allow pkg="com.fasterxml.jackson.databind" />
356+
<subpackage name="clusters">
357+
<allow pkg="kafka.server" />
358+
<allow pkg="kafka.zk" />
359+
<allow pkg="kafka.utils" />
360+
<allow class="javax.servlet.http.HttpServletResponse" />
361+
</subpackage>
362+
</subpackage>
363+
364+
<subpackage name="integration">
365+
<allow pkg="org.apache.kafka.connect.util.clusters" />
366+
<allow pkg="org.apache.kafka.connect" />
357367
</subpackage>
358368

359369
<subpackage name="json">

connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.utils.Time;
2121
import org.apache.kafka.common.utils.Utils;
2222
import org.apache.kafka.connect.runtime.Connect;
23+
import org.apache.kafka.connect.runtime.HerderProvider;
2324
import org.apache.kafka.connect.runtime.Worker;
2425
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
2526
import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -38,6 +39,7 @@
3839
import org.slf4j.LoggerFactory;
3940

4041
import java.net.URI;
42+
import java.util.Arrays;
4143
import java.util.Collections;
4244
import java.util.Map;
4345

@@ -53,62 +55,26 @@
5355
public class ConnectDistributed {
5456
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
5557

56-
public static void main(String[] args) throws Exception {
57-
if (args.length < 1) {
58+
private final Time time = Time.SYSTEM;
59+
private final long initStart = time.hiResClockMs();
60+
61+
public static void main(String[] args) {
62+
63+
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
5864
log.info("Usage: ConnectDistributed worker.properties");
5965
Exit.exit(1);
6066
}
6167

6268
try {
63-
Time time = Time.SYSTEM;
64-
log.info("Kafka Connect distributed worker initializing ...");
65-
long initStart = time.hiResClockMs();
6669
WorkerInfo initInfo = new WorkerInfo();
6770
initInfo.logAll();
6871

6972
String workerPropsFile = args[0];
7073
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
71-
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
72-
73-
log.info("Scanning for plugin classes. This might take a moment ...");
74-
Plugins plugins = new Plugins(workerProps);
75-
plugins.compareAndSwapWithDelegatingLoader();
76-
DistributedConfig config = new DistributedConfig(workerProps);
77-
78-
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
79-
log.debug("Kafka cluster ID: {}", kafkaClusterId);
80-
81-
RestServer rest = new RestServer(config);
82-
URI advertisedUrl = rest.advertisedUrl();
83-
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
84-
85-
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
86-
offsetBackingStore.configure(config);
87-
88-
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
89-
WorkerConfigTransformer configTransformer = worker.configTransformer();
90-
91-
Converter internalValueConverter = worker.getInternalValueConverter();
92-
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
93-
statusBackingStore.configure(config);
94-
95-
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
96-
internalValueConverter,
97-
config,
98-
configTransformer);
99-
100-
DistributedHerder herder = new DistributedHerder(config, time, worker,
101-
kafkaClusterId, statusBackingStore, configBackingStore,
102-
advertisedUrl.toString());
103-
final Connect connect = new Connect(herder, rest);
104-
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
105-
try {
106-
connect.start();
107-
} catch (Exception e) {
108-
log.error("Failed to start Connect", e);
109-
connect.stop();
110-
Exit.exit(3);
111-
}
74+
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
75+
76+
ConnectDistributed connectDistributed = new ConnectDistributed();
77+
Connect connect = connectDistributed.startConnect(workerProps);
11278

11379
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
11480
connect.awaitStop();
@@ -118,4 +84,55 @@ public static void main(String[] args) throws Exception {
11884
Exit.exit(2);
11985
}
12086
}
87+
88+
public Connect startConnect(Map<String, String> workerProps) {
89+
log.info("Scanning for plugin classes. This might take a moment ...");
90+
Plugins plugins = new Plugins(workerProps);
91+
plugins.compareAndSwapWithDelegatingLoader();
92+
DistributedConfig config = new DistributedConfig(workerProps);
93+
94+
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
95+
log.debug("Kafka cluster ID: {}", kafkaClusterId);
96+
97+
RestServer rest = new RestServer(config);
98+
HerderProvider provider = new HerderProvider();
99+
rest.start(provider, plugins);
100+
101+
URI advertisedUrl = rest.advertisedUrl();
102+
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
103+
104+
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
105+
offsetBackingStore.configure(config);
106+
107+
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
108+
WorkerConfigTransformer configTransformer = worker.configTransformer();
109+
110+
Converter internalValueConverter = worker.getInternalValueConverter();
111+
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
112+
statusBackingStore.configure(config);
113+
114+
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
115+
internalValueConverter,
116+
config,
117+
configTransformer);
118+
119+
DistributedHerder herder = new DistributedHerder(config, time, worker,
120+
kafkaClusterId, statusBackingStore, configBackingStore,
121+
advertisedUrl.toString());
122+
123+
final Connect connect = new Connect(herder, rest);
124+
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
125+
try {
126+
connect.start();
127+
// herder has initialized now, and ready to be used by the RestServer.
128+
provider.setHerder(herder);
129+
} catch (Exception e) {
130+
log.error("Failed to start Connect", e);
131+
connect.stop();
132+
Exit.exit(3);
133+
}
134+
135+
return connect;
136+
}
137+
121138
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23+
import java.net.URI;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.atomic.AtomicBoolean;
2526

@@ -50,7 +51,6 @@ public void start() {
5051
Runtime.getRuntime().addShutdownHook(shutdownHook);
5152

5253
herder.start();
53-
rest.start(herder);
5454

5555
log.info("Kafka Connect started");
5656
} finally {
@@ -82,6 +82,11 @@ public void awaitStop() {
8282
}
8383
}
8484

85+
// Visible for testing
86+
public URI restUrl() {
87+
return rest.serverUrl();
88+
}
89+
8590
private class ShutdownHook extends Thread {
8691
@Override
8792
public void run() {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.connect.runtime;
18+
19+
import org.apache.kafka.connect.errors.ConnectException;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* A supplier for {@link Herder}s.
26+
*/
27+
public class HerderProvider {
28+
29+
private final CountDownLatch initialized = new CountDownLatch(1);
30+
volatile Herder herder = null;
31+
32+
public HerderProvider() {
33+
}
34+
35+
/**
36+
* Create a herder provider with a herder.
37+
* @param herder the herder that will be supplied to threads waiting on this provider
38+
*/
39+
public HerderProvider(Herder herder) {
40+
this.herder = herder;
41+
initialized.countDown();
42+
}
43+
44+
/**
45+
* @return the contained herder.
46+
* @throws ConnectException if a herder was not available within a duration of calling this method
47+
*/
48+
public Herder get() {
49+
try {
50+
// wait for herder to be initialized
51+
if (!initialized.await(1, TimeUnit.MINUTES)) {
52+
throw new ConnectException("Timed out waiting for herder to be initialized.");
53+
}
54+
} catch (InterruptedException e) {
55+
throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
56+
}
57+
return herder;
58+
}
59+
60+
/**
61+
* @param herder set a herder, and signal to all threads waiting on get().
62+
*/
63+
public void setHerder(Herder herder) {
64+
this.herder = herder;
65+
initialized.countDown();
66+
}
67+
68+
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.kafka.connect.health.ConnectorState;
2323
import org.apache.kafka.connect.health.ConnectorType;
2424
import org.apache.kafka.connect.health.TaskState;
25-
import org.apache.kafka.connect.runtime.Herder;
25+
import org.apache.kafka.connect.runtime.HerderProvider;
2626
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
2727
import org.apache.kafka.connect.util.Callback;
2828

@@ -34,16 +34,16 @@
3434

3535
public class ConnectClusterStateImpl implements ConnectClusterState {
3636

37-
private Herder herder;
37+
private HerderProvider herderProvider;
3838

39-
public ConnectClusterStateImpl(Herder herder) {
40-
this.herder = herder;
39+
public ConnectClusterStateImpl(HerderProvider herderProvider) {
40+
this.herderProvider = herderProvider;
4141
}
4242

4343
@Override
4444
public Collection<String> connectors() {
4545
final Collection<String> connectors = new ArrayList<>();
46-
herder.connectors(new Callback<java.util.Collection<String>>() {
46+
herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
4747
@Override
4848
public void onCompletion(Throwable error, Collection<String> result) {
4949
connectors.addAll(result);
@@ -55,7 +55,7 @@ public void onCompletion(Throwable error, Collection<String> result) {
5555
@Override
5656
public ConnectorHealth connectorHealth(String connName) {
5757

58-
ConnectorStateInfo state = herder.connectorStatus(connName);
58+
ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
5959
ConnectorState connectorState = new ConnectorState(
6060
state.connector().state(),
6161
state.connector().workerId(),

0 commit comments

Comments
 (0)