Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
22b4522
upgraded protobuf & nifi libraries
benyaa Aug 8, 2019
65640a5
1. Changed name to AbstractProtobufProcessor so it would fit the conv…
benyaa Aug 8, 2019
4331d3a
Updated tests to use properties by their variables.
benyaa Aug 8, 2019
6b8ab36
Added docs and added expression evaluation to compile schema
benyaa Aug 8, 2019
749ed5d
Renamed processors to match NiFi conventinos
benSolar Mar 31, 2020
8ee2570
added validation for data arrival
benyaa Mar 31, 2020
fc78e25
Merge branch 'master' into develop
benyaa Mar 31, 2020
a27324e
Merge pull request #4 from benyaa/develop
whiver May 13, 2020
c27c6f1
removed unnecessary schema recompile(since we do it as soon as it is …
benyaa Dec 15, 2021
87df7ae
removed unnecessary parameters
benyaa Dec 15, 2021
0e77cff
Fixed error handling to not drop flowfile content on error
benyaa Dec 15, 2021
8bce8c4
added new processor to convert protobuf to avro format
benyaa Dec 15, 2021
f80158f
added new processor to list
benyaa Dec 15, 2021
021d37d
removed unused imports
benyaa Dec 15, 2021
fd71a54
switched to using a StreamDemarcator so not all messages would be loa…
benyaa Dec 16, 2021
9dbeb05
added option to encode json array to demarcated protobuf messages
benyaa Dec 21, 2021
a5f7d35
fixed error handling in ConvertProtobufToAvro
benyaa Dec 23, 2021
744110d
fixed thread synchronization
benyaa Dec 23, 2021
08db46e
added basic support for avro to protobuf
benyaa Jul 18, 2022
2fa74b9
changed ConvertAvroToProtobuf to ConvertRecordToProtobuf to support a…
benyaa Jul 19, 2022
2420c84
added javadoc
benyaa Jul 19, 2022
5d635d7
added support for single-field MESSAGE fields
benyaa Jul 20, 2022
1d02858
Merge branch 'whiver:develop' into develop
benyaa Jul 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ and everything should be fine ! :)
See the installation section to learn how to integrate this processor in Apache NiFi.
This projects add 2 different new processors in NiFi:

- `ProtobufDecoder`, which **decodes** a Protobuf-encoded payload to different kind of structured formats ;
- `ProtobufEncoder`, which **encodes** a payload in a structured format using a Protobuf schema.
- `DecodeProtobuf`, which **decodes** a Protobuf-encoded payload to different kind of structured formats.
- `EncodeProtobuf`, which **encodes** a payload in a structured format using a Protobuf schema.
- `ConvertProtobufToAvro` which decodes a Protobuf-encoded payload into Avro.

### Specifying the schema file
In both processors, you have to specify a schema file to use for data encoding/decoding. You can do so either
Expand Down
38 changes: 32 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<nifi.version>1.4.0</nifi.version>
<nifi.version>1.16.3</nifi.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.0</version>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.5.0</version>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand All @@ -52,7 +52,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>${nifi.version}</version>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand All @@ -66,19 +66,45 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-protobuf</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi.version}</version>
<type>nar</type>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
<version>1.3.1</version>
<extensions>true</extensions>
</plugin>
<plugin>
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/github/whiver/nifi/mapper/JSONMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ public class JSONMapper {
/**
* Format a Protocol Buffers Message to a JSON string
* @param data The Message to be formatted
* @param preserveFieldNames whether to preserve original field names
* @return A JSON String representing the data
* @throws InvalidProtocolBufferException Thrown in case of invalid Message data
*/
public static String toJSON(Message data) throws InvalidProtocolBufferException {
public static String toJSON(Message data, boolean preserveFieldNames) throws InvalidProtocolBufferException {
JsonFormat.Printer printer = JsonFormat.printer();
if (preserveFieldNames) {
printer = printer.preservingProtoFieldNames();
}
return printer.print(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@
import com.github.whiver.nifi.exception.SchemaLoadingException;
import com.github.whiver.nifi.parser.SchemaParser;
import com.google.protobuf.Descriptors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
Expand All @@ -41,7 +49,16 @@
import java.io.IOException;
import java.util.*;

public abstract class ProtobufProcessor extends AbstractProcessor {
import static org.apache.nifi.components.Validator.VALID;


@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes({
@ReadsAttribute(attribute = "protobuf.schemaPath", description = "will use this attribute as default for schema path."),
@ReadsAttribute(attribute = "protobuf.messageType", description = "will use this attribute as default for message type.")
})
@Tags({"Protobuf", "decoder", "Google Protocol Buffer"})
public abstract class AbstractProtobufProcessor extends AbstractProcessor {
/**
* NiFi properties of the processor, that can be configured using the Web UI
*/
Expand All @@ -56,26 +73,39 @@ public abstract class ProtobufProcessor extends AbstractProcessor {
* The compiled descriptor used to parse incoming binaries in case where the schema has been specified in the
* processor level (protobuf.schema property)
*/
protected DynamicSchema schema;

/**
* Reflects the value of the COMPILE_SCHEMA property, so that it can be used by the onPropertyModified method
*/
private boolean compileSchema;

protected static DynamicSchema schema = null;

/* PROPERTIES */

static final PropertyDescriptor PROTOBUF_SCHEMA = new PropertyDescriptor.Builder()
.name("protobuf.schemaPath")
.displayName("Schema path")
.displayName("Schema Path")
.required(false)
.description("Path to the Protocol Buffers schema to use to encode or decode the data. If set, this schema will " +
"be used when the flowfile protobuf.schemaPath is missing.")
.expressionLanguageSupported(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.createURLorFileValidator())
.build();

static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder()
.name("demarcator")
.displayName("Demarcator")
.required(false)
.description("This property is used to produce/consume messages separated by a demarcator")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();

static final PropertyDescriptor PROTOBUF_MESSAGE_TYPE = new PropertyDescriptor.Builder()
.name("protobuf.messageType")
.displayName("Message Type")
.required(false)
.description("Path to the Protocol Buffers message type to use to encode or decode the data. If set, this message type will " +
"be used when the flowfile protobuf.messageType is missing.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(VALID)
.build();

static final PropertyDescriptor COMPILE_SCHEMA = new PropertyDescriptor.Builder()
.name("protobuf.compileSchema")
.displayName("Compile schema")
Expand All @@ -85,6 +115,17 @@ public abstract class ProtobufProcessor extends AbstractProcessor {
"processing the data. It is useful if the given schema file is in .proto format. Try to always use " +
"precompiled .desc schema whenever possible, since it is more performant.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("max_message_size")
.displayName("Max Message Size")
.required(true)
.description("Maximum size of a single message when using a demarcator.")
.defaultValue("1 MB")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();


Expand All @@ -106,9 +147,10 @@ public abstract class ProtobufProcessor extends AbstractProcessor {
.build();

@Override
public void init(final ProcessorInitializationContext context){
public void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PROTOBUF_SCHEMA);
properties.add(PROTOBUF_MESSAGE_TYPE);
properties.add(COMPILE_SCHEMA);
this.properties = Collections.unmodifiableList(properties);

Expand All @@ -117,33 +159,25 @@ public void init(final ProcessorInitializationContext context){
relationships.add(INVALID_SCHEMA);
relationships.add(ERROR);
this.relationships = Collections.unmodifiableSet(relationships);

this.compileSchema = false;
}



/**
* Compile the given schema file when the protobuf.schemaPath property is given
*
* @see AbstractProcessor
*/
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);

if (descriptor == PROTOBUF_SCHEMA) {

// If the property is unset, just delete the existing descriptor
if (newValue == null || newValue.isEmpty()) {
this.schema = null;
return;
}

if (!newValue.equals(oldValue)) {
@OnScheduled
public void setUpSchema(final ProcessContext processContext) {
String protoSchemaPath = processContext.getProperty(PROTOBUF_SCHEMA).evaluateAttributeExpressions().getValue();
// If the property is unset, just delete the existing descriptor
if (protoSchemaPath == null || protoSchemaPath.isEmpty()) {
this.schema = null;
} else {
try {
this.schema = SchemaParser.parseSchema(newValue, this.compileSchema);
this.schema = SchemaParser.parseSchema(protoSchemaPath, processContext.getProperty(COMPILE_SCHEMA).evaluateAttributeExpressions().asBoolean());
} catch (FileNotFoundException e) {
getLogger().error("File " + newValue + " not found on the disk.", e);
getLogger().error("File " + protoSchemaPath + " not found on the disk.", e);
} catch (Descriptors.DescriptorValidationException e) {
getLogger().error("Invalid schema file: " + e.getMessage(), e);
} catch (IOException e) {
Expand All @@ -154,9 +188,6 @@ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, S
getLogger().error("Unable to compile schema: " + e.getMessage(), e);
}
}
} else if (descriptor == COMPILE_SCHEMA) {
this.compileSchema = Boolean.parseBoolean(newValue);
}
}

@Override
Expand Down
Loading