diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index b1ebde9ab1fe..0fd790eba31b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -39,12 +39,6 @@ public interface SparkPipelineOptions String getSparkMaster(); void setSparkMaster(String master); - @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until " - + "execution is stopped") - @Default.Long(-1) - Long getTimeout(); - void setTimeout(Long timeoutMillis); - @Description("Batch interval for Spark streaming in milliseconds.") @Default.Long(1000) Long getBatchIntervalMillis(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 6bbef39e63c0..e800071edf12 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -137,11 +137,8 @@ public EvaluationResult run(Pipeline pipeline) { // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), - pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt(); + pipeline, jssc) : contextFactory.getCtxt(); } else { - if (mOptions.getTimeout() > 0) { - LOG.info("Timeout is ignored by the SparkRunner in batch."); - } JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); SparkPipelineTranslator translator = new TransformTranslator.Translator(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index aaf757318fff..4cc68fab92bf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; @@ -57,7 +58,6 @@ public class EvaluationContext implements EvaluationResult { private JavaStreamingContext jssc; private final SparkRuntimeContext runtime; private final Pipeline pipeline; - private long timeout; private final Map datasets = new LinkedHashMap<>(); private final Map pcollections = new LinkedHashMap<>(); private final Set leaves = new LinkedHashSet<>(); @@ -76,10 +76,9 @@ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { } public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - JavaStreamingContext jssc, long timeout) { + JavaStreamingContext jssc) { this(jsc, pipeline); this.jssc = jssc; - this.timeout = timeout; this.state = State.RUNNING; } @@ -224,20 +223,15 @@ Iterable> getWindowedValues(PCollection pcollection) { return boundedDataset.getValues(pcollection); } - @Override - public void close(boolean gracefully) { - if (isStreamingPipeline()) { - // stop streaming context - if (timeout > 0) { - jssc.awaitTerminationOrTimeout(timeout); - } else { - jssc.awaitTermination(); + @Override public void close(boolean gracefully) { + // Stopping streaming job if running + if (isStreamingPipeline() && !state.isTerminal()) { + try { + cancel(gracefully); + } catch (IOException e) { + throw new RuntimeException("Failed to cancel streaming job", e); } - // stop streaming context gracefully, so checkpointing (and other computations) get to - // finish before shutdown. - jssc.stop(false, gracefully); } - state = State.DONE; SparkContextFactory.stopSparkContext(jsc); } @@ -248,21 +242,40 @@ public State getState() { @Override public State cancel() throws IOException { - throw new UnsupportedOperationException( - "Spark runner EvaluationContext does not support cancel."); + return cancel(true); + } + + private State cancel(boolean gracefully) throws IOException { + if (isStreamingPipeline()) { + if (!state.isTerminal()) { + jssc.stop(false, gracefully); + state = State.CANCELLED; + } + return state; + } else { + throw new UnsupportedOperationException( + "Spark runner EvaluationContext does not support cancel."); + } } @Override public State waitUntilFinish() { - return waitUntilFinish(Duration.millis(-1)); + return waitUntilFinish(Duration.ZERO); } @Override public State waitUntilFinish(Duration duration) { if (isStreamingPipeline()) { - throw new UnsupportedOperationException( - "Spark runner EvaluationContext does not support waitUntilFinish for streaming " - + "pipelines."); + if (duration.getMillis() < 1L) { + jssc.awaitTermination(); + state = State.DONE; + } else { + jssc.awaitTermination(duration.getMillis()); + if (jssc.getState().equals(StreamingContextState.STOPPED)) { + state = State.DONE; + } + } + return state; } else { // This is no-op, since Spark runner in batch is blocking. // It needs to be updated once SparkRunner supports non-blocking execution: diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 01398e494cc1..af90ff169bb6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -73,8 +73,7 @@ public JavaStreamingContext create() { JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - ctxt = new EvaluationContext(jsc, pipeline, jssc, - options.getTimeout()); + ctxt = new EvaluationContext(jsc, pipeline, jssc); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index ec75eb7727cf..1619ade0fe0d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -68,7 +68,7 @@ public void testAssertion() throws Exception { .apply(Window.into(FixedWindows.of(windowDuration))); try { - PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]); + PAssertStreaming.runAndAssertContents(pipeline, output, new String[0], Duration.standardSeconds(1L)); } catch (AssertionError e) { assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(), e.getMessage().equals(EXPECTED_ERR)); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index f69bd7f05026..3e75b1834cc1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -76,7 +76,7 @@ public void testFlattenUnbounded() throws Exception { PCollectionList list = PCollectionList.of(windowedW1).and(windowedW2); PCollection union = list.apply(Flatten.pCollections()); - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION); + PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); } @Test @@ -95,7 +95,7 @@ public void testFlattenBoundedUnbounded() throws Exception { PCollectionList list = PCollectionList.of(windowedW1).and(windowedW2); PCollection union = list.apply(Flatten.pCollections()); - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION); + PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 6b2486b40af9..d55ed39dbdd7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -118,7 +118,7 @@ public void testEarliest2Topics() throws Exception { .apply(ParDo.of(new FormatKVFn())) .apply(Distinct.create()); - PAssertStreaming.runAndAssertContents(p, deduped, expected); + PAssertStreaming.runAndAssertContents(p, deduped, expected, Duration.standardSeconds(1L)); } @Test @@ -143,10 +143,6 @@ public void testLatest() throws Exception { // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec, // so to be on the safe side we'll set to 750 msec. options.setMinReadTimeMillis(750L); - // run for more than 1 batch interval, so that reading of latest is attempted in the - // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read - // in the second interval. - options.setTimeout(Duration.standardSeconds(3).getMillis()); //------- test: read and format. Pipeline p = Pipeline.create(options); @@ -168,7 +164,10 @@ public void testLatest() throws Exception { .apply(Window.>into(FixedWindows.of(batchAndWindowDuration))) .apply(ParDo.of(new FormatKVFn())); - PAssertStreaming.runAndAssertContents(p, formatted, expected); + // run for more than 1 batch interval, so that reading of latest is attempted in the + // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read + // in the second interval. + PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3)); } private static void produce(String topic, Map messages) { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index af93d8484e87..b57787f76ba1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -163,7 +163,8 @@ private static EvaluationResult run(SparkPipelineOptions options) { // requires a graceful stop so that checkpointing of the first run would finish successfully // before stopping and attempting to resume. - return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED); + return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, + Duration.standardSeconds(1L)); } @AfterClass diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 4c503c48490b..9a15ff293446 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -67,8 +67,6 @@ public void testFixedWindows() throws Exception { // override defaults options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); - // graceful stop is on, so no worries about the timeout and window being equal - options.setTimeout(windowDuration.getMillis()); Pipeline pipeline = Pipeline.create(options); @@ -80,6 +78,6 @@ public void testFixedWindows() throws Exception { .apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); - PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS); + PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 496735d1920a..23aca4311392 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ private PAssertStreaming() { public static EvaluationResult runAndAssertContents(Pipeline p, PCollection actual, T[] expected, + Duration timeout, boolean stopGracefully) { // Because PAssert does not support non-global windowing, but all our data is in one window, // we set up the assertion directly. @@ -68,6 +70,7 @@ public static EvaluationResult runAndAssertContents(Pipeline p, // run the pipeline. EvaluationResult res = (EvaluationResult) p.run(); + res.waitUntilFinish(timeout); res.close(stopGracefully); // validate assertion succeeded (at least once). int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); @@ -86,8 +89,9 @@ public static EvaluationResult runAndAssertContents(Pipeline p, */ public static EvaluationResult runAndAssertContents(Pipeline p, PCollection actual, - T[] expected) { - return runAndAssertContents(p, actual, expected, true); + T[] expected, + Duration timeout) { + return runAndAssertContents(p, actual, expected, timeout, true); } private static class AssertDoFn extends OldDoFn, Void> { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java index 1c0b68a684dc..f74c74ab47e9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java @@ -31,7 +31,6 @@ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptio @Override protected void before() throws Throwable { super.before(); - options.setTimeout(1000L); options.setStreaming(true); }