Skip to content

Commit 80c60d0

Browse files
authored
Parallel split for multipart GetObject File Download (#6425)
Parallel split for multipart GetObject File Download
1 parent 17f6a6f commit 80c60d0

File tree

27 files changed

+2363
-47
lines changed

27 files changed

+2363
-47
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "S3",
4+
"contributor": "",
5+
"description": "Add support for parallel download for individual part-get for multipart GetObject in s3 async client and Transfer Manager"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,15 @@ interface SplitResult<ResponseT, ResultT>
381381
*/
382382
CompletableFuture<ResultT> resultFuture();
383383

384+
/**
385+
* Indicates if the split async response transformer supports sending individual transformer non-serially and
386+
* receiving back data from the many {@link AsyncResponseTransformer#onStream(SdkPublisher) publishers} non-serially.
387+
* @return true if non-serial data is supported, false otherwise
388+
*/
389+
default Boolean parallelSplitSupported() {
390+
return false;
391+
}
392+
384393
static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
385394
return DefaultAsyncResponseTransformerSplitResult.builder();
386395
}
@@ -413,6 +422,20 @@ interface Builder<ResponseT, ResultT>
413422
* @return an instance of this Builder
414423
*/
415424
Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> future);
425+
426+
/**
427+
* If the AsyncResponseTransformers returned by the {@link SplitResult#publisher()} support concurrent
428+
* parallel streaming of multiple content body concurrently.
429+
* @return
430+
*/
431+
Boolean parallelSplitSupported();
432+
433+
/**
434+
* Sets whether the AsyncResponseTransformers returned by the {@link SplitResult#publisher()} support concurrent
435+
* parallel streaming of multiple content body concurrently
436+
* @return
437+
*/
438+
Builder<ResponseT, ResultT> parallelSplitSupported(Boolean parallelSplitSupported);
416439
}
417440
}
418441

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncResponseTransformerListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.reactivestreams.Subscriber;
2121
import software.amazon.awssdk.annotations.SdkInternalApi;
2222
import software.amazon.awssdk.annotations.SdkProtectedApi;
23+
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
2324
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2425
import software.amazon.awssdk.core.async.SdkPublisher;
2526
import software.amazon.awssdk.utils.Logger;
@@ -108,6 +109,11 @@ public String name() {
108109
return delegate.name();
109110
}
110111

112+
@Override
113+
public SplitResult<ResponseT, ResultT> split(SplittingTransformerConfiguration splitConfig) {
114+
return delegate.split(splitConfig);
115+
}
116+
111117
static void invoke(Runnable runnable, String callbackName) {
112118
try {
113119
runnable.run();

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/DefaultAsyncResponseTransformerSplitResult.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ public final class DefaultAsyncResponseTransformerSplitResult<ResponseT, ResultT
2727

2828
private final SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher;
2929
private final CompletableFuture<ResultT> future;
30+
private final Boolean parallelSplitSupported;
3031

3132
private DefaultAsyncResponseTransformerSplitResult(Builder<ResponseT, ResultT> builder) {
3233
this.publisher = Validate.paramNotNull(
3334
builder.publisher(), "asyncResponseTransformerPublisher");
3435
this.future = Validate.paramNotNull(
3536
builder.resultFuture(), "future");
37+
this.parallelSplitSupported = Validate.getOrDefault(builder.parallelSplitSupported(), () -> false);
3638
}
3739

3840
/**
@@ -52,6 +54,11 @@ public CompletableFuture<ResultT> resultFuture() {
5254
return this.future;
5355
}
5456

57+
@Override
58+
public Boolean parallelSplitSupported() {
59+
return this.parallelSplitSupported;
60+
}
61+
5562
@Override
5663
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> toBuilder() {
5764
return new DefaultBuilder<>(this);
@@ -65,13 +72,15 @@ public static class DefaultBuilder<ResponseT, ResultT>
6572
implements AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> {
6673
private SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher;
6774
private CompletableFuture<ResultT> future;
75+
private Boolean parallelSplitSupported;
6876

6977
DefaultBuilder() {
7078
}
7179

7280
DefaultBuilder(DefaultAsyncResponseTransformerSplitResult<ResponseT, ResultT> split) {
7381
this.publisher = split.publisher;
7482
this.future = split.future;
83+
this.parallelSplitSupported = split.parallelSplitSupported;
7584
}
7685

7786
@Override
@@ -92,14 +101,28 @@ public CompletableFuture<ResultT> resultFuture() {
92101
}
93102

94103
@Override
95-
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> future) {
104+
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> resultFuture(
105+
CompletableFuture<ResultT> future) {
96106
this.future = future;
97107
return this;
98108
}
99109

110+
@Override
111+
public Boolean parallelSplitSupported() {
112+
return parallelSplitSupported;
113+
}
114+
115+
@Override
116+
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> parallelSplitSupported(
117+
Boolean parallelSplitSupported) {
118+
this.parallelSplitSupported = parallelSplitSupported;
119+
return this;
120+
}
121+
100122
@Override
101123
public AsyncResponseTransformer.SplitResult<ResponseT, ResultT> build() {
102124
return new DefaultAsyncResponseTransformerSplitResult<>(this);
103125
}
126+
104127
}
105128
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
import java.util.function.Supplier;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.annotations.ThreadSafe;
25+
import software.amazon.awssdk.utils.Logger;
26+
27+
/**
28+
* Subscription which can emit {@link Subscriber#onNext(T)} signals to a subscriber, based on the demand received with the
29+
* {@link Subscription#request(long)}. It tracks the outstandingDemand that has not yet been fulfilled and used a Supplier
30+
* passed to it to create the object it needs to emit.
31+
* @param <T> the type of object to emit to the subscriber.
32+
*/
33+
@SdkInternalApi
34+
@ThreadSafe
35+
public final class EmittingSubscription<T> implements Subscription {
36+
private static final Logger log = Logger.loggerFor(EmittingSubscription.class);
37+
38+
private Subscriber<? super T> downstreamSubscriber;
39+
private final AtomicBoolean emitting;
40+
private final AtomicLong outstandingDemand;
41+
private final Runnable onCancel;
42+
private final AtomicBoolean isCancelled;
43+
private final Supplier<T> supplier;
44+
45+
private EmittingSubscription(Builder<T> builder) {
46+
this.downstreamSubscriber = builder.downstreamSubscriber;
47+
this.onCancel = builder.onCancel;
48+
this.supplier = builder.supplier;
49+
this.isCancelled = new AtomicBoolean();
50+
this.outstandingDemand = new AtomicLong(0);
51+
this.emitting = new AtomicBoolean();
52+
}
53+
54+
public static <T> Builder<T> builder() {
55+
return new Builder<>();
56+
}
57+
58+
@Override
59+
public void request(long n) {
60+
if (n <= 0) {
61+
downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive"));
62+
return;
63+
}
64+
long newDemand = outstandingDemand.updateAndGet(current -> {
65+
if (Long.MAX_VALUE - current < n) {
66+
return Long.MAX_VALUE;
67+
}
68+
return current + n;
69+
});
70+
log.trace(() -> String.format("new outstanding demand: %s", newDemand));
71+
emit();
72+
}
73+
74+
@Override
75+
public void cancel() {
76+
isCancelled.set(true);
77+
downstreamSubscriber = null;
78+
onCancel.run();
79+
}
80+
81+
private void emit() {
82+
do {
83+
if (!emitting.compareAndSet(false, true)) {
84+
return;
85+
}
86+
try {
87+
if (doEmit()) {
88+
return;
89+
}
90+
} finally {
91+
emitting.compareAndSet(true, false);
92+
}
93+
} while (outstandingDemand.get() > 0);
94+
}
95+
96+
private boolean doEmit() {
97+
long demand = outstandingDemand.get();
98+
99+
while (demand > 0) {
100+
if (isCancelled.get()) {
101+
return true;
102+
}
103+
if (outstandingDemand.get() > 0) {
104+
demand = outstandingDemand.decrementAndGet();
105+
T value;
106+
try {
107+
value = supplier.get();
108+
} catch (Exception e) {
109+
downstreamSubscriber.onError(e);
110+
return true;
111+
}
112+
downstreamSubscriber.onNext(value);
113+
}
114+
}
115+
return false;
116+
}
117+
118+
public static class Builder<T> {
119+
private Subscriber<? super T> downstreamSubscriber;
120+
private Runnable onCancel;
121+
private Supplier<T> supplier;
122+
123+
public Builder<T> downstreamSubscriber(Subscriber<? super T> subscriber) {
124+
this.downstreamSubscriber = subscriber;
125+
return this;
126+
}
127+
128+
public Builder<T> onCancel(Runnable onCancel) {
129+
this.onCancel = onCancel;
130+
return this;
131+
}
132+
133+
public Builder<T> supplier(Supplier<T> supplier) {
134+
this.supplier = supplier;
135+
return this;
136+
}
137+
138+
public EmittingSubscription<T> build() {
139+
return new EmittingSubscription<>(this);
140+
}
141+
}
142+
143+
144+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import software.amazon.awssdk.annotations.SdkInternalApi;
4242
import software.amazon.awssdk.core.FileTransformerConfiguration;
4343
import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior;
44+
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
4445
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4546
import software.amazon.awssdk.core.async.SdkPublisher;
4647
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -76,6 +77,18 @@ private FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fil
7677
this.position = position;
7778
}
7879

80+
FileTransformerConfiguration config() {
81+
return configuration.toBuilder().build();
82+
}
83+
84+
Path path() {
85+
return path;
86+
}
87+
88+
long position() {
89+
return position;
90+
}
91+
7992
private static long determineFilePositionToWrite(Path path, FileTransformerConfiguration fileConfiguration) {
8093
if (fileConfiguration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
8194
try {
@@ -89,7 +102,7 @@ private static long determineFilePositionToWrite(Path path, FileTransformerConfi
89102
if (fileConfiguration.fileWriteOption() == WRITE_TO_POSITION) {
90103
return Validate.getOrDefault(fileConfiguration.position(), () -> 0L);
91104
}
92-
return 0L;
105+
return 0L;
93106
}
94107

95108
private AsynchronousFileChannel createChannel(Path path) throws IOException {
@@ -183,6 +196,7 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
183196
private final Path path;
184197
private final CompletableFuture<Void> future;
185198
private final Consumer<Throwable> onErrorMethod;
199+
private final Object closeLock = new Object();
186200

187201
private volatile boolean writeInProgress = false;
188202
private volatile boolean closeOnLastWrite = false;
@@ -228,7 +242,7 @@ public void completed(Integer result, ByteBuffer attachment) {
228242
if (byteBuffer.hasRemaining()) {
229243
performWrite(byteBuffer);
230244
} else {
231-
synchronized (FileSubscriber.this) {
245+
synchronized (closeLock) {
232246
writeInProgress = false;
233247
if (closeOnLastWrite) {
234248
close();
@@ -256,7 +270,7 @@ public void onError(Throwable t) {
256270
public void onComplete() {
257271
log.trace(() -> "onComplete");
258272
// if write in progress, tell write to close on finish.
259-
synchronized (this) {
273+
synchronized (closeLock) {
260274
if (writeInProgress) {
261275
log.trace(() -> "writeInProgress = true, not closing");
262276
closeOnLastWrite = true;
@@ -284,4 +298,18 @@ public String toString() {
284298
return getClass() + ":" + path.toString();
285299
}
286300
}
287-
}
301+
302+
303+
@Override
304+
public SplitResult<ResponseT, ResponseT> split(SplittingTransformerConfiguration splitConfig) {
305+
if (configuration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
306+
return AsyncResponseTransformer.super.split(splitConfig);
307+
}
308+
CompletableFuture<ResponseT> future = new CompletableFuture<>();
309+
return (SplitResult<ResponseT, ResponseT>) SplitResult.<ResponseT, ResponseT>builder()
310+
.publisher(new FileAsyncResponseTransformerPublisher(this))
311+
.resultFuture(future)
312+
.parallelSplitSupported(true)
313+
.build();
314+
}
315+
}

0 commit comments

Comments
 (0)