Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 7cbc12a

Browse files
jkffdavorbonaci
authored andcommitted
Renames fork => dynamic split in the Dataflow Java SDK.
----Release Notes---- [] ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=89358962
1 parent b426cbd commit 7cbc12a

20 files changed

+272
-245
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public Reader.Progress getProgress() {
308308
}
309309

310310
@Override
311-
public Reader.ForkResult requestFork(Reader.ForkRequest request) {
311+
public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest request) {
312312
return null;
313313
}
314314
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/BigQueryReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ public Progress getProgress() {
9696
}
9797

9898
@Override
99-
public ForkResult requestFork(ForkRequest forkRequest) {
100-
// For now fork is not supported because this source
99+
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
100+
// For now dynamic splitting is not supported because this source
101101
// is used only when an entire table needs to be read by each worker (used
102102
// as a side input for instance).
103103
throw new UnsupportedOperationException();

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkProgressUpdater.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import static com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.buildStatus;
2020
import static com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.uniqueId;
21-
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.toForkRequest;
21+
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.toDynamicSplitRequest;
2222
import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudDuration;
2323
import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
2424
import static com.google.cloud.dataflow.sdk.util.TimeUtil.toCloudDuration;
@@ -79,15 +79,15 @@ protected long getWorkUnitLeaseExpirationTimestamp() {
7979
@Override
8080
protected void reportProgressHelper() throws Exception {
8181
WorkItemStatus status = buildStatus(workItem, false/*completed*/, worker.getOutputCounters(),
82-
worker.getOutputMetrics(), options, worker.getWorkerProgress(), forkResultToReport,
82+
worker.getOutputMetrics(), options, worker.getWorkerProgress(), dynamicSplitResultToReport,
8383
null/*sourceOperationResponse*/, null/*errors*/,
8484
getNextReportIndex());
8585
status.setRequestedLeaseDuration(toCloudDuration(Duration.millis(requestedLeaseDurationMs)));
8686

8787
WorkItemServiceState result = workUnitClient.reportWorkItemStatus(status);
8888
if (result != null) {
8989
// Resets state after a successful progress report.
90-
forkResultToReport = null;
90+
dynamicSplitResultToReport = null;
9191
nextReportIndex++;
9292

9393
progressReportIntervalMs = nextProgressReportInterval(
@@ -96,8 +96,9 @@ protected void reportProgressHelper() throws Exception {
9696

9797
ApproximateProgress suggestedStopPoint = result.getSuggestedStopPoint();
9898
if (suggestedStopPoint != null) {
99-
LOG.info("Proposing fork of work unit {} at {}", workString(), suggestedStopPoint);
100-
forkResultToReport = worker.requestFork(toForkRequest(suggestedStopPoint));
99+
LOG.info("Proposing dynamic split of work unit {} at {}", workString(), suggestedStopPoint);
100+
dynamicSplitResultToReport = worker.requestDynamicSplit(
101+
toDynamicSplitRequest(suggestedStopPoint));
101102
}
102103
}
103104
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ private void reportStatus(DataflowWorkerHarnessOptions options, String status, W
211211
static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
212212
@Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
213213
DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress,
214-
@Nullable Reader.ForkResult forkResult,
214+
@Nullable Reader.DynamicSplitResult dynamicSplitResult,
215215
@Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors,
216216
long finalReportIndex) {
217217
WorkItemStatus status = new WorkItemStatus();
@@ -256,11 +256,13 @@ static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
256256
if (progress != null) {
257257
status.setProgress(readerProgressToCloudProgress(progress));
258258
}
259-
if (forkResult instanceof Reader.ForkResultWithPosition) {
260-
Reader.ForkResultWithPosition asPosition = (Reader.ForkResultWithPosition) forkResult;
259+
if (dynamicSplitResult instanceof Reader.DynamicSplitResultWithPosition) {
260+
Reader.DynamicSplitResultWithPosition asPosition =
261+
(Reader.DynamicSplitResultWithPosition) dynamicSplitResult;
261262
status.setStopPosition(toCloudPosition(asPosition.getAcceptedPosition()));
262-
} else if (forkResult != null) {
263-
throw new IllegalArgumentException("Unexpected type of fork result: " + forkResult);
263+
} else if (dynamicSplitResult != null) {
264+
throw new IllegalArgumentException(
265+
"Unexpected type of dynamic split result: " + dynamicSplitResult);
264266
}
265267

266268
if (workItem.getSourceOperationTask() != null) {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/FileBasedReader.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static com.google.api.client.util.Preconditions.checkNotNull;
2020
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
2121
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
22-
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.forkRequestToApproximateProgress;
22+
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.splitRequestToApproximateProgress;
2323

2424
import com.google.api.services.dataflow.model.ApproximateProgress;
2525
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -196,38 +196,40 @@ public Progress getProgress() {
196196
}
197197

198198
@Override
199-
public ForkResult requestFork(ForkRequest forkRequest) {
200-
checkNotNull(forkRequest);
201-
202-
// Currently, file-based Reader only supports fork at a byte offset.
203-
ApproximateProgress forkProgress = forkRequestToApproximateProgress(forkRequest);
204-
com.google.api.services.dataflow.model.Position forkPosition = forkProgress.getPosition();
205-
if (forkPosition == null) {
206-
LOG.warn("FileBasedReader only supports fork at a Position. Requested: {}", forkRequest);
199+
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
200+
checkNotNull(splitRequest);
201+
202+
// Currently, file-based Reader only supports split at a byte offset.
203+
ApproximateProgress splitProgress = splitRequestToApproximateProgress(splitRequest);
204+
com.google.api.services.dataflow.model.Position splitPosition = splitProgress.getPosition();
205+
if (splitPosition == null) {
206+
LOG.warn("FileBasedReader only supports split at a Position. Requested: {}",
207+
splitRequest);
207208
return null;
208209
}
209-
Long forkOffset = forkPosition.getByteOffset();
210-
if (forkOffset == null) {
211-
LOG.warn("FileBasedReader only supports fork at byte offset. Requested: {}", forkPosition);
210+
Long splitOffset = splitPosition.getByteOffset();
211+
if (splitOffset == null) {
212+
LOG.warn("FileBasedReader only supports split at byte offset. Requested: {}",
213+
splitPosition);
212214
return null;
213215
}
214-
if (forkOffset <= offset) {
215-
LOG.info("Already progressed to offset {} which is after the requested fork offset {}",
216-
offset, forkOffset);
216+
if (splitOffset <= offset) {
217+
LOG.info("Already progressed to offset {} which is after the requested split offset {}",
218+
offset, splitOffset);
217219
return null;
218220
}
219221

220-
if (endOffset != null && forkOffset >= endOffset) {
222+
if (endOffset != null && splitOffset >= endOffset) {
221223
LOG.info(
222-
"Fork requested at an offset beyond the end of the current range: {} >= {}",
223-
forkOffset, endOffset);
224+
"Split requested at an offset beyond the end of the current range: {} >= {}",
225+
splitOffset, endOffset);
224226
return null;
225227
}
226228

227-
this.endOffset = forkOffset;
228-
LOG.info("Forked FileBasedReader at offset {}", forkOffset);
229+
this.endOffset = splitOffset;
230+
LOG.info("Split FileBasedReader at offset {}", splitOffset);
229231

230-
return new ForkResultWithPosition(cloudPositionToReaderPosition(forkPosition));
232+
return new DynamicSplitResultWithPosition(cloudPositionToReaderPosition(splitPosition));
231233
}
232234

233235
/**

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static com.google.api.client.util.Preconditions.checkNotNull;
2020
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
2121
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
22-
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.forkRequestToApproximateProgress;
22+
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.splitRequestToApproximateProgress;
2323

2424
import com.google.api.client.util.Preconditions;
2525
import com.google.api.services.dataflow.model.ApproximateProgress;
@@ -225,42 +225,43 @@ public Progress getProgress() {
225225
* {@code KV<K, Reiterable<V>>} to be returned by the {@link GroupingShuffleReaderIterator}.
226226
*/
227227
@Override
228-
public ForkResult requestFork(ForkRequest forkRequest) {
229-
checkNotNull(forkRequest);
230-
ApproximateProgress forkProgress = forkRequestToApproximateProgress(forkRequest);
231-
com.google.api.services.dataflow.model.Position forkPosition = forkProgress.getPosition();
232-
if (forkPosition == null) {
233-
LOG.warn("GroupingShuffleReader only supports fork at a Position. Requested: {}",
234-
forkRequest);
228+
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
229+
checkNotNull(splitRequest);
230+
ApproximateProgress splitProgress = splitRequestToApproximateProgress(
231+
splitRequest);
232+
com.google.api.services.dataflow.model.Position splitPosition = splitProgress.getPosition();
233+
if (splitPosition == null) {
234+
LOG.warn("GroupingShuffleReader only supports split at a Position. Requested: {}",
235+
splitRequest);
235236
return null;
236237
}
237-
String forkShufflePosition = forkPosition.getShufflePosition();
238-
if (forkShufflePosition == null) {
239-
LOG.warn("GroupingShuffleReader only supports fork at a shuffle position. Requested: {}",
240-
forkPosition);
238+
String splitShufflePosition = splitPosition.getShufflePosition();
239+
if (splitShufflePosition == null) {
240+
LOG.warn("GroupingShuffleReader only supports split at a shuffle position. Requested: {}",
241+
splitPosition);
241242
return null;
242243
}
243244
ByteArrayShufflePosition newStopPosition =
244-
ByteArrayShufflePosition.fromBase64(forkShufflePosition);
245+
ByteArrayShufflePosition.fromBase64(splitShufflePosition);
245246
if (newStopPosition.compareTo(promisedPosition) <= 0) {
246247
LOG.info("Already progressed to promised shuffle position {} "
247-
+ "which is after the requested fork shuffle position {}",
248-
promisedPosition.encodeBase64(), forkShufflePosition);
248+
+ "which is after the requested split shuffle position {}",
249+
promisedPosition.encodeBase64(), splitShufflePosition);
249250
return null;
250251
}
251252

252253
if (this.stopPosition != null && newStopPosition.compareTo(this.stopPosition) >= 0) {
253254
LOG.info(
254-
"Fork requested at a shuffle position beyond the end of the current range: "
255+
"Split requested at a shuffle position beyond the end of the current range: "
255256
+ "{} >= current stop position: {}",
256-
forkShufflePosition, this.stopPosition.encodeBase64());
257+
splitShufflePosition, this.stopPosition.encodeBase64());
257258
return null;
258259
}
259260

260261
this.stopPosition = newStopPosition;
261-
LOG.info("Forked GroupingShuffleReader at {}", forkShufflePosition);
262+
LOG.info("Split GroupingShuffleReader at {}", splitShufflePosition);
262263

263-
return new ForkResultWithPosition(cloudPositionToReaderPosition(forkPosition));
264+
return new DynamicSplitResultWithPosition(cloudPositionToReaderPosition(splitPosition));
264265
}
265266

266267
/**

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/InMemoryReader.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static com.google.api.client.util.Preconditions.checkNotNull;
2020
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
2121
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
22-
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.forkRequestToApproximateProgress;
22+
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.splitRequestToApproximateProgress;
2323
import static java.lang.Math.min;
2424

2525
import com.google.api.services.dataflow.model.ApproximateProgress;
@@ -124,38 +124,39 @@ public Progress getProgress() {
124124
}
125125

126126
@Override
127-
public ForkResult requestFork(ForkRequest forkRequest) {
128-
checkNotNull(forkRequest);
129-
130-
com.google.api.services.dataflow.model.Position forkPosition =
131-
forkRequestToApproximateProgress(forkRequest).getPosition();
132-
if (forkPosition == null) {
133-
LOG.warn("InMemoryReader only supports fork at a Position. Requested: {}", forkRequest);
127+
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
128+
checkNotNull(splitRequest);
129+
130+
com.google.api.services.dataflow.model.Position splitPosition =
131+
splitRequestToApproximateProgress(splitRequest).getPosition();
132+
if (splitPosition == null) {
133+
LOG.warn("InMemoryReader only supports split at a Position. Requested: {}",
134+
splitRequest);
134135
return null;
135136
}
136137

137-
Long forkIndex = forkPosition.getRecordIndex();
138-
if (forkIndex == null) {
139-
LOG.warn("InMemoryReader only supports fork at a record index. Requested: {}",
140-
forkPosition);
138+
Long splitIndex = splitPosition.getRecordIndex();
139+
if (splitIndex == null) {
140+
LOG.warn("InMemoryReader only supports split at a record index. Requested: {}",
141+
splitPosition);
141142
return null;
142143
}
143-
if (forkIndex <= index) {
144-
LOG.info("Already progressed to index {} which is after the requested fork index {}",
145-
index, forkIndex);
144+
if (splitIndex <= index) {
145+
LOG.info("Already progressed to index {} which is after the requested split index {}",
146+
index, splitIndex);
146147
return null;
147148
}
148-
if (forkIndex >= endPosition) {
149+
if (splitIndex >= endPosition) {
149150
LOG.info(
150-
"Fork requested at an index beyond the end of the current range: {} >= {}",
151-
forkIndex, endPosition);
151+
"Split requested at an index beyond the end of the current range: {} >= {}",
152+
splitIndex, endPosition);
152153
return null;
153154
}
154155

155-
this.endPosition = forkIndex.intValue();
156-
LOG.info("Forked InMemoryReader at index {}", forkIndex);
156+
this.endPosition = splitIndex.intValue();
157+
LOG.info("Split InMemoryReader at index {}", splitIndex);
157158

158-
return new ForkResultWithPosition(cloudPositionToReaderPosition(forkPosition));
159+
return new DynamicSplitResultWithPosition(cloudPositionToReaderPosition(splitPosition));
159160
}
160161
}
161162
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/SourceTranslationUtils.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,16 @@ public static Source sourceSpecToCloudSource(@Nullable SourceFormat.SourceSpec s
8585
return (spec == null) ? null : ((DataflowSourceSpec) spec).cloudSource;
8686
}
8787

88-
public static ApproximateProgress forkRequestToApproximateProgress(
89-
@Nullable Reader.ForkRequest stopRequest) {
90-
return (stopRequest == null) ? null : ((DataflowForkRequest) stopRequest).approximateProgress;
88+
public static ApproximateProgress splitRequestToApproximateProgress(
89+
@Nullable Reader.DynamicSplitRequest splitRequest) {
90+
return (splitRequest == null)
91+
? null : ((DataflowDynamicSplitRequest) splitRequest).approximateProgress;
9192
}
9293

93-
public static Reader.ForkRequest toForkRequest(
94+
public static Reader.DynamicSplitRequest toDynamicSplitRequest(
9495
@Nullable ApproximateProgress approximateProgress) {
95-
return (approximateProgress == null) ? null : new DataflowForkRequest(approximateProgress);
96+
return (approximateProgress == null)
97+
? null : new DataflowDynamicSplitRequest(approximateProgress);
9698
}
9799

98100
static class DataflowReaderProgress implements Reader.Progress {
@@ -175,10 +177,10 @@ public static Source dictionaryToCloudSource(Map<String, Object> params) throws
175177
return res;
176178
}
177179

178-
private static class DataflowForkRequest implements Reader.ForkRequest {
180+
private static class DataflowDynamicSplitRequest implements Reader.DynamicSplitRequest {
179181
public final ApproximateProgress approximateProgress;
180182

181-
private DataflowForkRequest(ApproximateProgress approximateProgress) {
183+
private DataflowDynamicSplitRequest(ApproximateProgress approximateProgress) {
182184
this.approximateProgress = approximateProgress;
183185
}
184186
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/MapTaskExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ public Reader.Progress getWorkerProgress() throws Exception {
8585
}
8686

8787
@Override
88-
public Reader.ForkResult requestFork(Reader.ForkRequest forkRequest) throws Exception {
89-
return getReadOperation().requestFork(forkRequest);
88+
public Reader.DynamicSplitResult requestDynamicSplit(
89+
Reader.DynamicSplitRequest splitRequest) throws Exception {
90+
return getReadOperation().requestDynamicSplit(splitRequest);
9091
}
9192

9293
ReadOperation getReadOperation() throws Exception {

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,20 +220,21 @@ public Reader.Progress getProgress() {
220220
}
221221

222222
/**
223-
* Relays the fork request to {@code ReaderIterator}.
223+
* Relays the split request to {@code ReaderIterator}.
224224
*/
225-
public Reader.ForkResult requestFork(Reader.ForkRequest forkRequest) {
225+
public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest splitRequest) {
226226
synchronized (initializationStateLock) {
227227
if (isFinished()) {
228228
LOG.warn("Iterator is in the Finished state, returning null stop position.");
229229
return null;
230230
}
231231
synchronized (sourceIteratorLock) {
232232
if (readerIterator == null) {
233-
LOG.warn("Iterator has not been initialized, refusing to fork at {}", forkRequest);
233+
LOG.warn("Iterator has not been initialized, refusing to split at {}",
234+
splitRequest);
234235
return null;
235236
}
236-
return readerIterator.requestFork(forkRequest);
237+
return readerIterator.requestDynamicSplit(splitRequest);
237238
}
238239
}
239240
}

0 commit comments

Comments
 (0)