From 5894de5e05514b06a1840ab1f8756107229c08c5 Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 11 Jul 2023 20:02:57 +0530 Subject: [PATCH 01/13] feat: add inputSchemaType to Message --- .../firehose/consumer/kafka/FirehoseKafkaConsumer.java | 3 ++- .../java/com/gotocompany/firehose/message/Message.java | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java index ec3763e01..256cdbd5c 100644 --- a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java +++ b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java @@ -57,9 +57,10 @@ public List readMessages() { List messages = new ArrayList<>(); for (ConsumerRecord record : records) { - messages.add(new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis())); + messages.add(new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis(), this.consumerConfig.getInputSchemaType())); firehoseInstrumentation.logDebug("Pulled record: {}", record); } + return messages; } diff --git a/src/main/java/com/gotocompany/firehose/message/Message.java b/src/main/java/com/gotocompany/firehose/message/Message.java index 0ba03f14f..5f9dbb7c7 100644 --- a/src/main/java/com/gotocompany/firehose/message/Message.java +++ b/src/main/java/com/gotocompany/firehose/message/Message.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.message; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DefaultException; import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; @@ -30,6 +31,8 @@ public class Message { @Setter private ErrorInfo errorInfo; + private InputSchemaType inputSchemaType; + public void setDefaultErrorIfNotPresent() { if (errorInfo == null) { errorInfo = new ErrorInfo(new DefaultException("DEFAULT"), ErrorType.DEFAULT_ERROR); @@ -65,8 +68,8 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo * @param timestamp * @param consumeTimestamp */ - public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp) { - this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null); + public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp, InputSchemaType inputSchemaType) { + this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null, inputSchemaType); } public Message(Message message, ErrorInfo errorInfo) { @@ -78,7 +81,8 @@ public Message(Message message, ErrorInfo errorInfo) { message.getHeaders(), message.getTimestamp(), message.getConsumeTimestamp(), - errorInfo); + errorInfo, + message.getInputSchemaType()); } /** From c274db8da23c814b6ff73cf5b380f2ff83f9dc54 Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 11 Jul 2023 21:44:40 +0530 Subject: [PATCH 02/13] feat: serde for json input message type for http sink --- .../firehose/serializer/MessageToJson.java | 9 +++++++++ .../serializer/MessageToTemplatizedJson.java | 20 +++++++++++++++---- .../sink/http/factory/SerializerFactory.java | 5 +++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java index 69daeaa38..d10d0ad35 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.exception.DeserializerException; import com.google.gson.ExclusionStrategy; @@ -17,6 +18,7 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.HashMap; @@ -55,6 +57,13 @@ public String serialize(Message message) throws DeserializerException { JSONObject jsonObject = new JSONObject(); jsonObject.put("topic", message.getTopic()); + if (message.getInputSchemaType() == InputSchemaType.JSON) { + JSONParser parser = new JSONParser(); + JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + jsonObject.put("logMessage", json); + return jsonObject.toJSONString(); + } + if (message.getLogKey() != null && message.getLogKey().length != 0) { DynamicMessage key = protoParser.parse(message.getLogKey()); jsonObject.put("logKey", this.gson.toJson(convertDynamicMessageToJson(key))); diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java index ee42fb0e5..27ac68d71 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.exception.ConfigurationException; @@ -12,9 +13,11 @@ import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import com.gotocompany.stencil.Parser; +import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,9 +80,18 @@ public String serialize(Message message) throws DeserializerException { try { String jsonMessage; String jsonString; - // only supports messages not keys - DynamicMessage msg = protoParser.parse(message.getLogMessage()); - jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg); + + if (message.getInputSchemaType() == InputSchemaType.JSON) { + JSONParser parser = new JSONParser(); + JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + jsonMessage = json.toJSONString(); + } else { + // only supports messages not keys + DynamicMessage msg = protoParser.parse(message.getLogMessage()); + jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg); + } + + String finalMessage = httpSinkJsonBodyTemplate; for (String path : pathsToReplace) { if (path.equals(ALL_FIELDS_FROM_TEMPLATE)) { @@ -91,7 +103,7 @@ public String serialize(Message message) throws DeserializerException { finalMessage = finalMessage.replace(path, jsonString); } return finalMessage; - } catch (InvalidProtocolBufferException | PathNotFoundException e) { + } catch (InvalidProtocolBufferException | ParseException | PathNotFoundException e) { throw new DeserializerException(e.getMessage()); } } diff --git a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java index 923d08471..cb6c6f325 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java @@ -27,6 +27,11 @@ public MessageSerializer build() { if (isProtoSchemaEmpty() || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) { firehoseInstrumentation.logDebug("Serializer type: JsonWrappedProtoByte"); // Fallback to json wrapped proto byte + + // todo(sushmith): + // here output is proto, but input expected is also proto. + // need to have json as input and proto as output. + // this is currently not possible because of the way we are using the parser. return new JsonWrappedProtoByte(); } From 4e5ef8692f9d8a19dd3164756a7afe78b872d3eb Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 11 Jul 2023 22:23:21 +0530 Subject: [PATCH 03/13] feat: add setter for inputSchemaType --- .../consumer/kafka/FirehoseKafkaConsumer.java | 4 ++- .../gotocompany/firehose/message/Message.java | 25 +++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java index 256cdbd5c..ce103fcfa 100644 --- a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java +++ b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java @@ -57,7 +57,9 @@ public List readMessages() { List messages = new ArrayList<>(); for (ConsumerRecord record : records) { - messages.add(new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis(), this.consumerConfig.getInputSchemaType())); + Message msg = new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis()); + msg.setInputSchemaType(consumerConfig.getInputSchemaType()); + messages.add(msg); firehoseInstrumentation.logDebug("Pulled record: {}", record); } diff --git a/src/main/java/com/gotocompany/firehose/message/Message.java b/src/main/java/com/gotocompany/firehose/message/Message.java index 5f9dbb7c7..04623e85b 100644 --- a/src/main/java/com/gotocompany/firehose/message/Message.java +++ b/src/main/java/com/gotocompany/firehose/message/Message.java @@ -30,7 +30,7 @@ public class Message { private long consumeTimestamp; @Setter private ErrorInfo errorInfo; - + @Setter private InputSchemaType inputSchemaType; public void setDefaultErrorIfNotPresent() { @@ -57,7 +57,7 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo } /** - * Instantiates a new Message without providing errorType. + * Instantiates a new Message without providing errorType and inputSchemaType. * * @param logKey * @param logMessage @@ -68,8 +68,8 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo * @param timestamp * @param consumeTimestamp */ - public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp, InputSchemaType inputSchemaType) { - this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null, inputSchemaType); + public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp) { + this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null, InputSchemaType.PROTOBUF); } public Message(Message message, ErrorInfo errorInfo) { @@ -82,7 +82,22 @@ public Message(Message message, ErrorInfo errorInfo) { message.getTimestamp(), message.getConsumeTimestamp(), errorInfo, - message.getInputSchemaType()); + message.getInputSchemaType() + ); + } + + public Message(Message message, ErrorInfo errorInfo, InputSchemaType inputSchemaType) { + this(message.getLogKey(), + message.getLogMessage(), + message.getTopic(), + message.getPartition(), + message.getOffset(), + message.getHeaders(), + message.getTimestamp(), + message.getConsumeTimestamp(), + errorInfo, + inputSchemaType + ); } /** From f9be379ecbb8935a1857670f5add04c4edb8e548 Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 11 Jul 2023 22:23:41 +0530 Subject: [PATCH 04/13] test: fix tests --- .../firehose/sink/dlq/LogDlqWriterTest.java | 11 ++++---- .../blobstorage/BlobStorageDlqWriterTest.java | 25 ++++++++++--------- .../sinkdecorator/SinkWithDlqTest.java | 7 +++--- .../SinkWithFailHandlerTest.java | 5 ++-- .../sinkdecorator/SinkWithRetryTest.java | 6 +++-- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java index 5381b9162..03995c36d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java @@ -2,6 +2,7 @@ import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.dlq.log.LogDlqWriter; @@ -35,7 +36,7 @@ public void setUp() throws Exception { @Test public void shouldWriteMessagesToLog() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); @@ -51,7 +52,7 @@ public void shouldWriteMessagesToLog() throws IOException { @Test public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String value = new String(message.getLogMessage()); ErrorInfo errorInfo = message.getErrorInfo(); @@ -66,7 +67,7 @@ public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); ErrorInfo errorInfo = message.getErrorInfo(); @@ -81,7 +82,7 @@ public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null, InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); @@ -95,7 +96,7 @@ public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index c55d6eed2..1c8c256eb 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -2,6 +2,7 @@ import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage; @@ -37,12 +38,12 @@ public void setUp() throws Exception { @Test public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); @@ -58,12 +59,12 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept @Test public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new NullPointerException(), ErrorType.SINK_UNKNOWN_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new NullPointerException(), ErrorType.SINK_UNKNOWN_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.SINK_UNKNOWN_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.SINK_UNKNOWN_ERROR), InputSchemaType.PROTOBUF); List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); @@ -79,12 +80,12 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl @Test public void shouldThrowIOExceptionWhenWriteFileThrowIOException() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); doThrow(new BlobStorageException("", "", new IOException())).when(blobStorage).store(anyString(), any(byte[].class)); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java index f6c0f8e4d..59a27e11b 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java @@ -2,6 +2,7 @@ import com.gotocompany.firehose.config.DlqConfig; import com.gotocompany.firehose.config.ErrorConfig; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.error.ErrorHandler; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; @@ -180,9 +181,9 @@ public void shouldNotThrowIOExceptionWhenFailOnMaxRetryAttemptDisabled() throws @Test public void shouldCommitOffsetsOfDlqMessagesWhenSinkManageOffset() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); ArrayList messages = new ArrayList<>(); messages.add(message1); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java index 2f7fd3ef6..8530f5e0f 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.sinkdecorator; import com.gotocompany.firehose.config.ErrorConfig; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.error.ErrorHandler; import com.gotocompany.firehose.exception.SinkException; import com.gotocompany.firehose.message.Message; @@ -37,7 +38,7 @@ public void shouldThrowIOExceptionWhenMessageContainsConfiguredError() throws IO List messages = new LinkedList<>(); messages.add(new Message("".getBytes(), "".getBytes(), "basic", 1, 1, null, 0, 0, - new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR))); + new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF)); when(sink.pushMessage(anyList())).thenReturn(messages); @@ -54,7 +55,7 @@ public void shouldNotThrowIOExceptionWhenConfigIsNotSet() throws IOException { List messages = new LinkedList<>(); messages.add(new Message("".getBytes(), "".getBytes(), "basic", 1, 1, null, 0, 0, - new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR))); + new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF)); when(sink.pushMessage(anyList())).thenReturn(messages); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java index 44d23f96c..37d9a42a1 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java @@ -231,11 +231,13 @@ public void shouldThrowIOExceptionWhenExceedMaximumRetryAttempts() throws IOExce @Test public void shouldRetryMessagesWhenErrorTypesConfigured() throws IOException { - Message messageWithError = new Message("key".getBytes(), "value".getBytes(), "topic", 1, 1, null, 0, 0, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + InputSchemaType inputSchemaType = InputSchemaType.PROTOBUF; + ErrorInfo errorInfo = new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR); + Message messageWithError = new Message("key".getBytes(), "value".getBytes(), "topic", 1, 1, null, 0, 0, errorInfo, inputSchemaType); ArrayList messages = new ArrayList<>(); messages.add(messageWithError); - messages.add(new Message(message, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR))); + messages.add(new Message(message, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR), inputSchemaType)); when(sinkDecorator.pushMessage(anyList())).thenReturn(messages).thenReturn(new LinkedList<>()); HashSet errorTypes = new HashSet<>(); From ca37f752a5238bbff4cc48175dc27948c6e373c4 Mon Sep 17 00:00:00 2001 From: sushmith Date: Wed, 12 Jul 2023 10:48:36 +0530 Subject: [PATCH 05/13] refactor: reuse jsonParser --- .../firehose/serializer/MessageToTemplatizedJson.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java index 27ac68d71..e751dbb58 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java @@ -82,8 +82,7 @@ public String serialize(Message message) throws DeserializerException { String jsonString; if (message.getInputSchemaType() == InputSchemaType.JSON) { - JSONParser parser = new JSONParser(); - JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + JSONObject json = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); jsonMessage = json.toJSONString(); } else { // only supports messages not keys From 086057129223e04a72324d2acf0dfa4cc7f06aef Mon Sep 17 00:00:00 2001 From: sushmith Date: Thu, 3 Aug 2023 12:39:10 +0530 Subject: [PATCH 06/13] fix: add key to message --- .../com/gotocompany/firehose/serializer/MessageToJson.java | 6 +++++- .../firehose/serializer/MessageToTemplatizedJson.java | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java index d10d0ad35..8131f4b13 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java @@ -60,7 +60,11 @@ public String serialize(Message message) throws DeserializerException { if (message.getInputSchemaType() == InputSchemaType.JSON) { JSONParser parser = new JSONParser(); JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); - jsonObject.put("logMessage", json); + jsonObject.put("logMessage", gson.toJson(json)); + if (message.getLogKey() != null && message.getLogKey().length != 0) { + jsonObject.put("logKey", new String(message.getLogKey(), StandardCharsets.UTF_8)); + } + return jsonObject.toJSONString(); } diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java index e751dbb58..2391cd495 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java @@ -90,7 +90,6 @@ public String serialize(Message message) throws DeserializerException { jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg); } - String finalMessage = httpSinkJsonBodyTemplate; for (String path : pathsToReplace) { if (path.equals(ALL_FIELDS_FROM_TEMPLATE)) { From 80398d50325285c86eb5282230d26d2592b6c96b Mon Sep 17 00:00:00 2001 From: sushmith Date: Thu, 3 Aug 2023 12:39:21 +0530 Subject: [PATCH 07/13] test: add tests --- .../serializer/MessageToJsonTest.java | 53 +++++++++++++++ .../MessageToTemplatizedJsonTest.java | 67 ++++++++++++++++++- 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java index 4d03e9cb3..0025b1023 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java @@ -1,5 +1,6 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.consumer.TestAggregatedSupplyMessage; @@ -9,6 +10,8 @@ import org.junit.Before; import org.junit.Test; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Base64; import static org.junit.Assert.assertEquals; @@ -16,6 +19,8 @@ public class MessageToJsonTest { private String logMessage; private String logKey; + private String logMessageJSONString; + private String logKeyJSONString; private Parser protoParser; @Before @@ -24,6 +29,20 @@ public void setUp() { protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; + + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; + logKeyJSONString = "sample-key1"; + } + + public byte[] stringToByteArray(String inputString) { + return StandardCharsets.UTF_8.encode(inputString).array(); } @Test @@ -42,6 +61,23 @@ public void shouldProperlySerializeEsbMessage() throws DeserializerException { + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"}"); } + + @Test + public void shouldProperlySerializeJsonInputMessage() throws DeserializerException { + MessageToJson messageToJson = new MessageToJson(protoParser, false, true); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedOutput = "{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + + "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + + "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\"," + + "\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\",\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"," + + "\"topic\":\"sample-topic\",\"logKey\":\"sample-key1\"}"; + + String actualOutput = messageToJson.serialize(message); + assertEquals(expectedOutput, actualOutput); + } + @Test public void shouldSerializeWhenKeyIsMissing() throws DeserializerException { MessageToJson messageToJson = new MessageToJson(protoParser, false, true); @@ -55,6 +91,21 @@ public void shouldSerializeWhenKeyIsMissing() throws DeserializerException { + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}", actualOutput); } + @Test + public void shouldSerializeJSONInputMessageWhenKeyIsMissing() throws DeserializerException { + MessageToJson messageToJson = new MessageToJson(protoParser, false, true); + Message message = new Message(null, logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String actualOutput = messageToJson.serialize(message); + assertEquals("{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + + "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + + "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\",\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\"," + + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}", actualOutput); + + + } + @Test public void shouldSerializeWhenKeyIsEmptyWithTimestampsAsSimpleDateFormatWhenFlagIsEnabled() throws DeserializerException { MessageToJson messageToJson = new MessageToJson(protoParser, false, true); @@ -113,3 +164,5 @@ public void shouldReturnTheTimestampFieldsInISOFormatIfSimpleDateFormatIsDisable + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}]", actualOutput); } } + + diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java index 5aaedd5a2..13c7fdbee 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java @@ -1,8 +1,7 @@ package com.gotocompany.firehose.serializer; - - +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.ConfigurationException; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; @@ -41,12 +40,23 @@ public class MessageToTemplatizedJsonTest { private String logMessage; private String logKey; + private String logMessageJSONString; + private String logKeyJSONString; @Before public void setup() { initMocks(this); logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; + logKeyJSONString = "sample-key1"; } @Test @@ -64,6 +74,23 @@ public void shouldProperlySerializeMessageToTemplateWithSingleUnknownField() { Assert.assertEquals(expectedMessage, serializedMessage); } + @Test + public void shouldProperlySerializeJsonInputMessageToTemplateWithSingleKnownField() throws DeserializerException { + String template = "{\"test\":\"$.vehicleType\"}"; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedMessage = "{\"test\":\"BIKE\"}"; + + String serializedMessage = messageToTemplatizedJson.serialize(message); + + Assert.assertEquals(expectedMessage, serializedMessage); + } + @Test public void shouldProperlySerializeMessageToTemplateWithAsItIs() { String template = "\"$._all_\""; @@ -86,6 +113,26 @@ public void shouldProperlySerializeMessageToTemplateWithAsItIs() { Assert.assertEquals(expectedMessage, serializedMessage); } + @Test + public void shouldProperlySerializeJsonInputMessageToTemplateAsItIs() throws DeserializerException { + String template = "\"$._all_\""; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedMessage = "{\"uniqueDrivers\":\"3\"," + + "\"windowStartTime\":\"Mar 20, 2017 10:54:00 AM\"," + + "\"windowEndTime\":\"Mar 20, 2017 10:55:00 AM\",\"s2IdLevel\":13," + + "\"vehicleType\":\"BIKE\",\"s2Id\":\"3344472187078705152\"}"; + + String serializedMessage = messageToTemplatizedJson.serialize(message); + + Assert.assertEquals(expectedMessage, serializedMessage); + } + @Test public void shouldThrowIfNoPathsFoundInTheProto() { expectedException.expect(DeserializerException.class); @@ -102,6 +149,22 @@ public void shouldThrowIfNoPathsFoundInTheProto() { messageToTemplatizedJson.serialize(message); } + @Test + public void shouldThrowIfNoPathsFoundInTheJSON() { + expectedException.expect(DeserializerException.class); + expectedException.expectMessage("No results for path: $['invalidPath']"); + + String template = "{\"test\":\"$.invalidPath\"}"; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + messageToTemplatizedJson.serialize(message); + } + @Test public void shouldFailForNonJsonTemplate() { expectedException.expect(ConfigurationException.class); From ef7cfc985d6c2f6427c8d633afdfcffbe44af9f7 Mon Sep 17 00:00:00 2001 From: sushmith Date: Thu, 3 Aug 2023 13:55:56 +0530 Subject: [PATCH 08/13] style: fix style errors --- .../serializer/MessageToJsonTest.java | 26 +++++++++---------- .../MessageToTemplatizedJsonTest.java | 24 ++++++++--------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java index 0025b1023..c4eff504d 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java @@ -30,14 +30,14 @@ public void setUp() { logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; - logMessageJSONString = "{\n" + - " \"uniqueDrivers\": \"3\",\n" + - " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + - " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + - " \"s2IdLevel\": 13,\n" + - " \"vehicleType\": \"BIKE\",\n" + - " \"s2Id\": \"3344472187078705152\"\n" + - " }"; + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; logKeyJSONString = "sample-key1"; } @@ -68,11 +68,11 @@ public void shouldProperlySerializeJsonInputMessage() throws DeserializerExcepti Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); message.setInputSchemaType(InputSchemaType.JSON); - String expectedOutput = "{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + - "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + - "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\"," + - "\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\",\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"," + - "\"topic\":\"sample-topic\",\"logKey\":\"sample-key1\"}"; + String expectedOutput = "{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + + "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + + "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\"," + + "\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\",\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"," + + "\"topic\":\"sample-topic\",\"logKey\":\"sample-key1\"}"; String actualOutput = messageToJson.serialize(message); assertEquals(expectedOutput, actualOutput); diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java index 13c7fdbee..484164eb3 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java @@ -48,14 +48,14 @@ public void setup() { initMocks(this); logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; - logMessageJSONString = "{\n" + - " \"uniqueDrivers\": \"3\",\n" + - " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + - " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + - " \"s2IdLevel\": 13,\n" + - " \"vehicleType\": \"BIKE\",\n" + - " \"s2Id\": \"3344472187078705152\"\n" + - " }"; + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; logKeyJSONString = "sample-key1"; } @@ -123,10 +123,10 @@ public void shouldProperlySerializeJsonInputMessageToTemplateAsItIs() throws Des Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); message.setInputSchemaType(InputSchemaType.JSON); - String expectedMessage = "{\"uniqueDrivers\":\"3\"," + - "\"windowStartTime\":\"Mar 20, 2017 10:54:00 AM\"," + - "\"windowEndTime\":\"Mar 20, 2017 10:55:00 AM\",\"s2IdLevel\":13," + - "\"vehicleType\":\"BIKE\",\"s2Id\":\"3344472187078705152\"}"; + String expectedMessage = "{\"uniqueDrivers\":\"3\"," + + "\"windowStartTime\":\"Mar 20, 2017 10:54:00 AM\"," + + "\"windowEndTime\":\"Mar 20, 2017 10:55:00 AM\",\"s2IdLevel\":13," + + "\"vehicleType\":\"BIKE\",\"s2Id\":\"3344472187078705152\"}"; String serializedMessage = messageToTemplatizedJson.serialize(message); From fa741f583d0224b94a508656e8359afcd32bc6f3 Mon Sep 17 00:00:00 2001 From: sushmith Date: Fri, 4 Aug 2023 11:47:24 +0530 Subject: [PATCH 09/13] style: remove unused import --- .../com/gotocompany/firehose/serializer/MessageToJsonTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java index c4eff504d..517dd375e 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java @@ -10,7 +10,6 @@ import org.junit.Before; import org.junit.Test; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Base64; From 31310c3b2cf5b4ee57b0d9320841c2414092448a Mon Sep 17 00:00:00 2001 From: sushmith Date: Fri, 4 Aug 2023 11:57:45 +0530 Subject: [PATCH 10/13] chore: set `INPUT_SCHEMA_DATA_TYPE` for local env --- env/local.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/env/local.properties b/env/local.properties index 1908b7b76..805d1ee47 100644 --- a/env/local.properties +++ b/env/local.properties @@ -19,6 +19,7 @@ INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage # APPLICATION_THREAD_COUNT=2 # TRACE_JAEGAR_ENABLE=true # LOG_LEVEL=info +# INPUT_SCHEMA_DATA_TYPE=protobuf # # ############################################# @@ -122,7 +123,6 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret # SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info # -# ############################################# # ## INFLUX SINK From 47b56d4f7bc1935770ddd51434122876922a439a Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 5 Sep 2023 15:42:26 +0530 Subject: [PATCH 11/13] fix: uri parser for json input --- .../sink/http/request/uri/UriParser.java | 88 ++++++++++++++++--- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java index c957d12e9..46174d9cb 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java @@ -1,6 +1,9 @@ package com.gotocompany.firehose.sink.http.request.uri; +import com.google.gson.JsonObject; +import com.gotocompany.firehose.config.enums.InputSchemaType; +import com.gotocompany.firehose.exception.JsonParseException; import com.gotocompany.firehose.message.Message; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -8,7 +11,11 @@ import com.gotocompany.stencil.Parser; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -18,15 +25,38 @@ public class UriParser { private Parser protoParser; private String parserMode; + private JSONParser jsonParser; + public UriParser(Parser protoParser, String parserMode) { this.protoParser = protoParser; this.parserMode = parserMode; + this.jsonParser = new JSONParser(); + } + + public UriParser(Parser protoParser, String parserMode, JSONParser jsonParser) { + this.protoParser = protoParser; + this.parserMode = parserMode; + this.jsonParser = jsonParser; } public String parse(Message message, String serviceUrl) { - DynamicMessage parsedMessage = parseEsbMessage(message); - return parseServiceUrl(parsedMessage, serviceUrl); + +// if (message.getInputSchemaType() == InputSchemaType.JSON) { +// JSONObject jsonObject = parseJsonMessage(message); +// +// } +// create jonobject if input is json +// { +// } +// DynamicMessage parsedMessage = parseEsbMessage(message); + +// -- > mpve this to directlydo it inside parseServiceURl + + +// take call inside parseService Url +// overload parseServiceUrl function to take jsonObject + return parseServiceUrl(message, serviceUrl); } @@ -40,7 +70,18 @@ private DynamicMessage parseEsbMessage(Message message) { return parsedMessage; } - private String parseServiceUrl(DynamicMessage data, String serviceUrl) { + private JSONObject parseJsonMessage(Message message) { + JSONObject jsonObject; + try { + jsonObject = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + } catch (ParseException e) { + throw new JsonParseException(e.getMessage(), e.getCause()); + } + return jsonObject; + } + + + private String parseServiceUrl(Message message, String serviceUrl) { if (StringUtils.isEmpty(serviceUrl)) { throw new IllegalArgumentException("Service URL '" + serviceUrl + "' is invalid"); } @@ -55,16 +96,43 @@ private String parseServiceUrl(DynamicMessage data, String serviceUrl) { String urlPattern = urlStrings[0]; String urlVariables = StringUtils.join(Arrays.copyOfRange(urlStrings, 1, urlStrings.length), ","); - String renderedUrl = renderStringUrl(data, urlPattern, urlVariables); - return StringUtils.isEmpty(urlVariables) - ? urlPattern - : renderedUrl; + + if (StringUtils.isEmpty(urlVariables)) { + return urlPattern; + } + + String renderedUrl; + if (message.getInputSchemaType() == InputSchemaType.JSON) { + JSONObject json = parseJsonMessage(message); + renderedUrl = renderStringUrl(json, urlPattern, urlVariables); + } else { + // InputSchemaType.PROTOBUF + DynamicMessage data = parseEsbMessage(message); + renderedUrl = renderStringUrl(data, urlPattern, urlVariables); + } + + return renderedUrl; } - private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) { - if (StringUtils.isEmpty(patternVariables)) { - return pattern; + private String renderStringUrl(JSONObject jsonObject, String pattern, String patternVariables) { + List patternVariablesList = Arrays.asList(patternVariables.split(",")); + Object[] patternVariableData = patternVariablesList + .stream() + .map(field -> getDataByFieldName(jsonObject, field)) + .toArray(); + return String.format(pattern, patternVariableData); + } + + private Object getDataByFieldName(JSONObject jsonObject, String fieldName) { + if (!jsonObject.containsKey(fieldName)) { + throw new IllegalArgumentException("Invalid json field name: " + fieldName); } + + return jsonObject.get(fieldName); + } + + + private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) { List patternVariableFieldNumbers = Arrays.asList(patternVariables.split(",")); Object[] patternVariableData = patternVariableFieldNumbers .stream() From cea3c26643393f9fc8cb9c39dd17add129227795 Mon Sep 17 00:00:00 2001 From: sushmith Date: Tue, 5 Sep 2023 15:43:15 +0530 Subject: [PATCH 12/13] chore: remove comments --- .../sink/http/request/uri/UriParser.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java index 46174d9cb..0da069510 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java @@ -41,23 +41,7 @@ public UriParser(Parser protoParser, String parserMode, JSONParser jsonParser) { } public String parse(Message message, String serviceUrl) { - -// if (message.getInputSchemaType() == InputSchemaType.JSON) { -// JSONObject jsonObject = parseJsonMessage(message); -// -// } -// create jonobject if input is json -// { -// } -// DynamicMessage parsedMessage = parseEsbMessage(message); - -// -- > mpve this to directlydo it inside parseServiceURl - - -// take call inside parseService Url -// overload parseServiceUrl function to take jsonObject return parseServiceUrl(message, serviceUrl); - } private DynamicMessage parseEsbMessage(Message message) { From 7cd36ebdfb5dc8a9d86535440c57f297314cea70 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Wed, 6 Sep 2023 14:11:19 +0530 Subject: [PATCH 13/13] fix: json input for http sink --- .../firehose/serializer/MessageToTemplatizedJson.java | 5 ++++- .../firehose/sink/http/factory/SerializerFactory.java | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java index 2391cd495..dbef17c88 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java @@ -82,7 +82,9 @@ public String serialize(Message message) throws DeserializerException { String jsonString; if (message.getInputSchemaType() == InputSchemaType.JSON) { + System.out.println(new String(message.getLogMessage())); JSONObject json = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + System.out.println(json.toJSONString()); jsonMessage = json.toJSONString(); } else { // only supports messages not keys @@ -101,7 +103,8 @@ public String serialize(Message message) throws DeserializerException { finalMessage = finalMessage.replace(path, jsonString); } return finalMessage; - } catch (InvalidProtocolBufferException | ParseException | PathNotFoundException e) { + } catch (InvalidProtocolBufferException | ParseException | PathNotFoundException e) { + e.printStackTrace(); throw new DeserializerException(e.getMessage()); } } diff --git a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java index cb6c6f325..7f5331d1f 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java @@ -2,6 +2,7 @@ import com.gotocompany.firehose.config.HttpSinkConfig; import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.serializer.JsonWrappedProtoByte; import com.gotocompany.firehose.serializer.MessageSerializer; @@ -24,7 +25,7 @@ public class SerializerFactory { public MessageSerializer build() { FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, SerializerFactory.class); - if (isProtoSchemaEmpty() || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) { + if ( (httpSinkConfig.getInputSchemaType() == InputSchemaType.PROTOBUF && isProtoSchemaEmpty()) || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) { firehoseInstrumentation.logDebug("Serializer type: JsonWrappedProtoByte"); // Fallback to json wrapped proto byte @@ -34,7 +35,6 @@ public MessageSerializer build() { // this is currently not possible because of the way we are using the parser. return new JsonWrappedProtoByte(); } - if (httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.JSON) { Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()); if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) {