diff --git a/bin/produce-ad-event-data.sh b/bin/produce-ad-event-data.sh
new file mode 100755
index 00000000..097fb3f1
--- /dev/null
+++ b/bin/produce-ad-event-data.sh
@@ -0,0 +1,63 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will prepare topics and start AdEventProducer which generates impression and click events
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR=$(dirname ${DIR})
+AD_PRODUCER_PATH=${BASE_DIR}/target/classes/samza/examples/join/AdEventProducer
+SAMZA_LIB=${BASE_DIR}/deploy/samza/lib/
+
+ZOOKEEPER=localhost:2181
+KAFKA_BROKER=localhost:9092
+
+echo Checking if required topics exist in kafka...
+TOPICSRAW="ad-impression:1;ad-click:1;ad-imp-metadata:4;ad-clk-metadata:4;ad-event-error:1;ad-join:1;ad-imp-store-changelog:4;ad-clk-store-changelog:4"
+IFS=';' read -a TOPICS <<< "$TOPICSRAW"
+for i in "${!TOPICS[@]}"
+do
+ IFS=':' read -a TOPIC <<< "${TOPICS[$i]}"
+ TOPIC_NAME=${TOPIC[0]}
+ PARTITION_NUMBER=${TOPIC[1]}
+ EXIST=$(${BASE_DIR}/deploy/kafka/bin/kafka-topics.sh --describe --topic ${TOPIC_NAME} --zookeeper ${ZOOKEEPER})
+ if [ -z "${EXIST}" ]
+ then
+ echo -e topic "${TOPIC_NAME}" doesn\'t exists. Creating topic...
+ ${BASE_DIR}/deploy/kafka/bin/kafka-topics.sh --create --topic ${TOPIC_NAME} --zookeeper ${ZOOKEEPER} --partitions ${PARTITION_NUMBER} --replication 1
+ else
+ echo Topic "${TOPIC_NAME}" already exists.
+ read -a TOPIC_DESCRIPTION <<< "${EXIST}"
+ for DESC in "${TOPIC_DESCRIPTION[@]}"
+ do
+ IFS=':' read -a KV <<< "${DESC}"
+ if [ ${KV[0]} == 'PartitionCount' ]
+ then
+ if [ ${KV[1]} != $PARTITION_NUMBER ]
+ then
+ echo Number of partitions for topic "${TOPIC_NAME}" is wrong. It should be ${PARTITION_NUMBER} instead of ${KV[1]}. Exiting.
+ exit 0
+ fi
+ fi
+ done
+ fi
+done
+
+cd ${SAMZA_LIB}
+
+echo -e "\nStarting AdEventProducer...\n"
+java -cp "*" samza.examples.join.AdEventProducer
\ No newline at end of file
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index f57fee2a..4981d7eb 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -51,6 +51,16 @@
configtrue
+
+ ${basedir}/src/main/config/ad-event-feed.properties
+ config
+ true
+
+
+ ${basedir}/src/main/config/ad-event-join.properties
+ config
+ true
+
diff --git a/src/main/config/ad-event-feed.properties b/src/main/config/ad-event-feed.properties
new file mode 100644
index 00000000..ecc9c927
--- /dev/null
+++ b/src/main/config/ad-event-feed.properties
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=ad-event-feed
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+# Yarn
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.join.task.AdEventFeedStreamTask
+task.inputs=kafka.ad-impression, kafka.ad-click
+
+# Serializers
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=json
+systems.kafka.streams.ad-impression.samza.msg.serde=string
+systems.kafka.streams.ad-click.samza.msg.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.samza.offset.default=oldest
\ No newline at end of file
diff --git a/src/main/config/ad-event-join.properties b/src/main/config/ad-event-join.properties
new file mode 100644
index 00000000..7c4fd56f
--- /dev/null
+++ b/src/main/config/ad-event-join.properties
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=ad-event-join
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+# Yarn
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.join.task.AdEventJoinStreamTask
+task.inputs=kafka.ad-imp-metadata, kafka.ad-clk-metadata
+# Call window method every 5 minutes
+task.window.ms=300000
+
+# Serializers
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.samza.offset.default=oldest
+
+# Key/value storages
+stores.imp-meta-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.imp-meta-store.changelog=kafka.ad-imp-store-changelog
+stores.imp-meta-store.changelog.replication.factor=1
+stores.imp-meta-store.key.serde=string
+stores.imp-meta-store.msg.serde=json
+
+
+stores.clk-meta-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.clk-meta-store.changelog=kafka.ad-clk-store-changelog
+stores.clk-meta-store.changelog.replication.factor=1
+stores.clk-meta-store.key.serde=string
+stores.clk-meta-store.msg.serde=json
+
+# Metrics
+metrics.reporters=jmx
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
\ No newline at end of file
diff --git a/src/main/java/samza/examples/join/AdEventParser.java b/src/main/java/samza/examples/join/AdEventParser.java
new file mode 100644
index 00000000..6a1f4125
--- /dev/null
+++ b/src/main/java/samza/examples/join/AdEventParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.join;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AdEventParser {
+
+ /**
+ * Parse raw ad event that should have "key=value" pairs separated with space character into a map.
+ * Example argument:
+ * {@code impression-id=1 type=click advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T00:00:00.000"}
+ * Return value is a map that contains given key-value pairs
+ *
+ * @param rawAdEvent raw ad event String that should have key=value pairs separated with one space character
+ * @return event map
+ * @throws ParseException
+ */
+ public static synchronized Map parseAdEvent(String rawAdEvent) throws ParseException{
+ Map adEvent = new HashMap<>();
+ String[] fields = rawAdEvent.split(" ");
+ for(String field : fields){
+ String[] keyValuePair = field.split("=");
+ if(keyValuePair.length == 2)
+ adEvent.put(keyValuePair[0], keyValuePair[1]);
+ else
+ throw new ParseException("Error while parsing. Messages should have only 'key=value' pairs separated by one space characters with no space and '=' characters in keys and values", -1);
+ }
+ return adEvent;
+ }
+}
diff --git a/src/main/java/samza/examples/join/AdEventProducer.java b/src/main/java/samza/examples/join/AdEventProducer.java
new file mode 100644
index 00000000..5a4b5646
--- /dev/null
+++ b/src/main/java/samza/examples/join/AdEventProducer.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.join;
+
+import kafka.api.PartitionFetchInfo;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchRequest;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.log4j.*;
+import org.apache.samza.serializers.StringSerde;
+
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Date;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.concurrent.*;
+
+/**
+ * Generates dummy AdImpression and AdClick events and sends it to kafka broker on localhost, port 9092.
+ *
+ * AdEvent info contains impression-id, type (impression or click), advertiser-id, ip, agent, and timestamp.
+ * All info is dummy.
+ */
+public class AdEventProducer {
+
+ private static final Logger logger = LogManager.getLogger(AdEventProducer.class);
+
+ public static final String IMPRESSION_ID = "impression-id";
+ public static final String TYPE = "type";
+ public static final String TYPE_CLICK = "click";
+ public static final String TYPE_IMPRESSION = "impression";
+ public static final String ADVERTISER_ID = "advertiser-id";
+ public static final String IP = "ip";
+ public static final String AGENT = "agent";
+ public static final String TIMESTAMP = "timestamp";
+
+ private static int newImpressionId = 0;
+ private static int numOfAdImpPartitions = 0;
+ private static int numOfAdClkPartitions = 0;
+
+ private static KafkaProducer producer;
+
+ private static String[] agents;
+
+ static {
+ agents = new String[6];
+ agents[0] = "Edge";
+ agents[1] = "Safari";
+ agents[2] = "Firefox";
+ agents[3] = "InternetExplorer";
+ agents[4] = "Chrome";
+ agents[5] = "Opera";
+ }
+
+ public static void main(String args[]) {
+
+ // Configuring logger
+ LogManager.getRootLogger().removeAllAppenders();
+ ConsoleAppender console = new ConsoleAppender();
+ final String PATTERN = "%d [%p|%c|%C{1}] %m%n";
+ console.setLayout(new PatternLayout(PATTERN));
+ console.setThreshold(Level.INFO);
+ console.activateOptions();
+ Logger.getRootLogger().addAppender(console);
+
+ findLatestImpressionId();
+
+ int lambda = 6, generatedNumber;
+ long milliseconds;
+ final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ while(true) {
+ generatedNumber = generatePoissonRandomNumber(lambda);
+ milliseconds = (long)(generatedNumber * (Math.random() * 10 + 30) + 20);
+
+ try {
+ Thread.sleep(milliseconds);
+
+ final String impression = generateImpression();
+ final int impressionId = newImpressionId;
+
+ logger.info("Attempting to send an ad impression with id " + impressionId + " to kafka...");
+ producer.send(new ProducerRecord("ad-impression", impressionId % numOfAdImpPartitions, Integer.toString(impressionId), impression), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ if(recordMetadata != null) {
+ logger.info("KafkaProducer has sent ad impression with id " + impressionId + ". Offset: " + recordMetadata.offset());
+ } else {
+ logger.info("KafkaProducer has failed to sent ad impression with id " + impressionId);
+ logger.info("Record metadata is null");
+ }
+ if(e != null) {
+ logger.info("Exception occurred: ");
+ e.printStackTrace();
+ }
+ // Call a task to send a click event for this impression with possibility of 10%
+ if(recordMetadata != null && Math.random() < 0.1) {
+ // Starting task with random delay between 2 and 20 seconds
+ long delay = Math.round(2000 + Math.random() * 18000);
+ logger.info("Scheduling task for sending ad click for the impression with id " + impressionId + " in " + delay + " milliseconds.");
+ scheduledExecutor.schedule(adClickSender(impression), delay, TimeUnit.MILLISECONDS);
+ }
+ }
+ });
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static String generateImpression() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(IMPRESSION_ID + "=" + Integer.toString(++newImpressionId));
+ stringBuilder.append(" " + TYPE + "=" + TYPE_IMPRESSION);
+ stringBuilder.append(" " + ADVERTISER_ID + "=" + Integer.toString((int)Math.round(Math.random()*99 + 1)));
+ stringBuilder.append(" " + IP + "=" + generateRandomIpAddress());
+ stringBuilder.append(" " + AGENT + "=" + agents[(int)Math.round(Math.random()*5)]);
+ Date timestamp = new Date();
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ stringBuilder.append(" " + TIMESTAMP + "=" + dateFormatter.format(timestamp));
+
+ return stringBuilder.toString();
+ }
+
+ private static Runnable adClickSender(String impressionRaw) {
+ Map impression = null;
+ try {
+ impression = AdEventParser.parseAdEvent(impressionRaw);
+ } catch (ParseException parEx) {
+ parEx.printStackTrace();
+ }
+ final Map fImpression = new HashMap<>(impression);
+
+ Runnable adClickSenderRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String impressionId = fImpression.get(IMPRESSION_ID);
+ logger.info("Attempting to send an ad click with impression id " + impressionId + " to kafka...");
+
+ String clickEvent = generateClick(fImpression);
+
+ RecordMetadata recordMetadata = producer.send(new ProducerRecord("ad-click",
+ Integer.parseInt(impressionId)%numOfAdClkPartitions,
+ impressionId,
+ clickEvent)).get();
+
+ if(recordMetadata != null) {
+ logger.info("KafkaProducer has sent ad click with impression id " + impressionId + " and offset " + recordMetadata.offset());
+ } else {
+ logger.info("KafkaProducer has failed to sent ad click with impression id " + impressionId);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ return adClickSenderRunnable;
+ }
+
+ private static String generateClick(Map forImpression) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(IMPRESSION_ID + "=" + forImpression.get(IMPRESSION_ID));
+ stringBuilder.append(" " + TYPE + "=" + TYPE_CLICK);
+ stringBuilder.append(" " + ADVERTISER_ID + "=" + forImpression.get(ADVERTISER_ID));
+ stringBuilder.append(" " + IP + "=" + forImpression.get(IP));
+ stringBuilder.append(" " + AGENT + "=" + forImpression.get(AGENT));
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ stringBuilder.append(" " + TIMESTAMP + "=" + dateFormatter.format(new Date()));
+
+ return stringBuilder.toString();
+ }
+
+ /**
+ * @return ip address in format "n.n.n.*" where n is random number between 0 and 255
+ */
+ private static String generateRandomIpAddress(){
+ Integer ipNumberSegment;
+ StringBuilder sb = new StringBuilder(13);
+ for( int i = 0 ; i < 3 ; i++ ) {
+ ipNumberSegment = (int)(Math.random()*255);
+ sb.append(ipNumberSegment);
+ sb.append(".");
+ }
+ sb.append("*");
+ return sb.toString();
+ }
+
+ private static int generatePoissonRandomNumber(int lambda) {
+ double l = Math.exp(-lambda);
+ int k = 0;
+ double p = 1;
+ do{
+ k++;
+ p *= Math.random();
+ } while (p > l);
+ return k - 1;
+ }
+
+ /**
+ * Checking for the latest impression present in kafka so we can set id for the new impression after latest id.
+ * Required in case that user start producer multiple times so we don't start producing from id = 0 every time.
+ */
+ private static void findLatestImpressionId() {;
+ producer = new KafkaProducer(getProducerProperties());
+ logger.info("Checking kafka for the latest ad-impression...");
+
+ // Fetching partition metadata
+ List adImpPartitionInfo = producer.partitionsFor("ad-impression");
+ numOfAdImpPartitions = adImpPartitionInfo.size();
+ List adClkPartitionInfo = producer.partitionsFor("ad-click");
+ numOfAdClkPartitions = adClkPartitionInfo.size();
+
+ // Sending request for every partition's latest offset
+ Map offsetRequestInfo = new HashMap<>();
+ for (int i = 0 ; i < numOfAdImpPartitions ; i++)
+ offsetRequestInfo.put(new TopicAndPartition("ad-impression", i), new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+ OffsetRequest offsetRequest = new OffsetRequest(offsetRequestInfo, (short)0, kafka.api.OffsetRequest.DefaultClientId());
+ SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 30000, 100000, "ad-event-consumer");
+ OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
+
+ // Making fetch requests for last message in every partition to see the latest impression-id
+ long[] adImpressionPartitionOffsets = new long[numOfAdImpPartitions];
+ Map fetchRequestInfo = new HashMap<>(numOfAdImpPartitions);
+ boolean topicHasMessages = false;
+ for (int i = 0 ; i < numOfAdImpPartitions ; i++) {
+ adImpressionPartitionOffsets[i] = offsetResponse.offsets("ad-impression", i)[0];
+ if(adImpressionPartitionOffsets[i] > 0)
+ topicHasMessages = true;
+ fetchRequestInfo.put(new TopicAndPartition("ad-impression", i), new PartitionFetchInfo(adImpressionPartitionOffsets[i] - 1, 1024 * 1024));
+ }
+
+ // Fetching messages with latest offset in every partition
+ int[] lastImpressionIds = new int[numOfAdImpPartitions];
+ if(topicHasMessages) {
+ FetchResponse fetchResponse = consumer.fetch(new FetchRequest(kafka.api.FetchRequest.DefaultCorrelationId(), consumer.clientId(), 10000, 1000, fetchRequestInfo));
+ for (int i = 0; i < numOfAdImpPartitions; i++) {
+
+ ByteBufferMessageSet messageSet = fetchResponse.messageSet("ad-impression", i);
+ Iterator iter = messageSet.iterator();
+ if (!iter.hasNext())
+ lastImpressionIds[i] = 0;
+ while (iter.hasNext()) {
+ MessageAndOffset message = iter.next();
+ if (!iter.hasNext()) {
+ ByteBuffer keyBuffer = message.message().key();
+ byte[] messageKey = Utils.toArray(keyBuffer);
+ StringSerde stringSerde = new StringSerde("UTF-8");
+ String impressionId = stringSerde.fromBytes(messageKey);
+ lastImpressionIds[i] = Integer.parseInt(impressionId);
+ }
+ }
+ }
+ }
+
+ // Setting next impression-id
+ Arrays.sort(lastImpressionIds);
+ newImpressionId = lastImpressionIds[numOfAdImpPartitions - 1];
+ if(newImpressionId == 0) {
+ logger.info("No ad-impressions found in kafka.");
+ } else {
+ logger.info("Starting producer. Latest ad-impression found with id: " + newImpressionId);
+ }
+ }
+
+ private static Map getProducerProperties() {
+ Map props = new HashMap<>();
+
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("client.id", "dummy-ad-producer");
+ props.put("key.serializer", StringSerializer.class.getCanonicalName());
+ props.put("value.serializer", StringSerializer.class.getCanonicalName());
+
+ return props;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/join/task/AdEventFeedStreamTask.java b/src/main/java/samza/examples/join/task/AdEventFeedStreamTask.java
new file mode 100644
index 00000000..0823fb59
--- /dev/null
+++ b/src/main/java/samza/examples/join/task/AdEventFeedStreamTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.join.task;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.join.AdEventParser;
+import samza.examples.join.AdEventProducer;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AdEventFeedStreamTask implements StreamTask {
+
+ private final int NUM_OF_PARTITIONS = 4;
+ private final SystemStream ERROR_STREAM = new SystemStream("kafka", "ad-event-error");
+ private final SystemStream IMP_META_STREAM = new SystemStream("kafka", "ad-imp-metadata");
+ private final SystemStream CLK_META_STREAM = new SystemStream("kafka", "ad-clk-metadata");
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ String rawAdEvent = (String)envelope.getMessage();
+ if (envelope.getKey() == null) {
+ OutgoingMessageEnvelope ome = buildErrorEnvelope(ERROR_STREAM, rawAdEvent, new Exception("Envelope key cannot be null!"));
+ collector.send(ome);
+ return;
+ }
+
+ try {
+ Map adEvent = AdEventParser.parseAdEvent(rawAdEvent);
+ adEvent.put("log-line", rawAdEvent);
+ SystemStream outgoingStream = null;
+ String adEventType = adEvent.get(AdEventProducer.TYPE);
+ if(adEventType.equals(AdEventProducer.TYPE_IMPRESSION))
+ outgoingStream = IMP_META_STREAM;
+ else if(adEventType.equals(AdEventProducer.TYPE_CLICK))
+ outgoingStream = CLK_META_STREAM;
+
+ if(outgoingStream != null) {
+ OutgoingMessageEnvelope outgoingEnvelope = new OutgoingMessageEnvelope(outgoingStream
+ , envelope.getKey().hashCode()%NUM_OF_PARTITIONS
+ , envelope.getKey()
+ , adEvent);
+ collector.send(outgoingEnvelope);
+ } else {
+ OutgoingMessageEnvelope ome = buildErrorEnvelope(ERROR_STREAM, rawAdEvent, new Exception("Ad event type ('" + adEventType + "') unknown"));
+ collector.send(ome);
+ }
+ } catch (ParseException pe) {
+ OutgoingMessageEnvelope ome = buildErrorEnvelope(ERROR_STREAM, rawAdEvent, pe);
+ collector.send(ome);
+ }
+ }
+
+ private OutgoingMessageEnvelope buildErrorEnvelope(SystemStream stream, String rawEvent, Throwable error) {
+ Map errorMessage = new HashMap<>();
+ errorMessage.put("error", error.getClass().getCanonicalName());
+ errorMessage.put("error-message", error.getMessage());
+ errorMessage.put("raw-event", rawEvent);
+ return new OutgoingMessageEnvelope(stream, errorMessage);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/join/task/AdEventJoinStreamTask.java b/src/main/java/samza/examples/join/task/AdEventJoinStreamTask.java
new file mode 100644
index 00000000..9fd94bbf
--- /dev/null
+++ b/src/main/java/samza/examples/join/task/AdEventJoinStreamTask.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.join.task;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.*;
+import samza.examples.join.AdEventProducer;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AdEventJoinStreamTask implements StreamTask, InitableTask, WindowableTask {
+
+ private final SystemStream ERROR_STREAM = new SystemStream("kafka", "ad-event-error");
+ private final SystemStream JOIN_STREAM = new SystemStream("kafka", "ad-join");
+ private KeyValueStore> impMetaStore;
+ private KeyValueStore> clkMetaStore;
+ private String lastTimestamp = null;
+
+ @Override
+ public void init(Config config, TaskContext taskContext) throws Exception {
+ impMetaStore = (KeyValueStore) taskContext.getStore("imp-meta-store");
+ clkMetaStore = (KeyValueStore) taskContext.getStore("clk-meta-store");
+ }
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ Map adEvent = null;
+ if (envelope.getKey() == null) {
+ OutgoingMessageEnvelope ome = buildErrorEnvelope(ERROR_STREAM, new Exception("Envelope key cannot be null!"));
+ collector.send(ome);
+ return;
+ }
+ if(envelope.getMessage() instanceof Map) {
+ adEvent = (Map) envelope.getMessage();
+ } else {
+ OutgoingMessageEnvelope ome = buildErrorEnvelope(ERROR_STREAM, new Exception("Envelope message is not a map. Map is required"));
+ collector.send(ome);
+ }
+ lastTimestamp = adEvent.get(AdEventProducer.TIMESTAMP);
+ String key = (String)envelope.getKey();
+ Map joinEvent = null;
+ try {
+ if (adEvent.get(AdEventProducer.TYPE).equals(AdEventProducer.TYPE_IMPRESSION)) {
+ Map clkEvent = clkMetaStore.get(key);
+ if (clkEvent != null){
+ joinEvent = buildJoinEvent(adEvent, clkEvent);
+ collector.send(new OutgoingMessageEnvelope(JOIN_STREAM, joinEvent));
+ clkMetaStore.delete(key);
+ }else
+ impMetaStore.put(key, adEvent);
+ } else if (adEvent.get(AdEventProducer.TYPE).equals(AdEventProducer.TYPE_CLICK)) {
+ Map impEvent = impMetaStore.get(key);
+ if (impEvent != null){
+ joinEvent = buildJoinEvent(impEvent, adEvent);
+ collector.send(new OutgoingMessageEnvelope(JOIN_STREAM, joinEvent));
+ impMetaStore.delete(key);
+ } else
+ clkMetaStore.put(key, adEvent);
+ }
+ } catch (ParseException pe) {
+ collector.send(buildErrorEnvelope(ERROR_STREAM, pe));
+ }
+ }
+
+ /**
+ * When window occurs all ad events older than latest event by more than 5 minutes are deleted from
+ * key-value stores.
+ */
+ @Override
+ public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) throws Exception {
+ KeyValueIterator> iterator = impMetaStore.all();
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ long timestampThreshold = Long.MAX_VALUE;
+ if(iterator.hasNext())
+ timestampThreshold = dateFormatter.parse(lastTimestamp).getTime();
+ while(iterator.hasNext()) {
+ Entry> entry = iterator.next();
+ if(dateFormatter.parse(entry.getValue().get(AdEventProducer.TIMESTAMP)).getTime() < timestampThreshold - 5*1000)
+ impMetaStore.delete(entry.getKey());
+ }
+ iterator = clkMetaStore.all();
+ if(iterator.hasNext())
+ timestampThreshold = dateFormatter.parse(lastTimestamp).getTime();
+ while(iterator.hasNext()) {
+ Entry> entry = iterator.next();
+ if(dateFormatter.parse(entry.getValue().get(AdEventProducer.TIMESTAMP)).getTime() < timestampThreshold - 5*1000)
+ clkMetaStore.delete(entry.getKey());
+ }
+ }
+
+ /**
+ * Builds joined event from given impression and click events.
+ * All it does is adding impression and click log-lines, timestamp and field that represents
+ * time passed from when ad is shown to when the user clicked on it.
+ *
+ * @param impEvent impression event
+ * @param clkEvent click event
+ * @return joined data
+ */
+ private Map buildJoinEvent(Map impEvent, Map clkEvent) throws ParseException {
+ Map joinEvent = new HashMap<>();
+
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ Date impTimestamp = dateFormatter.parse(impEvent.get(AdEventProducer.TIMESTAMP));
+ Date clkTimestamp = dateFormatter.parse(clkEvent.get(AdEventProducer.TIMESTAMP));
+ joinEvent.put(AdEventProducer.IMPRESSION_ID, impEvent.get(AdEventProducer.IMPRESSION_ID));
+ joinEvent.put(AdEventProducer.TIMESTAMP, dateFormatter.format(new Date()));
+ joinEvent.put("passed-time-millis", Long.toString(clkTimestamp.getTime() - impTimestamp.getTime()));
+ joinEvent.put("imp-log-line", impEvent.get("log-line"));
+ joinEvent.put("clk-log-line", clkEvent.get("log-line"));
+ return joinEvent;
+ }
+
+ private OutgoingMessageEnvelope buildErrorEnvelope(SystemStream stream, Throwable error) {
+ Map errorMessage = new HashMap<>();
+ errorMessage.put("error", error.getClass().getCanonicalName());
+ errorMessage.put("error-message", error.getMessage());
+ errorMessage.put("stack-trace", ExceptionUtils.getFullStackTrace(error));
+ return new OutgoingMessageEnvelope(stream, errorMessage);
+ }
+}
\ No newline at end of file