@@ -79,6 +79,11 @@ public class DirectPipelineRunner
7979 extends PipelineRunner <DirectPipelineRunner .EvaluationResults > {
8080 private static final Logger LOG = LoggerFactory .getLogger (DirectPipelineRunner .class );
8181
82+ /**
83+ * A source of random data, which can be seeded if determinism is desired.
84+ */
85+ private Random rand ;
86+
8287 /**
8388 * A map from PTransform class to the corresponding
8489 * TransformEvaluator to use to evaluate that transform.
@@ -225,7 +230,7 @@ private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombin
225230 PCollection <KV <K , Iterable <InputT >>> input ) {
226231
227232 PCollection <KV <K , OutputT >> output = input
228- .apply (ParDo .of (TestCombineDoFn .create (transform , input , testSerializability )));
233+ .apply (ParDo .of (TestCombineDoFn .create (transform , input , testSerializability , rand )));
229234
230235 try {
231236 output .setCoder (transform .getDefaultOutputCoder (input ));
@@ -236,30 +241,27 @@ private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombin
236241 }
237242
238243 /**
239- * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE
240- * and EXTRACT phases (see {@code com.google.cloud.dataflow.sdk.runners.worker.CombineValuesFn}).
241- * In order to emulate
242- * this for the {@link DirectPipelineRunner} and provide an experience
243- * closer to the service, go through heavy seralizability checks for
244- * the equivalent of the results of the ADD phase, but after the
245- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}
246- * shuffle, and the MERGE phase. Doing these checks
247- * ensure that not only is the accumulator coder serializable, but
248- * the accumulator coder can actually serialize the data in
249- * question.
244+ * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases (
245+ * see {@code com.google.cloud.dataflow.sdk.runners.worker.CombineValuesFn}). In order to emulate
246+ * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go
247+ * through heavy serializability checks for the equivalent of the results of the ADD phase, but
248+ * after the {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} shuffle, and the MERGE
249+ * phase. Doing these checks ensure that not only is the accumulator coder serializable, but
250+ * the accumulator coder can actually serialize the data in question.
250251 */
251- // @VisibleForTesting
252- @ SuppressWarnings ("serial" )
253252 public static class TestCombineDoFn <K , InputT , AccumT , OutputT >
254253 extends DoFn <KV <K , Iterable <InputT >>, KV <K , OutputT >> {
254+ private static final long serialVersionUID = 0L ;
255255 private final KeyedCombineFn <? super K , ? super InputT , AccumT , OutputT > fn ;
256256 private final Coder <AccumT > accumCoder ;
257257 private final boolean testSerializability ;
258+ private final Random rand ;
258259
259260 public static <K , InputT , AccumT , OutputT > TestCombineDoFn <K , InputT , AccumT , OutputT > create (
260261 Combine .GroupedValues <K , InputT , OutputT > transform ,
261262 PCollection <KV <K , Iterable <InputT >>> input ,
262- boolean testSerializability ) {
263+ boolean testSerializability ,
264+ Random rand ) {
263265
264266 Coder <AccumT > accumCoder ;
265267 try {
@@ -273,16 +275,19 @@ public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, Ou
273275 return new TestCombineDoFn (
274276 transform .getFn (),
275277 accumCoder ,
276- testSerializability );
278+ testSerializability ,
279+ rand );
277280 }
278281
279282 public TestCombineDoFn (
280283 KeyedCombineFn <? super K , ? super InputT , AccumT , OutputT > fn ,
281284 Coder <AccumT > accumCoder ,
282- boolean testSerializability ) {
285+ boolean testSerializability ,
286+ Random rand ) {
283287 this .fn = fn ;
284288 this .accumCoder = accumCoder ;
285289 this .testSerializability = testSerializability ;
290+ this .rand = rand ;
286291 }
287292
288293 @ Override
@@ -291,7 +296,7 @@ public void processElement(ProcessContext c) throws Exception {
291296 Iterable <InputT > values = c .element ().getValue ();
292297 List <AccumT > groupedPostShuffle =
293298 ensureSerializableByCoder (ListCoder .of (accumCoder ),
294- addInputsRandomly (fn , key , values , new Random () ),
299+ addInputsRandomly (fn , key , values , rand ),
295300 "After addInputs of KeyedCombineFn " + fn .toString ());
296301 AccumT merged =
297302 ensureSerializableByCoder (accumCoder ,
@@ -302,8 +307,11 @@ public void processElement(ProcessContext c) throws Exception {
302307 c .output (KV .of (key , fn .extractOutput (key , merged )));
303308 }
304309
305- // Create a random list of accumulators from the given list of values
306- // @VisibleForTesting
310+ /**
311+ * Create a random list of accumulators from the given list of values.
312+ *
313+ * <p>Visible for testing purposes only.
314+ */
307315 public static <K , AccumT , InputT > List <AccumT > addInputsRandomly (
308316 KeyedCombineFn <? super K , ? super InputT , AccumT , ?> fn ,
309317 K key ,
@@ -352,7 +360,7 @@ public <T> T ensureSerializableByCoder(
352360 public EvaluationResults run (Pipeline pipeline ) {
353361 LOG .info ("Executing pipeline using the DirectPipelineRunner." );
354362
355- Evaluator evaluator = new Evaluator ();
363+ Evaluator evaluator = new Evaluator (rand );
356364 evaluator .run (pipeline );
357365
358366 // Log all counter values for debugging purposes.
@@ -630,12 +638,15 @@ class Evaluator implements PipelineVisitor, EvaluationContext {
630638 */
631639 private final Map <PTransform <?, ?>, String > fullNames = new HashMap <>();
632640
633- // Use a random number generator with a fixed seed, so execution
634- // using this evaluator is deterministic. (If the user-defined
635- // functions, transforms, and coders are deterministic.)
636- Random rand = new Random (0 );
641+ private Random rand ;
637642
638- public Evaluator () {}
643+ public Evaluator () {
644+ this (new Random ());
645+ }
646+
647+ public Evaluator (Random rand ) {
648+ this .rand = rand ;
649+ }
639650
640651 public void run (Pipeline pipeline ) {
641652 pipeline .traverseTopologically (this );
@@ -953,6 +964,15 @@ private DirectPipelineRunner(DirectPipelineOptions options) {
953964 this .options = options ;
954965 // (Re-)register standard IO factories. Clobbers any prior credentials.
955966 IOChannelUtils .registerStandardIOFactories (options );
967+ long randomSeed ;
968+ if (options .getDirectPipelineRunnerRandomSeed () != null ) {
969+ randomSeed = options .getDirectPipelineRunnerRandomSeed ();
970+ } else {
971+ randomSeed = new Random ().nextLong ();
972+ }
973+
974+ LOG .info ("DirectPipelineRunner using random seed {}." , randomSeed );
975+ rand = new Random (randomSeed );
956976 }
957977
958978 public DirectPipelineOptions getPipelineOptions () {
0 commit comments