Skip to content

Commit bf89bbb

Browse files
Issue 18 (#25)
* Added support to retry connection to Redis when building the initial connection. Added integration tests. Fixes #18. * Fixed license headers.
1 parent 040421a commit bf89bbb

File tree

8 files changed

+395
-160
lines changed

8 files changed

+395
-160
lines changed

pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
<parent>
2424
<groupId>com.github.jcustenborder.kafka.connect</groupId>
2525
<artifactId>kafka-connect-parent</artifactId>
26-
<version>2.1.1-cp1</version>
26+
<version>2.2.1-cp1</version>
2727
</parent>
2828
<artifactId>kafka-connect-redis</artifactId>
2929
<version>0.0.2-SNAPSHOT</version>
3030
<name>kafka-connect-redis</name>
31-
<description>A Kafka Connect connector receiving data from redis.</description>
31+
<description>A Kafka Connect plugin for interacting with Redis.</description>
3232
<url>https://github.com/jcustenborder/kafka-connect-redis</url>
3333
<inceptionYear>2017</inceptionYear>
3434
<licenses>
@@ -61,7 +61,7 @@
6161
<dependency>
6262
<groupId>io.lettuce</groupId>
6363
<artifactId>lettuce-core</artifactId>
64-
<version>5.1.6.RELEASE</version>
64+
<version>5.2.1.RELEASE</version>
6565
</dependency>
6666
</dependencies>
6767
<build>

src/main/java/com/github/jcustenborder/kafka/connect/redis/RedisConnectorConfig.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ class RedisConnectorConfig extends AbstractConfig {
6969
static final String SSL_TRUSTSTORE_PATH_DOC = "The path to the SSL truststore.";
7070
public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "redis.ssl.truststore.password";
7171
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the SSL truststore.";
72-
72+
73+
public final static String CONNECTION_ATTEMPTS_CONF = "redis.connection.attempts";
74+
public final static String CONNECTION_ATTEMPTS_DOC = "The number of attempt when connecting to redis.";
75+
76+
public final static String CONNECTION_RETRY_DELAY_MS_CONF = "redis.connection.retry.delay.ms";
77+
public final static String CONNECTION_RETRY_DELAY_MS_DOC = "The amount of milliseconds to wait between redis connection attempts.";
78+
7379
public final ClientMode clientMode;
7480
public final List<HostAndPort> hosts;
7581

@@ -88,7 +94,8 @@ class RedisConnectorConfig extends AbstractConfig {
8894
public final String keystorePassword;
8995
public final File truststorePath;
9096
public final String truststorePassword;
91-
97+
public final int retryDelay;
98+
public final int maxAttempts;
9299

93100

94101
public RedisConnectorConfig(ConfigDef config, Map<?, ?> originals) {
@@ -112,6 +119,8 @@ public RedisConnectorConfig(ConfigDef config, Map<?, ?> originals) {
112119
final String trustPassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONFIG).value();
113120
this.keystorePassword = Strings.isNullOrEmpty(keystorePassword) ? null : keystorePassword;
114121
this.truststorePassword = Strings.isNullOrEmpty(trustPassword) ? null : trustPassword;
122+
this.maxAttempts = getInt(CONNECTION_ATTEMPTS_CONF);
123+
this.retryDelay = getInt(CONNECTION_RETRY_DELAY_MS_CONF);
115124
}
116125

117126
public static ConfigDef config() {
@@ -208,6 +217,20 @@ public static ConfigDef config() {
208217
.defaultValue("")
209218
.importance(ConfigDef.Importance.MEDIUM)
210219
.build()
220+
).define(
221+
ConfigKeyBuilder.of(CONNECTION_ATTEMPTS_CONF, ConfigDef.Type.INT)
222+
.documentation(CONNECTION_ATTEMPTS_DOC)
223+
.defaultValue(3)
224+
.importance(ConfigDef.Importance.MEDIUM)
225+
.validator(ConfigDef.Range.atLeast(1))
226+
.build()
227+
).define(
228+
ConfigKeyBuilder.of(CONNECTION_RETRY_DELAY_MS_CONF, ConfigDef.Type.INT)
229+
.documentation(CONNECTION_RETRY_DELAY_MS_DOC)
230+
.defaultValue(2000)
231+
.validator(ConfigDef.Range.atLeast(100))
232+
.importance(ConfigDef.Importance.MEDIUM)
233+
.build()
211234
);
212235
}
213236

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.redis;
17+
18+
interface RedisSessionFactory {
19+
RedisSession create(RedisConnectorConfig config);
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.redis;
17+
18+
import io.lettuce.core.AbstractRedisClient;
19+
import io.lettuce.core.ClientOptions;
20+
import io.lettuce.core.RedisClient;
21+
import io.lettuce.core.RedisConnectionException;
22+
import io.lettuce.core.SocketOptions;
23+
import io.lettuce.core.SslOptions;
24+
import io.lettuce.core.api.StatefulConnection;
25+
import io.lettuce.core.api.StatefulRedisConnection;
26+
import io.lettuce.core.cluster.ClusterClientOptions;
27+
import io.lettuce.core.cluster.RedisClusterClient;
28+
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
29+
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
30+
import io.lettuce.core.codec.ByteArrayCodec;
31+
import io.lettuce.core.codec.RedisCodec;
32+
import org.apache.kafka.common.utils.Time;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.time.Duration;
37+
38+
class RedisSessionFactoryImpl implements RedisSessionFactory {
39+
Time time = Time.SYSTEM;
40+
41+
private static final Logger log = LoggerFactory.getLogger(RedisSessionFactoryImpl.class);
42+
43+
@Override
44+
public RedisSession create(RedisConnectorConfig config) {
45+
int attempts = 0;
46+
RedisSession result;
47+
48+
while (true) {
49+
attempts++;
50+
try {
51+
log.info("Creating Redis session. Attempt {} of {}", attempts, config.maxAttempts);
52+
result = RedisSessionImpl.create(config);
53+
break;
54+
} catch (RedisConnectionException ex) {
55+
if (attempts == config.maxAttempts) {
56+
throw ex;
57+
} else {
58+
log.warn("Exception thrown connecting to redis. Waiting {} ms to try again.", config.retryDelay);
59+
this.time.sleep(config.retryDelay);
60+
}
61+
}
62+
}
63+
64+
return result;
65+
}
66+
67+
private static class RedisSessionImpl implements RedisSession {
68+
private static final Logger log = LoggerFactory.getLogger(RedisSessionImpl.class);
69+
70+
private final AbstractRedisClient client;
71+
private final StatefulConnection connection;
72+
private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;
73+
private final RedisConnectorConfig config;
74+
75+
RedisSessionImpl(AbstractRedisClient client, StatefulConnection connection, RedisClusterAsyncCommands<byte[], byte[]> asyncCommands, RedisConnectorConfig config) {
76+
this.client = client;
77+
this.connection = connection;
78+
this.asyncCommands = asyncCommands;
79+
this.config = config;
80+
}
81+
82+
public AbstractRedisClient client() {
83+
return this.client;
84+
}
85+
86+
public StatefulConnection connection() {
87+
return this.connection;
88+
}
89+
90+
public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
91+
return this.asyncCommands;
92+
}
93+
94+
public static RedisSessionImpl create(RedisConnectorConfig config) {
95+
RedisSessionImpl result;
96+
final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();
97+
98+
final SslOptions sslOptions;
99+
100+
if (config.sslEnabled) {
101+
SslOptions.Builder builder = SslOptions.builder();
102+
switch (config.sslProvider) {
103+
case JDK:
104+
builder.jdkSslProvider();
105+
break;
106+
case OPENSSL:
107+
builder.openSslProvider();
108+
break;
109+
default:
110+
throw new UnsupportedOperationException(
111+
String.format(
112+
"%s is not a supported value for %s.",
113+
config.sslProvider,
114+
RedisConnectorConfig.SSL_PROVIDER_CONFIG
115+
)
116+
);
117+
}
118+
if (null != config.keystorePath) {
119+
if (null != config.keystorePassword) {
120+
builder.keystore(config.keystorePath, config.keystorePassword.toCharArray());
121+
} else {
122+
builder.keystore(config.keystorePath);
123+
}
124+
}
125+
if (null != config.truststorePath) {
126+
if (null != config.truststorePassword) {
127+
builder.truststore(config.truststorePath, config.keystorePassword);
128+
} else {
129+
builder.truststore(config.truststorePath);
130+
}
131+
}
132+
sslOptions = builder.build();
133+
} else {
134+
sslOptions = null;
135+
}
136+
137+
final SocketOptions socketOptions = SocketOptions.builder()
138+
.tcpNoDelay(config.tcpNoDelay)
139+
.connectTimeout(Duration.ofMillis(config.connectTimeout))
140+
.keepAlive(config.keepAliveEnabled)
141+
.build();
142+
143+
144+
if (RedisConnectorConfig.ClientMode.Cluster == config.clientMode) {
145+
ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
146+
.requestQueueSize(config.requestQueueSize)
147+
.autoReconnect(config.autoReconnectEnabled);
148+
if (config.sslEnabled) {
149+
clientOptions.sslOptions(sslOptions);
150+
}
151+
final RedisClusterClient client = RedisClusterClient.create(config.redisURIs());
152+
client.setOptions(clientOptions.build());
153+
154+
final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
155+
result = new RedisSessionImpl(client, connection, connection.async(), config);
156+
} else if (RedisConnectorConfig.ClientMode.Standalone == config.clientMode) {
157+
final ClientOptions.Builder clientOptions = ClientOptions.builder()
158+
.socketOptions(socketOptions)
159+
.requestQueueSize(config.requestQueueSize)
160+
.autoReconnect(config.autoReconnectEnabled);
161+
if (config.sslEnabled) {
162+
clientOptions.sslOptions(sslOptions);
163+
}
164+
final RedisClient client = RedisClient.create(config.redisURIs().get(0));
165+
client.setOptions(clientOptions.build());
166+
final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
167+
result = new RedisSessionImpl(client, connection, connection.async(), config);
168+
} else {
169+
throw new UnsupportedOperationException(
170+
String.format("%s is not supported", config.clientMode)
171+
);
172+
}
173+
174+
return result;
175+
}
176+
177+
178+
@Override
179+
public void close() throws Exception {
180+
this.connection.close();
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)