Skip to content

Commit 12f3084

Browse files
Added a transformation to change a topic name to be all lowercase. Fixes #92. (#93)
* Added a transformation to change a topic name to be all lowercase. Fixes #92. * Bumped to use 3.3.1-1 * Added xml-apis.
1 parent 44adce9 commit 12f3084

File tree

4 files changed

+134
-11
lines changed

4 files changed

+134
-11
lines changed

pom.xml

+23-11
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
limitations under the License.
1717
1818
-->
19-
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
20-
xmlns="http://maven.apache.org/POM/4.0.0"
21-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
19+
<project
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
21+
xmlns="http://maven.apache.org/POM/4.0.0"
22+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
2223
<modelVersion>4.0.0</modelVersion>
2324
<parent>
2425
<groupId>com.github.jcustenborder.kafka.connect</groupId>
2526
<artifactId>kafka-connect-parent</artifactId>
26-
<version>2.6.1</version>
27+
<version>3.3.1-1</version>
2728
</parent>
2829
<artifactId>kafka-connect-transform-common</artifactId>
2930
<version>0.1.0-SNAPSHOT</version>
@@ -49,8 +50,11 @@
4950
</developer>
5051
</developers>
5152
<scm>
52-
<connection>scm:git:https://github.com/jcustenborder/kafka-connect-transform-common.git</connection>
53-
<developerConnection>scm:git:[email protected]:jcustenborder/kafka-connect-transform-common.git</developerConnection>
53+
<connection>scm:git:https://github.com/jcustenborder/kafka-connect-transform-common.git
54+
</connection>
55+
<developerConnection>
56+
scm:git:[email protected]:jcustenborder/kafka-connect-transform-common.git
57+
</developerConnection>
5458
<url>https://github.com/jcustenborder/kafka-connect-transform-common</url>
5559
</scm>
5660
<issueManagement>
@@ -61,8 +65,10 @@
6165
<dependency>
6266
<groupId>org.apache.kafka</groupId>
6367
<artifactId>connect-json</artifactId>
64-
<version>${kafka.version}</version>
65-
<scope>provided</scope>
68+
</dependency>
69+
<dependency>
70+
<groupId>com.github.jcustenborder.kafka.connect</groupId>
71+
<artifactId>connect-utils-jackson</artifactId>
6672
</dependency>
6773
<dependency>
6874
<groupId>org.reflections</groupId>
@@ -73,7 +79,7 @@
7379
<dependency>
7480
<groupId>com.github.jcustenborder.kafka.connect</groupId>
7581
<artifactId>connect-utils-testing-data</artifactId>
76-
<version>[0.3.33,0.3.1000)</version>
82+
<version>${connect-utils.version}</version>
7783
<scope>test</scope>
7884
</dependency>
7985
<dependency>
@@ -85,7 +91,12 @@
8591
<groupId>xerces</groupId>
8692
<artifactId>xercesImpl</artifactId>
8793
<version>2.12.1</version>
88-
</dependency>
94+
</dependency>
95+
<dependency>
96+
<groupId>xml-apis</groupId>
97+
<artifactId>xml-apis</artifactId>
98+
<version>1.4.01</version>
99+
</dependency>
89100
</dependencies>
90101
<build>
91102
<plugins>
@@ -94,7 +105,8 @@
94105
<artifactId>kafka-connect-maven-plugin</artifactId>
95106
<configuration>
96107
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
97-
<documentationUrl>https://jcustenborder.github.io/kafka-connect-documentation/</documentationUrl>
108+
<documentationUrl>https://jcustenborder.github.io/kafka-connect-documentation/
109+
</documentationUrl>
98110
<componentTypes>
99111
<componentType>transform</componentType>
100112
</componentTypes>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
20+
import org.apache.kafka.common.config.ConfigDef;
21+
import org.apache.kafka.common.utils.SystemTime;
22+
import org.apache.kafka.common.utils.Time;
23+
import org.apache.kafka.connect.connector.ConnectRecord;
24+
import org.apache.kafka.connect.transforms.Transformation;
25+
26+
import java.util.Map;
27+
28+
@Title("LowerCaseTopic")
29+
@Description("This transformation is used to change a topic name to be all lower case.")
30+
public class LowerCaseTopic<R extends ConnectRecord<R>> implements Transformation<R> {
31+
Time time = SystemTime.SYSTEM;
32+
33+
@Override
34+
public R apply(R record) {
35+
return record.newRecord(
36+
record.topic().toLowerCase(),
37+
record.kafkaPartition(),
38+
record.keySchema(),
39+
record.key(),
40+
record.valueSchema(),
41+
record.value(),
42+
record.timestamp(),
43+
record.headers()
44+
);
45+
}
46+
47+
@Override
48+
public ConfigDef config() {
49+
return new ConfigDef();
50+
}
51+
52+
@Override
53+
public void close() {
54+
55+
}
56+
57+
@Override
58+
public void configure(Map<String, ?> map) {
59+
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.github.jcustenborder.kafka.connect.transform.common;
2+
3+
import org.apache.kafka.common.record.TimestampType;
4+
import org.apache.kafka.common.utils.Time;
5+
import org.apache.kafka.connect.sink.SinkRecord;
6+
import org.junit.jupiter.api.Test;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.times;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
13+
14+
public class LowerCaseTopicTest {
15+
16+
@Test
17+
public void test() {
18+
final SinkRecord input = new SinkRecord(
19+
"TeSt",
20+
1,
21+
null,
22+
"",
23+
null,
24+
"",
25+
1234123L,
26+
12341312L,
27+
TimestampType.NO_TIMESTAMP_TYPE
28+
);
29+
LowerCaseTopic<SinkRecord> transform = new LowerCaseTopic<>();
30+
final SinkRecord actual = transform.apply(input);
31+
assertEquals("test", actual.topic(), "Topic should match.");
32+
}
33+
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"input" : {
3+
"topic" : "TestTopic",
4+
"kafkaPartition" : 1,
5+
"key" : "",
6+
"value" : "",
7+
"timestamp" : 12341312,
8+
"timestampType" : "NO_TIMESTAMP_TYPE",
9+
"offset" : 1234123,
10+
"headers" : [ ]
11+
},
12+
"description" : "This example will change the topic name to be all lower case.",
13+
"name" : "Example",
14+
"config" : { }
15+
}

0 commit comments

Comments
 (0)