Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 67e99cc

Browse files
dpmillsdavorbonaci
authored andcommitted
Makes side inputs per window.
This is a backwards-incompatible change. - sideInput() can no longer be called from startBundle/finishBundle. - Calls to sideInput() now return values only in a specific window corresponding to the window of the main input element, not the whole side input PCollectionView ----Release Notes---- - sideInput() can no longer be called from startBundle/finishBundle. - Calls to sideInput() now return values only in a specific window corresponding to the window of the main input element, not the whole side input PCollectionView. For PCollections and side inputs that are both windowed by GlobalWindows, this is identical to the old behavior [] ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=89356757
1 parent da8dfda commit 67e99cc

File tree

15 files changed

+573
-82
lines changed

15 files changed

+573
-82
lines changed

examples/src/main/java/com/google/cloud/dataflow/examples/TopWikipediaSessions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> s
126126
public int compare(KV<String, Long> o1, KV<String, Long> o2) {
127127
return Long.compare(o1.getValue(), o2.getValue());
128128
}
129-
}));
129+
}).withoutDefaults());
130130
}
131131
}
132132

sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.cloud.dataflow.sdk.transforms.ParDo;
2626
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
2727
import com.google.cloud.dataflow.sdk.transforms.View;
28+
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
29+
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
2830
import com.google.cloud.dataflow.sdk.values.PCollection;
2931
import com.google.cloud.dataflow.sdk.values.PCollectionView;
3032
import com.google.common.base.Optional;
@@ -80,7 +82,8 @@ private DataflowAssert() {}
8082
* {@link PCollection PCollection&lt;T&gt;}.
8183
*/
8284
public static <T> IterableAssert<T> that(PCollection<T> actual) {
83-
return new IterableAssert<>(actual.apply(View.<T>asIterable()))
85+
return
86+
new IterableAssert<>(inGlobalWindows(actual).apply(View.<T>asIterable()))
8487
.setCoder(actual.getCoder());
8588
}
8689

@@ -101,7 +104,7 @@ public static <T> IterableAssert<T> thatSingletonIterable(PCollection<Iterable<T
101104
+ " single Coder<T> to apply to the elements.");
102105
}
103106

104-
return new IterableAssert<>(actual.apply(View.<Iterable<T>>asSingleton()))
107+
return new IterableAssert<>(inGlobalWindows(actual).apply(View.<Iterable<T>>asSingleton()))
105108
.setCoder(tCoder);
106109
}
107110

@@ -118,7 +121,7 @@ public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> ac
118121
* {@code PCollection PCollection<T>}, which must be a singleton.
119122
*/
120123
public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
121-
return new SingletonAssert<>(actual.apply(View.<T>asSingleton()))
124+
return new SingletonAssert<>(inGlobalWindows(actual).apply(View.<T>asSingleton()))
122125
.setCoder(actual.getCoder());
123126
}
124127

@@ -344,6 +347,16 @@ public SingletonAssert<T> is(T expectedValue) {
344347

345348
////////////////////////////////////////////////////////////////////////
346349

350+
/**
351+
* Returns a new PCollection equivalent to the input, but with all elements
352+
* in the GlobalWindow. Preserves ordering if the input is ordered.
353+
*/
354+
private static <T> PCollection<T> inGlobalWindows(PCollection<T> input) {
355+
return input
356+
.apply(Window.<T>into(new GlobalWindows()))
357+
.setOrdered(input.isOrdered());
358+
}
359+
347360
/**
348361
* An assertion checker that takes a single {@link PCollectionView PCollectionView&lt;A&gt;}
349362
* and an assertion over {@code A}, and checks it within a dataflow pipeline.

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java

Lines changed: 144 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,15 @@ public class Combine {
5959
/**
6060
* Returns a {@link Globally Combine.Globally} {@code PTransform}
6161
* that uses the given {@code SerializableFunction} to combine all
62-
* the elements of the input {@code PCollection} into a singleton
63-
* {@code PCollection} value. The types of the input elements and the
64-
* output value must be the same.
62+
* the elements in each window of the input {@code PCollection} into a
63+
* single value in the output {@code PCollection}. The types of the input
64+
* elements and the output elements must be the same.
6565
*
66-
* <p>If the input {@code PCollection} is empty, the ouput will contain a the
67-
* default value of the combining function if the input is windowed into
68-
* the {@link GlobalWindows}; otherwise, the output will be empty. Note: this
69-
* behavior is subject to change.
66+
* <p> If the input {@code PCollection} is windowed into {@link GlobalWindows},
67+
* a default value in the {@link GlobalWindow} will be output if the input
68+
* {@code PCollection} is empty. To use this with inputs with other windowing,
69+
* either {@link Globally#withoutDefaults} or {@link Globally#asSingletonView}
70+
* must be called.
7071
*
7172
* <p> See {@link Globally Combine.Globally} for more information.
7273
*/
@@ -77,21 +78,22 @@ public static <V> Globally<V, V> globally(
7778

7879
/**
7980
* Returns a {@link Globally Combine.Globally} {@code PTransform}
80-
* that uses the given {@code CombineFn} to combine all the elements
81-
* of the input {@code PCollection} into a singleton {@code PCollection}
82-
* value. The types of the input elements and the output value can
83-
* differ.
81+
* that uses the given {@code SerializableFunction} to combine all
82+
* the elements in each window of the input {@code PCollection} into a
83+
* single value in the output {@code PCollection}. The types of the input
84+
* elements and the output elements can differ
8485
*
85-
* If the input {@code PCollection} is empty, the ouput will contain a the
86-
* default value of the combining function if the input is windowed into
87-
* the {@link GlobalWindows}; otherwise, the output will be empty. Note: this
88-
* behavior is subject to change.
86+
* <p> If the input {@code PCollection} is windowed into {@link GlobalWindows},
87+
* a default value in the {@link GlobalWindow} will be output if the input
88+
* {@code PCollection} is empty. To use this with inputs with other windowing,
89+
* either {@link Globally#withoutDefaults} or {@link Globally#asSingletonView}
90+
* must be called.
8991
*
9092
* <p> See {@link Globally Combine.Globally} for more information.
9193
*/
9294
public static <VI, VO> Globally<VI, VO> globally(
9395
CombineFn<? super VI, ?, VO> fn) {
94-
return new Globally<>(fn);
96+
return new Globally<>(fn, true);
9597
}
9698

9799
/**
@@ -1053,10 +1055,9 @@ public Coder<VO> getDefaultOutputCoder(
10531055

10541056
/**
10551057
* {@code Combine.Globally<VI, VO>} takes a {@code PCollection<VI>}
1056-
* and returns a {@code PCollection<VO>} whose single element is the result of
1057-
* combining all the elements of the input {@code PCollection},
1058-
* using a specified}
1059-
* {@link CombineFn CombineFn&lt;VI, VA, VO&gt;}. It is common
1058+
* and returns a {@code PCollection<VO>} whose elements are the result of
1059+
* combining all the elements in each window of the input {@code PCollection},
1060+
* using a specified {@link CombineFn CombineFn<VI, VA, VO>}. It is common
10601061
* for {@code VI == VO}, but not required. Common combining
10611062
* functions include sums, mins, maxes, and averages of numbers,
10621063
* conjunctions and disjunctions of booleans, statistical
@@ -1074,6 +1075,11 @@ public Coder<VO> getDefaultOutputCoder(
10741075
* intermediate results combined further, in an arbitrary tree
10751076
* reduction pattern, until a single result value is produced.
10761077
*
1078+
* <p> If the input {@code PCollection} is windowed into {@link GlobalWindows},
1079+
* a default value in the {@link GlobalWindow} will be output if the input
1080+
* {@code PCollection} is empty. To use this with inputs with other windowing,
1081+
* either {@link #withoutDefaults} or {@link #asSingletonView} must be called.
1082+
*
10771083
* <p> By default, the {@code Coder} of the output {@code PValue<VO>}
10781084
* is inferred from the concrete type of the
10791085
* {@code CombineFn<VI, VA, VO>}'s output type {@code VO}.
@@ -1090,9 +1096,36 @@ public static class Globally<VI, VO>
10901096
extends PTransform<PCollection<VI>, PCollection<VO>> {
10911097

10921098
private final CombineFn<? super VI, ?, VO> fn;
1099+
private final boolean insertDefault;
10931100

1094-
private Globally(CombineFn<? super VI, ?, VO> fn) {
1101+
private Globally(CombineFn<? super VI, ?, VO> fn, boolean insertDefault) {
10951102
this.fn = fn;
1103+
this.insertDefault = insertDefault;
1104+
}
1105+
1106+
@Override
1107+
@SuppressWarnings("unchecked")
1108+
public Globally<VI, VO> withName(String name) {
1109+
return (Globally<VI, VO>) super.withName(name);
1110+
}
1111+
1112+
/**
1113+
* Returns a {@link PTransform} that produces a {@code PCollectionView}
1114+
* whose elements are the result of combining elements per-window in
1115+
* the input {@code PCollection}. If a value is requested from the view
1116+
* for a window that is not present, the result of calling the {@code CombineFn}
1117+
* on empty input will returned.
1118+
*/
1119+
public GloballyAsSingletonView<VI, VO> asSingletonView() {
1120+
return new GloballyAsSingletonView<>(fn, insertDefault);
1121+
}
1122+
1123+
/**
1124+
* Returns a {@link PTransform} identical to this, but that does not attempt to
1125+
* provide a default value in the case of empty input.
1126+
*/
1127+
public Globally<VI, VO> withoutDefaults() {
1128+
return new Globally<>(fn, false);
10961129
}
10971130

10981131
@Override
@@ -1103,7 +1136,13 @@ public PCollection<VO> apply(PCollection<VI> input) {
11031136
.apply(Combine.<Void, VI, VO>perKey(fn.<Void>asKeyedFn()))
11041137
.apply(Values.<VO>create());
11051138

1106-
if (input.getWindowFn().isCompatible(new GlobalWindows())) {
1139+
if (insertDefault) {
1140+
if (!output.getWindowFn().isCompatible(new GlobalWindows())) {
1141+
throw new IllegalStateException(
1142+
"Attempted to add default value to PCollection not windowed by GlobalWindows. "
1143+
+ "Instead, use Combine.globally().withoutDefaults() or "
1144+
+ "Combine.globally().asSingletonView().");
1145+
}
11071146
return insertDefaultValueIfEmpty(output);
11081147
} else {
11091148
return output;
@@ -1117,15 +1156,15 @@ private PCollection<VO> insertDefaultValueIfEmpty(PCollection<VO> maybeEmpty) {
11171156
.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
11181157
.apply(ParDo.of(
11191158
new DoFn<Void, VO>() {
1120-
@Override
1121-
public void processElement(DoFn<Void, VO>.ProcessContext c) {
1122-
Iterator<VO> combined = c.sideInput(maybeEmptyView).iterator();
1123-
if (combined.hasNext()) {
1124-
c.output(combined.next());
1125-
} else {
1126-
c.output(fn.apply(Collections.<VI>emptyList()));
1127-
}
1159+
@Override
1160+
public void processElement(DoFn<Void, VO>.ProcessContext c) {
1161+
Iterator<VO> combined = c.sideInput(maybeEmptyView).iterator();
1162+
if (combined.hasNext()) {
1163+
c.output(combined.next());
1164+
} else {
1165+
c.output(fn.apply(Collections.<VI>emptyList()));
11281166
}
1167+
}
11291168
}).withSideInputs(maybeEmptyView))
11301169
.setCoder(maybeEmpty.getCoder());
11311170
}
@@ -1136,6 +1175,81 @@ protected String getKindString() {
11361175
}
11371176
}
11381177

1178+
/**
1179+
* {@code Combine.GloballyAsSingletonView<VI, VO>} takes a {@code PCollection<VI>}
1180+
* and returns a {@code PCollectionView<VO>} whose elements are the result of
1181+
* combining all the elements in each window of the input {@code PCollection},
1182+
* using a specified {@link CombineFn CombineFn<VI, VA, VO>}. It is common for
1183+
* {@code VI == VO}, but not required. Common combining
1184+
* functions include sums, mins, maxes, and averages of numbers,
1185+
* conjunctions and disjunctions of booleans, statistical
1186+
* aggregations, etc.
1187+
*
1188+
* <p> Example of use:
1189+
* <pre> {@code
1190+
* PCollection<Integer> pc = ...;
1191+
* PCollection<Integer> sum = pc.apply(
1192+
* Combine.globally(new Sum.SumIntegerFn()));
1193+
* } </pre>
1194+
*
1195+
* <p> Combining can happen in parallel, with different subsets of the
1196+
* input {@code PCollection} being combined separately, and their
1197+
* intermediate results combined further, in an arbitrary tree
1198+
* reduction pattern, until a single result value is produced.
1199+
*
1200+
* <p> If a value is requested from the view for a window that is not present
1201+
* and {@code insertDefault} is true, the result of calling the {@code CombineFn}
1202+
* on empty input will returned. If {@code insertDefault} is false, an
1203+
* exception will be thrown instead.
1204+
*
1205+
* <p> By default, the {@code Coder} of the output {@code PValue<VO>}
1206+
* is inferred from the concrete type of the
1207+
* {@code CombineFn<VI, VA, VO>}'s output type {@code VO}.
1208+
*
1209+
* <p> See also {@link #perKey}/{@link PerKey Combine.PerKey} and
1210+
* {@link #groupedValues}/{@link GroupedValues Combine.GroupedValues},
1211+
* which are useful for combining values associated with each key in
1212+
* a {@code PCollection} of {@code KV}s.
1213+
*
1214+
* @param <VI> type of input values
1215+
* @param <VO> type of output values
1216+
*/
1217+
public static class GloballyAsSingletonView<VI, VO>
1218+
extends PTransform<PCollection<VI>, PCollectionView<VO>> {
1219+
1220+
private final CombineFn<? super VI, ?, VO> fn;
1221+
private final boolean insertDefault;
1222+
1223+
private GloballyAsSingletonView(CombineFn<? super VI, ?, VO> fn, boolean insertDefault) {
1224+
this.fn = fn;
1225+
this.insertDefault = insertDefault;
1226+
}
1227+
1228+
@Override
1229+
@SuppressWarnings("unchecked")
1230+
public GloballyAsSingletonView<VI, VO> withName(String name) {
1231+
return (GloballyAsSingletonView<VI, VO>) super.withName(name);
1232+
}
1233+
1234+
@Override
1235+
public PCollectionView<VO> apply(PCollection<VI> input) {
1236+
PCollection<VO> combined = input
1237+
.apply(Combine.globally(fn).withoutDefaults());
1238+
if (insertDefault) {
1239+
return combined
1240+
.apply(View.<VO>asSingleton().withDefaultValue(
1241+
fn.apply(Collections.<VI>emptyList())));
1242+
} else {
1243+
return combined.apply(View.<VO>asSingleton());
1244+
}
1245+
}
1246+
1247+
@Override
1248+
protected String getKindString() {
1249+
return "Combine.GloballyAsSingletonView";
1250+
}
1251+
}
1252+
11391253
/**
11401254
* Converts a {@link SerializableFunction} from {@code Iterable<V>}s
11411255
* to {@code V}s into a simple {@link CombineFn} over {@code V}s.

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,6 @@ public abstract class Context {
6666
*/
6767
public abstract PipelineOptions getPipelineOptions();
6868

69-
/**
70-
* Returns the value of the side input.
71-
*
72-
* @throws IllegalArgumentException if this is not a side input
73-
* @see ParDo#withSideInputs
74-
*/
75-
public abstract <T> T sideInput(PCollectionView<T> view);
76-
7769
/**
7870
* Adds the given element to the main output {@code PCollection}.
7971
*
@@ -207,6 +199,19 @@ public abstract class ProcessContext extends Context {
207199
*/
208200
public abstract I element();
209201

202+
/**
203+
* Returns the value of the side input for the window corresponding to the
204+
* window of the main input element.
205+
*
206+
* <p> See
207+
* {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn#getSideInputWindow}
208+
* for how this corresponding window is determined.
209+
*
210+
* @throws IllegalArgumentException if this is not a side input
211+
* @see ParDo#withSideInputs
212+
*/
213+
public abstract <T> T sideInput(PCollectionView<T> view);
214+
210215
/**
211216
* Returns this {@code DoFn}'s state associated with the input
212217
* element's key. This state can be used by the {@code DoFn} to

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Top.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,9 @@ public class Top {
8080
* {@code KV}s and return the top values associated with each key.
8181
*/
8282
public static <T, C extends Comparator<T> & Serializable>
83-
PTransform<PCollection<T>, PCollection<List<T>>> of(int count, C compareFn) {
83+
Combine.Globally<T, List<T>> of(int count, C compareFn) {
8484
return Combine.globally(new TopCombineFn<>(count, compareFn))
8585
.withName("Top");
86-
8786
}
8887

8988
/**
@@ -121,7 +120,7 @@ PTransform<PCollection<T>, PCollection<List<T>>> of(int count, C compareFn) {
121120
* {@code KV}s and return the top values associated with each key.
122121
*/
123122
public static <T extends Comparable<T>>
124-
PTransform<PCollection<T>, PCollection<List<T>>> smallest(int count) {
123+
Combine.Globally<T, List<T>> smallest(int count) {
125124
return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()))
126125
.withName("Top.Smallest");
127126
}
@@ -161,7 +160,7 @@ PTransform<PCollection<T>, PCollection<List<T>>> smallest(int count) {
161160
* {@code KV}s and return the top values associated with each key.
162161
*/
163162
public static <T extends Comparable<T>>
164-
PTransform<PCollection<T>, PCollection<List<T>>> largest(int count) {
163+
Combine.Globally<T, List<T>> largest(int count) {
165164
return Combine.globally(new TopCombineFn<>(count, new Largest<T>()))
166165
.withName("Top.Largest");
167166
}

0 commit comments

Comments
 (0)