Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ include 'temporal-remote-data-encoder'
include 'temporal-shaded'
include 'temporal-workflowcheck'
include 'temporal-envconfig'
include 'temporal-s3-external-storage'
14 changes: 14 additions & 0 deletions temporal-s3-external-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
description = '''Temporal S3 External Storage Driver'''

ext {
awsSdkVersion = '2.25.0'
}

dependencies {
api project(':temporal-sdk')
implementation "software.amazon.awssdk:s3:${awsSdkVersion}"

testImplementation "junit:junit:${junitVersion}"
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.temporal.contrib.aws.s3driver;

import io.temporal.common.Experimental;
import io.temporal.common.converter.StorageDriverException;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* Adapter wrapping the AWS SDK for Java v2 S3 client into the {@link S3Client} interface used by
* {@link S3StorageDriver}.
*
* <p><b>Retry behavior:</b> The AWS SDK v2 has built-in retry logic that is configurable on the
* {@link software.amazon.awssdk.services.s3.S3Client} instance. Users <b>should</b> configure
* appropriate retry policies for production use. For example:
*
* <pre>{@code
* software.amazon.awssdk.services.s3.S3Client awsS3 =
* software.amazon.awssdk.services.s3.S3Client.builder()
* .region(Region.US_EAST_1)
* .overrideConfiguration(o -> o.retryPolicy(
* RetryPolicy.builder().numRetries(3).build()))
* .build();
* S3Client client = new AwsSdkV2S3Client(awsS3);
* }</pre>
*
* <p><b>Impact on workflows:</b> S3 failures during workflow replay will block workflow progress
* until retries succeed or the operation is abandoned. Configure retry policies and timeouts
* accordingly to avoid indefinite blocking.
*
* <p>Example usage:
*
* <pre>{@code
* software.amazon.awssdk.services.s3.S3Client awsS3 = software.amazon.awssdk.services.s3.S3Client.builder()
* .region(Region.US_EAST_1)
* .build();
* S3Client client = new AwsSdkV2S3Client(awsS3);
* }</pre>
*/
@Experimental
public final class AwsSdkV2S3Client implements S3Client {

private static final Logger log = LoggerFactory.getLogger(AwsSdkV2S3Client.class);

private final software.amazon.awssdk.services.s3.S3Client delegate;

/**
* Creates a new adapter wrapping the given AWS SDK v2 S3 client.
*
* @param delegate the AWS SDK v2 S3 client to delegate to
*/
public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

@Override
public void putObject(String bucket, String key, byte[] data) {
log.debug("S3 PutObject: bucket={}, key={}, size={} bytes", bucket, key, data.length);
try {
delegate.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data));
} catch (Exception e) {
log.warn("S3 PutObject failed: bucket={}, key={}", bucket, key, e);
throw new StorageDriverException(
"Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e);
}
}

@Override
public byte[] getObject(String bucket, String key) {
log.debug("S3 GetObject: bucket={}, key={}", bucket, key);
try (InputStream is =
delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
}
return baos.toByteArray();
} catch (Exception e) {
log.warn("S3 GetObject failed: bucket={}, key={}", bucket, key, e);
throw new StorageDriverException(
"Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.temporal.contrib.aws.s3driver;

import io.temporal.common.Experimental;

/**
* Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows
* plugging in different S3 client implementations and enables testing with mocks.
*
* <p>Implementations <b>must</b> be thread-safe. A single {@code S3Client} instance may be shared
* across multiple workflow threads and activity workers concurrently.
*
* @see AwsSdkV2S3Client
*/
@Experimental
public interface S3Client {
/**
* Uploads an object to S3.
*
* @param bucket the S3 bucket name
* @param key the S3 object key
* @param data the object content
* @throws io.temporal.common.converter.StorageDriverException if the upload fails
*/
void putObject(String bucket, String key, byte[] data);

/**
* Downloads an object from S3.
*
* @param bucket the S3 bucket name
* @param key the S3 object key
* @return the object content
* @throws io.temporal.common.converter.StorageDriverException if the download fails
*/
byte[] getObject(String bucket, String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package io.temporal.contrib.aws.s3driver;

import io.temporal.common.Experimental;
import io.temporal.common.converter.StorageDriver;
import io.temporal.common.converter.StorageDriverClaim;
import io.temporal.common.converter.StorageDriverException;
import io.temporal.common.converter.StorageDriverRetrieveContext;
import io.temporal.common.converter.StorageDriverStoreContext;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed
* storage with SHA-256 hashing for natural deduplication.
*
* <p>S3 object keys are constructed as:
*
* <pre>
* {keyPrefix}{namespace}/{workflowId}/{sha256-hash}
* </pre>
*
* <p>Example usage:
*
* <pre>{@code
* software.amazon.awssdk.services.s3.S3Client awsS3 = S3Client.builder()
* .region(Region.US_EAST_1)
* .build();
*
* S3StorageDriver driver = new S3StorageDriver(
* S3StorageDriverOptions.newBuilder()
* .setClient(new AwsSdkV2S3Client(awsS3))
* .setBucket("my-temporal-payloads")
* .build());
*
* ExternalStorage externalStorage = ExternalStorage.newBuilder()
* .addDriver(driver)
* .build();
* }</pre>
*
* <p><b>Important:</b> Temporal does not manage the lifecycle of stored objects. Users must
* configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at
* least for the workflow lifetime plus the namespace retention period.
*
* @implNote Each payload is held in memory as a {@code byte[]}, so peak memory usage during store
* and retrieve operations is approximately 2–3× the payload size (original bytes, hash
* computation buffer, and S3 request/response buffer). S3 provides data integrity via checksums
* automatically. Content-addressed storage means that duplicate payloads produce identical
* object keys, making writes idempotent — re-uploading the same payload simply overwrites the
* same S3 object with identical content.
*/
@Experimental
public final class S3StorageDriver implements StorageDriver {

private static final Logger log = LoggerFactory.getLogger(S3StorageDriver.class);

private static final String DRIVER_TYPE = "aws.s3driver";
static final String CLAIM_KEY_BUCKET = "bucket";
static final String CLAIM_KEY_OBJECT_KEY = "key";

private final S3Client client;
private final String bucket;
private final String keyPrefix;
private final String driverName;

/**
* Creates a new S3 storage driver with the given options.
*
* @param options the driver configuration options
*/
public S3StorageDriver(@Nonnull S3StorageDriverOptions options) {
Objects.requireNonNull(options, "options");
this.client = options.getClient();
this.bucket = options.getBucket();
this.keyPrefix = options.getKeyPrefix();
this.driverName = options.getDriverName();
}

@Override
public String name() {
return driverName;
}

@Override
public String type() {
return DRIVER_TYPE;
}

@Override
public List<StorageDriverClaim> store(StorageDriverStoreContext context, List<byte[]> payloads)
throws StorageDriverException {
List<StorageDriverClaim> claims = new ArrayList<>(payloads.size());
for (byte[] payload : payloads) {
String hash = sha256Hex(payload);
String objectKey = buildObjectKey(context, hash);

log.debug(
"Storing payload to S3: bucket={}, key={}, size={} bytes",
bucket,
objectKey,
payload.length);
try {
client.putObject(bucket, objectKey, payload);
} catch (StorageDriverException e) {
log.warn("Failed to store payload to S3: bucket={}, key={}", bucket, objectKey, e);
throw e;
}

Map<String, String> claimData = new HashMap<>();
claimData.put(CLAIM_KEY_BUCKET, bucket);
claimData.put(CLAIM_KEY_OBJECT_KEY, objectKey);
claims.add(new StorageDriverClaim(claimData));
}
return claims;
}

/**
* Retrieves payloads from S3 using the configured bucket.
*
* <p><b>Security note:</b> This method always uses the bucket configured at construction time
* ({@code this.bucket}) and ignores any bucket value stored in claim data. Although claims
* include a bucket field for informational purposes, honoring it at retrieval time would be a
* confused-deputy vulnerability — an attacker who can craft or tamper with claim tokens could
* redirect reads to an arbitrary bucket.
*/
@Override
public List<byte[]> retrieve(
StorageDriverRetrieveContext context, List<StorageDriverClaim> claims)
throws StorageDriverException {
List<byte[]> results = new ArrayList<>(claims.size());
for (StorageDriverClaim claim : claims) {
String objectKey = claim.getClaimData().get(CLAIM_KEY_OBJECT_KEY);
if (objectKey == null) {
throw new StorageDriverException(
"Claim is missing required '" + CLAIM_KEY_OBJECT_KEY + "' field: " + claim);
}
log.debug("Retrieving payload from S3: bucket={}, key={}", bucket, objectKey);
// Always use the configured bucket for security — claim data could be tampered.
try {
results.add(client.getObject(bucket, objectKey));
} catch (StorageDriverException e) {
log.warn("Failed to retrieve payload from S3: bucket={}, key={}", bucket, objectKey, e);
throw e;
}
}
return results;
}

/**
* Builds the S3 object key for a payload.
*
* <p>Each path segment derived from user-controlled input (namespace, workflowId, activityType)
* is URL-encoded to prevent path-injection attacks (e.g., a workflowId containing {@code "../"}
* could escape the intended key hierarchy).
*
* @param context the store context providing namespace, workflowId, and optional activityType
* @param hash the SHA-256 hex digest of the payload
* @return the fully-qualified S3 object key
*/
private String buildObjectKey(StorageDriverStoreContext context, String hash) {
StringBuilder sb = new StringBuilder();
if (!keyPrefix.isEmpty()) {
sb.append(keyPrefix);
if (!keyPrefix.endsWith("/")) {
sb.append('/');
}
}
sb.append(encodePathSegment(context.getNamespace())).append('/');
sb.append(encodePathSegment(context.getWorkflowId())).append('/');
if (context.getActivityType() != null) {
sb.append(encodePathSegment(context.getActivityType())).append('/');
}
sb.append(hash);
return sb.toString();
}

/**
* URL-encodes a single path segment, replacing {@code +} with {@code %20} so that spaces are
* represented correctly in S3 object keys.
*/
private static String encodePathSegment(String segment) {
try {
return URLEncoder.encode(segment, "UTF-8").replace("+", "%20");
} catch (UnsupportedEncodingException e) {
// UTF-8 is guaranteed to be available on all JVMs
throw new AssertionError("UTF-8 encoding not available", e);
}
}

static String sha256Hex(byte[] data) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(data);
StringBuilder hexString = new StringBuilder(hash.length * 2);
for (byte b : hash) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException e) {
throw new StorageDriverException("SHA-256 algorithm not available", e);
}
}
}
Loading