Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 2ee610b

Browse files
kennknowlesdavorbonaci
authored andcommitted
Update tests to use DoFnRunner.ListOutputManager
----Release Notes---- [] ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=99150521
1 parent c4cf644 commit 2ee610b

File tree

5 files changed

+162
-114
lines changed

5 files changed

+162
-114
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,15 @@ public void finishBundle() {
230230
*/
231231
public List<OutputT> peekOutputElements() {
232232
// TODO: Should we return an unmodifiable list?
233-
return Lists.transform(fnRunner.getReceiver(mainOutputTag),
234-
new Function<Object, OutputT>() {
235-
@Override
236-
@SuppressWarnings("unchecked")
237-
public OutputT apply(Object input) {
238-
return ((WindowedValue<OutputT>) input).getValue();
239-
}
240-
});
233+
return Lists.transform(
234+
outputManager.getOutput(mainOutputTag),
235+
new Function<Object, OutputT>() {
236+
@Override
237+
@SuppressWarnings("unchecked")
238+
public OutputT apply(Object input) {
239+
return ((WindowedValue<OutputT>) input).getValue();
240+
}
241+
});
241242

242243
}
243244

@@ -272,12 +273,13 @@ public List<OutputT> takeOutputElements() {
272273
*/
273274
public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
274275
// TODO: Should we return an unmodifiable list?
275-
return Lists.transform(fnRunner.getReceiver(tag),
276-
new Function<Object, T>() {
277-
@Override
278-
public T apply(Object input) {
279-
return ((WindowedValue<T>) input).getValue();
280-
}});
276+
return Lists.transform(
277+
outputManager.getOutput(tag),
278+
new Function<Object, T>() {
279+
@Override
280+
public T apply(Object input) {
281+
return ((WindowedValue<T>) input).getValue();
282+
}});
281283
}
282284

283285
/**
@@ -323,6 +325,9 @@ enum State { UNSTARTED, STARTED, FINISHED }
323325
/** The original DoFn under test, if started. */
324326
DoFn<InputT, OutputT> fn;
325327

328+
/** The ListOutputManager to examine the outputs. */
329+
DoFnRunner.ListOutputManager outputManager;
330+
326331
/** The DoFnRunner if processing is in progress. */
327332
DoFnRunner<InputT, OutputT, List<WindowedValue<?>>> fnRunner;
328333

@@ -341,6 +346,7 @@ enum State { UNSTARTED, STARTED, FINISHED }
341346

342347
void resetState() {
343348
fn = null;
349+
outputManager = null;
344350
fnRunner = null;
345351
counterSet = null;
346352
state = State.UNSTARTED;
@@ -358,11 +364,12 @@ void initializeState() {
358364
: sideInputs.entrySet()) {
359365
runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
360366
}
367+
outputManager = new DoFnRunner.ListOutputManager();
361368
fnRunner = DoFnRunner.create(
362369
options,
363370
fn,
364371
DirectSideInputReader.of(runnerSideInputs),
365-
new DoFnRunner.ListOutputManager(),
372+
outputManager,
366373
mainOutputTag,
367374
sideOutputTags,
368375
DirectModeExecutionContext.create().createStepContext("stepName", "stepName"),

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ public interface OutputManager<ReceiverT> {
6666

6767
/** Outputs a single element to the provided receiver. */
6868
public void output(ReceiverT receiver, WindowedValue<?> output);
69-
7069
}
7170

7271
/** The DoFn being run. */
@@ -197,11 +196,6 @@ public void finishBundle() {
197196
}
198197
}
199198

200-
/** Returns the receiver who gets outputs with the provided tag. */
201-
public ReceiverT getReceiver(TupleTag<?> tag) {
202-
return context.getReceiver(tag);
203-
}
204-
205199
/**
206200
* A concrete implementation of {@link DoFn.Context} used for running
207201
* a {@link DoFn}.

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFnTest.java

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -51,36 +51,39 @@
5151

5252
/** Unit tests for {@link GroupAlsoByWindowsDoFn}. */
5353
@RunWith(JUnit4.class)
54-
@SuppressWarnings({"rawtypes", "unchecked"})
5554
public class GroupAlsoByWindowsDoFnTest {
5655
ExecutionContext execContext;
5756
CounterSet counters;
58-
TupleTag<KV<String, Iterable<String>>> outputTag;
5957

6058
@Before public void setUp() {
6159
execContext = new DirectModeExecutionContext();
6260
counters = new CounterSet();
63-
outputTag = new TupleTag<>();
6461
}
6562

6663
@Test public void testEmpty() throws Exception {
64+
TupleTag<KV<String, Iterable<String>>> outputTag = new TupleTag<>();
65+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
6766
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
6867
KV<String, Iterable<String>>, List<WindowedValue<?>>> runner =
69-
makeRunner(WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
68+
makeRunner(
69+
outputTag, outputManager, WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
7070

7171
runner.startBundle();
7272

7373
runner.finishBundle();
7474

75-
List<KV<String, Iterable<String>>> result = (List) runner.getReceiver(outputTag);
75+
List<WindowedValue<KV<String, Iterable<String>>>> result = outputManager.getOutput(outputTag);
7676

7777
assertEquals(0, result.size());
7878
}
7979

8080
@Test public void testFixedWindows() throws Exception {
81+
TupleTag<KV<String, Iterable<String>>> outputTag = new TupleTag<>();
82+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
8183
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
8284
KV<String, Iterable<String>>, List<WindowedValue<?>>> runner =
83-
makeRunner(WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
85+
makeRunner(
86+
outputTag, outputManager, WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
8487

8588
runner.startBundle();
8689

@@ -104,7 +107,7 @@ public class GroupAlsoByWindowsDoFnTest {
104107

105108
runner.finishBundle();
106109

107-
List<WindowedValue<KV<String, Iterable<String>>>> result = (List) runner.getReceiver(outputTag);
110+
List<WindowedValue<KV<String, Iterable<String>>>> result = outputManager.getOutput(outputTag);
108111

109112
assertEquals(2, result.size());
110113

@@ -124,10 +127,15 @@ public class GroupAlsoByWindowsDoFnTest {
124127
}
125128

126129
@Test public void testSlidingWindows() throws Exception {
130+
TupleTag<KV<String, Iterable<String>>> outputTag = new TupleTag<>();
131+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
127132
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
128133
KV<String, Iterable<String>>, List<WindowedValue<?>>> runner =
129-
makeRunner(WindowingStrategy.of(
130-
SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))));
134+
makeRunner(
135+
outputTag,
136+
outputManager,
137+
WindowingStrategy.of(
138+
SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))));
131139

132140
runner.startBundle();
133141

@@ -146,7 +154,7 @@ public class GroupAlsoByWindowsDoFnTest {
146154

147155
runner.finishBundle();
148156

149-
List<WindowedValue<KV<String, Iterable<String>>>> result = (List) runner.getReceiver(outputTag);
157+
List<WindowedValue<KV<String, Iterable<String>>>> result = outputManager.getOutput(outputTag);
150158

151159
assertEquals(3, result.size());
152160

@@ -173,10 +181,14 @@ public class GroupAlsoByWindowsDoFnTest {
173181
}
174182

175183
@Test public void testSlidingWindowsCombine() throws Exception {
184+
TupleTag<KV<String, Long>> outputTag = new TupleTag<>();
176185
CombineFn<Long, ?, Long> combineFn = new Sum.SumLongFn();
186+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
177187
DoFnRunner<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>,
178188
List<WindowedValue<?>>> runner =
179189
makeRunner(
190+
outputTag,
191+
outputManager,
180192
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))),
181193
combineFn.<String>asKeyedFn());
182194

@@ -202,7 +214,7 @@ public class GroupAlsoByWindowsDoFnTest {
202214

203215
runner.finishBundle();
204216

205-
List<WindowedValue<KV<String, Long>>> result = (List) runner.getReceiver(outputTag);
217+
List<WindowedValue<KV<String, Long>>> result = outputManager.getOutput(outputTag);
206218

207219
assertEquals(3, result.size());
208220

@@ -219,9 +231,12 @@ public class GroupAlsoByWindowsDoFnTest {
219231
}
220232

221233
@Test public void testDiscontiguousWindows() throws Exception {
234+
TupleTag<KV<String, Iterable<String>>> outputTag = new TupleTag<>();
235+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
222236
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
223237
KV<String, Iterable<String>>, List<WindowedValue<?>>> runner =
224-
makeRunner(WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
238+
makeRunner(
239+
outputTag, outputManager, WindowingStrategy.of(FixedWindows.of(Duration.millis(10))));
225240

226241
runner.startBundle();
227242

@@ -245,7 +260,7 @@ public class GroupAlsoByWindowsDoFnTest {
245260

246261
runner.finishBundle();
247262

248-
List<WindowedValue<KV<String, Iterable<String>>>> result = (List) runner.getReceiver(outputTag);
263+
List<WindowedValue<KV<String, Iterable<String>>>> result = outputManager.getOutput(outputTag);
249264

250265
assertEquals(2, result.size());
251266

@@ -265,9 +280,12 @@ public class GroupAlsoByWindowsDoFnTest {
265280
}
266281

267282
@Test public void testSessions() throws Exception {
283+
TupleTag<KV<String, Iterable<String>>> outputTag = new TupleTag<>();
284+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
268285
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
269286
KV<String, Iterable<String>>, List<WindowedValue<?>>> runner =
270-
makeRunner(WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))));
287+
makeRunner(outputTag, outputManager,
288+
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))));
271289

272290
runner.startBundle();
273291

@@ -291,7 +309,7 @@ public class GroupAlsoByWindowsDoFnTest {
291309

292310
runner.finishBundle();
293311

294-
List<WindowedValue<KV<String, Iterable<String>>>> result = (List) runner.getReceiver(outputTag);
312+
List<WindowedValue<KV<String, Iterable<String>>>> result = outputManager.getOutput(outputTag);
295313

296314
assertEquals(2, result.size());
297315

@@ -311,11 +329,16 @@ public class GroupAlsoByWindowsDoFnTest {
311329
}
312330

313331
@Test public void testSessionsCombine() throws Exception {
332+
TupleTag<KV<String, Long>> outputTag = new TupleTag<>();
314333
CombineFn<Long, ?, Long> combineFn = new Sum.SumLongFn();
334+
DoFnRunner.ListOutputManager outputManager = new DoFnRunner.ListOutputManager();
315335
DoFnRunner<KV<String, Iterable<WindowedValue<Long>>>,
316336
KV<String, Long>, List<WindowedValue<?>>> runner =
317-
makeRunner(WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))),
318-
combineFn.<String>asKeyedFn());
337+
makeRunner(
338+
outputTag,
339+
outputManager,
340+
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))),
341+
combineFn.<String>asKeyedFn());
319342
runner.startBundle();
320343

321344
runner.processElement(WindowedValue.valueInEmptyWindows(
@@ -338,7 +361,7 @@ public class GroupAlsoByWindowsDoFnTest {
338361

339362
runner.finishBundle();
340363

341-
List<WindowedValue<KV<String, Long>>> result = (List) runner.getReceiver(outputTag);
364+
List<WindowedValue<KV<String, Long>>> result = outputManager.getOutput(outputTag);
342365

343366
assertThat(result, Matchers.contains(
344367
WindowMatchers.isSingleWindowedValue(
@@ -349,40 +372,53 @@ public class GroupAlsoByWindowsDoFnTest {
349372
15, 15, 25)));
350373
}
351374

352-
private DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
353-
KV<String, Iterable<String>>, List<WindowedValue<?>>> makeRunner(
375+
private <ReceiverT>
376+
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
377+
KV<String, Iterable<String>>, ReceiverT>
378+
makeRunner(
379+
TupleTag<KV<String, Iterable<String>>> outputTag,
380+
DoFnRunner.OutputManager<ReceiverT> outputManager,
354381
WindowingStrategy<? super String, IntervalWindow> windowingStrategy) {
382+
355383
GroupAlsoByWindowsDoFn<String, String, Iterable<String>, IntervalWindow> fn =
356384
GroupAlsoByWindowsDoFn.createForIterable(windowingStrategy, StringUtf8Coder.of());
357-
return makeRunner(windowingStrategy, fn);
385+
386+
return makeRunner(outputTag, outputManager, windowingStrategy, fn);
358387
}
359388

360-
private DoFnRunner<KV<String, Iterable<WindowedValue<Long>>>,
361-
KV<String, Long>, List<WindowedValue<?>>> makeRunner(
362-
WindowingStrategy<? super String, IntervalWindow> windowingStrategy,
363-
KeyedCombineFn<String, Long, ?, Long> combineFn) {
389+
private <ReceiverT>
390+
DoFnRunner<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>, ReceiverT>
391+
makeRunner(
392+
TupleTag<KV<String, Long>> outputTag,
393+
DoFnRunner.OutputManager<ReceiverT> outputManager,
394+
WindowingStrategy<? super String, IntervalWindow> windowingStrategy,
395+
KeyedCombineFn<String, Long, ?, Long> combineFn) {
396+
364397
GroupAlsoByWindowsDoFn<String, Long, Long, IntervalWindow> fn =
365-
GroupAlsoByWindowsDoFn.create(windowingStrategy, combineFn,
366-
StringUtf8Coder.of(), BigEndianLongCoder.of());
398+
GroupAlsoByWindowsDoFn.create(
399+
windowingStrategy, combineFn, StringUtf8Coder.of(), BigEndianLongCoder.of());
367400

368-
return makeRunner(windowingStrategy, fn);
401+
return makeRunner(outputTag, outputManager, windowingStrategy, fn);
369402
}
370403

371-
private <InputT, OutputT> DoFnRunner<KV<String, Iterable<WindowedValue<InputT>>>,
372-
KV<String, OutputT>, List<WindowedValue<?>>> makeRunner(
404+
private <InputT, OutputT, ReceiverT>
405+
DoFnRunner<KV<String, Iterable<WindowedValue<InputT>>>, KV<String, OutputT>, ReceiverT>
406+
makeRunner(
407+
TupleTag<KV<String, OutputT>> outputTag,
408+
DoFnRunner.OutputManager<ReceiverT> outputManager,
373409
WindowingStrategy<? super String, IntervalWindow> windowingStrategy,
374410
GroupAlsoByWindowsDoFn<String, InputT, OutputT, IntervalWindow> fn) {
375-
return
376-
DoFnRunner.create(
377-
PipelineOptionsFactory.create(),
378-
fn,
379-
NullSideInputReader.empty(),
380-
new DoFnRunner.ListOutputManager(),
381-
(TupleTag<KV<String, OutputT>>) (TupleTag) outputTag,
382-
new ArrayList<TupleTag<?>>(),
383-
execContext.getStepContext("merge", "merge"),
384-
counters.getAddCounterMutator(),
385-
windowingStrategy);
411+
412+
return DoFnRunner.create(
413+
PipelineOptionsFactory.create(),
414+
fn,
415+
NullSideInputReader.empty(),
416+
outputManager,
417+
outputTag,
418+
new ArrayList<TupleTag<?>>(),
419+
execContext.getStepContext("merge", "merge"),
420+
counters.getAddCounterMutator(),
421+
windowingStrategy);
386422
}
387423

388424
private BoundedWindow window(long start, long end) {

0 commit comments

Comments
 (0)