diff --git a/docker-compose.yml b/docker-compose.yml index 87e0274..cef56f2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,8 @@ services: - ./certs/docker.kafka.server.truststore.jks:/kafka.truststore.jks:ro - ./demo/kafka_server_jaas.conf:/kafka_server_jaas.conf environment: + # Kafka topic config + KAFKA_DELETE_TOPIC_ENABLE: "true" # Zookeeper config KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # SSL config env diff --git a/kafka-client-lib/pom.xml b/kafka-client-lib/pom.xml index 9583b74..772f14e 100644 --- a/kafka-client-lib/pom.xml +++ b/kafka-client-lib/pom.xml @@ -48,4 +48,20 @@ + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + + + + + + + diff --git a/kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/ZookeeperManager.java b/kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/ZookeeperManager.java new file mode 100644 index 0000000..d977ca2 --- /dev/null +++ b/kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/ZookeeperManager.java @@ -0,0 +1,68 @@ +package io.jenkins.plugins.remotingkafka; + +import io.jenkins.plugins.remotingkafka.exception.RemotingKafkaConfigurationException; +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ZookeeperManager { + private static final Logger LOGGER = Logger.getLogger(ZookeeperManager.class.getName()); + private static final int SESSION_TIMEOUT = 15 * 1000; // 15 secs. + private static final int CONNECTION_TIMEOUT = 10 * 1000; // 10 secs. + + private static volatile ZookeeperManager manager; + + private ZkClient zkClient; + + private ZkUtils zkUtils; + + private Properties topicConfiguration; + + private String zookeeperHost; + + private ZookeeperManager() { + } + + public static ZookeeperManager getInstance() { + if (manager == null) { + synchronized (ZookeeperManager.class) { + if (manager == null) { + manager = new ZookeeperManager(); + } + } + } + return manager; + } + + public void init(String zookeeperHost) throws RemotingKafkaConfigurationException { + if (this.zookeeperHost == null) { + this.zookeeperHost = zookeeperHost; + try { + zkClient = new ZkClient(zookeeperHost, SESSION_TIMEOUT, CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$); + zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false); + topicConfiguration = new Properties(); + } catch (Exception ex) { + LOGGER.log(Level.SEVERE, ex.toString(), ex); + throw new RemotingKafkaConfigurationException(ex.toString(), ex); + } + } + } + + public void createTopic(String topic, int noOfPartitions, int noOfReplication) { + if (!AdminUtils.topicExists(zkUtils, topic)) { + AdminUtils.createTopic(zkUtils, topic, noOfPartitions, noOfReplication, topicConfiguration, null); + } + } + + public void close() { + if (zkClient != null) { + zkClient.close(); + } + } +} diff --git a/plugin/src/main/java/io/jenkins/plugins/remotingkafka/KafkaComputerLauncher.java b/plugin/src/main/java/io/jenkins/plugins/remotingkafka/KafkaComputerLauncher.java index 4f2dc89..83f0b0f 100644 --- a/plugin/src/main/java/io/jenkins/plugins/remotingkafka/KafkaComputerLauncher.java +++ b/plugin/src/main/java/io/jenkins/plugins/remotingkafka/KafkaComputerLauncher.java @@ -41,14 +41,11 @@ public class KafkaComputerLauncher extends ComputerLauncher { @CheckForNull private transient volatile ExecutorService launcherExecutorService; - private String kafkaUsername; - private String sslTruststoreLocation; - private String sslKeystoreLocation; - private boolean enableSSL; + private ZookeeperManager zookeeperManager; @DataBoundConstructor public KafkaComputerLauncher(String kafkaUsername, String sslTruststoreLocation, String sslKeystoreLocation, @@ -57,6 +54,7 @@ public KafkaComputerLauncher(String kafkaUsername, String sslTruststoreLocation, this.sslTruststoreLocation = sslTruststoreLocation; this.sslKeystoreLocation = sslKeystoreLocation; this.enableSSL = Boolean.valueOf(enableSSL); + zookeeperManager = ZookeeperManager.getInstance(); } @Override @@ -76,8 +74,12 @@ public synchronized void launch(SlaveComputer computer, final TaskListener liste public Boolean call() throws Exception { Boolean rval = Boolean.FALSE; String topic = KafkaConfigs.getConnectionTopic(computer.getName(), retrieveJenkinsURL()); - KafkaUtils.createTopic(topic, GlobalKafkaConfiguration.get().getZookeeperURL(), - 4, 1); + try { + zookeeperManager.init(GlobalKafkaConfiguration.get().getZookeeperURL()); + } catch (RemotingKafkaConfigurationException ex) { + ex.printStackTrace(); + } + zookeeperManager.createTopic(topic, 4, 1); if (!isValidAgent(computer.getName(), listener)) { return Boolean.FALSE; } @@ -138,7 +140,7 @@ public void onClosed(Channel channel, IOException cause) { } @Override - public void afterDisconnect(SlaveComputer slaveComputer, final TaskListener listener) { + public void afterDisconnect(SlaveComputer computer, final TaskListener listener) { ExecutorService srv = launcherExecutorService; if (srv != null) { // If the service is still running, shut it down and interrupt the operations if any diff --git a/plugin/src/main/resources/META-INF/hudson.remoting.ClassFilter b/plugin/src/main/resources/META-INF/hudson.remoting.ClassFilter new file mode 100644 index 0000000..d9d9cff --- /dev/null +++ b/plugin/src/main/resources/META-INF/hudson.remoting.ClassFilter @@ -0,0 +1,7 @@ +org.I0Itec.zkclient.ZkClient +org.I0Itec.zkclient.ZkConnection +org.apache.zookeeper.ZooKeeper +org.apache.zookeeper.ClientCnxn +org.apache.zookeeper.ZooKeeper$ZKWatchManager +org.apache.zookeeper +org[.]apache[.]zookeeper[.].*