From edd142eb11c4fd378d5d8371c54273059d23961d Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Tue, 12 Aug 2025 12:08:11 -0700 Subject: [PATCH 1/4] Add crt implementation of upload to support progress --- .../s3/internal/CrtS3TransferManager.java | 50 +++++++++++++++++++ .../s3/internal/GenericS3TransferManager.java | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java index d9dded426e37..3d4673582845 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java @@ -19,6 +19,7 @@ import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -33,11 +34,15 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload; +import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload; import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; +import software.amazon.awssdk.transfer.s3.model.CompletedUpload; import software.amazon.awssdk.transfer.s3.model.FileUpload; import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload; +import software.amazon.awssdk.transfer.s3.model.Upload; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Validate; @@ -54,6 +59,51 @@ class CrtS3TransferManager extends GenericS3TransferManager { this.s3AsyncClient = s3AsyncClient; } + @Override + public final Upload upload(UploadRequest uploadRequest) { + Validate.paramNotNull(uploadRequest, "uploadRequest"); + + AsyncRequestBody requestBody = uploadRequest.requestBody(); + + CompletableFuture returnFuture = new CompletableFuture<>(); + + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, + requestBody.contentLength().orElse(null)); + progressUpdater.transferInitiated(); + // requestBody = progressUpdater.wrapRequestBody(requestBody); + progressUpdater.registerCompletion(returnFuture); + + S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable(); + + Consumer attachObservable = + b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable) + .put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); + + PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachObservable); + + progressUpdater.transferInitiated(); + progressUpdater.registerCompletion(returnFuture); + + try { + assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); + + CompletableFuture future = + s3AsyncClient.putObject(putObjectRequest, requestBody); + + // Forward upload cancellation to future + CompletableFutureUtils.forwardExceptionTo(returnFuture, future); + + CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture, + r -> CompletedUpload.builder() + .response(r) + .build()); + } catch (Throwable throwable) { + returnFuture.completeExceptionally(throwable); + } + + return new DefaultUpload(returnFuture, progressUpdater.progress()); + } + @Override public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { Validate.paramNotNull(uploadFileRequest, "uploadFileRequest"); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index 29bcdc34d93f..2bed0dd682e6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -128,7 +128,7 @@ class GenericS3TransferManager implements S3TransferManager { } @Override - public final Upload upload(UploadRequest uploadRequest) { + public Upload upload(UploadRequest uploadRequest) { Validate.paramNotNull(uploadRequest, "uploadRequest"); AsyncRequestBody requestBody = uploadRequest.requestBody(); From d945a0c4eb5867e6fa970e4e78bc878aecfbd076 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Wed, 13 Aug 2025 11:03:09 -0700 Subject: [PATCH 2/4] Refactors --- .../s3/internal/CrtS3TransferManager.java | 31 +++---------------- .../s3/internal/GenericS3TransferManager.java | 21 ++++++++----- .../s3/internal/CrtS3TransferManagerTest.java | 3 +- 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java index 3d4673582845..d099a41398e2 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java @@ -19,7 +19,6 @@ import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; -import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -70,36 +69,14 @@ public final Upload upload(UploadRequest uploadRequest) { TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody.contentLength().orElse(null)); progressUpdater.transferInitiated(); - // requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); - S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable(); - - Consumer attachObservable = - b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable) - .put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); - - PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachObservable); + Consumer attachProgress = + b -> b.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); - progressUpdater.transferInitiated(); - progressUpdater.registerCompletion(returnFuture); + PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachProgress); - try { - assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); - - CompletableFuture future = - s3AsyncClient.putObject(putObjectRequest, requestBody); - - // Forward upload cancellation to future - CompletableFutureUtils.forwardExceptionTo(returnFuture, future); - - CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture, - r -> CompletedUpload.builder() - .response(r) - .build()); - } catch (Throwable throwable) { - returnFuture.completeExceptionally(throwable); - } + doUpload(putObjectRequest, requestBody, returnFuture); return new DefaultUpload(returnFuture, progressUpdater.progress()); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index 2bed0dd682e6..de7ffc6ca949 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -148,8 +148,15 @@ public Upload upload(UploadRequest uploadRequest) { putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener); } + doUpload(putObjectRequest, requestBody, returnFuture); + + return new DefaultUpload(returnFuture, progressUpdater.progress()); + } + + protected void doUpload(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody, + CompletableFuture returnFuture) { try { - assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); + assertNotUnsupportedArn(putObjectRequest.bucket(), "upload"); CompletableFuture future = s3AsyncClient.putObject(putObjectRequest, requestBody); @@ -157,15 +164,15 @@ public Upload upload(UploadRequest uploadRequest) { // Forward upload cancellation to future CompletableFutureUtils.forwardExceptionTo(returnFuture, future); - CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture, - r -> CompletedUpload.builder() - .response(r) - .build()); + CompletableFutureUtils.forwardTransformedResultTo( + future, + returnFuture, + r -> CompletedUpload.builder() + .response(r) + .build()); } catch (Throwable throwable) { returnFuture.completeExceptionally(throwable); } - - return new DefaultUpload(returnFuture, progressUpdater.progress()); } /** diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java index b3628d18ce23..ba4da1547dc6 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import com.google.common.jimfs.Jimfs; @@ -105,6 +106,6 @@ private void verifyCrtInRequestAttributes() { assertThat(actual.overrideConfiguration()).isPresent(); SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES); assertThat(attribute).isNotNull(); - assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull(); + assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull(); } } From 083499a826d2607eac20bb8d4cac34373738c09b Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Wed, 13 Aug 2025 12:04:13 -0700 Subject: [PATCH 3/4] Add changelog + tests --- .../bugfix-S3TransferManager-77ad713.json | 6 ++++ .../s3/internal/CrtS3TransferManagerTest.java | 33 ++++++++++++++++--- 2 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 .changes/next-release/bugfix-S3TransferManager-77ad713.json diff --git a/.changes/next-release/bugfix-S3TransferManager-77ad713.json b/.changes/next-release/bugfix-S3TransferManager-77ad713.json new file mode 100644 index 000000000000..5998a5d3e1f4 --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-77ad713.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fix bug in progress reporting in upload when using the CRT client." +} diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java index ba4da1547dc6..454cd2b7d987 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java @@ -37,13 +37,16 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.exceptions.verification.WantedButNotInvoked; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.http.SdkHttpExecutionAttributes; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; @ExtendWith(MockitoExtension.class) public class CrtS3TransferManagerTest { @@ -81,7 +84,7 @@ void uploadDirectory_shouldUseCrtUploadFile() { .completionFuture() .join(); - verifyCrtInRequestAttributes(); + verifyCrtInRequestAttributes(true); } @Test @@ -94,18 +97,38 @@ void uploadFile_shouldUseCrtUploadFile() { .completionFuture() .join(); - verifyCrtInRequestAttributes(); + verifyCrtInRequestAttributes(true); } - private void verifyCrtInRequestAttributes() { - ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture()); + @Test + void upload_shouldUseCrtUpload() { + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build())); + transferManager.upload(UploadRequest.builder() + .putObjectRequest(PutObjectRequest.builder().bucket("test").key("test").build()) + .requestBody(AsyncRequestBody.fromString("test")) + .build()) + .completionFuture() + .join(); + + verifyCrtInRequestAttributes(false); + } + + private void verifyCrtInRequestAttributes(boolean verifyObservable) { + ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + try { + verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture()); + } catch (WantedButNotInvoked e) { + verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(AsyncRequestBody.class).capture()); + } PutObjectRequest actual = requestArgumentCaptor.getValue(); assertThat(actual.overrideConfiguration()).isPresent(); SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES); assertThat(attribute).isNotNull(); assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull(); + if (verifyObservable) { + assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull(); + } } } From 78670485fa9af45146d678e9f59199b21984d4ee Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 14 Aug 2025 16:26:14 -0700 Subject: [PATCH 4/4] Added integ test --- ...3TransferManagerUploadIntegrationTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index eb8102d3a470..c2d2f88f6622 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.transfer.s3; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.IOException; @@ -215,4 +216,31 @@ void upload_file_Interupted_CancelsTheListener(S3TransferManager tm) { assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1)) .isNotEqualTo(100.0); } + + @ParameterizedTest + @MethodSource("transferManagers") + void upload_asyncRequestBody_ReportsProgressCorrectly(S3TransferManager tm) throws IOException { + String content = RandomStringUtils.randomAscii(OBJ_SIZE); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + Upload upload = + tm.upload(UploadRequest.builder() + .putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY)) + .requestBody(AsyncRequestBody.fromString(content)) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .build()); + + upload.completionFuture().join(); + ResponseInputStream obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8))) + .isEqualTo(ChecksumUtils.computeCheckSum(obj)); + + assertListenerForSuccessfulTransferComplete(transferListener); + + // ensure intermediate progress is reported + assertThat(transferListener.getRatioTransferredList()).hasSizeGreaterThan(2); + } }