Skip to content

Commit 171fdea

Browse files
Extract XPath (#86)
* Skeleton ExtractXPath SMT * Added dependencies for ExtractXPath SMT * Added test for XPath extracting an element from a SOAP Envelope * Added handling for byte[] payloads * Added handling for full payload (no need to wrap in a field) * Added extra logging
1 parent 9126c92 commit 171fdea

File tree

6 files changed

+590
-0
lines changed

6 files changed

+590
-0
lines changed

pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@
7676
<version>[0.3.33,0.3.1000)</version>
7777
<scope>test</scope>
7878
</dependency>
79+
<dependency>
80+
<groupId>org.apache.ws.commons</groupId>
81+
<artifactId>ws-commons-util</artifactId>
82+
<version>1.0.1</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>xerces</groupId>
86+
<artifactId>xercesImpl</artifactId>
87+
<version>2.12.1</version>
88+
</dependency>
7989
</dependencies>
8090
<build>
8191
<plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
20+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.data.Field;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaAndValue;
26+
import org.apache.kafka.connect.data.SchemaBuilder;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import java.io.StringWriter;
31+
import java.io.PrintWriter;
32+
33+
import java.util.Map;
34+
import java.util.LinkedHashMap;
35+
import java.io.InputStream;
36+
import java.io.ByteArrayInputStream;
37+
38+
import javax.xml.parsers.DocumentBuilder;
39+
import javax.xml.parsers.DocumentBuilderFactory;
40+
import javax.xml.xpath.XPath;
41+
import javax.xml.xpath.XPathExpression;
42+
import javax.xml.xpath.XPathFactory;
43+
import javax.xml.xpath.XPathConstants;
44+
import org.w3c.dom.Document;
45+
import org.w3c.dom.Node;
46+
import org.w3c.dom.bootstrap.DOMImplementationRegistry;
47+
import org.w3c.dom.ls.DOMImplementationLS;
48+
import org.w3c.dom.ls.LSSerializer;
49+
50+
import org.apache.ws.commons.util.NamespaceContextImpl;
51+
52+
public abstract class ExtractXPath<R extends ConnectRecord<R>> extends BaseTransformation<R> {
53+
private static final Logger log = LoggerFactory.getLogger(ExtractXPath.class);
54+
55+
56+
ExtractXPathConfig config;
57+
public ExtractXPathConfig theConfig() {
58+
return this.config;
59+
}
60+
61+
DocumentBuilder builder;
62+
XPath xpath;
63+
XPathExpression xpathE;
64+
LSSerializer writer;
65+
66+
@Override
67+
public ConfigDef config() {
68+
return ExtractXPathConfig.config();
69+
}
70+
71+
@Override
72+
public void close() {
73+
74+
}
75+
76+
@Override
77+
public void configure(Map<String, ?> settings) {
78+
this.config = new ExtractXPathConfig(settings);
79+
try {
80+
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
81+
factory.setNamespaceAware(config.namespaceAware);
82+
builder = factory.newDocumentBuilder();
83+
xpath = XPathFactory.newInstance().newXPath();
84+
if (config.namespaceAware) {
85+
NamespaceContextImpl nsContext = new NamespaceContextImpl();
86+
for (int i = 0; i < config.prefixes.size(); i++) {
87+
String prefix = config.prefixes.get(i);
88+
String ns = config.namespaces.get(i);
89+
log.debug("Adding prefix {} for namespace {}", prefix, ns);
90+
nsContext.startPrefixMapping(prefix, ns);
91+
}
92+
xpath.setNamespaceContext(nsContext);
93+
}
94+
xpathE = xpath.compile(config.xpath);
95+
96+
DOMImplementationRegistry registry = DOMImplementationRegistry.newInstance();
97+
DOMImplementationLS impl = (DOMImplementationLS) registry.getDOMImplementation("LS");
98+
writer = impl.createLSSerializer();
99+
} catch (Exception e) {
100+
log.error("Unable to create transformer {} {}", e.getMessage(), e.toString());
101+
}
102+
}
103+
104+
/**
105+
*
106+
*/
107+
private Object extractXPathString(Object inData) {
108+
log.trace("extractXPathString Data type {}", inData.getClass());
109+
InputStream in = null;
110+
try {
111+
112+
if (inData instanceof String) {
113+
log.trace("Handling XML String");
114+
String inFieldData = (String) inData;
115+
in = new ByteArrayInputStream(inFieldData.getBytes());
116+
Document doc = builder.parse(in);
117+
Node node = (Node) xpathE.evaluate(doc, XPathConstants.NODE);
118+
String output = writer.writeToString(node);
119+
return output;
120+
} else if (inData instanceof byte[]) {
121+
byte[] inFieldData = (byte[]) inData;
122+
log.trace("Handling byte array, length {}", inFieldData.length);
123+
in = new ByteArrayInputStream(inFieldData);
124+
Document doc = builder.parse(in);
125+
Node node = (Node) xpathE.evaluate(doc, XPathConstants.NODE);
126+
String output = writer.writeToString(node);
127+
return output.getBytes();
128+
} else {
129+
log.error("Expected a String or byte[], got a {}", inData.getClass().getName());
130+
}
131+
} catch (Exception e) {
132+
StringWriter sw = new StringWriter();
133+
PrintWriter pw = new PrintWriter(sw);
134+
e.printStackTrace(pw);
135+
log.error("Unable to evaluate XPath {} {}", e.getMessage(), sw.toString());
136+
}
137+
return null;
138+
}
139+
140+
@Override
141+
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
142+
Schema outPutSchema = inputSchema;
143+
SchemaAndValue retVal = null;
144+
log.trace("process() - Processing bytes Input: {}", new String(input));
145+
Object outputValue = extractXPathString(input);
146+
log.trace("process() - Output: {}", new String((byte[]) outputValue));
147+
retVal = new SchemaAndValue(outPutSchema, outputValue);
148+
return retVal;
149+
}
150+
151+
@Override
152+
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inputStruct) {
153+
Schema outPutSchema = inputSchema;
154+
SchemaAndValue retVal = null;
155+
if (!config.outputField.equals(config.inputField)) {
156+
// Adding a new field, need to build a new schema
157+
final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
158+
outputSchemaBuilder.name(inputSchema.name());
159+
outputSchemaBuilder.doc(inputSchema.doc());
160+
if (null != inputSchema.defaultValue()) {
161+
outputSchemaBuilder.defaultValue(inputSchema.defaultValue());
162+
}
163+
if (null != inputSchema.parameters() && !inputSchema.parameters().isEmpty()) {
164+
outputSchemaBuilder.parameters(inputSchema.parameters());
165+
}
166+
if (inputSchema.isOptional()) {
167+
outputSchemaBuilder.optional();
168+
}
169+
for (final Field inputField : inputSchema.fields()) {
170+
final String inputFieldName = inputField.name();
171+
log.trace("process() - Schema has field '{}'", inputFieldName);
172+
outputSchemaBuilder.field(inputFieldName, inputField.schema());
173+
if (inputFieldName.equals(config.inputField)) {
174+
log.trace("process() - Adding field '{}'", config.outputField);
175+
outputSchemaBuilder.field(config.outputField, inputField.schema());
176+
}
177+
}
178+
final Schema newSchema = outputSchemaBuilder.build();
179+
final Struct outputStruct = new Struct(newSchema);
180+
for (final Field inputField : inputSchema.fields()) {
181+
final String inputFieldName = inputField.name();
182+
final Object value = inputStruct.get(inputFieldName);
183+
outputStruct.put(inputFieldName, value);
184+
if (inputFieldName.equals(config.inputField)) {
185+
Object extractedValue = extractXPathString(value);
186+
outputStruct.put(config.outputField, extractedValue);
187+
}
188+
}
189+
retVal = new SchemaAndValue(newSchema, outputStruct);
190+
} else {
191+
Struct outputStruct = inputStruct;
192+
Object toReplace = inputStruct.get(config.inputField);
193+
if (toReplace != null && toReplace instanceof String) {
194+
String inputFieldName = config.inputField;
195+
String replacedField = (String) toReplace;
196+
log.trace("process() - Processing struct field '{}' value '{}'", inputFieldName, toReplace);
197+
Object extractedValue = extractXPathString(replacedField);
198+
if (config.outputField.equals(config.inputField)) {
199+
log.debug("process() - Replaced struct field '{}' with '{}'", inputFieldName, extractedValue);
200+
} else {
201+
log.debug("process() - Added struct field '{}' with '{}'", config.outputField, extractedValue);
202+
}
203+
outputStruct.put(config.outputField, extractedValue);
204+
retVal = new SchemaAndValue(outPutSchema, outputStruct);
205+
}
206+
}
207+
return retVal;
208+
}
209+
210+
@Override
211+
protected SchemaAndValue processMap(R record, Map<String, Object> input) {
212+
Map<String, Object> outputMap = new LinkedHashMap<>(input.size());
213+
for (final String inputFieldName : input.keySet()) {
214+
outputMap.put(inputFieldName, input.get(inputFieldName));
215+
log.trace("process() - Processing map field '{}' value '{}'", inputFieldName, input.get(inputFieldName));
216+
if (inputFieldName.equals(config.inputField)) {
217+
String fieldToMatch = (String) input.get(inputFieldName);
218+
Object replacedValue = extractXPathString(fieldToMatch);
219+
outputMap.put(config.outputField, replacedValue);
220+
if (config.outputField.equals(config.inputField)) {
221+
log.debug("process() - Replaced map field '{}' with '{}'", inputFieldName, replacedValue);
222+
} else {
223+
log.debug("process() - Added map field '{}' with '{}'", config.outputField, replacedValue);
224+
}
225+
}
226+
}
227+
return new SchemaAndValue(null, outputMap);
228+
}
229+
230+
@Title("ExtractXPathConfig(Key)")
231+
@Description("This transformation is used to take XML data and apply an XPath to " +
232+
"it, returning a new XML document.")
233+
@DocumentationTip("This transformation is used to manipulate fields in the Key of the record.")
234+
public static class Key<R extends ConnectRecord<R>> extends ExtractXPath<R> {
235+
236+
@Override
237+
public R apply(R r) {
238+
final SchemaAndValue transformed = process(r, r.keySchema(), r.key());
239+
240+
return r.newRecord(
241+
r.topic(),
242+
r.kafkaPartition(),
243+
transformed.schema(),
244+
transformed.value(),
245+
r.valueSchema(),
246+
r.value(),
247+
r.timestamp()
248+
);
249+
}
250+
}
251+
252+
@Title("ExtractXPathConfig(Value)")
253+
@Description("This transformation is used to take XML data and apply an XPath to " +
254+
"it, returning a new XML document.")
255+
public static class Value<R extends ConnectRecord<R>> extends ExtractXPath<R> {
256+
257+
@Override
258+
public R apply(R r) {
259+
final SchemaAndValue transformed = process(r, r.valueSchema(), r.value());
260+
261+
return r.newRecord(
262+
r.topic(),
263+
r.kafkaPartition(),
264+
r.keySchema(),
265+
r.key(),
266+
transformed.schema(),
267+
transformed.value(),
268+
r.timestamp()
269+
);
270+
}
271+
}
272+
273+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import org.apache.kafka.common.config.AbstractConfig;
19+
import org.apache.kafka.common.config.ConfigDef;
20+
21+
import java.util.List;
22+
import java.util.ArrayList;
23+
import java.util.Map;
24+
import java.util.Arrays;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class ExtractXPathConfig extends AbstractConfig {
30+
private static final Logger log = LoggerFactory.getLogger(ExtractXPathConfig.class);
31+
32+
public final String inputField;
33+
public final String outputField;
34+
public final String xpath;
35+
public final boolean namespaceAware;
36+
public final List<String> prefixes;
37+
public final List<String> namespaces;
38+
39+
public static final String IN_FIELD_CONFIG = "input.field";
40+
public static final String IN_FIELD_DOC = "The input field containing the XML Document.";
41+
public static final String OUT_FIELD_CONFIG = "output.field";
42+
public static final String OUT_FIELD_DOC = "The output field where the XML element matching the XPath will be placed.";
43+
public static final String NS_PREFIX_CONFIG = "ns.prefix";
44+
public static final String NS_PREFIX_DOC = "A comma separated list of Namespace prefixes";
45+
public static final String NS_LIST_CONFIG = "ns.namespace";
46+
public static final String NS_LIST_DOC = "A comma separated list of Namespaces corresponding to the prefixes";
47+
public static final String XPATH_CONFIG = "xpath";
48+
public static final String XPATH_DOC = "The XPath to apply to extract an element from the Document";
49+
50+
51+
52+
public ExtractXPathConfig(Map<String, ?> settings) {
53+
super(config(), settings);
54+
this.inputField = getString(IN_FIELD_CONFIG);
55+
this.outputField = getString(OUT_FIELD_CONFIG);
56+
this.xpath = getString(XPATH_CONFIG);
57+
String prefixString = getString(NS_PREFIX_CONFIG);
58+
String namespaceString = getString(NS_LIST_CONFIG);
59+
if (prefixString == null || prefixString.trim().length() == 0) {
60+
this.namespaceAware = false;
61+
prefixes = new ArrayList<String>();
62+
namespaces = new ArrayList<String>();
63+
} else {
64+
this.namespaceAware = true;
65+
prefixes = Arrays.asList(prefixString.split(","));
66+
namespaces = Arrays.asList(namespaceString.split(","));
67+
if (namespaces.size() != prefixes.size()) {
68+
log.warn("The list of namespaces and corresponding prefixes are not the same length.");
69+
}
70+
}
71+
}
72+
73+
public static ConfigDef config() {
74+
return new ConfigDef()
75+
.define(IN_FIELD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, IN_FIELD_DOC)
76+
.define(OUT_FIELD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, OUT_FIELD_DOC)
77+
.define(XPATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, XPATH_DOC)
78+
.define(NS_LIST_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, NS_LIST_DOC)
79+
.define(NS_PREFIX_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, NS_PREFIX_DOC);
80+
}
81+
82+
}

0 commit comments

Comments
 (0)