Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ public interface SparkPipelineOptions
String getSparkMaster();
void setSparkMaster(String master);

@Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until "
+ "execution is stopped")
@Default.Long(-1)
Long getTimeout();
void setTimeout(Long timeoutMillis);

@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(1000)
Long getBatchIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,8 @@ public EvaluationResult run(Pipeline pipeline) {

// if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(),
pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
pipeline, jssc) : contextFactory.getCtxt();
} else {
if (mOptions.getTimeout() > 0) {
LOG.info("Timeout is ignored by the SparkRunner in batch.");
}
JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
SparkPipelineTranslator translator = new TransformTranslator.Translator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;

Expand All @@ -57,7 +58,6 @@ public class EvaluationContext implements EvaluationResult {
private JavaStreamingContext jssc;
private final SparkRuntimeContext runtime;
private final Pipeline pipeline;
private long timeout;
private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
private final Set<Dataset> leaves = new LinkedHashSet<>();
Expand All @@ -76,10 +76,9 @@ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
}

public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
JavaStreamingContext jssc, long timeout) {
JavaStreamingContext jssc) {
this(jsc, pipeline);
this.jssc = jssc;
this.timeout = timeout;
this.state = State.RUNNING;
}

Expand Down Expand Up @@ -224,20 +223,15 @@ <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
return boundedDataset.getValues(pcollection);
}

@Override
public void close(boolean gracefully) {
if (isStreamingPipeline()) {
// stop streaming context
if (timeout > 0) {
jssc.awaitTerminationOrTimeout(timeout);
} else {
jssc.awaitTermination();
@Override public void close(boolean gracefully) {
// Stopping streaming job if running
if (isStreamingPipeline() && !state.isTerminal()) {
try {
cancel(gracefully);
} catch (IOException e) {
throw new RuntimeException("Failed to cancel streaming job", e);
}
// stop streaming context gracefully, so checkpointing (and other computations) get to
// finish before shutdown.
jssc.stop(false, gracefully);
}
state = State.DONE;
SparkContextFactory.stopSparkContext(jsc);
}

Expand All @@ -248,21 +242,40 @@ public State getState() {

@Override
public State cancel() throws IOException {
throw new UnsupportedOperationException(
"Spark runner EvaluationContext does not support cancel.");
return cancel(true);
}

private State cancel(boolean gracefully) throws IOException {
if (isStreamingPipeline()) {
if (!state.isTerminal()) {
jssc.stop(false, gracefully);
state = State.CANCELLED;
}
return state;
} else {
throw new UnsupportedOperationException(
"Spark runner EvaluationContext does not support cancel.");
}
}

@Override
public State waitUntilFinish() {
return waitUntilFinish(Duration.millis(-1));
return waitUntilFinish(Duration.ZERO);
}

@Override
public State waitUntilFinish(Duration duration) {
if (isStreamingPipeline()) {
throw new UnsupportedOperationException(
"Spark runner EvaluationContext does not support waitUntilFinish for streaming "
+ "pipelines.");
if (duration.getMillis() < 1L) {
jssc.awaitTermination();
state = State.DONE;
} else {
jssc.awaitTermination(duration.getMillis());
if (jssc.getState().equals(StreamingContextState.STOPPED)) {
state = State.DONE;
}
}
return state;
} else {
// This is no-op, since Spark runner in batch is blocking.
// It needs to be updated once SparkRunner supports non-blocking execution:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public JavaStreamingContext create() {

JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
ctxt = new EvaluationContext(jsc, pipeline, jssc,
options.getTimeout());
ctxt = new EvaluationContext(jsc, pipeline, jssc);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testAssertion() throws Exception {
.apply(Window.<String>into(FixedWindows.of(windowDuration)));

try {
PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]);
PAssertStreaming.runAndAssertContents(pipeline, output, new String[0], Duration.standardSeconds(1L));
} catch (AssertionError e) {
assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(),
e.getMessage().equals(EXPECTED_ERR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testFlattenUnbounded() throws Exception {
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());

PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
}

@Test
Expand All @@ -95,7 +95,7 @@ public void testFlattenBoundedUnbounded() throws Exception {
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());

PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testEarliest2Topics() throws Exception {
.apply(ParDo.of(new FormatKVFn()))
.apply(Distinct.<String>create());

PAssertStreaming.runAndAssertContents(p, deduped, expected);
PAssertStreaming.runAndAssertContents(p, deduped, expected, Duration.standardSeconds(1L));
}

@Test
Expand All @@ -143,10 +143,6 @@ public void testLatest() throws Exception {
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
// run for more than 1 batch interval, so that reading of latest is attempted in the
// first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
// in the second interval.
options.setTimeout(Duration.standardSeconds(3).getMillis());

//------- test: read and format.
Pipeline p = Pipeline.create(options);
Expand All @@ -168,7 +164,10 @@ public void testLatest() throws Exception {
.apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
.apply(ParDo.of(new FormatKVFn()));

PAssertStreaming.runAndAssertContents(p, formatted, expected);
// run for more than 1 batch interval, so that reading of latest is attempted in the
// first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
// in the second interval.
PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3));
}

private static void produce(String topic, Map<String, String> messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private static EvaluationResult run(SparkPipelineOptions options) {

// requires a graceful stop so that checkpointing of the first run would finish successfully
// before stopping and attempting to resume.
return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED,
Duration.standardSeconds(1L));
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public void testFixedWindows() throws Exception {

// override defaults
options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
// graceful stop is on, so no worries about the timeout and window being equal
options.setTimeout(windowDuration.getMillis());

Pipeline pipeline = Pipeline.create(options);

Expand All @@ -80,6 +78,6 @@ public void testFixedWindows() throws Exception {
.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));

PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS);
PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +58,7 @@ private PAssertStreaming() {
public static <T> EvaluationResult runAndAssertContents(Pipeline p,
PCollection<T> actual,
T[] expected,
Duration timeout,
boolean stopGracefully) {
// Because PAssert does not support non-global windowing, but all our data is in one window,
// we set up the assertion directly.
Expand All @@ -68,6 +70,7 @@ public static <T> EvaluationResult runAndAssertContents(Pipeline p,

// run the pipeline.
EvaluationResult res = (EvaluationResult) p.run();
res.waitUntilFinish(timeout);
res.close(stopGracefully);
// validate assertion succeeded (at least once).
int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
Expand All @@ -86,8 +89,9 @@ public static <T> EvaluationResult runAndAssertContents(Pipeline p,
*/
public static <T> EvaluationResult runAndAssertContents(Pipeline p,
PCollection<T> actual,
T[] expected) {
return runAndAssertContents(p, actual, expected, true);
T[] expected,
Duration timeout) {
return runAndAssertContents(p, actual, expected, timeout, true);
}

private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptio
@Override
protected void before() throws Throwable {
super.before();
options.setTimeout(1000L);
options.setStreaming(true);
}

Expand Down