Skip to content

Commit 134c95d

Browse files
Transformation to copy headers to structs. (#54)
* Added tranformation to copy headers to fields. Fixes #47 * Added support to pull in header values. Fixes #47
1 parent e1d8c55 commit 134c95d

File tree

8 files changed

+952
-1
lines changed

8 files changed

+952
-1
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<groupId>com.github.jcustenborder.kafka.connect</groupId>
2525
<artifactId>kafka-connect-parent</artifactId>
26-
<version>2.1.1-cp1</version>
26+
<version>2.2.1-cp1</version>
2727
</parent>
2828
<artifactId>kafka-connect-transform-common</artifactId>
2929
<version>0.1.0-SNAPSHOT</version>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.google.common.base.MoreObjects;
19+
import com.google.common.base.Preconditions;
20+
import com.google.common.collect.ComparisonChain;
21+
import com.google.common.collect.ImmutableMap;
22+
import com.google.common.collect.Ordering;
23+
import org.apache.kafka.connect.connector.ConnectRecord;
24+
import org.apache.kafka.connect.data.Date;
25+
import org.apache.kafka.connect.data.Decimal;
26+
import org.apache.kafka.connect.data.Schema;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.apache.kafka.connect.data.Time;
29+
import org.apache.kafka.connect.data.Timestamp;
30+
import org.apache.kafka.connect.data.Values;
31+
import org.apache.kafka.connect.header.Header;
32+
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
import java.util.Objects;
36+
37+
abstract class ConversionHandler {
38+
final Schema headerSchema;
39+
final String header;
40+
final String field;
41+
42+
protected ConversionHandler(Schema headerSchema, String header, String field) {
43+
this.headerSchema = headerSchema;
44+
this.header = header;
45+
this.field = field;
46+
}
47+
48+
abstract Object convert(Header header);
49+
50+
public void convert(ConnectRecord record, Struct struct) {
51+
final Header header = record.headers().lastWithName(this.header);
52+
Object fieldValue;
53+
if (null != header) {
54+
fieldValue = convert(header);
55+
} else {
56+
fieldValue = null;
57+
}
58+
59+
struct.put(this.field, fieldValue);
60+
}
61+
62+
static class StringConversionHandler extends ConversionHandler {
63+
public StringConversionHandler(Schema headerSchema, String header, String field) {
64+
super(headerSchema, header, field);
65+
}
66+
67+
@Override
68+
Object convert(Header header) {
69+
return Values.convertToString(header.schema(), header.value());
70+
}
71+
}
72+
73+
static class BooleanConversionHandler extends ConversionHandler {
74+
public BooleanConversionHandler(Schema headerSchema, String header, String field) {
75+
super(headerSchema, header, field);
76+
}
77+
78+
@Override
79+
Object convert(Header header) {
80+
return Values.convertToBoolean(header.schema(), header.value());
81+
}
82+
}
83+
84+
static class Float32ConversionHandler extends ConversionHandler {
85+
public Float32ConversionHandler(Schema headerSchema, String header, String field) {
86+
super(headerSchema, header, field);
87+
}
88+
89+
@Override
90+
Object convert(Header header) {
91+
return Values.convertToFloat(header.schema(), header.value());
92+
}
93+
}
94+
95+
static class Float64ConversionHandler extends ConversionHandler {
96+
public Float64ConversionHandler(Schema headerSchema, String header, String field) {
97+
super(headerSchema, header, field);
98+
}
99+
100+
@Override
101+
Object convert(Header header) {
102+
return Values.convertToDouble(header.schema(), header.value());
103+
}
104+
}
105+
106+
static class Int8ConversionHandler extends ConversionHandler {
107+
public Int8ConversionHandler(Schema headerSchema, String header, String field) {
108+
super(headerSchema, header, field);
109+
}
110+
111+
@Override
112+
Object convert(Header header) {
113+
return Values.convertToByte(header.schema(), header.value());
114+
}
115+
}
116+
117+
static class Int16ConversionHandler extends ConversionHandler {
118+
public Int16ConversionHandler(Schema headerSchema, String header, String field) {
119+
super(headerSchema, header, field);
120+
}
121+
122+
@Override
123+
Object convert(Header header) {
124+
return Values.convertToShort(header.schema(), header.value());
125+
}
126+
}
127+
128+
static class Int32ConversionHandler extends ConversionHandler {
129+
public Int32ConversionHandler(Schema headerSchema, String header, String field) {
130+
super(headerSchema, header, field);
131+
}
132+
133+
@Override
134+
Object convert(Header header) {
135+
return Values.convertToInteger(header.schema(), header.value());
136+
}
137+
}
138+
139+
static class Int64ConversionHandler extends ConversionHandler {
140+
public Int64ConversionHandler(Schema headerSchema, String header, String field) {
141+
super(headerSchema, header, field);
142+
}
143+
144+
@Override
145+
Object convert(Header header) {
146+
return Values.convertToLong(header.schema(), header.value());
147+
}
148+
}
149+
150+
static class DecimalConversionHandler extends ConversionHandler {
151+
private final int scale;
152+
153+
public DecimalConversionHandler(Schema headerSchema, String header, String field) {
154+
super(headerSchema, header, field);
155+
String scaleText = null != headerSchema.parameters() ? headerSchema.parameters().get(Decimal.SCALE_FIELD) : null;
156+
Preconditions.checkNotNull(scaleText, "schema parameters must contain a '%s' parameter.", Decimal.SCALE_FIELD);
157+
scale = Integer.parseInt(scaleText);
158+
}
159+
160+
@Override
161+
Object convert(Header header) {
162+
return Values.convertToDecimal(header.schema(), header.value(), scale);
163+
}
164+
}
165+
166+
static class TimestampConversionHandler extends ConversionHandler {
167+
168+
public TimestampConversionHandler(Schema headerSchema, String header, String field) {
169+
super(headerSchema, header, field);
170+
}
171+
172+
@Override
173+
Object convert(Header header) {
174+
return Values.convertToTimestamp(header.schema(), header.value());
175+
}
176+
}
177+
178+
static class TimeConversionHandler extends ConversionHandler {
179+
180+
public TimeConversionHandler(Schema headerSchema, String header, String field) {
181+
super(headerSchema, header, field);
182+
}
183+
184+
@Override
185+
Object convert(Header header) {
186+
return Values.convertToTime(header.schema(), header.value());
187+
}
188+
}
189+
190+
static class DateConversionHandler extends ConversionHandler {
191+
192+
public DateConversionHandler(Schema headerSchema, String header, String field) {
193+
super(headerSchema, header, field);
194+
}
195+
196+
@Override
197+
Object convert(Header header) {
198+
return Values.convertToDate(header.schema(), header.value());
199+
}
200+
}
201+
202+
203+
static class SchemaKey implements Comparable<SchemaKey> {
204+
final String name;
205+
final Schema.Type type;
206+
207+
208+
SchemaKey(String name, Schema.Type type) {
209+
this.name = name;
210+
this.type = type;
211+
}
212+
213+
public static SchemaKey of(Schema schema) {
214+
return new SchemaKey(schema.name(), schema.type());
215+
}
216+
217+
@Override
218+
public int hashCode() {
219+
return Objects.hash(this.type, this.name);
220+
}
221+
222+
@Override
223+
public int compareTo(SchemaKey that) {
224+
return ComparisonChain.start()
225+
.compare(this.type, that.type)
226+
.compare(this.name, that.name, Ordering.natural().nullsLast())
227+
.result();
228+
}
229+
230+
@Override
231+
public boolean equals(Object obj) {
232+
if (obj instanceof SchemaKey) {
233+
return 0 == compareTo((SchemaKey) obj);
234+
} else {
235+
return false;
236+
}
237+
}
238+
239+
@Override
240+
public String toString() {
241+
return MoreObjects.toStringHelper(this)
242+
.omitNullValues()
243+
.add("type", this.type)
244+
.add("name", this.name)
245+
.toString();
246+
}
247+
}
248+
249+
interface ConversionHandlerFactory {
250+
ConversionHandler create(Schema schema, String header, String field);
251+
}
252+
253+
static final Map<SchemaKey, ConversionHandlerFactory> CONVERSION_HANDLER_FACTORIES;
254+
255+
static {
256+
Map<SchemaKey, ConversionHandlerFactory> handlerFactories = new HashMap<>();
257+
handlerFactories.put(SchemaKey.of(Schema.STRING_SCHEMA), StringConversionHandler::new);
258+
handlerFactories.put(SchemaKey.of(Schema.BOOLEAN_SCHEMA), BooleanConversionHandler::new);
259+
handlerFactories.put(SchemaKey.of(Schema.FLOAT32_SCHEMA), Float32ConversionHandler::new);
260+
handlerFactories.put(SchemaKey.of(Schema.FLOAT64_SCHEMA), Float64ConversionHandler::new);
261+
handlerFactories.put(SchemaKey.of(Schema.INT8_SCHEMA), Int8ConversionHandler::new);
262+
handlerFactories.put(SchemaKey.of(Schema.INT16_SCHEMA), Int16ConversionHandler::new);
263+
handlerFactories.put(SchemaKey.of(Schema.INT32_SCHEMA), Int32ConversionHandler::new);
264+
handlerFactories.put(SchemaKey.of(Schema.INT64_SCHEMA), Int64ConversionHandler::new);
265+
handlerFactories.put(SchemaKey.of(Decimal.schema(1)), DecimalConversionHandler::new);
266+
handlerFactories.put(SchemaKey.of(Timestamp.SCHEMA), TimestampConversionHandler::new);
267+
handlerFactories.put(SchemaKey.of(Time.SCHEMA), TimeConversionHandler::new);
268+
handlerFactories.put(SchemaKey.of(Date.SCHEMA), DateConversionHandler::new);
269+
270+
271+
CONVERSION_HANDLER_FACTORIES = ImmutableMap.copyOf(handlerFactories);
272+
}
273+
274+
275+
public static ConversionHandler of(Schema headerSchema, String header, String field) {
276+
SchemaKey key = SchemaKey.of(headerSchema);
277+
ConversionHandlerFactory factory = CONVERSION_HANDLER_FACTORIES.get(key);
278+
if (null == factory) {
279+
throw new UnsupportedOperationException(
280+
String.format("%s is not supported", key)
281+
);
282+
}
283+
return factory.create(headerSchema, header, field);
284+
}
285+
}

0 commit comments

Comments
 (0)