diff --git a/pom.xml b/pom.xml
index 116150a..9316adc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
com.flipkart.databuilderframework
- 0.5.8
+ 0.5.8-PATCH-SNAPSHOT
4.0.0
databuilderframework
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderExecutionListener.java b/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderExecutionListener.java
index 66e683b..9061c2d 100644
--- a/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderExecutionListener.java
+++ b/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderExecutionListener.java
@@ -41,4 +41,12 @@ default void postProcessing(DataFlowInstance dataFlowInstance,
default boolean shouldThrowException() {
return false;
}
+
+ default boolean shouldThrowExceptionInBeforeExecute() {
+ return false;
+ }
+
+ default boolean shouldThrowExceptionInAfterExecute() {
+ return false;
+ }
}
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderFrameworkException.java b/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderFrameworkException.java
index 8374772..41cf02d 100644
--- a/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderFrameworkException.java
+++ b/src/main/java/com/flipkart/databuilderframework/engine/DataBuilderFrameworkException.java
@@ -22,7 +22,9 @@ public static enum ErrorCode {
NO_BUILDER_FOUND_FOR_NAME,
INSTANTIATION_FAILURE,
BUILDER_RESOLUTION_CONFLICT_FOR_DATA,
- BUILDER_EXECUTION_ERROR
+ BUILDER_EXECUTION_ERROR,
+ BUILDER_PRE_EXECUTION_ERROR,
+ BUILDER_POST_EXECUTION_ERROR
}
private final ErrorCode errorCode;
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/MultiThreadedDataFlowExecutor.java b/src/main/java/com/flipkart/databuilderframework/engine/MultiThreadedDataFlowExecutor.java
index 002bd4d..ac4aeda 100644
--- a/src/main/java/com/flipkart/databuilderframework/engine/MultiThreadedDataFlowExecutor.java
+++ b/src/main/java/com/flipkart/databuilderframework/engine/MultiThreadedDataFlowExecutor.java
@@ -1,5 +1,6 @@
package com.flipkart.databuilderframework.engine;
+import com.flipkart.databuilderframework.engine.util.DataBuilderExceptionUtil;
import com.flipkart.databuilderframework.model.*;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -224,7 +225,7 @@ public DataContainer call() throws Exception {
try {
listener.beforeExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData);
} catch (Throwable t) {
- logger.error("Error running pre-execution execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInBeforeExecute(listener, logger, t);
}
}
try {
@@ -236,7 +237,7 @@ public DataContainer call() throws Exception {
try {
listener.afterExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData, response);
} catch (Throwable t) {
- logger.error("Error running post-execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInAfterExecute(listener, logger, t);
}
}
if(null != response) {
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/OptimizedMultiThreadedDataFlowExecutor.java b/src/main/java/com/flipkart/databuilderframework/engine/OptimizedMultiThreadedDataFlowExecutor.java
index be07d25..8c21852 100644
--- a/src/main/java/com/flipkart/databuilderframework/engine/OptimizedMultiThreadedDataFlowExecutor.java
+++ b/src/main/java/com/flipkart/databuilderframework/engine/OptimizedMultiThreadedDataFlowExecutor.java
@@ -1,5 +1,6 @@
package com.flipkart.databuilderframework.engine;
+import com.flipkart.databuilderframework.engine.util.DataBuilderExceptionUtil;
import com.flipkart.databuilderframework.model.*;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -243,7 +244,7 @@ public DataContainer call() throws Exception {
try {
listener.beforeExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData);
} catch (Throwable t) {
- logger.error("Error running pre-execution execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInBeforeExecute(listener, logger, t);
}
}
try {
@@ -255,7 +256,7 @@ public DataContainer call() throws Exception {
try {
listener.afterExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData, response);
} catch (Throwable t) {
- logger.error("Error running post-execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInAfterExecute(listener, logger, t);
}
}
if(null != response) {
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/SimpleDataFlowExecutor.java b/src/main/java/com/flipkart/databuilderframework/engine/SimpleDataFlowExecutor.java
index 37ee2fe..410e879 100644
--- a/src/main/java/com/flipkart/databuilderframework/engine/SimpleDataFlowExecutor.java
+++ b/src/main/java/com/flipkart/databuilderframework/engine/SimpleDataFlowExecutor.java
@@ -1,5 +1,6 @@
package com.flipkart.databuilderframework.engine;
+import com.flipkart.databuilderframework.engine.util.DataBuilderExceptionUtil;
import com.flipkart.databuilderframework.model.*;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -67,7 +68,7 @@ protected DataExecutionResponse run(DataBuilderContext dataBuilderContext,
try {
listener.beforeExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData);
} catch (Throwable t) {
- logger.error("Error running pre-execution execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInBeforeExecute(listener, logger, t);
}
}
try {
@@ -86,13 +87,12 @@ protected DataExecutionResponse run(DataBuilderContext dataBuilderContext,
newlyGeneratedData.add(response.getData());
}
}
- //logger.debug("Ran " + builderMeta.getName());
processedBuilders.add(builderMeta);
for (DataBuilderExecutionListener listener : dataBuilderExecutionListener) {
try {
listener.afterExecute(dataBuilderContext, dataFlowInstance, builderMeta, dataDelta, responseData, response);
} catch (Throwable t) {
- logger.error("Error running post-execution listener: ", t);
+ DataBuilderExceptionUtil.handleExceptionInAfterExecute(listener, logger, t);
}
}
diff --git a/src/main/java/com/flipkart/databuilderframework/engine/util/DataBuilderExceptionUtil.java b/src/main/java/com/flipkart/databuilderframework/engine/util/DataBuilderExceptionUtil.java
new file mode 100644
index 0000000..efcebab
--- /dev/null
+++ b/src/main/java/com/flipkart/databuilderframework/engine/util/DataBuilderExceptionUtil.java
@@ -0,0 +1,32 @@
+package com.flipkart.databuilderframework.engine.util;
+
+import com.flipkart.databuilderframework.engine.DataBuilderExecutionListener;
+import com.flipkart.databuilderframework.engine.DataBuilderFrameworkException;
+import lombok.experimental.UtilityClass;
+import org.slf4j.Logger;
+
+@UtilityClass
+public class DataBuilderExceptionUtil {
+
+ public static void handleExceptionInBeforeExecute(final DataBuilderExecutionListener listener,
+ final Logger logger,
+ final Throwable t) throws DataBuilderFrameworkException {
+ final String errorMessage = "Error running pre-execution execution listener: ";
+ logger.error(errorMessage, t);
+ if (listener.shouldThrowExceptionInBeforeExecute()) {
+ throw new DataBuilderFrameworkException(DataBuilderFrameworkException.ErrorCode.BUILDER_PRE_EXECUTION_ERROR,
+ errorMessage + t.getMessage(), t);
+ }
+ }
+
+ public static void handleExceptionInAfterExecute(final DataBuilderExecutionListener listener,
+ final Logger logger,
+ final Throwable t) throws DataBuilderFrameworkException {
+ final String errorMessage = "Error running post-execution execution listener: ";
+ logger.error(errorMessage, t);
+ if (listener.shouldThrowExceptionInAfterExecute()) {
+ throw new DataBuilderFrameworkException(DataBuilderFrameworkException.ErrorCode.BUILDER_POST_EXECUTION_ERROR,
+ errorMessage + t.getMessage(), t);
+ }
+ }
+}
diff --git a/src/test/java/com/flipkart/databuilderframework/MultiThreadedDataFlowExecutorTest.java b/src/test/java/com/flipkart/databuilderframework/MultiThreadedDataFlowExecutorTest.java
index a029aec..92e414e 100644
--- a/src/test/java/com/flipkart/databuilderframework/MultiThreadedDataFlowExecutorTest.java
+++ b/src/test/java/com/flipkart/databuilderframework/MultiThreadedDataFlowExecutorTest.java
@@ -173,6 +173,99 @@ public void postProcessing(DataFlowInstance dataFlowInstance,
}
}
+ private static class TestListenerBeforeExecutionErrorWithExceptionThrown implements DataBuilderExecutionListener {
+
+ @Override
+ public void preProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta) {
+ log.info("Being called for: " + dataFlowInstance.getId());
+ }
+
+ @Override
+ public void beforeExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses) throws Exception {
+ throw new Exception("Blah blah");
+ }
+
+ @Override
+ public void afterExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses, Data currentResponse) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void afterException(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta,
+ Map prevResponses, Throwable frameworkException) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void postProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta, DataExecutionResponse response,
+ Throwable frameworkException) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+ @Override
+ public boolean shouldThrowExceptionInBeforeExecute() {
+ return true;
+ }
+ }
+
+ private static class TestListenerAfterExecutionErrorWithExceptionThrown implements DataBuilderExecutionListener {
+
+ @Override
+ public void preProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+
+ @Override
+ public void beforeExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void afterExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses, Data currentResponse) throws Exception {
+ throw new Exception("Blah blah");
+ }
+
+ @Override
+ public void afterException(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta,
+ Map prevResponses, Throwable frameworkException) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void postProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta, DataExecutionResponse response,
+ Throwable frameworkException) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+ @Override
+ public boolean shouldThrowExceptionInAfterExecute() {
+ return true;
+ }
+ }
+
private DataBuilderMetadataManager dataBuilderMetadataManager = new DataBuilderMetadataManager();
private DataFlowExecutor executor = new MultiThreadedDataFlowExecutor(
new InstantiatingDataBuilderFactory(dataBuilderMetadataManager),
@@ -196,30 +289,22 @@ public void setup() throws Exception {
.withAnnotatedDataBuilder(TestBuilderError.class)
.withTargetData("Y")
.build();
- executor.registerExecutionListener(new TestListener());
- executor.registerExecutionListener(new TestListenerBeforeExecutionError());
- executor.registerExecutionListener(new TestListenerAfterExecutionError());
- executor.registerExecutionListener(new TestListenerAfterExceptionError());
dataFlowValidationError = new DataFlowBuilder()
.withAnnotatedDataBuilder(TestBuilderDataValidationError.class)
.withTargetData("Y")
.build();
- executor.registerExecutionListener(new TestListener());
- executor.registerExecutionListener(new TestListenerBeforeExecutionError());
- executor.registerExecutionListener(new TestListenerAfterExecutionError());
- executor.registerExecutionListener(new TestListenerAfterExceptionError());
dataFlowValidationErrorWithPartialData = new DataFlowBuilder()
.withAnnotatedDataBuilder(TestBuilderA.class)
.withAnnotatedDataBuilder(TestBuilderDataValidationError.class)
.withTargetData("Y")
.build();
+
executor.registerExecutionListener(new TestListener());
executor.registerExecutionListener(new TestListenerBeforeExecutionError());
executor.registerExecutionListener(new TestListenerAfterExecutionError());
executor.registerExecutionListener(new TestListenerAfterExceptionError());
-
}
@Test
@@ -351,4 +436,40 @@ public void testRunValidationErrorWithPartialData() throws Exception {
fail("Should have thrown exception");
}
}
+
+ @Test
+ public void testRunSingleStepWithExceptionThrownInBeforeExecuteInExecutionListener() throws Exception {
+ DataFlowInstance dataFlowInstance = new DataFlowInstance();
+ dataFlowInstance.setId("testflow");
+ dataFlowInstance.setDataFlow(dataFlow);
+ executor.registerExecutionListener(new TestListenerBeforeExecutionErrorWithExceptionThrown());
+
+ DataDelta dataDelta = new DataDelta(Lists.newArrayList(
+ new TestDataA("Hello"), new TestDataB("World"),
+ new TestDataD("this"), new TestDataG("Hmmm")));
+ try {
+ executor.run(dataFlowInstance, dataDelta);
+ fail("It should not come here.");
+ } catch (DataBuilderFrameworkException exception) {
+ Assert.assertEquals(DataBuilderFrameworkException.ErrorCode.BUILDER_EXECUTION_ERROR, exception.getErrorCode());
+ }
+ }
+
+ @Test
+ public void testRunSingleStepWithExceptionThrownInAfterExecuteInExecutionListener() throws Exception {
+ DataFlowInstance dataFlowInstance = new DataFlowInstance();
+ dataFlowInstance.setId("testflow");
+ dataFlowInstance.setDataFlow(dataFlow);
+ executor.registerExecutionListener(new TestListenerAfterExecutionErrorWithExceptionThrown());
+
+ DataDelta dataDelta = new DataDelta(Lists.newArrayList(
+ new TestDataA("Hello"), new TestDataB("World"),
+ new TestDataD("this"), new TestDataG("Hmmm")));
+ try {
+ executor.run(dataFlowInstance, dataDelta);
+ fail("It should not come here.");
+ } catch (DataBuilderFrameworkException exception) {
+ Assert.assertEquals(DataBuilderFrameworkException.ErrorCode.BUILDER_EXECUTION_ERROR, exception.getErrorCode());
+ }
+ }
}
diff --git a/src/test/java/com/flipkart/databuilderframework/SimpleDataFlowExecutorTest.java b/src/test/java/com/flipkart/databuilderframework/SimpleDataFlowExecutorTest.java
new file mode 100644
index 0000000..0528361
--- /dev/null
+++ b/src/test/java/com/flipkart/databuilderframework/SimpleDataFlowExecutorTest.java
@@ -0,0 +1,170 @@
+package com.flipkart.databuilderframework;
+
+import com.flipkart.databuilderframework.engine.DataBuilderContext;
+import com.flipkart.databuilderframework.engine.DataBuilderExecutionListener;
+import com.flipkart.databuilderframework.engine.DataBuilderFrameworkException;
+import com.flipkart.databuilderframework.engine.DataFlowBuilder;
+import com.flipkart.databuilderframework.engine.DataFlowExecutor;
+import com.flipkart.databuilderframework.engine.SimpleDataFlowExecutor;
+import com.flipkart.databuilderframework.engine.impl.MixedDataBuilderFactory;
+import com.flipkart.databuilderframework.model.Data;
+import com.flipkart.databuilderframework.model.DataBuilderMeta;
+import com.flipkart.databuilderframework.model.DataDelta;
+import com.flipkart.databuilderframework.model.DataExecutionResponse;
+import com.flipkart.databuilderframework.model.DataFlow;
+import com.flipkart.databuilderframework.model.DataFlowInstance;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+@Slf4j
+public class SimpleDataFlowExecutorTest {
+
+ private DataFlowExecutor executor = new SimpleDataFlowExecutor(new MixedDataBuilderFactory());
+ private DataFlow dataFlow = new DataFlow();
+
+ @Before
+ public void setup() throws Exception {
+ dataFlow = new DataFlowBuilder()
+ .withAnnotatedDataBuilder(TestBuilderA.class)
+ .withAnnotatedDataBuilder(TestBuilderB.class)
+ .withAnnotatedDataBuilder(TestBuilderC.class)
+ .withTargetData("F")
+ .build();
+ }
+
+ private static class TestListenerBeforeExecutionErrorWithExceptionThrown implements DataBuilderExecutionListener {
+
+ @Override
+ public void preProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta) {
+ log.info("Being called for: " + dataFlowInstance.getId());
+ }
+
+ @Override
+ public void beforeExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses) throws Exception {
+ throw new Exception("Blah blah");
+ }
+
+ @Override
+ public void afterExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses, Data currentResponse) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void afterException(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta,
+ Map prevResponses, Throwable frameworkException) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void postProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta, DataExecutionResponse response,
+ Throwable frameworkException) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+ @Override
+ public boolean shouldThrowExceptionInBeforeExecute() {
+ return true;
+ }
+ }
+
+ private static class TestListenerAfterExecutionErrorWithExceptionThrown implements DataBuilderExecutionListener {
+
+ @Override
+ public void preProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+
+ @Override
+ public void beforeExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void afterExecute(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta, Map prevResponses, Data currentResponse) throws Exception {
+ throw new Exception("Blah blah");
+ }
+
+ @Override
+ public void afterException(DataBuilderContext builderContext,
+ DataFlowInstance dataFlowInstance,
+ DataBuilderMeta builderToBeApplied,
+ DataDelta dataDelta,
+ Map prevResponses, Throwable frameworkException) {
+ log.info("{} called for: {}", builderToBeApplied.getName(), dataFlowInstance.getId());
+ }
+
+ @Override
+ public void postProcessing(DataFlowInstance dataFlowInstance,
+ DataDelta dataDelta, DataExecutionResponse response,
+ Throwable frameworkException) {
+ log.info("Being called for: {}", dataFlowInstance.getId());
+ }
+
+ @Override
+ public boolean shouldThrowExceptionInAfterExecute() {
+ return true;
+ }
+ }
+
+ @Test
+ public void testRunSingleStepWithExceptionThrownInBeforeExecuteInExecutionListener() throws Exception {
+ DataFlowInstance dataFlowInstance = new DataFlowInstance();
+ dataFlowInstance.setId("testflow");
+ dataFlowInstance.setDataFlow(dataFlow);
+
+ executor.registerExecutionListener(new TestListenerBeforeExecutionErrorWithExceptionThrown());
+
+ DataDelta dataDelta = new DataDelta(Lists.newArrayList(
+ new TestDataA("Hello"), new TestDataB("World"), new TestDataD("this")));
+ try {
+ executor.run(dataFlowInstance, dataDelta);
+ fail("It should not come here.");
+ } catch (DataBuilderFrameworkException exception) {
+ Assert.assertEquals(DataBuilderFrameworkException.ErrorCode.BUILDER_PRE_EXECUTION_ERROR, exception.getErrorCode());
+ }
+ }
+
+ @Test
+ public void testRunSingleStepWithExceptionThrownInAfterExecuteInExecutionListener() throws Exception {
+ DataFlowInstance dataFlowInstance = new DataFlowInstance();
+ dataFlowInstance.setId("testflow");
+ dataFlowInstance.setDataFlow(dataFlow);
+
+ executor.registerExecutionListener(new TestListenerAfterExecutionErrorWithExceptionThrown());
+
+ DataDelta dataDelta = new DataDelta(Lists.newArrayList(
+ new TestDataA("Hello"), new TestDataB("World"), new TestDataD("this")));
+ try {
+ executor.run(dataFlowInstance, dataDelta);
+ fail("It should not come here.");
+ } catch (DataBuilderFrameworkException exception) {
+ Assert.assertEquals(DataBuilderFrameworkException.ErrorCode.BUILDER_EXECUTION_ERROR, exception.getErrorCode());
+ }
+ }
+}