Skip to content

Commit 25513f0

Browse files
Initial (#1)
* Initial commit. * Refactored to allow the same code path to support clustered and non clustered. * Added Jenkinsfile * Removed class that was not used. * Moved to a combined session object. * Added more SSL options. * Updated README
1 parent 1b406cc commit 25513f0

19 files changed

+1370
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@
2020

2121
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
2222
hs_err_pid*
23+
.okhttpcache
24+
.idea
25+
target

Jenkinsfile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!groovy
2+
@Library('jenkins-pipeline') import com.github.jcustenborder.jenkins.pipeline.KafkaConnectPipeline
3+
4+
def pipe = new KafkaConnectPipeline()
5+
pipe.execute()

README.md

+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
2+
# Introduction
3+
4+
5+
# Sink Connectors
6+
7+
8+
## RedisSinkConnector
9+
10+
Sink connector for writing data to Redis
11+
12+
13+
14+
### Important
15+
16+
This connector expects to received data with a key of bytes and a values of bytes. If your data is structured you need to use a Transformation to convert this data from structured data like a Struct to an array of bytes for the key and value.
17+
18+
### Note
19+
20+
This connector supports deletes. It will issue a delete to the Redis cluster for any key that does not have a corresponding value. In Kafka a record that contains a key and a null value is considered a delete.
21+
22+
23+
### Configuration
24+
25+
##### `redis.hosts`
26+
*Importance:* High
27+
28+
*Type:* List
29+
30+
*Default Value:* [localhost:6379]
31+
32+
33+
34+
##### `redis.client.mode`
35+
*Importance:* Medium
36+
37+
*Type:* String
38+
39+
*Default Value:* Standalone
40+
41+
*Validator:* ValidEnum{enum=ClientMode, allowed=[Standalone, Cluster]}
42+
43+
44+
45+
##### `redis.database`
46+
*Importance:* Medium
47+
48+
*Type:* Int
49+
50+
*Default Value:* 1
51+
52+
53+
54+
##### `redis.operation.timeout.ms`
55+
*Importance:* Medium
56+
57+
*Type:* Long
58+
59+
*Default Value:* 10000
60+
61+
*Validator:* [100,...]
62+
63+
64+
redis.operation.timeout.ms
65+
##### `redis.password`
66+
*Importance:* Medium
67+
68+
*Type:* Password
69+
70+
*Default Value:* [hidden]
71+
72+
73+
74+
##### `redis.ssl.enabled`
75+
*Importance:* Medium
76+
77+
*Type:* Boolean
78+
79+
*Default Value:* false
80+
81+
82+
83+
##### `redis.ssl.keystore.password`
84+
*Importance:* Medium
85+
86+
*Type:* Password
87+
88+
*Default Value:* [hidden]
89+
90+
91+
92+
##### `redis.ssl.keystore.path`
93+
*Importance:* Medium
94+
95+
*Type:* String
96+
97+
98+
99+
##### `redis.ssl.truststore.password`
100+
*Importance:* Medium
101+
102+
*Type:* Password
103+
104+
*Default Value:* [hidden]
105+
106+
107+
108+
##### `redis.ssl.truststore.path`
109+
*Importance:* Medium
110+
111+
*Type:* String
112+
113+
114+
115+
##### `redis.auto.reconnect.enabled`
116+
*Importance:* Low
117+
118+
*Type:* Boolean
119+
120+
*Default Value:* true
121+
122+
123+
124+
##### `redis.request.queue.size`
125+
*Importance:* Low
126+
127+
*Type:* Int
128+
129+
*Default Value:* 2147483647
130+
131+
132+
133+
##### `redis.socket.connect.timeout.ms`
134+
*Importance:* Low
135+
136+
*Type:* Int
137+
138+
*Default Value:* 10000
139+
140+
141+
142+
##### `redis.socket.keep.alive.enabled`
143+
*Importance:* Low
144+
145+
*Type:* Boolean
146+
147+
*Default Value:* false
148+
149+
150+
151+
##### `redis.socket.tcp.no.delay.enabled`
152+
*Importance:* Low
153+
154+
*Type:* Boolean
155+
156+
*Default Value:* true
157+
158+
159+
160+
##### `redis.ssl.provider`
161+
*Importance:* Low
162+
163+
*Type:* String
164+
165+
*Default Value:* JDK
166+
167+
*Validator:* ValidEnum{enum=RedisSslProvider, allowed=[OPENSSL, JDK]}
168+
169+
170+
171+
172+
#### Examples
173+
174+
##### Standalone Example
175+
176+
This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers).
177+
178+
```properties
179+
name=RedisSinkConnector1
180+
connector.class=com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
181+
tasks.max=1
182+
topics=< Required Configuration >
183+
```
184+
185+
##### Distributed Example
186+
187+
This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers).
188+
Write the following json to `connector.json`, configure all of the required values, and use the command below to
189+
post the configuration to one the distributed connect worker(s).
190+
191+
```json
192+
{
193+
"config" : {
194+
"name" : "RedisSinkConnector1",
195+
"connector.class" : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
196+
"tasks.max" : "1",
197+
"topics" : "< Required Configuration >"
198+
}
199+
}
200+
```
201+
202+
Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of
203+
one of your Kafka Connect worker(s).
204+
205+
Create a new instance.
206+
```bash
207+
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
208+
```
209+
210+
Update an existing instance.
211+
```bash
212+
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
213+
```

bin/debug.sh

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Copyright © 2017 Jeremy Custenborder ([email protected])
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
: ${SUSPEND:='n'}
19+
20+
set -e
21+
22+
mvn clean package
23+
export KAFKA_DEBUG='y'
24+
connect-standalone config/connect-avro-docker.properties config/sink.properties

config/connect-avro-docker.properties

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright © 2017 Jeremy Custenborder ([email protected])
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
18+
# integrates the the SchemaConfig Registry. This sample configuration assumes a local installation of
19+
# Confluent Platform with all services running on their default ports.
20+
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
21+
bootstrap.servers=kafka:9092
22+
key.converter=io.confluent.connect.avro.AvroConverter
23+
key.converter.schema.registry.url=http://schema-registry:8081
24+
value.converter=io.confluent.connect.avro.AvroConverter
25+
value.converter.schema.registry.url=http://schema-registry:8081
26+
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
27+
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
28+
internal.key.converter.schemas.enable=false
29+
internal.value.converter.schemas.enable=false
30+
offset.storage.file.filename=/tmp/connect.offsets
31+
rest.port=10000
32+
plugin.path=target/kafka-connect-target/usr/share/kafka-connect

config/sink.properties

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright © 2017 Jeremy Custenborder ([email protected])
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
name=sink
18+
topics=twitter
19+
tasks.max=1
20+
connector.class=com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
21+
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
22+
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

docker-compose.yml

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#
2+
# Copyright © 2017 Jeremy Custenborder ([email protected])
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
version: "2"
18+
services:
19+
zookeeper:
20+
image: confluentinc/cp-zookeeper:4.0.0
21+
environment:
22+
ZOOKEEPER_CLIENT_PORT: 2181
23+
zk_id: "1"
24+
ports:
25+
- "2181:2181"
26+
kafka:
27+
hostname: kafka
28+
image: confluentinc/cp-kafka:4.0.0
29+
links:
30+
- zookeeper
31+
ports:
32+
- "9092:9092"
33+
environment:
34+
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
35+
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092"
36+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
37+
schema-registry:
38+
image: confluentinc/cp-schema-registry:4.0.0
39+
links:
40+
- kafka
41+
- zookeeper
42+
ports:
43+
- "8081:8081"
44+
environment:
45+
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
46+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
47+
redis:
48+
image: redis
49+
ports:
50+
- "6379:6379"

docs/connectors.rst

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
===============
2+
Redis Connector
3+
===============
4+
5+
The Redis Connector provides a Kafka Connect :term:`Source Connector` and :term:`Sink Connector`
6+
that can read and write data from `Redis <https://redis.io/>`_.
7+
8+
.. toctree::
9+
:maxdepth: 1
10+
:caption: Source Connectors:
11+
:hidden:
12+
:glob:
13+
14+
sources/*
15+
16+
17+
.. toctree::
18+
:maxdepth: 1
19+
:caption: Sink Connectors:
20+
:hidden:
21+
:glob:
22+
23+
sinks/*
24+
25+

0 commit comments

Comments
 (0)