Skip to content

Commit c4f3ff4

Browse files
Added ExtractTimestamp transformation. (#5)
* Moved to a common base transformation that supports both schema and schemaless data. * Added transformation that will pull the timestamp from a field within the value. Closes #4. * Corrected missing headers. Added documentation.
1 parent eb5fb37 commit c4f3ff4

File tree

8 files changed

+435
-99
lines changed

8 files changed

+435
-99
lines changed

pom.xml

+17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,21 @@
11
<?xml version="1.0"?>
2+
<!--
3+
4+
Copyright © 2017 Jeremy Custenborder ([email protected])
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
219
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
320
xmlns="http://maven.apache.org/POM/4.0.0"
421
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import org.apache.kafka.connect.connector.ConnectRecord;
19+
import org.apache.kafka.connect.data.SchemaAndValue;
20+
import org.apache.kafka.connect.data.Struct;
21+
import org.apache.kafka.connect.transforms.Transformation;
22+
23+
import java.util.Map;
24+
25+
public abstract class BaseTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
26+
27+
protected abstract SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue);
28+
29+
protected abstract SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue);
30+
31+
protected SchemaAndValue process(R record, SchemaAndValue schemaAndValue) {
32+
final SchemaAndValue result;
33+
if (schemaAndValue.value() instanceof Struct) {
34+
result = processStruct(record, schemaAndValue);
35+
} else if (schemaAndValue.value() instanceof Map) {
36+
result = processMap(record, schemaAndValue);
37+
} else {
38+
throw new UnsupportedOperationException();
39+
}
40+
return result;
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
20+
import com.google.common.base.Preconditions;
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.data.Field;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaAndValue;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.data.Timestamp;
28+
import org.apache.kafka.connect.errors.DataException;
29+
import org.apache.kafka.connect.transforms.Transformation;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.util.Date;
34+
import java.util.Map;
35+
36+
public abstract class ExtractTimestamp<R extends ConnectRecord<R>> implements Transformation<R> {
37+
private static final Logger log = LoggerFactory.getLogger(ExtractTimestamp.class);
38+
public ExtractTimestampConfig config;
39+
40+
protected long process(SchemaAndValue schemaAndValue) {
41+
final long result;
42+
if (schemaAndValue.value() instanceof Struct) {
43+
result = processStruct(schemaAndValue);
44+
} else if (schemaAndValue.value() instanceof Map) {
45+
result = processMap(schemaAndValue);
46+
} else {
47+
throw new UnsupportedOperationException();
48+
}
49+
return result;
50+
}
51+
52+
private long processMap(SchemaAndValue schemaAndValue) {
53+
Preconditions.checkState(schemaAndValue.value() instanceof Map, "value must be a map.");
54+
final Map<String, Object> input = (Map<String, Object>) schemaAndValue.value();
55+
final Object inputValue = input.get(this.config.fieldName);
56+
final long result;
57+
58+
if (inputValue instanceof Date) {
59+
final Date inputDate = (Date) inputValue;
60+
result = inputDate.getTime();
61+
} else if (inputValue instanceof Long) {
62+
result = (long) inputValue;
63+
} else if (null == inputValue) {
64+
throw new DataException(
65+
String.format("Field '%s' cannot be null.", this.config.fieldName)
66+
);
67+
} else {
68+
throw new DataException(
69+
String.format("Cannot convert %s to timestamp.", inputValue.getClass().getName())
70+
);
71+
}
72+
73+
return result;
74+
}
75+
76+
private long processStruct(SchemaAndValue schemaAndValue) {
77+
final Struct inputStruct = (Struct) schemaAndValue.value();
78+
final Field inputField = schemaAndValue.schema().field(this.config.fieldName);
79+
80+
if (null == inputField) {
81+
throw new DataException(
82+
String.format("Schema does not have field '{}'", this.config.fieldName)
83+
);
84+
}
85+
86+
final Schema fieldSchema = inputField.schema();
87+
final long result;
88+
if (Schema.Type.INT64 == fieldSchema.type()) {
89+
final Object fieldValue = inputStruct.get(inputField);
90+
91+
if (null == fieldValue) {
92+
throw new DataException(
93+
String.format("Field '%s' cannot be null.", this.config.fieldName)
94+
);
95+
}
96+
97+
if (Timestamp.LOGICAL_NAME.equals(fieldSchema.name())) {
98+
final Date date = (Date) fieldValue;
99+
result = date.getTime();
100+
} else {
101+
final long timestamp = (long) fieldValue;
102+
result = timestamp;
103+
}
104+
} else {
105+
throw new DataException(
106+
String.format("Schema '{}' is not supported.", inputField.schema())
107+
);
108+
}
109+
110+
return result;
111+
}
112+
113+
114+
@Override
115+
public ConfigDef config() {
116+
return ExtractTimestampConfig.config();
117+
}
118+
119+
@Override
120+
public void close() {
121+
122+
}
123+
124+
@Override
125+
public void configure(Map<String, ?> settings) {
126+
this.config = new ExtractTimestampConfig(settings);
127+
}
128+
129+
130+
@Title("ExtractTimestamp(Value)")
131+
@Description("This transformation is used to use a field from the input data to override the timestamp for the record.")
132+
public static class Value<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {
133+
134+
@Override
135+
public R apply(R r) {
136+
final long timestamp = process(new SchemaAndValue(r.valueSchema(), r.value()));
137+
return r.newRecord(
138+
r.topic(),
139+
r.kafkaPartition(),
140+
r.keySchema(),
141+
r.key(),
142+
r.valueSchema(),
143+
r.value(),
144+
timestamp
145+
);
146+
}
147+
}
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import org.apache.kafka.common.config.AbstractConfig;
19+
import org.apache.kafka.common.config.ConfigDef;
20+
21+
import java.util.Map;
22+
23+
class ExtractTimestampConfig extends AbstractConfig {
24+
public final String fieldName;
25+
26+
public static final String FIELD_NAME_CONFIG = "field.name";
27+
public static final String FIELD_NAME_DOC = "The field to pull the timestamp from. This must be an int64 or a timestamp.";
28+
29+
public ExtractTimestampConfig(Map<?, ?> originals) {
30+
super(config(), originals);
31+
this.fieldName = getString(FIELD_NAME_CONFIG);
32+
}
33+
34+
public static ConfigDef config() {
35+
return new ConfigDef()
36+
.define(FIELD_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, FIELD_NAME_DOC);
37+
}
38+
}

src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternRename.java

+5-82
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.kafka.connect.data.SchemaAndValue;
2626
import org.apache.kafka.connect.data.SchemaBuilder;
2727
import org.apache.kafka.connect.data.Struct;
28-
import org.apache.kafka.connect.transforms.Transformation;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

@@ -34,7 +33,7 @@
3433
import java.util.Map;
3534
import java.util.regex.Matcher;
3635

37-
public abstract class PatternRename<R extends ConnectRecord<R>> implements Transformation<R> {
36+
public abstract class PatternRename<R extends ConnectRecord<R>> extends BaseTransformation<R> {
3837
private static final Logger log = LoggerFactory.getLogger(PatternRename.class);
3938

4039
@Override
@@ -54,7 +53,8 @@ public void close() {
5453

5554
}
5655

57-
SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) {
56+
@Override
57+
protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) {
5858
final Schema inputSchema = schemaAndValue.schema();
5959
final Struct inputStruct = (Struct) schemaAndValue.value();
6060
final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
@@ -94,7 +94,8 @@ SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) {
9494
return new SchemaAndValue(outputSchema, outputStruct);
9595
}
9696

97-
SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) {
97+
@Override
98+
protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) {
9899
final Map<String, Object> inputMap = (Map<String, Object>) schemaAndValue.value();
99100
final Map<String, Object> outputMap = new LinkedHashMap<>(inputMap.size());
100101

@@ -114,84 +115,6 @@ SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) {
114115
return new SchemaAndValue(null, outputMap);
115116
}
116117

117-
SchemaAndValue process(R record, SchemaAndValue schemaAndValue) {
118-
final SchemaAndValue result;
119-
if (schemaAndValue.value() instanceof Struct) {
120-
result = processStruct(record, schemaAndValue);
121-
} else if (schemaAndValue.value() instanceof Map) {
122-
result = processMap(record, schemaAndValue);
123-
} else {
124-
throw new UnsupportedOperationException();
125-
}
126-
return result;
127-
}
128-
129-
130-
// R process(R record, boolean isKey) {
131-
// final Schema inputSchema = isKey ? record.keySchema() : record.valueSchema();
132-
// final Struct inputStruct = (Struct) (isKey ? record.key() : record.value());
133-
// final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
134-
// outputSchemaBuilder.name(inputSchema.name());
135-
// outputSchemaBuilder.doc(inputSchema.doc());
136-
// if (null != inputSchema.defaultValue()) {
137-
// outputSchemaBuilder.defaultValue(inputSchema.defaultValue());
138-
// }
139-
// if (null != inputSchema.parameters() && !inputSchema.parameters().isEmpty()) {
140-
// outputSchemaBuilder.parameters(inputSchema.parameters());
141-
// }
142-
// if (inputSchema.isOptional()) {
143-
// outputSchemaBuilder.optional();
144-
// }
145-
// Map<String, String> fieldMappings = new HashMap<>(inputSchema.fields().size());
146-
// for (final Field inputField : inputSchema.fields()) {
147-
// log.trace("process() - Processing field '{}'", inputField.name());
148-
// final Matcher fieldMatcher = this.config.pattern.matcher(inputField.name());
149-
// final String outputFieldName;
150-
// if (fieldMatcher.find()) {
151-
// outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
152-
// } else {
153-
// outputFieldName = inputField.name();
154-
// }
155-
// log.trace("process() - Mapping field '{}' to '{}'", inputField.name(), outputFieldName);
156-
// fieldMappings.put(inputField.name(), outputFieldName);
157-
// outputSchemaBuilder.field(outputFieldName, inputField.schema());
158-
// }
159-
// final Schema outputSchema = outputSchemaBuilder.build();
160-
// final Struct outputStruct = new Struct(outputSchema);
161-
// for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
162-
// final String inputField = entry.getKey(), outputField = entry.getValue();
163-
// log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
164-
// final Object value = inputStruct.get(inputField);
165-
// outputStruct.put(outputField, value);
166-
// }
167-
//
168-
// final R result;
169-
// if (isKey) {
170-
// result = record.newRecord(
171-
// record.topic(),
172-
// record.kafkaPartition(),
173-
// outputSchema,
174-
// outputStruct,
175-
// record.valueSchema(),
176-
// record.value(),
177-
// record.timestamp()
178-
// );
179-
// } else {
180-
// result = record.newRecord(
181-
// record.topic(),
182-
// record.kafkaPartition(),
183-
// record.keySchema(),
184-
// record.key(),
185-
// outputSchema,
186-
// outputStruct,
187-
// record.timestamp()
188-
// );
189-
// }
190-
// return result;
191-
//
192-
//
193-
// }
194-
195118
@Title("PatternRename(Key)")
196119
@Description("This transformation is used to rename fields in the key of an input struct based on a regular expression and a replacement string.")
197120
@DocumentationTip("This transformation is used to manipulate fields in the Key of the record.")

0 commit comments

Comments
 (0)