Skip to content

Commit 577eeba

Browse files
Initial (#1)
* Initial commit. * Added support for schemaless as well. * Added documentation and examples. * Bump to 0.11.0.0-cp3 to address issues with plugin isolation. * Changed version. * Added documentation and examples. * Example
1 parent 2f631c5 commit 577eeba

File tree

12 files changed

+773
-2
lines changed

12 files changed

+773
-2
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@
2020

2121
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
2222
hs_err_pid*
23+
.okhttpcache
24+
.idea
25+
target

Jenkinsfile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!groovy
2+
@Library('jenkins-pipeline') import com.github.jcustenborder.jenkins.pipeline.KafkaConnectPipeline
3+
4+
def pipe = new KafkaConnectPipeline()
5+
pipe.execute()

README.md

+44-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,44 @@
1-
# kafka-connect-transform-common
2-
Common Transforms for Kafka Connect.
1+
# Introduction
2+
3+
This project provides some common transformation functionality for Kafka Connect.
4+
5+
# Configuration
6+
7+
## PatternRename$Key
8+
9+
This transformation is used to rename fields in the key of an input struct based on a regular expression and a replacement string.
10+
11+
```properties
12+
transforms=key
13+
transforms.key.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key
14+
15+
# Set these required values
16+
transforms.key.field.pattern=
17+
transforms.key.field.replacement=
18+
```
19+
20+
| Name | Description | Type | Default | Valid Values | Importance |
21+
|---------------------|-------------|--------|--------------------|-----------------------------------------------------------------------------------------------------------------------|------------|
22+
| field.pattern | | string | | | high |
23+
| field.replacement | | string | | | high |
24+
| field.pattern.flags | | list | [CASE_INSENSITIVE] | [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] | low |
25+
26+
## PatternRename$Value
27+
28+
This transformation is used to rename fields in the value of an input struct based on a regular expression and a replacement string.
29+
30+
```properties
31+
transforms=value
32+
transforms.value.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value
33+
34+
# Set these required values
35+
transforms.value.field.pattern=
36+
transforms.value.field.replacement=
37+
```
38+
39+
| Name | Description | Type | Default | Valid Values | Importance |
40+
|---------------------|-------------|--------|--------------------|-----------------------------------------------------------------------------------------------------------------------|------------|
41+
| field.pattern | | string | | | high |
42+
| field.replacement | | string | | | high |
43+
| field.pattern.flags | | list | [CASE_INSENSITIVE] | [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] | low |
44+

config/PatternRename.conf

Whitespace-only changes.

docs/introduction.rst

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
=================
2+
Common Transforms
3+
=================
4+
5+
The Common Transforms project is a collection of common transformations that can be use universally.
6+
7+
.. toctree::
8+
:maxdepth: 1
9+
:caption: Transformations:
10+
:hidden:
11+
:glob:
12+
13+
transformations/*
14+
15+
16+
.. toctree::
17+
:maxdepth: 0
18+
:caption: Schemas:
19+
:hidden:
20+
21+
schemas

pom.xml

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
4+
Copyright © 2017 Jeremy Custenborder ([email protected])
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.github.jcustenborder.kafka.connect</groupId>
24+
<artifactId>kafka-connect-parent</artifactId>
25+
<version>0.11.0.0-cp3</version>
26+
</parent>
27+
<artifactId>kafka-connect-transform-common</artifactId>
28+
<version>0.1.0-SNAPSHOT</version>
29+
<name>kafka-connect-transform-cef</name>
30+
<url>https://github.com/jcustenborder/kafka-connect-transform-common</url>
31+
<inceptionYear>2017</inceptionYear>
32+
<licenses>
33+
<license>
34+
<name>Apache License 2.0</name>
35+
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
36+
<distribution>repo</distribution>
37+
</license>
38+
</licenses>
39+
<developers>
40+
<developer>
41+
<name>Jeremy Custenborder</name>
42+
<email>[email protected]</email>
43+
<url>https://github.com/jcustenborder</url>
44+
<roles>
45+
<role>maintainer</role>
46+
</roles>
47+
</developer>
48+
</developers>
49+
<scm>
50+
<connection>scm:git:https://github.com/jcustenborder/kafka-connect-transform-cef.git</connection>
51+
<developerConnection>scm:git:[email protected]:jcustenborder/kafka-connect-transform-cef.git</developerConnection>
52+
<url>https://github.com/jcustenborder/kafka-connect-transform-cef</url>
53+
</scm>
54+
<issueManagement>
55+
<system>github</system>
56+
<url>https://github.com/jcustenborder/kafka-connect-transform-cef/issues</url>
57+
</issueManagement>
58+
<dependencies>
59+
<dependency>
60+
<groupId>org.reflections</groupId>
61+
<artifactId>reflections</artifactId>
62+
<version>0.9.10</version>
63+
<scope>test</scope>
64+
</dependency>
65+
<dependency>
66+
<groupId>com.github.jcustenborder.kafka.connect</groupId>
67+
<artifactId>connect-utils-testing-data</artifactId>
68+
<version>[0.3.33,0.3.1000)</version>
69+
<scope>test</scope>
70+
</dependency>
71+
</dependencies>
72+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
20+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.data.Field;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaAndValue;
26+
import org.apache.kafka.connect.data.SchemaBuilder;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.apache.kafka.connect.transforms.Transformation;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.HashMap;
33+
import java.util.LinkedHashMap;
34+
import java.util.Map;
35+
import java.util.regex.Matcher;
36+
37+
public abstract class PatternRename<R extends ConnectRecord<R>> implements Transformation<R> {
38+
private static final Logger log = LoggerFactory.getLogger(PatternRename.class);
39+
40+
@Override
41+
public ConfigDef config() {
42+
return PatternRenameConfig.config();
43+
}
44+
45+
PatternRenameConfig config;
46+
47+
@Override
48+
public void configure(Map<String, ?> settings) {
49+
this.config = new PatternRenameConfig(settings);
50+
}
51+
52+
@Override
53+
public void close() {
54+
55+
}
56+
57+
SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) {
58+
final Schema inputSchema = schemaAndValue.schema();
59+
final Struct inputStruct = (Struct) schemaAndValue.value();
60+
final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
61+
outputSchemaBuilder.name(inputSchema.name());
62+
outputSchemaBuilder.doc(inputSchema.doc());
63+
if (null != inputSchema.defaultValue()) {
64+
outputSchemaBuilder.defaultValue(inputSchema.defaultValue());
65+
}
66+
if (null != inputSchema.parameters() && !inputSchema.parameters().isEmpty()) {
67+
outputSchemaBuilder.parameters(inputSchema.parameters());
68+
}
69+
if (inputSchema.isOptional()) {
70+
outputSchemaBuilder.optional();
71+
}
72+
Map<String, String> fieldMappings = new HashMap<>(inputSchema.fields().size());
73+
for (final Field inputField : inputSchema.fields()) {
74+
log.trace("process() - Processing field '{}'", inputField.name());
75+
final Matcher fieldMatcher = this.config.pattern.matcher(inputField.name());
76+
final String outputFieldName;
77+
if (fieldMatcher.find()) {
78+
outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
79+
} else {
80+
outputFieldName = inputField.name();
81+
}
82+
log.trace("process() - Mapping field '{}' to '{}'", inputField.name(), outputFieldName);
83+
fieldMappings.put(inputField.name(), outputFieldName);
84+
outputSchemaBuilder.field(outputFieldName, inputField.schema());
85+
}
86+
final Schema outputSchema = outputSchemaBuilder.build();
87+
final Struct outputStruct = new Struct(outputSchema);
88+
for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
89+
final String inputField = entry.getKey(), outputField = entry.getValue();
90+
log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
91+
final Object value = inputStruct.get(inputField);
92+
outputStruct.put(outputField, value);
93+
}
94+
return new SchemaAndValue(outputSchema, outputStruct);
95+
}
96+
97+
SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) {
98+
final Map<String, Object> inputMap = (Map<String, Object>) schemaAndValue.value();
99+
final Map<String, Object> outputMap = new LinkedHashMap<>(inputMap.size());
100+
101+
for (final String inputFieldName : inputMap.keySet()) {
102+
log.trace("process() - Processing field '{}'", inputFieldName);
103+
final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName);
104+
final String outputFieldName;
105+
if (fieldMatcher.find()) {
106+
outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
107+
} else {
108+
outputFieldName = inputFieldName;
109+
}
110+
final Object value = inputMap.get(inputFieldName);
111+
outputMap.put(outputFieldName, value);
112+
}
113+
114+
return new SchemaAndValue(null, outputMap);
115+
}
116+
117+
SchemaAndValue process(R record, SchemaAndValue schemaAndValue) {
118+
final SchemaAndValue result;
119+
if (schemaAndValue.value() instanceof Struct) {
120+
result = processStruct(record, schemaAndValue);
121+
} else if (schemaAndValue.value() instanceof Map) {
122+
result = processMap(record, schemaAndValue);
123+
} else {
124+
throw new UnsupportedOperationException();
125+
}
126+
return result;
127+
}
128+
129+
130+
// R process(R record, boolean isKey) {
131+
// final Schema inputSchema = isKey ? record.keySchema() : record.valueSchema();
132+
// final Struct inputStruct = (Struct) (isKey ? record.key() : record.value());
133+
// final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
134+
// outputSchemaBuilder.name(inputSchema.name());
135+
// outputSchemaBuilder.doc(inputSchema.doc());
136+
// if (null != inputSchema.defaultValue()) {
137+
// outputSchemaBuilder.defaultValue(inputSchema.defaultValue());
138+
// }
139+
// if (null != inputSchema.parameters() && !inputSchema.parameters().isEmpty()) {
140+
// outputSchemaBuilder.parameters(inputSchema.parameters());
141+
// }
142+
// if (inputSchema.isOptional()) {
143+
// outputSchemaBuilder.optional();
144+
// }
145+
// Map<String, String> fieldMappings = new HashMap<>(inputSchema.fields().size());
146+
// for (final Field inputField : inputSchema.fields()) {
147+
// log.trace("process() - Processing field '{}'", inputField.name());
148+
// final Matcher fieldMatcher = this.config.pattern.matcher(inputField.name());
149+
// final String outputFieldName;
150+
// if (fieldMatcher.find()) {
151+
// outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
152+
// } else {
153+
// outputFieldName = inputField.name();
154+
// }
155+
// log.trace("process() - Mapping field '{}' to '{}'", inputField.name(), outputFieldName);
156+
// fieldMappings.put(inputField.name(), outputFieldName);
157+
// outputSchemaBuilder.field(outputFieldName, inputField.schema());
158+
// }
159+
// final Schema outputSchema = outputSchemaBuilder.build();
160+
// final Struct outputStruct = new Struct(outputSchema);
161+
// for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
162+
// final String inputField = entry.getKey(), outputField = entry.getValue();
163+
// log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
164+
// final Object value = inputStruct.get(inputField);
165+
// outputStruct.put(outputField, value);
166+
// }
167+
//
168+
// final R result;
169+
// if (isKey) {
170+
// result = record.newRecord(
171+
// record.topic(),
172+
// record.kafkaPartition(),
173+
// outputSchema,
174+
// outputStruct,
175+
// record.valueSchema(),
176+
// record.value(),
177+
// record.timestamp()
178+
// );
179+
// } else {
180+
// result = record.newRecord(
181+
// record.topic(),
182+
// record.kafkaPartition(),
183+
// record.keySchema(),
184+
// record.key(),
185+
// outputSchema,
186+
// outputStruct,
187+
// record.timestamp()
188+
// );
189+
// }
190+
// return result;
191+
//
192+
//
193+
// }
194+
195+
@Title("PatternRename(Key)")
196+
@Description("This transformation is used to rename fields in the key of an input struct based on a regular expression and a replacement string.")
197+
@DocumentationTip("This transformation is used to manipulate fields in the Key of the record.")
198+
public static class Key<R extends ConnectRecord<R>> extends PatternRename<R> {
199+
200+
@Override
201+
public R apply(R r) {
202+
final SchemaAndValue transformed = process(r, new SchemaAndValue(r.keySchema(), r.key()));
203+
204+
return r.newRecord(
205+
r.topic(),
206+
r.kafkaPartition(),
207+
transformed.schema(),
208+
transformed.value(),
209+
r.valueSchema(),
210+
r.value(),
211+
r.timestamp()
212+
);
213+
}
214+
}
215+
216+
@Title("PatternRename(Value)")
217+
@Description("This transformation is used to rename fields in the value of an input struct based on a regular expression and a replacement string.")
218+
public static class Value<R extends ConnectRecord<R>> extends PatternRename<R> {
219+
@Override
220+
public R apply(R r) {
221+
final SchemaAndValue transformed = process(r, new SchemaAndValue(r.valueSchema(), r.value()));
222+
223+
return r.newRecord(
224+
r.topic(),
225+
r.kafkaPartition(),
226+
r.keySchema(),
227+
r.key(),
228+
transformed.schema(),
229+
transformed.value(),
230+
r.timestamp()
231+
);
232+
}
233+
}
234+
}

0 commit comments

Comments
 (0)