Skip to content

[WIP][JENKINS-54003] Fix issue of creating the same agent name failed for Kafka agent #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ services:
- ./certs/docker.kafka.server.truststore.jks:/kafka.truststore.jks:ro
- ./demo/kafka_server_jaas.conf:/kafka_server_jaas.conf
environment:
# Kafka topic config
KAFKA_DELETE_TOPIC_ENABLE: "true"
# Zookeeper config
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# SSL config env
Expand Down
16 changes: 16 additions & 0 deletions kafka-client-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,20 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Jenkins-ClassFilter-Whitelisted>true</Jenkins-ClassFilter-Whitelisted>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.jenkins.plugins.remotingkafka;

import io.jenkins.plugins.remotingkafka.exception.RemotingKafkaConfigurationException;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ZookeeperManager {
private static final Logger LOGGER = Logger.getLogger(ZookeeperManager.class.getName());
private static final int SESSION_TIMEOUT = 15 * 1000; // 15 secs.
private static final int CONNECTION_TIMEOUT = 10 * 1000; // 10 secs.

private static volatile ZookeeperManager manager;

private ZkClient zkClient;

private ZkUtils zkUtils;

private Properties topicConfiguration;

private String zookeeperHost;

private ZookeeperManager() {
}

public static ZookeeperManager getInstance() {
if (manager == null) {
synchronized (ZookeeperManager.class) {
if (manager == null) {
manager = new ZookeeperManager();
}
}
}
return manager;
}

public void init(String zookeeperHost) throws RemotingKafkaConfigurationException {
if (this.zookeeperHost == null) {
this.zookeeperHost = zookeeperHost;
try {
zkClient = new ZkClient(zookeeperHost, SESSION_TIMEOUT, CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false);
topicConfiguration = new Properties();
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, ex.toString(), ex);
throw new RemotingKafkaConfigurationException(ex.toString(), ex);
}
}
}

public void createTopic(String topic, int noOfPartitions, int noOfReplication) {
if (!AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.createTopic(zkUtils, topic, noOfPartitions, noOfReplication, topicConfiguration, null);
}
}

public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@ public class KafkaComputerLauncher extends ComputerLauncher {

@CheckForNull
private transient volatile ExecutorService launcherExecutorService;

private String kafkaUsername;

private String sslTruststoreLocation;

private String sslKeystoreLocation;

private boolean enableSSL;
private ZookeeperManager zookeeperManager;

@DataBoundConstructor
public KafkaComputerLauncher(String kafkaUsername, String sslTruststoreLocation, String sslKeystoreLocation,
Expand All @@ -57,6 +54,7 @@ public KafkaComputerLauncher(String kafkaUsername, String sslTruststoreLocation,
this.sslTruststoreLocation = sslTruststoreLocation;
this.sslKeystoreLocation = sslKeystoreLocation;
this.enableSSL = Boolean.valueOf(enableSSL);
zookeeperManager = ZookeeperManager.getInstance();
}

@Override
Expand All @@ -76,8 +74,12 @@ public synchronized void launch(SlaveComputer computer, final TaskListener liste
public Boolean call() throws Exception {
Boolean rval = Boolean.FALSE;
String topic = KafkaConfigs.getConnectionTopic(computer.getName(), retrieveJenkinsURL());
KafkaUtils.createTopic(topic, GlobalKafkaConfiguration.get().getZookeeperURL(),
4, 1);
try {
zookeeperManager.init(GlobalKafkaConfiguration.get().getZookeeperURL());
} catch (RemotingKafkaConfigurationException ex) {
ex.printStackTrace();
}
zookeeperManager.createTopic(topic, 4, 1);
if (!isValidAgent(computer.getName(), listener)) {
return Boolean.FALSE;
}
Expand Down Expand Up @@ -138,7 +140,7 @@ public void onClosed(Channel channel, IOException cause) {
}

@Override
public void afterDisconnect(SlaveComputer slaveComputer, final TaskListener listener) {
public void afterDisconnect(SlaveComputer computer, final TaskListener listener) {
ExecutorService srv = launcherExecutorService;
if (srv != null) {
// If the service is still running, shut it down and interrupt the operations if any
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
org.I0Itec.zkclient.ZkClient
org.I0Itec.zkclient.ZkConnection
org.apache.zookeeper.ZooKeeper
org.apache.zookeeper.ClientCnxn
org.apache.zookeeper.ZooKeeper$ZKWatchManager
org.apache.zookeeper
org[.]apache[.]zookeeper[.].*