Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit b426cbd

Browse files
bo-yuandavorbonaci
authored andcommitted
Modify the Dataflow Java SDK to support the combining function in PartialGroupByKey.
* Implements PGBKOp.Combiner via Combine.KeyedCombineFn. * Modify createPartialGroupByKeyOperation to pass combiner to PartialGroupByKeyOperation when the PartialGroupByKeyInstruction contains value combining function. * Add new constructors of PartialGroupByKeyOperation to initiate the PartialGroupByKeyOperation object with CombiningGroupingTable. * Set SDK major version to "2" to turn on in-place combining in PGBK combiner. ----Release Notes---- [] ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=89357867
1 parent 67e99cc commit b426cbd

File tree

4 files changed

+188
-18
lines changed

4 files changed

+188
-18
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
8080
private DataflowPipelineRunnerHooks hooks;
8181

8282
// Environment version information
83-
private static final String ENVIRONMENT_MAJOR_VERSION = "1";
83+
private static final String ENVIRONMENT_MAJOR_VERSION = "2";
8484

8585
/**
8686
* Construct a runner from the provided options.

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/MapTaskExecutorFactory.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.dataflow.sdk.runners.worker;
1818

19+
import static com.google.cloud.dataflow.sdk.util.Structs.getBytes;
20+
1921
import com.google.api.services.dataflow.model.FlattenInstruction;
2022
import com.google.api.services.dataflow.model.InstructionInput;
2123
import com.google.api.services.dataflow.model.InstructionOutput;
@@ -28,9 +30,12 @@
2830
import com.google.cloud.dataflow.sdk.coders.Coder;
2931
import com.google.cloud.dataflow.sdk.coders.KvCoder;
3032
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
33+
import com.google.cloud.dataflow.sdk.transforms.Combine;
3134
import com.google.cloud.dataflow.sdk.util.CloudObject;
3235
import com.google.cloud.dataflow.sdk.util.CoderUtils;
3336
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
37+
import com.google.cloud.dataflow.sdk.util.PropertyNames;
38+
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
3439
import com.google.cloud.dataflow.sdk.util.Serializer;
3540
import com.google.cloud.dataflow.sdk.util.WindowedValue;
3641
import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
@@ -188,18 +193,64 @@ static PartialGroupByKeyOperation createPartialGroupByKeyOperation(PipelineOptio
188193
OutputReceiver[] receivers =
189194
createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, 1);
190195

191-
PartialGroupByKeyOperation operation =
192-
new PartialGroupByKeyOperation(instruction.getSystemName(),
193-
new WindowingCoderGroupingKeyCreator(keyCoder),
194-
new CoderSizeEstimator(WindowedValue.getValueOnlyCoder(keyCoder)),
195-
new CoderSizeEstimator(valueCoder), 0.001/*sizeEstimatorSampleRate*/, PairInfo.create(),
196-
receivers, counterPrefix, addCounterMutator, stateSampler);
196+
PartialGroupByKeyOperation.Combiner valueCombiner = createValueCombiner(pgbk);
197+
198+
PartialGroupByKeyOperation operation = new PartialGroupByKeyOperation(
199+
instruction.getSystemName(),
200+
new WindowingCoderGroupingKeyCreator(keyCoder),
201+
new CoderSizeEstimator(WindowedValue.getValueOnlyCoder(keyCoder)),
202+
new CoderSizeEstimator(valueCoder), 0.001 /*sizeEstimatorSampleRate*/, valueCombiner,
203+
PairInfo.create(), receivers, counterPrefix, addCounterMutator, stateSampler);
197204

198205
attachInput(operation, pgbk.getInput(), priorOperations);
199206

200207
return operation;
201208
}
202209

210+
static ValueCombiner createValueCombiner(PartialGroupByKeyInstruction pgbk) throws Exception {
211+
if (pgbk.getValueCombiningFn() == null) {
212+
return null;
213+
}
214+
215+
Object deserializedFn = SerializableUtils.deserializeFromByteArray(
216+
getBytes(CloudObject.fromSpec(pgbk.getValueCombiningFn()), PropertyNames.SERIALIZED_FN),
217+
"serialized combine fn");
218+
return new ValueCombiner((Combine.KeyedCombineFn) deserializedFn);
219+
}
220+
221+
/**
222+
* Implements PGBKOp.Combiner via Combine.KeyedCombineFn.
223+
*/
224+
public static class ValueCombiner<K, VI, VA, VO>
225+
implements PartialGroupByKeyOperation.Combiner<K, VI, VA, VO> {
226+
private final Combine.KeyedCombineFn<K, VI, VA, VO> combineFn;
227+
228+
private ValueCombiner(Combine.KeyedCombineFn<K, VI, VA, VO> combineFn) {
229+
this.combineFn = combineFn;
230+
}
231+
232+
@Override
233+
public VA createAccumulator(K key) {
234+
return this.combineFn.createAccumulator(key);
235+
}
236+
237+
@Override
238+
public VA add(K key, VA accumulator, VI value) {
239+
this.combineFn.addInput(key, accumulator, value);
240+
return accumulator;
241+
}
242+
243+
@Override
244+
public VA merge(K key, Iterable<VA> accumulators) {
245+
return this.combineFn.mergeAccumulators(key, accumulators);
246+
}
247+
248+
@Override
249+
public VO extract(K key, VA accumulator) {
250+
return this.combineFn.extractOutput(key, accumulator);
251+
}
252+
}
253+
203254
/**
204255
* Implements PGBKOp.PairInfo via KVs.
205256
*/

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/PartialGroupByKeyOperation.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -133,43 +133,79 @@ public PartialGroupByKeyOperation(
133133
String counterPrefix,
134134
CounterSet.AddCounterMutator addCounterMutator,
135135
StateSampler stateSampler) {
136+
this(operationName, groupingKeyCreator, keySizeEstimator, valueSizeEstimator, null, pairInfo,
137+
receivers, counterPrefix, addCounterMutator, stateSampler);
138+
}
139+
140+
@SuppressWarnings({"rawtypes", "unchecked"})
141+
public PartialGroupByKeyOperation(
142+
String operationName,
143+
GroupingKeyCreator<?> groupingKeyCreator,
144+
SizeEstimator<?> keySizeEstimator, SizeEstimator<?> valueSizeEstimator,
145+
Combiner combineFn,
146+
PairInfo pairInfo,
147+
OutputReceiver[] receivers,
148+
String counterPrefix,
149+
CounterSet.AddCounterMutator addCounterMutator,
150+
StateSampler stateSampler) {
136151
super(operationName, receivers, counterPrefix, addCounterMutator, stateSampler);
137-
groupingTable = new BufferingGroupingTable(
138-
DEFAULT_MAX_GROUPING_TABLE_BYTES, groupingKeyCreator,
139-
pairInfo, keySizeEstimator, valueSizeEstimator);
152+
if (combineFn == null) {
153+
groupingTable = new BufferingGroupingTable(DEFAULT_MAX_GROUPING_TABLE_BYTES,
154+
groupingKeyCreator, pairInfo, keySizeEstimator, valueSizeEstimator);
155+
} else {
156+
groupingTable = new CombiningGroupingTable(DEFAULT_MAX_GROUPING_TABLE_BYTES,
157+
groupingKeyCreator, pairInfo, combineFn, keySizeEstimator, valueSizeEstimator);
158+
}
140159
}
141160

142-
@SuppressWarnings("unchecked")
161+
@SuppressWarnings({"rawtypes", "unchecked"})
143162
public PartialGroupByKeyOperation(
144163
String operationName,
145164
GroupingKeyCreator<?> groupingKeyCreator,
146165
SizeEstimator<?> keySizeEstimator, SizeEstimator<?> valueSizeEstimator,
147166
double sizeEstimatorSampleRate,
167+
Combiner combineFn,
148168
PairInfo pairInfo,
149169
OutputReceiver[] receivers,
150170
String counterPrefix,
151-
CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
171+
CounterSet.AddCounterMutator addCounterMutator,
172+
StateSampler stateSampler) {
152173
this(operationName, groupingKeyCreator,
153174
new SamplingSizeEstimator(keySizeEstimator, sizeEstimatorSampleRate, 1.0),
154-
new SamplingSizeEstimator(valueSizeEstimator, sizeEstimatorSampleRate, 1.0),
175+
new SamplingSizeEstimator(valueSizeEstimator, sizeEstimatorSampleRate, 1.0), combineFn,
155176
pairInfo, receivers, counterPrefix, addCounterMutator, stateSampler);
156177
}
157178

158179
/** Invoked by tests. */
180+
public PartialGroupByKeyOperation(GroupingKeyCreator<?> groupingKeyCreator,
181+
SizeEstimator<?> keySizeEstimator, SizeEstimator<?> valueSizeEstimator, PairInfo pairInfo,
182+
OutputReceiver outputReceiver, String counterPrefix,
183+
CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
184+
this(groupingKeyCreator,
185+
keySizeEstimator, valueSizeEstimator, null, pairInfo,
186+
outputReceiver,
187+
counterPrefix,
188+
addCounterMutator,
189+
stateSampler);
190+
}
191+
192+
/** Invoked by tests. */
193+
@SuppressWarnings({"rawtypes"})
159194
public PartialGroupByKeyOperation(
160195
GroupingKeyCreator<?> groupingKeyCreator,
161196
SizeEstimator<?> keySizeEstimator, SizeEstimator<?> valueSizeEstimator,
197+
Combiner combineFn,
162198
PairInfo pairInfo,
163199
OutputReceiver outputReceiver,
164200
String counterPrefix,
165201
CounterSet.AddCounterMutator addCounterMutator,
166202
StateSampler stateSampler) {
167203
this("PartialGroupByKeyOperation", groupingKeyCreator,
168-
keySizeEstimator, valueSizeEstimator, pairInfo,
169-
new OutputReceiver[]{ outputReceiver },
170-
counterPrefix,
171-
addCounterMutator,
172-
stateSampler);
204+
keySizeEstimator, valueSizeEstimator, combineFn, pairInfo,
205+
new OutputReceiver[]{ outputReceiver },
206+
counterPrefix,
207+
addCounterMutator,
208+
stateSampler);
173209
}
174210

175211
@Override

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/worker/PartialGroupByKeyOperationTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,89 @@ public void testRunPartialGroupByKeyOperation() throws Exception {
128128
counterSet);
129129
}
130130

131+
@Test
132+
public void testRunPartialGroupByKeyOperationWithCombiner() throws Exception {
133+
Coder keyCoder = StringUtf8Coder.of();
134+
Coder valueCoder = BigEndianIntegerCoder.of();
135+
136+
CounterSet counterSet = new CounterSet();
137+
String counterPrefix = "test-";
138+
StateSampler stateSampler = new StateSampler(
139+
counterPrefix, counterSet.getAddCounterMutator());
140+
TestReceiver receiver =
141+
new TestReceiver(new ElementByteSizeObservableCoder(
142+
WindowedValue.getValueOnlyCoder(KvCoder.of(keyCoder, valueCoder))),
143+
counterSet, counterPrefix);
144+
145+
Combiner<WindowedValue<String>, Integer, Integer, Integer> combineFn =
146+
new Combiner<WindowedValue<String>, Integer, Integer, Integer>() {
147+
public Integer createAccumulator(WindowedValue<String> key) {
148+
return 0;
149+
}
150+
public Integer add(WindowedValue<String> key, Integer accumulator, Integer value) {
151+
return accumulator + value;
152+
}
153+
public Integer merge(WindowedValue<String> key, Iterable<Integer> accumulators) {
154+
Integer sum = 0;
155+
for (Integer part : accumulators) {
156+
sum += part;
157+
}
158+
return sum;
159+
}
160+
public Integer extract(WindowedValue<String> key, Integer accumulator) {
161+
return accumulator;
162+
}
163+
};
164+
165+
PartialGroupByKeyOperation pgbkOperation =
166+
new PartialGroupByKeyOperation(
167+
new WindowingCoderGroupingKeyCreator(keyCoder),
168+
new CoderSizeEstimator(WindowedValue.getValueOnlyCoder(keyCoder)),
169+
new CoderSizeEstimator(valueCoder),
170+
combineFn,
171+
PairInfo.create(),
172+
receiver,
173+
counterPrefix,
174+
counterSet.getAddCounterMutator(),
175+
stateSampler);
176+
177+
pgbkOperation.start();
178+
179+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("hi", 4)));
180+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("there", 5)));
181+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("hi", 6)));
182+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("joe", 7)));
183+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("there", 8)));
184+
pgbkOperation.process(WindowedValue.valueInGlobalWindow(KV.of("hi", 9)));
185+
186+
pgbkOperation.finish();
187+
188+
assertThat(receiver.outputElems,
189+
IsIterableContainingInAnyOrder.<Object>containsInAnyOrder(
190+
WindowedValue.valueInGlobalWindow(KV.of("hi", 19)),
191+
WindowedValue.valueInGlobalWindow(KV.of("there", 13)),
192+
WindowedValue.valueInGlobalWindow(KV.of("joe", 7))));
193+
194+
// Exact counter values depend on size of encoded data. If encoding
195+
// changes, then these expected counters should change to match.
196+
assertEquals(
197+
new CounterSet(
198+
Counter.longs("test-PartialGroupByKeyOperation-start-msecs", SUM)
199+
.resetToValue(((Counter<Long>) counterSet.getExistingCounter(
200+
"test-PartialGroupByKeyOperation-start-msecs")).getAggregate(false)),
201+
Counter.longs("test-PartialGroupByKeyOperation-process-msecs", SUM)
202+
.resetToValue(((Counter<Long>) counterSet.getExistingCounter(
203+
"test-PartialGroupByKeyOperation-process-msecs")).getAggregate(false)),
204+
Counter.longs("test-PartialGroupByKeyOperation-finish-msecs", SUM)
205+
.resetToValue(((Counter<Long>) counterSet.getExistingCounter(
206+
"test-PartialGroupByKeyOperation-finish-msecs")).getAggregate(false)),
207+
Counter.longs("test_receiver_out-ElementCount", SUM)
208+
.resetToValue(3L),
209+
Counter.longs("test_receiver_out-MeanByteCount", MEAN)
210+
.resetToValue(3, 25L)),
211+
counterSet);
212+
}
213+
131214
// TODO: Add tests about early flushing when the table fills.
132215

133216
////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)