diff --git a/README.md b/README.md index 802c2d7..4dbc526 100644 --- a/README.md +++ b/README.md @@ -45,8 +45,9 @@ and everything should be fine ! :) See the installation section to learn how to integrate this processor in Apache NiFi. This projects add 2 different new processors in NiFi: -- `ProtobufDecoder`, which **decodes** a Protobuf-encoded payload to different kind of structured formats ; -- `ProtobufEncoder`, which **encodes** a payload in a structured format using a Protobuf schema. +- `DecodeProtobuf`, which **decodes** a Protobuf-encoded payload to different kind of structured formats. +- `EncodeProtobuf`, which **encodes** a payload in a structured format using a Protobuf schema. +- `ConvertProtobufToAvro` which decodes a Protobuf-encoded payload into Avro. ### Specifying the schema file In both processors, you have to specify a schema file to use for data encoding/decoding. You can do so either diff --git a/pom.xml b/pom.xml index 004c0af..e31dc14 100644 --- a/pom.xml +++ b/pom.xml @@ -20,24 +20,24 @@ UTF-8 1.7 1.7 - 1.4.0 + 1.16.3 com.google.protobuf protobuf-java - 3.5.0 + 3.9.0 com.github.os72 protobuf-dynamic - 0.9.3 + 1.0.0 com.google.protobuf protobuf-java-util - 3.5.0 + 3.9.0 org.apache.nifi @@ -52,7 +52,7 @@ org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.15.3 org.apache.nifi @@ -66,11 +66,37 @@ 4.12 test + + org.apache.nifi + nifi-record-serialization-service-api + ${nifi.version} + + + org.apache.nifi + nifi-record + ${nifi.version} + com.github.os72 protoc-jar 3.5.0 + + org.apache.avro + avro-protobuf + 1.11.0 + + + org.apache.avro + avro + 1.11.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi.version} + nar + @@ -78,7 +104,7 @@ org.apache.nifi nifi-nar-maven-plugin - 1.2.0 + 1.3.1 true diff --git a/src/main/java/com/github/whiver/nifi/mapper/JSONMapper.java b/src/main/java/com/github/whiver/nifi/mapper/JSONMapper.java index 4439548..4127830 100644 --- a/src/main/java/com/github/whiver/nifi/mapper/JSONMapper.java +++ b/src/main/java/com/github/whiver/nifi/mapper/JSONMapper.java @@ -38,11 +38,15 @@ public class JSONMapper { /** * Format a Protocol Buffers Message to a JSON string * @param data The Message to be formatted + * @param preserveFieldNames whether to preserve original field names * @return A JSON String representing the data * @throws InvalidProtocolBufferException Thrown in case of invalid Message data */ - public static String toJSON(Message data) throws InvalidProtocolBufferException { + public static String toJSON(Message data, boolean preserveFieldNames) throws InvalidProtocolBufferException { JsonFormat.Printer printer = JsonFormat.printer(); + if (preserveFieldNames) { + printer = printer.preservingProtoFieldNames(); + } return printer.print(data); } diff --git a/src/main/java/com/github/whiver/nifi/processor/ProtobufProcessor.java b/src/main/java/com/github/whiver/nifi/processor/AbstractProtobufProcessor.java similarity index 62% rename from src/main/java/com/github/whiver/nifi/processor/ProtobufProcessor.java rename to src/main/java/com/github/whiver/nifi/processor/AbstractProtobufProcessor.java index c065546..e97d03e 100644 --- a/src/main/java/com/github/whiver/nifi/processor/ProtobufProcessor.java +++ b/src/main/java/com/github/whiver/nifi/processor/AbstractProtobufProcessor.java @@ -31,8 +31,16 @@ import com.github.whiver.nifi.exception.SchemaLoadingException; import com.github.whiver.nifi.parser.SchemaParser; import com.google.protobuf.Descriptors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; @@ -41,7 +49,16 @@ import java.io.IOException; import java.util.*; -public abstract class ProtobufProcessor extends AbstractProcessor { +import static org.apache.nifi.components.Validator.VALID; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttributes({ + @ReadsAttribute(attribute = "protobuf.schemaPath", description = "will use this attribute as default for schema path."), + @ReadsAttribute(attribute = "protobuf.messageType", description = "will use this attribute as default for message type.") +}) +@Tags({"Protobuf", "decoder", "Google Protocol Buffer"}) +public abstract class AbstractProtobufProcessor extends AbstractProcessor { /** * NiFi properties of the processor, that can be configured using the Web UI */ @@ -56,26 +73,39 @@ public abstract class ProtobufProcessor extends AbstractProcessor { * The compiled descriptor used to parse incoming binaries in case where the schema has been specified in the * processor level (protobuf.schema property) */ - protected DynamicSchema schema; - - /** - * Reflects the value of the COMPILE_SCHEMA property, so that it can be used by the onPropertyModified method - */ - private boolean compileSchema; - + protected static DynamicSchema schema = null; /* PROPERTIES */ static final PropertyDescriptor PROTOBUF_SCHEMA = new PropertyDescriptor.Builder() .name("protobuf.schemaPath") - .displayName("Schema path") + .displayName("Schema Path") .required(false) .description("Path to the Protocol Buffers schema to use to encode or decode the data. If set, this schema will " + "be used when the flowfile protobuf.schemaPath is missing.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.createURLorFileValidator()) .build(); + static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder() + .name("demarcator") + .displayName("Demarcator") + .required(false) + .description("This property is used to produce/consume messages separated by a demarcator") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor PROTOBUF_MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("protobuf.messageType") + .displayName("Message Type") + .required(false) + .description("Path to the Protocol Buffers message type to use to encode or decode the data. If set, this message type will " + + "be used when the flowfile protobuf.messageType is missing.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(VALID) + .build(); + static final PropertyDescriptor COMPILE_SCHEMA = new PropertyDescriptor.Builder() .name("protobuf.compileSchema") .displayName("Compile schema") @@ -85,6 +115,17 @@ public abstract class ProtobufProcessor extends AbstractProcessor { "processing the data. It is useful if the given schema file is in .proto format. Try to always use " + "precompiled .desc schema whenever possible, since it is more performant.") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("max_message_size") + .displayName("Max Message Size") + .required(true) + .description("Maximum size of a single message when using a demarcator.") + .defaultValue("1 MB") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .build(); @@ -106,9 +147,10 @@ public abstract class ProtobufProcessor extends AbstractProcessor { .build(); @Override - public void init(final ProcessorInitializationContext context){ + public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(PROTOBUF_SCHEMA); + properties.add(PROTOBUF_MESSAGE_TYPE); properties.add(COMPILE_SCHEMA); this.properties = Collections.unmodifiableList(properties); @@ -117,33 +159,25 @@ public void init(final ProcessorInitializationContext context){ relationships.add(INVALID_SCHEMA); relationships.add(ERROR); this.relationships = Collections.unmodifiableSet(relationships); - - this.compileSchema = false; } + + /** * Compile the given schema file when the protobuf.schemaPath property is given * - * @see AbstractProcessor */ - @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - super.onPropertyModified(descriptor, oldValue, newValue); - - if (descriptor == PROTOBUF_SCHEMA) { - - // If the property is unset, just delete the existing descriptor - if (newValue == null || newValue.isEmpty()) { - this.schema = null; - return; - } - - if (!newValue.equals(oldValue)) { + @OnScheduled + public void setUpSchema(final ProcessContext processContext) { + String protoSchemaPath = processContext.getProperty(PROTOBUF_SCHEMA).evaluateAttributeExpressions().getValue(); + // If the property is unset, just delete the existing descriptor + if (protoSchemaPath == null || protoSchemaPath.isEmpty()) { this.schema = null; + } else { try { - this.schema = SchemaParser.parseSchema(newValue, this.compileSchema); + this.schema = SchemaParser.parseSchema(protoSchemaPath, processContext.getProperty(COMPILE_SCHEMA).evaluateAttributeExpressions().asBoolean()); } catch (FileNotFoundException e) { - getLogger().error("File " + newValue + " not found on the disk.", e); + getLogger().error("File " + protoSchemaPath + " not found on the disk.", e); } catch (Descriptors.DescriptorValidationException e) { getLogger().error("Invalid schema file: " + e.getMessage(), e); } catch (IOException e) { @@ -154,9 +188,6 @@ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, S getLogger().error("Unable to compile schema: " + e.getMessage(), e); } } - } else if (descriptor == COMPILE_SCHEMA) { - this.compileSchema = Boolean.parseBoolean(newValue); - } } @Override diff --git a/src/main/java/com/github/whiver/nifi/processor/ConvertProtobufToAvro.java b/src/main/java/com/github/whiver/nifi/processor/ConvertProtobufToAvro.java new file mode 100644 index 0000000..9a1ed6f --- /dev/null +++ b/src/main/java/com/github/whiver/nifi/processor/ConvertProtobufToAvro.java @@ -0,0 +1,168 @@ +/* + * MIT License + * + * NiFi Protobuf Processor + * Copyright (c) 2017 William Hiver + * https://github.com/whiver/nifi-protobuf-processor + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.whiver.nifi.processor; + +import com.google.protobuf.DynamicMessage; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.protobuf.ProtobufData; +import org.apache.avro.protobuf.ProtobufDatumWriter; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * Processor to convert protobuf format to avro + */ +@SideEffectFree +@SeeAlso(value = {EncodeProtobuf.class, DecodeProtobuf.class, ConvertRecordToProtobuf.class}) +@CapabilityDescription("Decodes incoming data using a Google Protocol Buffer Schema and converts into Avro.") +public class ConvertProtobufToAvro extends AbstractProtobufProcessor { + private DatumWriter datumWriter; + + static final PropertyDescriptor PROTOBUF_MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("protobuf.messageType") + .displayName("Message Type") + .required(true) + .description("Path to the Protocol Buffers message type to use to encode or decode the data. If set, this message type will " + + "be used when the flowfile protobuf.messageType is missing.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder() + .name("demarcator") + .displayName("Demarcator") + .required(true) + .defaultValue("|||") + .description("This property is used to produce/consume messages separated by a demarcator") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(Validator.VALID) + .build(); + + protected Schema avroSchema; + protected String messageType; + + @Override + public List getSupportedPropertyDescriptors() { + List list = new LinkedList<>(super.getSupportedPropertyDescriptors()); + list.remove(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE); + list.remove(AbstractProtobufProcessor.DEMARCATOR); + list.add(DEMARCATOR); + list.add(PROTOBUF_MESSAGE_TYPE); + list.add(MAX_MESSAGE_SIZE); + return list; + } + + @OnScheduled + @Override + public void setUpSchema(ProcessContext processContext) { + super.setUpSchema(processContext); + messageType = processContext.getProperty(PROTOBUF_MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); + avroSchema = ProtobufData.get().getSchema(this.schema.getMessageDescriptor(messageType)); + datumWriter = new ProtobufDatumWriter<>(avroSchema); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { + final AtomicReference error = new AtomicReference<>(); + + FlowFile flowfile = session.get(); + + if (flowfile == null) { + return; + } + + String demarcator = processContext.getProperty(DEMARCATOR).evaluateAttributeExpressions(flowfile).getValue(); + int maxMessageSize = processContext.getProperty(MAX_MESSAGE_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue(); + String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); + + if (protobufSchema == null && this.schema == null) { + getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + + " property, either at processor or flowfile level.."); + session.transfer(flowfile, INVALID_SCHEMA); + } else { + + // Write the results back out ot flow file + FlowFile outputFlowfile; + try { + outputFlowfile = processBatch(session, error, flowfile, demarcator, maxMessageSize); + outputFlowfile = session.putAttribute(outputFlowfile, CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + session.transfer(outputFlowfile, SUCCESS); + } catch (RuntimeException e) { + session.transfer(flowfile, error.get()); + } + } + } + + private FlowFile processBatch(ProcessSession session, + AtomicReference error, + FlowFile flowfile, + String demarcator, + int maxMessageSize) { + return session.write(flowfile, (in, out) -> { + try { + byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + StreamDemarcator streamDemarcator = new StreamDemarcator(in, demarcatorBytes, maxMessageSize); + DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.create(avroSchema, out); + byte[] message; + while ((message = streamDemarcator.nextToken()) != null) { + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(this.schema.getMessageDescriptor(messageType), message); + dataFileWriter.append(dynamicMessage); + } + dataFileWriter.flush(); + dataFileWriter.close(); + + } catch (Exception e) { + getLogger().error("encountered error while processing batch:", e); + error.set(ERROR); + throw new RuntimeException(e); + } + }); + } +} diff --git a/src/main/java/com/github/whiver/nifi/processor/ConvertRecordToProtobuf.java b/src/main/java/com/github/whiver/nifi/processor/ConvertRecordToProtobuf.java new file mode 100644 index 0000000..2e3f37d --- /dev/null +++ b/src/main/java/com/github/whiver/nifi/processor/ConvertRecordToProtobuf.java @@ -0,0 +1,398 @@ +/* + * MIT License + * + * NiFi Protobuf Processor + * Copyright (c) 2017 William Hiver + * https://github.com/whiver/nifi-protobuf-processor + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.whiver.nifi.processor; + +import com.google.common.base.CaseFormat; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + + +/** + * Processor to convert protobuf format to avro + */ +@SideEffectFree +@SeeAlso(value = {EncodeProtobuf.class, DecodeProtobuf.class, ConvertProtobufToAvro.class}) +@CapabilityDescription("Decodes incoming data using RecordReader and converts it into a Google Protocol Buffer Schema.") +public class ConvertRecordToProtobuf extends AbstractProtobufProcessor { + + static final PropertyDescriptor PROTOBUF_MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("protobuf.messageType") + .displayName("Message Type") + .required(true) + .description("Path to the Protocol Buffers message type to use to encode or decode the data. If set, this message type will " + + "be used when the flowfile protobuf.messageType is missing.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder() + .name("demarcator") + .displayName("Demarcator") + .required(true) + .defaultValue("|||") + .description("This property is used to produce/consume messages separated by a demarcator") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(Validator.VALID) + .build(); + + protected String messageType; + + @Override + public List getSupportedPropertyDescriptors() { + List list = new LinkedList<>(super.getSupportedPropertyDescriptors()); + list.add(RECORD_READER); + list.remove(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE); + list.remove(AbstractProtobufProcessor.DEMARCATOR); + list.add(DEMARCATOR); + list.add(PROTOBUF_MESSAGE_TYPE); + return list; + } + + @OnScheduled + @Override + public void setUpSchema(ProcessContext processContext) { + super.setUpSchema(processContext); + messageType = processContext.getProperty(PROTOBUF_MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { + final AtomicReference error = new AtomicReference<>(); + + FlowFile flowfile = session.get(); + + if (flowfile == null) { + return; + } + + String demarcator = processContext.getProperty(DEMARCATOR).evaluateAttributeExpressions(flowfile).getValue(); + String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); + + if (protobufSchema == null && this.schema == null) { + getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + + " property, either at processor or flowfile level.."); + session.transfer(flowfile, INVALID_SCHEMA); + } else { + + // Write the results back out ot flow file + FlowFile outputFlowfile; + try { + outputFlowfile = processBatch(processContext, session, error, flowfile, demarcator); + outputFlowfile = session.putAttribute(outputFlowfile, CoreAttributes.MIME_TYPE.key(), "application/protobuf-binary"); + session.transfer(outputFlowfile, SUCCESS); + } catch (RuntimeException e) { + session.transfer(flowfile, error.get()); + } + } + } + + private FlowFile processBatch(ProcessContext context, + ProcessSession session, + AtomicReference error, + FlowFile flowfile, + String demarcator) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + return session.write(flowfile, (in, out) -> { + try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, getLogger())) { + byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + Descriptors.Descriptor messageDescriptor = schema.getMessageDescriptor(messageType); + Record record; + boolean firstTime = true; + while ((record = reader.nextRecord()) != null) { + if (!firstTime) { // add demarcator + out.write(demarcatorBytes); + } + firstTime = false; + getLogger().debug("converting record to dynamic message"); + DynamicMessage dynamicMessage = recordDataToDynamicMessage(messageDescriptor, record); + out.write(dynamicMessage.toByteArray()); + } + } catch (Exception e) { + getLogger().error("encountered error while processing batch:", e); + error.set(ERROR); + throw new RuntimeException(e); + } + }); + } + + /** + * Builds a protobuf DynamicMessage using a NiFi {@link Record} + * + * @param messageDescriptor the message descriptor + * @param record the record + * @return a {@link DynamicMessage} + */ + private DynamicMessage recordDataToDynamicMessage(Descriptors.Descriptor messageDescriptor, Record record) { + DynamicMessage.Builder dynamicMessage = DynamicMessage + .newBuilder(messageDescriptor); + for (Descriptors.FieldDescriptor field : messageDescriptor.getFields()) { + String fieldName = field.getName(); + getLogger().debug("attempting to extract value for field {}", fieldName); + AtomicReference nameRepresentedByRecord = new AtomicReference<>(); + Object value = attemptToExtractWithDifferentNames((name) -> getValue(record, name, field), fieldName, nameRepresentedByRecord); + if (value != null) { + if (field.getType().equals(Descriptors.FieldDescriptor.Type.ENUM)) { + setFieldForEnum(dynamicMessage, field, fieldName, value); + } else if (field.getType().equals(Descriptors.FieldDescriptor.Type.MESSAGE)) { + handleMessageType(record, dynamicMessage, field, fieldName, value, nameRepresentedByRecord); + } else { + getLogger().debug("setting value {} for {}", value, fieldName); + dynamicMessage.setField(field, value); + } + } + } + return dynamicMessage.build(); + } + + private void handleMessageType(Record record, DynamicMessage.Builder dynamicMessage, + Descriptors.FieldDescriptor field, + String fieldName, Object value, AtomicReference nameRepresentedByRecord) { + // MESSAGE type is represented as MapRecord by NiFi + if (value instanceof MapRecord) { + dynamicMessage.setField(field, buildMessage((MapRecord) value, field)); + } else if (field.getMessageType().getFields().size() == 1) { + // Since the NiFi record is not recognizing this field as a MESSAGE field but the flat out field, + // and since the MESSAGE has only one field, it is possible to address this one field as the intended field + // by the NiFi record. + handleSingleFieldMessageField(record, dynamicMessage, field, nameRepresentedByRecord); + } else { + throw new IllegalStateException("Field " + fieldName + " should be of type MESSAGE but is not!"); + } + } + + private void handleSingleFieldMessageField(Record record, + DynamicMessage.Builder dynamicMessage, + Descriptors.FieldDescriptor field, + AtomicReference nameRepresentedByRecord) { + Object value; + Descriptors.FieldDescriptor innerField = field.getMessageType().getFields().get(0); + String innerFieldName = innerField.getName(); + Optional currentRecordField = record + .getSchema() + .getField(nameRepresentedByRecord.get()); + if (currentRecordField.isPresent()) { + value = getValue(record, nameRepresentedByRecord.get(), innerField); + HashMap map = new HashMap<>(); + map.put(innerFieldName, value); + MapRecord mapRecord = + new MapRecord(new SimpleRecordSchema(Collections.singletonList(currentRecordField.get())), + map); + dynamicMessage.setField(field, buildMessage(mapRecord, field)); + } else { + throw new IllegalStateException("Field value was extracted but field doesn't exist!"); + } + } + + private void setFieldForEnum(DynamicMessage.Builder dynamicMessage, Descriptors.FieldDescriptor field, String fieldName, Object value) { + getLogger().debug("encountered enum for {}", fieldName); + if (value instanceof Number) { + getLogger().debug("value is number for {}, will convert to enum", fieldName); + dynamicMessage.setField(field, field.getEnumType() + .findValueByNumber(((Number) value).intValue())); + } else { + dynamicMessage.setField(field, field.getEnumType() + .findValueByName(value.toString())); + } + } + + @Nonnull + private Message buildMessage(MapRecord mapRecord, Descriptors.FieldDescriptor field) { + DynamicMessage.Builder message = DynamicMessage.newBuilder(field.getMessageType()); + for (Descriptors.FieldDescriptor fieldDescriptor : field.getMessageType().getFields()) { + AtomicReference name = new AtomicReference<>(); + Object value = attemptToExtractWithDifferentNames(currentFieldName -> getValue(mapRecord, currentFieldName, fieldDescriptor), + fieldDescriptor.getName(), name); + if (value != null) { + if (fieldDescriptor.getJavaType().equals(Descriptors.FieldDescriptor.JavaType.MESSAGE)) { + value = buildMessage((MapRecord) value, fieldDescriptor); + } + message.setField(fieldDescriptor, value); + } + } + + return message.build(); + } + + /** + * Attempts to extract the value with different formats for the name(UpperCamelCase, lowerCamelCase, lower_underscore). + * The function will try every format until it yields a non-null result. + * Then, returns the value. + * + * @param extractValue the function to attempt to extract the value + * @param fieldName the original field name + * @return the value(if successful, should not be null) + */ + @Nullable + private T attemptToExtractWithDifferentNames(Function extractValue, String fieldName) { + return attemptToExtractWithDifferentNames(extractValue, fieldName, null); + } + + /** + * Attempts to extract the value with different formats for the name(UpperCamelCase, lowerCamelCase, lower_underscore). + * The function will try every format until it yields a non-null result. + * Then, returns the value. + * + * @param extractValue the function to attempt to extract the value + * @param fieldName the original field name + * @param finalName an atomic reference to hold the last field name attempt. If the function succeeded, + * it will hold the name used to extract the value. + * @return the value(if successful, should not be null) + */ + @Nullable + private T attemptToExtractWithDifferentNames(Function extractValue, String fieldName, + AtomicReference finalName) { + T value = extractValue.apply(fieldName); + String originalFieldName = fieldName; + + if (value == null) { + fieldName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, originalFieldName); + value = extractValue.apply(fieldName); + } + + if (value == null) { + fieldName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, originalFieldName); + value = extractValue.apply(fieldName); + } + + if (value == null) { + fieldName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, originalFieldName); + value = extractValue.apply(fieldName); + } + + if (value == null) { + fieldName = CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, originalFieldName); + value = extractValue.apply(fieldName); + } + + + if (value == null) { + fieldName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, originalFieldName); + value = extractValue.apply(fieldName); + } + + if (value == null) { + fieldName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, originalFieldName); + value = extractValue.apply(fieldName); + } + + if (finalName != null) { + finalName.set(fieldName); + } + + return value; + } + + /** + * Get value according to target type. It checks what kind of type it should be and converts accordingly. + * + * @param record the nifi {@link Record} + * @param fieldName the field name + * @param fieldDescriptor the field descriptor + * @return the value + */ + private Object getValue(Record record, String fieldName, Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case DOUBLE: + return record.getAsDouble(fieldName); + case INT: + return record.getAsInt(fieldName); + case FLOAT: + return record.getAsFloat(fieldName); + case LONG: + return record.getAsLong(fieldName); + case STRING: + return record.getAsString(fieldName); + case BOOLEAN: + Optional recordField = record.getSchema().getField(fieldName); + if (recordField.isPresent()) { + RecordFieldType fieldType = recordField.get().getDataType().getFieldType(); + if (fieldType.equals(RecordFieldType.INT)) { + return record.getAsInt(fieldName) == 1; + } else if (fieldType.equals(RecordFieldType.STRING)) { + String value = record.getAsString(fieldName).toLowerCase(Locale.ROOT); + if (value.equals("true")) { + return true; + } else if (value.equals("false")) { + return false; + } + } + } + return record.getAsBoolean(fieldName); + case ENUM: + Optional field = record.getSchema().getField(fieldName); + if (field.isPresent()) { + if (field.get().getDataType().getFieldType().equals(RecordFieldType.INT)) { + return record.getAsInt(fieldName); + } + } + default: + return record.getValue(fieldName); + } + } +} diff --git a/src/main/java/com/github/whiver/nifi/processor/DecodeProtobuf.java b/src/main/java/com/github/whiver/nifi/processor/DecodeProtobuf.java new file mode 100644 index 0000000..3e4d760 --- /dev/null +++ b/src/main/java/com/github/whiver/nifi/processor/DecodeProtobuf.java @@ -0,0 +1,197 @@ +/* + * MIT License + * + * NiFi Protobuf Processor + * Copyright (c) 2017 William Hiver + * https://github.com/whiver/nifi-protobuf-processor + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.whiver.nifi.processor; + +import com.github.whiver.nifi.exception.MessageDecodingException; +import com.github.whiver.nifi.exception.SchemaCompilationException; +import com.github.whiver.nifi.exception.SchemaLoadingException; +import com.github.whiver.nifi.exception.UnknownMessageTypeException; +import com.github.whiver.nifi.service.ProtobufService; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + + +@SideEffectFree +@SeeAlso(EncodeProtobuf.class) +@CapabilityDescription("Decodes incoming data using a Google Protocol Buffer Schema.") +public class DecodeProtobuf extends AbstractProtobufProcessor { + + static final PropertyDescriptor PRESERVE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("preserve_field_names") + .displayName("Preserve Original Proto Field Names") + .required(true) + .defaultValue("false") + .description("Whether to preserve original proto field names. If not, fields like field_name would be converted to camel case fieldName.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + List list = new LinkedList<>(super.getSupportedPropertyDescriptors()); + list.add(DEMARCATOR); + list.add(MAX_MESSAGE_SIZE); + list.add(PRESERVE_FIELD_NAMES); + return list; + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { + final AtomicReference error = new AtomicReference<>(); + + FlowFile flowfile = session.get(); + + if (flowfile == null) { + return; + } + + String demarcator = processContext.getProperty(DEMARCATOR).evaluateAttributeExpressions(flowfile).getValue(); + + String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); + + String messageTypeValue = flowfile.getAttribute(PROTOBUF_MESSAGE_TYPE.getName()); + int maxDataSize = processContext.getProperty(MAX_MESSAGE_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue(); + final String messageType = messageTypeValue != null ? messageTypeValue : processContext.getProperty(PROTOBUF_MESSAGE_TYPE).evaluateAttributeExpressions(flowfile).getValue(); + final boolean preserveFieldNames = processContext.getProperty(PRESERVE_FIELD_NAMES).evaluateAttributeExpressions(flowfile).asBoolean(); + + if (protobufSchema == null && this.schema == null) { + getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + + " property, either at processor or flowfile level.."); + session.transfer(flowfile, INVALID_SCHEMA); + } else if (messageType == null) { + getLogger().error("Unable to find the message type in protobuf.messageType, unable to decode data."); + session.transfer(flowfile, ERROR); + } else { + + // Write the results back out ot flow file + FlowFile outputFlowfile; + + try { + if (demarcator == null || demarcator.isEmpty()) { + outputFlowfile = processSingleFlowFile(session, error, flowfile, messageType, preserveFieldNames); + } else { + outputFlowfile = processBatch(session, error, flowfile, messageType, demarcator, preserveFieldNames, maxDataSize); + } + outputFlowfile = session.putAttribute(outputFlowfile, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.transfer(outputFlowfile, SUCCESS); + } catch (Exception e) { + session.transfer(flowfile, error.get()); + } + } + } + + private FlowFile processBatch(ProcessSession session, + AtomicReference error, + FlowFile flowfile, + String messageType, + String demarcator, + boolean preserveFieldNames, + int maxDataSize) throws Exception { + return session.write(flowfile, (in, out) -> { + try { + byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + StreamDemarcator streamDemarcator = new StreamDemarcator(in, demarcatorBytes, maxDataSize); + out.write('['); + byte[] message = streamDemarcator.nextToken(); + while (message != null) { + processMessage(messageType, preserveFieldNames, message, out); + message = streamDemarcator.nextToken(); + if (message != null) { + out.write(','); + } + } + out.write(']'); + } catch (Exception e) { + getLogger().error("encountered error while processing batch:", e); + error.set(ERROR); + throw new RuntimeException(e); + } + }); + } + + private FlowFile processSingleFlowFile(ProcessSession session, + AtomicReference error, + FlowFile flowfile, + String messageType, + boolean preserveFieldNames) { + return session.write(flowfile, (InputStream in, OutputStream out) -> { + try { + processMessage(messageType, preserveFieldNames, in, out); + } catch (DescriptorValidationException e) { + getLogger().error("Invalid schema file: " + e.getMessage(), e); + error.set(INVALID_SCHEMA); + throw new RuntimeException(e); + } catch (SchemaLoadingException | SchemaCompilationException e) { + getLogger().error(e.getMessage(), e); + error.set(INVALID_SCHEMA); + throw new RuntimeException(e); + } catch (UnknownMessageTypeException | MessageDecodingException e) { + getLogger().error(e.getMessage()); + error.set(ERROR); + throw new RuntimeException(e); + } catch (InvalidProtocolBufferException e) { + getLogger().error("Unable to encode message into JSON: " + e.getMessage(), e); + error.set(ERROR); + throw new RuntimeException(e); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + } + + private void processMessage(String messageType, boolean preserveFieldNames, InputStream in, OutputStream out) throws IOException, DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException, InterruptedException, SchemaCompilationException { + out.write(ProtobufService.decodeProtobuf(this.schema, messageType, in, preserveFieldNames).getBytes()); + } + + private void processMessage(String messageType, boolean preserveFieldNames, byte[] in, OutputStream out) throws IOException, DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException, InterruptedException, SchemaCompilationException { + out.write(ProtobufService.decodeProtobuf(this.schema, messageType, in, preserveFieldNames).getBytes()); + } +} diff --git a/src/main/java/com/github/whiver/nifi/processor/EncodeProtobuf.java b/src/main/java/com/github/whiver/nifi/processor/EncodeProtobuf.java new file mode 100644 index 0000000..4977c39 --- /dev/null +++ b/src/main/java/com/github/whiver/nifi/processor/EncodeProtobuf.java @@ -0,0 +1,161 @@ +/* + * MIT License + * + * NiFi Protobuf Processor + * Copyright (c) 2017 William Hiver + * https://github.com/whiver/nifi-protobuf-processor + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.whiver.nifi.processor; + + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.whiver.nifi.exception.MessageEncodingException; +import com.github.whiver.nifi.exception.SchemaCompilationException; +import com.github.whiver.nifi.exception.SchemaLoadingException; +import com.github.whiver.nifi.exception.UnknownMessageTypeException; +import com.github.whiver.nifi.service.ProtobufService; +import com.google.protobuf.Descriptors; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +@SideEffectFree +@SeeAlso(DecodeProtobuf.class) +@CapabilityDescription("Decodes incoming data using a Google Protocol Buffer Schema.") +public class EncodeProtobuf extends AbstractProtobufProcessor { + private static final JsonFactory jsonFactory = new JsonFactory(); + + @Override + public List getSupportedPropertyDescriptors() { + List list = new LinkedList<>(super.getSupportedPropertyDescriptors()); + list.add(DEMARCATOR); + return list; + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { + final AtomicReference error = new AtomicReference<>(); + + final FlowFile flowfile = session.get(); + + if (flowfile == null) { + return; + } + + // We check if the protobuf.schemaPath property is defined in the flowfile + String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); + + boolean compileSchema = processContext.getProperty(COMPILE_SCHEMA).evaluateAttributeExpressions().asBoolean(); + + String messageTypeValue = flowfile.getAttribute(PROTOBUF_MESSAGE_TYPE.getName()); + final String messageType = messageTypeValue != null ? messageTypeValue : processContext.getProperty(PROTOBUF_MESSAGE_TYPE).evaluateAttributeExpressions(flowfile).getValue(); + String demarcator = processContext.getProperty(DEMARCATOR).evaluateAttributeExpressions(flowfile).getValue(); + + + if (protobufSchema == null && this.schema == null) { + getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + + " property, either at processor or flowfile level.."); + session.transfer(flowfile, INVALID_SCHEMA); + } else if (messageType == null) { + getLogger().error("Unable to find the message type in protobuf.messageType, unable to decode data."); + session.transfer(flowfile, ERROR); + } else { + + FlowFile outputFlowfile = null; + try { + outputFlowfile = session.write(flowfile, (InputStream in, OutputStream out) -> { + try { + // If the protobufSchema property is defined, we use the schema from the flowfile instead of the + // processor-wide one + + if (demarcator != null) { + byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + JsonParser parser = jsonFactory.createParser(in); + ObjectMapper mapper = new ObjectMapper(jsonFactory); + if (parser.nextToken() != JsonToken.START_ARRAY) { + parseOneObject(protobufSchema, compileSchema, messageType, in, out); + } + parser.nextToken(); + while (parser.currentToken() == JsonToken.START_OBJECT) { + ProtobufService.encodeProtobuf(this.schema, messageType, + new ByteArrayInputStream(mapper.readTree(parser).toString().getBytes()), out); + if (parser.nextToken() == JsonToken.START_OBJECT) { + out.write(demarcatorBytes); + } + } + } else { + parseOneObject(protobufSchema, compileSchema, messageType, in, out); + } + } catch (Descriptors.DescriptorValidationException e) { + getLogger().error("Invalid schema file: " + e.getMessage(), e); + error.set(INVALID_SCHEMA); + throw new RuntimeException(e); + } catch (SchemaLoadingException | SchemaCompilationException e) { + getLogger().error(e.getMessage(), e); + error.set(INVALID_SCHEMA); + throw new RuntimeException(e); + } catch (InterruptedException e) { + getLogger().error("Unable to compile schema: " + e.getMessage(), e); + error.set(ERROR); + throw new RuntimeException(e); + } catch (Exception e) { + getLogger().error(e.getMessage(), e); + error.set(ERROR); + throw new RuntimeException(e); + } + }); + session.transfer(outputFlowfile, SUCCESS); + } catch (RuntimeException e) { + session.transfer(flowfile, error.get()); + } + } + } + + private void parseOneObject(String protobufSchema, boolean compileSchema, String messageType, InputStream in, OutputStream out) throws Descriptors.DescriptorValidationException, IOException, MessageEncodingException, UnknownMessageTypeException, SchemaLoadingException, SchemaCompilationException, InterruptedException { + if (protobufSchema == null) { + ProtobufService.encodeProtobuf(this.schema, messageType, in, out); + } else { + ProtobufService.encodeProtobuf(protobufSchema, compileSchema, messageType, in, out); + } + } +} diff --git a/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java b/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java deleted file mode 100644 index 701d175..0000000 --- a/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * MIT License - * - * NiFi Protobuf Processor - * Copyright (c) 2017 William Hiver - * https://github.com/whiver/nifi-protobuf-processor - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.github.whiver.nifi.processor; - -import com.github.whiver.nifi.exception.MessageDecodingException; -import com.github.whiver.nifi.exception.SchemaCompilationException; -import com.github.whiver.nifi.exception.SchemaLoadingException; -import com.github.whiver.nifi.exception.UnknownMessageTypeException; -import com.github.whiver.nifi.service.ProtobufService; -import com.google.protobuf.Descriptors.DescriptorValidationException; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicReference; - - -@SideEffectFree -@Tags({"Protobuf", "decoder", "Google Protocol Buffer"}) -@CapabilityDescription("Decode incoming data encoded using a Google Protocol Buffer Schema.") -public class ProtobufDecoder extends ProtobufProcessor { - - @Override - public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { - final AtomicReference error = new AtomicReference<>(); - - final FlowFile flowfile = session.get(); - - String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); - boolean compileSchema = processContext.getProperty(COMPILE_SCHEMA.getName()).asBoolean(); - String messageType = flowfile.getAttribute("protobuf.messageType"); - - if (protobufSchema == null && this.schema == null) { - getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + - " property, either at processor or flowfile level.."); - session.transfer(flowfile, INVALID_SCHEMA); - } else if (messageType == null) { - getLogger().error("Unable to find the message type in protobuf.messageType, unable to decode data."); - session.transfer(flowfile, ERROR); - } else { - - // Write the results back out ot flow file - FlowFile outputFlowfile = session.write(flowfile, (InputStream in, OutputStream out) -> { - try { - if (protobufSchema == null) { - out.write(ProtobufService.decodeProtobuf(this.schema, messageType, in).getBytes()); - } else { - out.write(ProtobufService.decodeProtobuf(protobufSchema, compileSchema, messageType, in).getBytes()); - } - } catch (DescriptorValidationException e) { - getLogger().error("Invalid schema file: " + e.getMessage(), e); - error.set(INVALID_SCHEMA); - } catch (SchemaLoadingException | SchemaCompilationException e) { - getLogger().error(e.getMessage(), e); - error.set(INVALID_SCHEMA); - } catch (UnknownMessageTypeException | MessageDecodingException e) { - getLogger().error(e.getMessage()); - error.set(ERROR); - } catch (InvalidProtocolBufferException e) { - getLogger().error("Unable to encode message into JSON: " + e.getMessage(), e); - error.set(ERROR); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - - if (error.get() != null) { - session.transfer(flowfile, error.get()); - } else { - session.transfer(outputFlowfile, SUCCESS); - } - } - } -} diff --git a/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java b/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java deleted file mode 100644 index ff2ef93..0000000 --- a/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * MIT License - * - * NiFi Protobuf Processor - * Copyright (c) 2017 William Hiver - * https://github.com/whiver/nifi-protobuf-processor - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.github.whiver.nifi.processor; - - -import com.github.whiver.nifi.exception.SchemaCompilationException; -import com.github.whiver.nifi.exception.SchemaLoadingException; -import com.github.whiver.nifi.service.ProtobufService; -import com.google.protobuf.Descriptors; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicReference; - -@SideEffectFree -@Tags({"Protobuf", "decoder", "Google Protocol Buffer"}) -@CapabilityDescription("Decode incoming data encoded using a Google Protocol Buffer Schema.") -public class ProtobufEncoder extends ProtobufProcessor { - - @Override - public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException { - final AtomicReference error = new AtomicReference<>(); - - final FlowFile flowfile = session.get(); - - // We check if the protobuf.schemaPath property is defined in the flowfile - String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); - boolean compileSchema = processContext.getProperty(COMPILE_SCHEMA.getName()).asBoolean(); - - String messageType = flowfile.getAttribute("protobuf.messageType"); - - - if (protobufSchema == null && this.schema == null) { - getLogger().error("No schema path given, please fill in the " + PROTOBUF_SCHEMA.getName() + - " property, either at processor or flowfile level.."); - session.transfer(flowfile, INVALID_SCHEMA); - } else if (messageType == null) { - getLogger().error("Unable to find the message type in protobuf.messageType, unable to decode data."); - session.transfer(flowfile, ERROR); - } else { - - FlowFile outputFlowfile = session.write(flowfile, (InputStream in, OutputStream out) -> { - try { - // If the protobufSchema property is defined, we use the schema from the flowfile instead of the - // processor-wide one - if (protobufSchema == null) { - ProtobufService.encodeProtobuf(this.schema, messageType, in, out); - } else { - ProtobufService.encodeProtobuf(protobufSchema, compileSchema, messageType, in, out); - } - } catch (Descriptors.DescriptorValidationException e) { - getLogger().error("Invalid schema file: " + e.getMessage(), e); - error.set(INVALID_SCHEMA); - } catch (SchemaLoadingException | SchemaCompilationException e) { - getLogger().error(e.getMessage(), e); - error.set(INVALID_SCHEMA); - } catch (InterruptedException e) { - getLogger().error("Unable to compile schema: " + e.getMessage(), e); - error.set(ERROR); - } catch (Exception e) { - getLogger().error(e.getMessage(), e); - error.set(ERROR); - } - }); - - if (error.get() != null) { - session.transfer(flowfile, error.get()); - } else { - session.transfer(outputFlowfile, SUCCESS); - } - } - } -} diff --git a/src/main/java/com/github/whiver/nifi/service/ProtobufService.java b/src/main/java/com/github/whiver/nifi/service/ProtobufService.java index 61519a4..002b85b 100644 --- a/src/main/java/com/github/whiver/nifi/service/ProtobufService.java +++ b/src/main/java/com/github/whiver/nifi/service/ProtobufService.java @@ -40,17 +40,19 @@ public class ProtobufService { /** * Handle all the logic leading to the decoding of a Protobuf-encoded binary given a schema file path. - * @param schema Schema used to decode the binary data - * @param messageType Type of Protobuf Message - * @param encodedData Encoded data source - * @return A JSON representation of the data, contained in a Java String - * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON - * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid - * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema - * @throws MessageDecodingException Thrown when an error occurs during the binary decoding - * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + * + * @param schema Schema used to decode the binary data + * @param messageType Type of Protobuf Message + * @param encodedData Encoded data source + * @param preserveFieldNames whether to preserve original field names + * @return A JSON representation of the data, contained in a Java String + * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws MessageDecodingException Thrown when an error occurs during the binary decoding + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file */ - public static String decodeProtobuf(DynamicSchema schema, String messageType, InputStream encodedData) throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException { + public static String decodeProtobuf(DynamicSchema schema, String messageType, InputStream encodedData, boolean preserveFieldNames) throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException { Descriptors.Descriptor descriptor; DynamicMessage message; @@ -66,38 +68,94 @@ public static String decodeProtobuf(DynamicSchema schema, String messageType, In throw new MessageDecodingException(e); } - return JSONMapper.toJSON(message); + return JSONMapper.toJSON(message, preserveFieldNames); } + /** * Handle all the logic leading to the decoding of a Protobuf-encoded binary given a schema file path. - * @param pathToSchema Path to the .desc schema file on disk - * @param compileSchema true if the given schema is still in raw .proto format - * @param messageType Type of Protobuf Message - * @param encodedData Encoded data source - * @return A JSON representation of the data, contained in a Java String - * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON - * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid - * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema - * @throws MessageDecodingException Thrown when an error occurs during the binary decoding - * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + * + * @param schema Schema used to decode the binary data + * @param messageType Type of Protobuf Message + * @param encodedData Encoded data source + * @param preserveFieldNames whether to preserve original field names + * @return A JSON representation of the data, contained in a Java String + * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws MessageDecodingException Thrown when an error occurs during the binary decoding + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file */ - public static String decodeProtobuf(String pathToSchema, boolean compileSchema, String messageType, InputStream encodedData) throws IOException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException, InterruptedException, SchemaCompilationException { - return decodeProtobuf(SchemaParser.parseSchema(pathToSchema, compileSchema), messageType, encodedData); + public static String decodeProtobuf(DynamicSchema schema, String messageType, byte[] encodedData, boolean preserveFieldNames) throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException { + Descriptors.Descriptor descriptor; + DynamicMessage message; + + descriptor = schema.getMessageDescriptor(messageType); + + if (descriptor == null) { + throw new UnknownMessageTypeException(messageType); + } + + try { + message = DynamicMessage.parseFrom(descriptor, encodedData); + } catch (IOException e) { + throw new MessageDecodingException(e); + } + + return JSONMapper.toJSON(message, preserveFieldNames); + } + + /** + * Handle all the logic leading to the decoding of a Protobuf-encoded binary given a schema file path. + * + * @param pathToSchema Path to the .desc schema file on disk + * @param compileSchema true if the given schema is still in raw .proto format + * @param messageType Type of Protobuf Message + * @param preserveFieldNames whether to preserve original field names + * @param encodedData Encoded data source + * @return A JSON representation of the data, contained in a Java String + * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws MessageDecodingException Thrown when an error occurs during the binary decoding + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + */ + public static String decodeProtobuf(String pathToSchema, boolean compileSchema, String messageType, InputStream encodedData, boolean preserveFieldNames) throws IOException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException, InterruptedException, SchemaCompilationException { + return decodeProtobuf(SchemaParser.parseSchema(pathToSchema, compileSchema), messageType, encodedData, preserveFieldNames); + } + + /** + * Handle all the logic leading to the decoding of a Protobuf-encoded binary given a schema file path. + * + * @param pathToSchema Path to the .desc schema file on disk + * @param compileSchema true if the given schema is still in raw .proto format + * @param messageType Type of Protobuf Message + * @param preserveFieldNames whether to preserve original field names + * @param encodedData Encoded data source + * @return A JSON representation of the data, contained in a Java String + * @throws InvalidProtocolBufferException Thrown when an error occurs during the encoding of the decoded data into JSON + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws MessageDecodingException Thrown when an error occurs during the binary decoding + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + */ + public static String decodeProtobuf(String pathToSchema, boolean compileSchema, String messageType, byte[] encodedData, boolean preserveFieldNames) throws IOException, Descriptors.DescriptorValidationException, UnknownMessageTypeException, MessageDecodingException, SchemaLoadingException, InterruptedException, SchemaCompilationException { + return decodeProtobuf(SchemaParser.parseSchema(pathToSchema, compileSchema), messageType, encodedData, preserveFieldNames); } /** * Handle all the logic leading to the encoding of a Protobuf-encoded binary given a schema file path and a JSON * data file. - * @param schema Schema object to use to encode binary data - * @param messageType Type of Protobuf Message - * @param jsonData Data to encode, structured in a JSON format - * @param binaryOutput The stream where to output the encoded data - * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid - * @throws IOException Thrown when an errors occurs while parsing the JSON data - * @throws MessageEncodingException Thrown when an error occurs during the binary encoding - * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema - * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + * + * @param schema Schema object to use to encode binary data + * @param messageType Type of Protobuf Message + * @param jsonData Data to encode, structured in a JSON format + * @param binaryOutput The stream where to output the encoded data + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws IOException Thrown when an errors occurs while parsing the JSON data + * @throws MessageEncodingException Thrown when an error occurs during the binary encoding + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file */ public static void encodeProtobuf(DynamicSchema schema, String messageType, InputStream jsonData, OutputStream binaryOutput) throws Descriptors.DescriptorValidationException, IOException, MessageEncodingException, UnknownMessageTypeException, SchemaLoadingException { Descriptors.Descriptor descriptor; @@ -128,16 +186,17 @@ public static void encodeProtobuf(DynamicSchema schema, String messageType, Inpu /** * Handle all the logic leading to the encoding of a Protobuf-encoded binary given a schema file path and a JSON * data file. + * * @param pathToSchema Path to the .desc schema file on disk * @param compileSchema true if the given schema is still in raw .proto format * @param messageType Type of Protobuf Message * @param jsonData Data to encode, structured in a JSON format * @param binaryOutput The stream where to output the encoded data - * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid - * @throws IOException Thrown when an errors occurs while parsing the JSON data - * @throws MessageEncodingException Thrown when an error occurs during the binary encoding - * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema - * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file + * @throws Descriptors.DescriptorValidationException Thrown when the schema is invalid + * @throws IOException Thrown when an errors occurs while parsing the JSON data + * @throws MessageEncodingException Thrown when an error occurs during the binary encoding + * @throws UnknownMessageTypeException Thrown when the given message type is not contained in the schema + * @throws SchemaLoadingException Thrown when an error occurs while reading the schema file */ public static void encodeProtobuf(String pathToSchema, boolean compileSchema, String messageType, InputStream jsonData, OutputStream binaryOutput) throws Descriptors.DescriptorValidationException, IOException, MessageEncodingException, UnknownMessageTypeException, SchemaLoadingException, SchemaCompilationException, InterruptedException { encodeProtobuf(SchemaParser.parseSchema(pathToSchema, compileSchema), messageType, jsonData, binaryOutput); diff --git a/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index aea1beb..b094398 100644 --- a/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -1,2 +1,4 @@ -com.github.whiver.nifi.processor.ProtobufDecoder -com.github.whiver.nifi.processor.ProtobufEncoder \ No newline at end of file +com.github.whiver.nifi.processor.DecodeProtobuf +com.github.whiver.nifi.processor.EncodeProtobuf +com.github.whiver.nifi.processor.ConvertProtobufToAvro +com.github.whiver.nifi.processor.ConvertRecordToProtobuf \ No newline at end of file diff --git a/src/test/java/com/github/whiver/nifi/processor/ProtobufDecoderTest.java b/src/test/java/com/github/whiver/nifi/processor/DecodeProtobufTest.java similarity index 66% rename from src/test/java/com/github/whiver/nifi/processor/ProtobufDecoderTest.java rename to src/test/java/com/github/whiver/nifi/processor/DecodeProtobufTest.java index 94e4148..77502e4 100644 --- a/src/test/java/com/github/whiver/nifi/processor/ProtobufDecoderTest.java +++ b/src/test/java/com/github/whiver/nifi/processor/DecodeProtobufTest.java @@ -43,7 +43,7 @@ /** * A test class mocking a NiFi flow */ -public class ProtobufDecoderTest { +public class DecodeProtobufTest { private final String[] validTestFiles = {"AddressBook_basic", "AddressBook_several"}; /** @@ -52,16 +52,16 @@ public class ProtobufDecoderTest { */ @Test public void onTriggerDecodeValidFiles() throws IOException { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); // AddressBook test HashMap addressBookProperties = new HashMap<>(); - addressBookProperties.put("protobuf.schemaPath", ProtobufDecoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); - addressBookProperties.put("protobuf.messageType", "AddressBook"); + addressBookProperties.put(AbstractProtobufProcessor.PROTOBUF_SCHEMA.getName(), DecodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); + addressBookProperties.put(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE.getName(), "AddressBook"); // AddressBook test for (String filename: validTestFiles) { - InputStream jsonFile = ProtobufDecoderTest.class.getResourceAsStream("/data/" + filename + ".data"); + InputStream jsonFile = DecodeProtobufTest.class.getResourceAsStream("/data/" + filename + ".data"); addressBookProperties.put("testfile", filename); runner.enqueue(jsonFile, addressBookProperties); } @@ -74,7 +74,7 @@ public void onTriggerDecodeValidFiles() throws IOException { runner.assertQueueEmpty(); // Check if the data was processed without failure - List results = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS); + List results = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS); Assert.assertEquals("All flowfiles should be returned to success", validTestFiles.length, results.size()); // Check if the content of the flowfile is as expected @@ -93,21 +93,21 @@ public void onTriggerDecodeValidFiles() throws IOException { */ @Test public void onTriggerDecodeValidFilesWithSchemaAtProcessorLevel() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufDecoderTest.class.getResource("/schemas/Person.desc").getPath()); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "false"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, DecodeProtobufTest.class.getResource("/schemas/Person.desc").getPath()); - InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data"); + InputStream dataFile = DecodeProtobufTest.class.getResourceAsStream("/data/Person.data"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.messageType", "Person"); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE.getName(), "Person"); runner.enqueue(dataFile, personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); ObjectMapper mapper = new ObjectMapper(); JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json")); @@ -121,21 +121,21 @@ public void onTriggerDecodeValidFilesWithSchemaAtProcessorLevel() throws Excepti */ @Test public void onTriggerCompileFlowfileSchemaAndDecodeValidFiles() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true"); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "true"); - InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data"); + InputStream dataFile = DecodeProtobufTest.class.getResourceAsStream("/data/Person.data"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.schemaPath", ProtobufDecoderTest.class.getResource("/schemas/Person.proto").getPath()); - personProperties.put("protobuf.messageType", "Person"); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_SCHEMA.getName(), DecodeProtobufTest.class.getResource("/schemas/Person.proto").getPath()); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE.getName(), "Person"); runner.enqueue(dataFile, personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); ObjectMapper mapper = new ObjectMapper(); JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json")); @@ -149,21 +149,22 @@ public void onTriggerCompileFlowfileSchemaAndDecodeValidFiles() throws Exception */ @Test public void onTriggerCompileProcessorSchemaAndDecodeValidFiles() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufDecoderTest.class.getResource("/schemas/Person.proto").getPath()); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "true"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, DecodeProtobufTest.class.getResource("/schemas/Person.proto").getPath()); + //runner.setProperty(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE, "Person"); - InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data"); + InputStream dataFile = DecodeProtobufTest.class.getResourceAsStream("/data/Person.data"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.messageType", "Person"); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE.getName(), "Person"); runner.enqueue(dataFile, personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); ObjectMapper mapper = new ObjectMapper(); JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json")); @@ -177,21 +178,21 @@ public void onTriggerCompileProcessorSchemaAndDecodeValidFiles() throws Exceptio */ @Test public void onTriggerUsePerFlowfileSchemaIfAvailable() throws IOException { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); - runner.setProperty("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); + runner.setProperty("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); - InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data"); + InputStream dataFile = DecodeProtobufTest.class.getResourceAsStream("/data/Person.data"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.schemaPath", ProtobufDecoderTest.class.getResource("/schemas/Person.desc").getPath()); - personProperties.put("protobuf.messageType", "Person"); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_SCHEMA.getName(), DecodeProtobufTest.class.getResource("/schemas/Person.desc").getPath()); + personProperties.put(AbstractProtobufProcessor.PROTOBUF_MESSAGE_TYPE.getName(), "Person"); runner.enqueue(dataFile, personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); ObjectMapper mapper = new ObjectMapper(); JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json")); @@ -205,7 +206,7 @@ public void onTriggerUsePerFlowfileSchemaIfAvailable() throws IOException { */ @Test public void onPropertyModified() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder()); + TestRunner runner = TestRunners.newTestRunner(new DecodeProtobuf()); HashMap addressBookProperties = new HashMap<>(); addressBookProperties.put("protobuf.messageType", "AddressBook"); @@ -215,19 +216,19 @@ public void onPropertyModified() throws Exception { First try to decode using a schema set in a processor property */ - runner.setProperty("protobuf.schemaPath", ProtobufDecoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA.getName(), DecodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); runner.assertValid(); - runner.enqueue(ProtobufDecoderTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); + runner.enqueue(DecodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); // Finally check the content - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); ObjectMapper mapper = new ObjectMapper(); JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/AddressBook_basic.json")); JsonNode given = mapper.readTree(runner.getContentAsByteArray(result)); @@ -239,17 +240,17 @@ public void onPropertyModified() throws Exception { */ runner.clearTransferState(); - runner.removeProperty(runner.getProcessor().getPropertyDescriptor("protobuf.schemaPath")); - Assert.assertFalse("The schema property should now be null", runner.getProcessContext().getProperty("protobuf.schemaPath").isSet()); + runner.removeProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA); + Assert.assertFalse("The schema property should now be null", runner.getProcessContext().getProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA).isSet()); runner.assertValid(); - runner.enqueue(ProtobufDecoderTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); + runner.enqueue(DecodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.INVALID_SCHEMA); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.INVALID_SCHEMA); /* @@ -257,19 +258,19 @@ public void onPropertyModified() throws Exception { */ runner.clearTransferState(); - runner.setProperty("protobuf.schemaPath", ProtobufDecoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, DecodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); runner.assertValid(); - runner.enqueue(ProtobufDecoderTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); + runner.enqueue(DecodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.data"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); // Finally check the content - result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); + result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); expected = mapper.readTree(this.getClass().getResourceAsStream("/data/AddressBook_basic.json")); given = mapper.readTree(runner.getContentAsByteArray(result)); Assert.assertEquals("The parsing result of AddressBook_basic.data is not as expected", expected, given); diff --git a/src/test/java/com/github/whiver/nifi/processor/ProtobufEncoderTest.java b/src/test/java/com/github/whiver/nifi/processor/EncodeProtobufTest.java similarity index 60% rename from src/test/java/com/github/whiver/nifi/processor/ProtobufEncoderTest.java rename to src/test/java/com/github/whiver/nifi/processor/EncodeProtobufTest.java index a4c67af..121fcfd 100644 --- a/src/test/java/com/github/whiver/nifi/processor/ProtobufEncoderTest.java +++ b/src/test/java/com/github/whiver/nifi/processor/EncodeProtobufTest.java @@ -37,7 +37,7 @@ import java.util.HashMap; import java.util.List; -public class ProtobufEncoderTest { +public class EncodeProtobufTest { private final String[] validTestFiles = {"AddressBook_basic", "AddressBook_several"}; @@ -46,9 +46,9 @@ public class ProtobufEncoderTest { */ @Test public void initDefaultPropertiesValues() { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - Assert.assertFalse("Default value for COMPILE_SCHEMA should be false", runner.getProcessContext().getProperty(ProtobufProcessor.COMPILE_SCHEMA.getName()).asBoolean()); - Assert.assertNull("Default value for PROTOBUF_SCHEMA should be null", runner.getProcessContext().getProperty(ProtobufProcessor.PROTOBUF_SCHEMA).getValue()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + Assert.assertFalse("Default value for COMPILE_SCHEMA should be false", runner.getProcessContext().getProperty(AbstractProtobufProcessor.COMPILE_SCHEMA.getName()).asBoolean()); + Assert.assertNull("Default value for PROTOBUF_SCHEMA should be null", runner.getProcessContext().getProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA).getValue()); } /** @@ -57,15 +57,15 @@ public void initDefaultPropertiesValues() { */ @Test public void onTriggerEncodeValidFiles() throws IOException { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); HashMap adressBookProperties = new HashMap<>(); - adressBookProperties.put("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + adressBookProperties.put("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); adressBookProperties.put("protobuf.messageType", "AddressBook"); // AddressBook test for (String filename: validTestFiles) { - InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/" + filename + ".json"); + InputStream jsonFile = EncodeProtobufTest.class.getResourceAsStream("/data/" + filename + ".json"); adressBookProperties.put("testfile", filename); runner.enqueue(jsonFile, adressBookProperties); } @@ -78,12 +78,12 @@ public void onTriggerEncodeValidFiles() throws IOException { runner.assertQueueEmpty(); // Check if the data was processed without failure - List results = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS); + List results = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS); Assert.assertEquals("All flowfiles should be returned to success", validTestFiles.length, results.size()); // Check if the content of the flowfile is as expected for (MockFlowFile result: results) { - result.assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/" + result.getAttribute("testfile") + ".data")); + result.assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/" + result.getAttribute("testfile") + ".data")); } } @@ -93,11 +93,11 @@ public void onTriggerEncodeValidFiles() throws IOException { */ @Test public void onTriggerEncodeValidFilesWithSchemaAtProcessorLevel() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.desc").getPath()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "false"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, EncodeProtobufTest.class.getResource("/schemas/Person.desc").getPath()); - InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"); + InputStream jsonFile = EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"); HashMap personProperties = new HashMap<>(); personProperties.put("protobuf.messageType", "Person"); runner.enqueue(jsonFile, personProperties); @@ -106,10 +106,10 @@ public void onTriggerEncodeValidFilesWithSchemaAtProcessorLevel() throws Excepti runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS); - List results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS); + runner.assertAllFlowFilesTransferred(EncodeProtobuf.SUCCESS); + List results = runner.getFlowFilesForRelationship(EncodeProtobuf.SUCCESS); Assert.assertEquals("The Person flowfile should be returned to success", 1, results.size()); - results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data")); + results.get(0).assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/Person.data")); } /** @@ -118,12 +118,12 @@ public void onTriggerEncodeValidFilesWithSchemaAtProcessorLevel() throws Excepti */ @Test public void onTriggerCompileFlowfileSchemaAndEncodeValidFiles() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true"); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "true"); - InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"); + InputStream jsonFile = EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/Person.proto").getPath()); + personProperties.put("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/Person.proto").getPath()); personProperties.put("protobuf.messageType", "Person"); runner.enqueue(jsonFile, personProperties); @@ -131,10 +131,10 @@ public void onTriggerCompileFlowfileSchemaAndEncodeValidFiles() throws Exception runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS); - List results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS); + runner.assertAllFlowFilesTransferred(EncodeProtobuf.SUCCESS); + List results = runner.getFlowFilesForRelationship(EncodeProtobuf.SUCCESS); Assert.assertEquals("The Person flowfile should be returned to success", 1, results.size()); - results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data")); + results.get(0).assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/Person.data")); } /** @@ -143,11 +143,11 @@ public void onTriggerCompileFlowfileSchemaAndEncodeValidFiles() throws Exception */ @Test public void onTriggerCompileProcessorSchemaAndEncodeValidFiles() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.proto").getPath()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "true"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, EncodeProtobufTest.class.getResource("/schemas/Person.proto").getPath()); - InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"); + InputStream jsonFile = EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"); HashMap personProperties = new HashMap<>(); personProperties.put("protobuf.messageType", "Person"); runner.enqueue(jsonFile, personProperties); @@ -156,10 +156,10 @@ public void onTriggerCompileProcessorSchemaAndEncodeValidFiles() throws Exceptio runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS); - List results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS); + runner.assertAllFlowFilesTransferred(EncodeProtobuf.SUCCESS); + List results = runner.getFlowFilesForRelationship(EncodeProtobuf.SUCCESS); Assert.assertEquals("The Person flowfile should be returned to success", 1, results.size()); - results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data")); + results.get(0).assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/Person.data")); } /** @@ -168,12 +168,12 @@ public void onTriggerCompileProcessorSchemaAndEncodeValidFiles() throws Exceptio */ @Test public void onTriggerUsePerFlowfileSchemaIfAvailable() throws IOException { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - runner.setProperty("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + runner.setProperty("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); - InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"); + InputStream jsonFile = EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"); HashMap personProperties = new HashMap<>(); - personProperties.put("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/Person.desc").getPath()); + personProperties.put("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/Person.desc").getPath()); personProperties.put("protobuf.messageType", "Person"); runner.enqueue(jsonFile, personProperties); @@ -181,10 +181,10 @@ public void onTriggerUsePerFlowfileSchemaIfAvailable() throws IOException { runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS); - List results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS); + runner.assertAllFlowFilesTransferred(EncodeProtobuf.SUCCESS); + List results = runner.getFlowFilesForRelationship(EncodeProtobuf.SUCCESS); Assert.assertEquals("The encoder should use the schema from flowfile instead of processor if given", 1, results.size()); - results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data")); + results.get(0).assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/Person.data")); } /** @@ -193,7 +193,7 @@ public void onTriggerUsePerFlowfileSchemaIfAvailable() throws IOException { */ @Test public void onPropertyModified() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); HashMap addressBookProperties = new HashMap<>(); addressBookProperties.put("protobuf.messageType", "AddressBook"); @@ -203,20 +203,20 @@ public void onPropertyModified() throws Exception { First try to decode using a schema set in a processor property */ - runner.setProperty("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + runner.setProperty("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); runner.assertValid(); - runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); + runner.enqueue(EncodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); // Finally check the content - MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); - result.assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.data")); + MockFlowFile result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); + result.assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.data")); /* @@ -228,13 +228,13 @@ public void onPropertyModified() throws Exception { Assert.assertFalse("The schema property should now be null", runner.getProcessContext().getProperty("protobuf.schemaPath").isSet()); runner.assertValid(); - runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); + runner.enqueue(EncodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.INVALID_SCHEMA); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.INVALID_SCHEMA); /* @@ -242,20 +242,20 @@ public void onPropertyModified() throws Exception { */ runner.clearTransferState(); - runner.setProperty("protobuf.schemaPath", ProtobufEncoderTest.class.getResource("/schemas/AddressBook.desc").getPath()); + runner.setProperty("protobuf.schemaPath", EncodeProtobufTest.class.getResource("/schemas/AddressBook.desc").getPath()); runner.assertValid(); - runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); + runner.enqueue(EncodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.json"), addressBookProperties); runner.run(1); // Check if the flowfile has been successfully processed runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS); + runner.assertAllFlowFilesTransferred(DecodeProtobuf.SUCCESS); // Finally check the content - result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0); - result.assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.data")); + result = runner.getFlowFilesForRelationship(DecodeProtobuf.SUCCESS).get(0); + result.assertContentEquals(EncodeProtobufTest.class.getResourceAsStream("/data/AddressBook_basic.data")); } @@ -264,29 +264,29 @@ public void onPropertyModified() throws Exception { */ @Test public void onPropertyModifiedEncodeFileUsingSchemaAtProcessorLevel() { - TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder()); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.proto").getPath()); + TestRunner runner = TestRunners.newTestRunner(new EncodeProtobuf()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "true"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, EncodeProtobufTest.class.getResource("/schemas/Person.proto").getPath()); HashMap personProperties = new HashMap<>(); personProperties.put("protobuf.messageType", "Person"); - runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"), personProperties); + runner.enqueue(EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"), personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false"); - runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.desc").getPath()); + runner.setProperty(AbstractProtobufProcessor.COMPILE_SCHEMA, "false"); + runner.setProperty(AbstractProtobufProcessor.PROTOBUF_SCHEMA, EncodeProtobufTest.class.getResource("/schemas/Person.desc").getPath()); - runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"), personProperties); + runner.enqueue(EncodeProtobufTest.class.getResourceAsStream("/data/Person.json"), personProperties); runner.assertValid(); runner.run(1); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS); - List results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS); + runner.assertAllFlowFilesTransferred(EncodeProtobuf.SUCCESS); + List results = runner.getFlowFilesForRelationship(EncodeProtobuf.SUCCESS); Assert.assertEquals("The 2 flowfiles should be returned to success", 2, results.size()); } } \ No newline at end of file