Skip to content

Commit 1343077

Browse files
authored
SSE_C changes (#281)
## Description of change <!-- Thank you for submitting a pull request!--> <!-- Please describe your contribution here. What and why? --> <!-- Please ensure your commit messages follow these [guidelines](https://chris.beams.io/posts/git-commit/). --> Support passing of sse_c customer key to pass this to s3 for encryption/decryption. S3A currently has this customer key as Optional String https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecretOperations.java#L41 while Iceberg has this key as a String https://github.com/apache/iceberg/blob/f9cc62eb0d98e360b452a3ab8fdc6efdc4969f6e/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java#L499. So decided to accept this key as Optional String. #### Relevant issues <!-- Please add issue numbers. --> <!-- Please also link them to this PR. --> #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? <!-- Please explain why this was necessary. --> #### Does this contribution introduce any new public APIs or behaviors? <!-- Please describe them and explain what scenarios they target. --> #### How was the contribution tested? <!-- Please describe how this contribution was tested. --> #### Does this contribution need a changelog entry? - [ ] I have updated the CHANGELOG or README if appropriate --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
1 parent a4ca8c8 commit 1343077

File tree

16 files changed

+673
-45
lines changed

16 files changed

+673
-45
lines changed

.github/workflows/gradle-integration-test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ env:
2020
S3_TEST_BUCKET : ${{ vars.S3_TEST_BUCKET }}
2121
S3_TEST_PREFIX : ${{ vars.S3_TEST_PREFIX }}
2222
ROLE_TO_ASSUME: ${{ secrets.S3_TEST_ASSUME_ROLE_ARN }}
23+
CUSTOMER_KEY: ${{ secrets.CUSTOMER_KEY }}
2324

2425
jobs:
2526
build:

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,21 @@ When the `S3SeekableInputStreamFactory` is no longer required to create new stre
7171
s3SeekableInputStreamFactory.close();
7272
```
7373

74+
### Accessing SSE_C encrypted objects
75+
76+
To access SSE_C encrypted objects using AAL, set the customer key which was used to encrypt the object in the ```OpenStreamInformation``` object and pass the openStreamInformation object in the stream. The customer key must be base64 encoded.
77+
78+
```
79+
OpenStreamInformation openStreamInformation =
80+
OpenStreamInformation.builder()
81+
.encryptionSecrets(
82+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64EncodedCustomerKey)).build())
83+
.build();
84+
85+
S3SeekableInputStream s3SeekableInputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key), openStreamInformation);
86+
87+
```
88+
7489
### Using with Hadoop
7590

7691
If you are using Analytics Accelerator Library for Amazon S3 with Hadoop, you need to set the stream type to `analytics` in the Hadoop configuration. An example configuration is as follows:
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package software.amazon.s3.analyticsaccelerator.request;
17+
18+
import java.security.MessageDigest;
19+
import java.security.NoSuchAlgorithmException;
20+
import java.util.Base64;
21+
import java.util.Optional;
22+
import lombok.Builder;
23+
import lombok.Getter;
24+
25+
/**
26+
* Contains encryption secrets for Server-Side Encryption with Customer-Provided Keys (SSE-C). This
27+
* class manages the customer-provided encryption key used for SSE-C operations with Amazon S3.
28+
*/
29+
@Getter
30+
public class EncryptionSecrets {
31+
32+
/**
33+
* The customer-provided encryption key for SSE-C operations. When present, this key will be used
34+
* for server-side encryption. The key must be Base64 encoded and exactly 256 bits (32 bytes) when
35+
* decoded.
36+
*/
37+
private final Optional<String> ssecCustomerKey;
38+
39+
/**
40+
* The Base64-encoded MD5 hash of the customer key. This hash is automatically calculated from the
41+
* customer key and is used by Amazon S3 to verify the integrity of the encryption key during
42+
* transmission. Will be null if no customer key is provided.
43+
*/
44+
private final String ssecCustomerKeyMd5;
45+
46+
/**
47+
* Constructs an EncryptionSecrets instance with the specified SSE-C customer key.
48+
*
49+
* <p>This constructor processes the SSE-C (Server-Side Encryption with Customer-Provided Keys)
50+
* encryption key and calculates its MD5 hash as required by Amazon S3. The process involves:
51+
*
52+
* <ol>
53+
* <li>Accepting a Base64-encoded encryption key
54+
* <li>Decoding the Base64 key back to bytes
55+
* <li>Computing the MD5 hash of these bytes
56+
* <li>Encoding the MD5 hash in Base64 format
57+
* </ol>
58+
*
59+
* @param sseCustomerKey An Optional containing the Base64-encoded encryption key, or empty if no
60+
* encryption is needed
61+
*/
62+
@Builder
63+
public EncryptionSecrets(Optional<String> sseCustomerKey) {
64+
this.ssecCustomerKey = sseCustomerKey;
65+
this.ssecCustomerKeyMd5 =
66+
sseCustomerKey
67+
.map(
68+
key -> {
69+
try {
70+
MessageDigest md = MessageDigest.getInstance("MD5");
71+
return Base64.getEncoder()
72+
.encodeToString(md.digest(Base64.getDecoder().decode(key)));
73+
} catch (NoSuchAlgorithmException e) {
74+
throw new IllegalStateException("MD5 algorithm not available", e);
75+
}
76+
})
77+
.orElse(null);
78+
}
79+
}

common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenStreamInformation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import lombok.AccessLevel;
1919
import lombok.Builder;
2020
import lombok.Getter;
21+
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
2122
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
2223
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
2324

@@ -41,6 +42,7 @@ public class OpenStreamInformation {
4142
private final StreamAuditContext streamAuditContext;
4243
private final ObjectMetadata objectMetadata;
4344
private final InputPolicy inputPolicy;
45+
private final EncryptionSecrets encryptionSecrets;
4446

4547
/** Default set of settings for {@link OpenStreamInformation} */
4648
public static final OpenStreamInformation DEFAULT = OpenStreamInformation.builder().build();

common/src/test/java/software/amazon/s3/analyticsaccelerator/util/OpenStreamInformationTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,25 @@
1717

1818
import static org.junit.jupiter.api.Assertions.*;
1919

20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Base64;
22+
import java.util.Optional;
2023
import org.junit.jupiter.api.Test;
2124
import org.mockito.Mockito;
25+
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
2226
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
2327
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
2428

2529
public class OpenStreamInformationTest {
2630

31+
private static final String CUSTOMER_KEY = "32-bytes-long-key-for-testing-123";
32+
33+
/**
34+
* To generate the base64 encoded md5 value for a customer key use the cli command echo -n
35+
* "customer_key" | base64 | base64 -d | openssl md5 -binary | base64
36+
*/
37+
private static final String EXPECTED_BASE64_MD5 = "R+k8pqEVUmkxDfaH5MqIdw==";
38+
2739
@Test
2840
public void testDefaultInstance() {
2941
OpenStreamInformation info = OpenStreamInformation.DEFAULT;
@@ -32,24 +44,36 @@ public void testDefaultInstance() {
3244
assertNull(info.getStreamAuditContext(), "Default streamContext should be null");
3345
assertNull(info.getObjectMetadata(), "Default objectMetadata should be null");
3446
assertNull(info.getInputPolicy(), "Default inputPolicy should be null");
47+
assertNull(info.getEncryptionSecrets(), "Default encryptionSecrets should be null");
3548
}
3649

3750
@Test
3851
public void testBuilderWithAllFields() {
3952
StreamAuditContext mockContext = Mockito.mock(StreamAuditContext.class);
4053
ObjectMetadata mockMetadata = Mockito.mock(ObjectMetadata.class);
4154
InputPolicy mockPolicy = Mockito.mock(InputPolicy.class);
55+
String base64Key =
56+
Base64.getEncoder().encodeToString(CUSTOMER_KEY.getBytes(StandardCharsets.UTF_8));
57+
EncryptionSecrets secrets =
58+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64Key)).build();
4259

4360
OpenStreamInformation info =
4461
OpenStreamInformation.builder()
4562
.streamAuditContext(mockContext)
4663
.objectMetadata(mockMetadata)
4764
.inputPolicy(mockPolicy)
65+
.encryptionSecrets(secrets)
4866
.build();
4967

5068
assertSame(mockContext, info.getStreamAuditContext(), "StreamContext should match");
5169
assertSame(mockMetadata, info.getObjectMetadata(), "ObjectMetadata should match");
5270
assertSame(mockPolicy, info.getInputPolicy(), "InputPolicy should match");
71+
assertEquals(
72+
base64Key,
73+
info.getEncryptionSecrets().getSsecCustomerKey().get(),
74+
"Customer key should match");
75+
assertNotNull(info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should not be null");
76+
assertEquals(EXPECTED_BASE64_MD5, info.getEncryptionSecrets().getSsecCustomerKeyMd5());
5377
}
5478

5579
@Test
@@ -103,4 +127,47 @@ public void testNullFields() {
103127
assertNull(info.getObjectMetadata(), "ObjectMetadata should be null");
104128
assertNull(info.getInputPolicy(), "InputPolicy should be null");
105129
}
130+
131+
@Test
132+
public void testDefaultInstanceEncryptionSecrets() {
133+
OpenStreamInformation info = OpenStreamInformation.DEFAULT;
134+
assertNull(info.getEncryptionSecrets(), "Default encryptionSecrets should be null");
135+
}
136+
137+
@Test
138+
public void testBuilderWithEncryptionSecrets() {
139+
// Create a sample base64 encoded key
140+
String base64Key =
141+
Base64.getEncoder().encodeToString(CUSTOMER_KEY.getBytes(StandardCharsets.UTF_8));
142+
EncryptionSecrets secrets =
143+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64Key)).build();
144+
145+
OpenStreamInformation info = OpenStreamInformation.builder().encryptionSecrets(secrets).build();
146+
147+
assertNotNull(info.getEncryptionSecrets(), "EncryptionSecrets should not be null");
148+
assertTrue(
149+
info.getEncryptionSecrets().getSsecCustomerKey().isPresent(),
150+
"Customer key should be present");
151+
assertEquals(
152+
base64Key,
153+
info.getEncryptionSecrets().getSsecCustomerKey().get(),
154+
"Customer key should match");
155+
assertNotNull(info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should not be null");
156+
assertEquals(EXPECTED_BASE64_MD5, info.getEncryptionSecrets().getSsecCustomerKeyMd5());
157+
}
158+
159+
@Test
160+
public void testBuilderWithEmptyEncryptionSecrets() {
161+
EncryptionSecrets secrets =
162+
EncryptionSecrets.builder().sseCustomerKey(Optional.empty()).build();
163+
164+
OpenStreamInformation info = OpenStreamInformation.builder().encryptionSecrets(secrets).build();
165+
166+
assertNotNull(info.getEncryptionSecrets(), "EncryptionSecrets should not be null");
167+
assertFalse(
168+
info.getEncryptionSecrets().getSsecCustomerKey().isPresent(),
169+
"Customer key should be empty");
170+
assertNull(
171+
info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should be null for empty key");
172+
}
106173
}

input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import software.amazon.awssdk.services.s3.model.S3Exception;
4747
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
4848
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
49+
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4950
import software.amazon.s3.analyticsaccelerator.util.S3URI;
5051

5152
/** Base class for the integration tests */
@@ -99,7 +100,11 @@ protected void testAndCompareStreamReadPattern(
99100
// Read using the standard S3 async client
100101
Crc32CChecksum directChecksum = new Crc32CChecksum();
101102
executeReadPatternDirectly(
102-
s3ClientKind, s3Object, streamReadPattern, Optional.of(directChecksum));
103+
s3ClientKind,
104+
s3Object,
105+
streamReadPattern,
106+
Optional.of(directChecksum),
107+
OpenStreamInformation.DEFAULT);
103108

104109
// Read using the AAL S3
105110
Crc32CChecksum aalChecksum = new Crc32CChecksum();
@@ -108,7 +113,8 @@ protected void testAndCompareStreamReadPattern(
108113
s3Object,
109114
streamReadPattern,
110115
AALInputStreamConfigurationKind,
111-
Optional.of(aalChecksum));
116+
Optional.of(aalChecksum),
117+
OpenStreamInformation.DEFAULT);
112118

113119
// Assert checksums
114120
assertChecksums(directChecksum, aalChecksum);
@@ -140,7 +146,8 @@ protected void testChangingEtagMidStream(
140146
S3URI s3URI =
141147
s3Object.getObjectUri(this.getS3ExecutionContext().getConfiguration().getBaseUri());
142148
S3AsyncClient s3Client = this.getS3ExecutionContext().getS3Client();
143-
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
149+
S3SeekableInputStream stream =
150+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
144151

145152
// Read first 100 bytes
146153
readAndAssert(stream, buffer, 0, 100);
@@ -171,7 +178,11 @@ protected void testChangingEtagMidStream(
171178
assertDoesNotThrow(
172179
() ->
173180
executeReadPatternOnAAL(
174-
s3Object, s3AALClientStreamReader, streamReadPattern, Optional.of(datChecksum)));
181+
s3Object,
182+
s3AALClientStreamReader,
183+
streamReadPattern,
184+
Optional.of(datChecksum),
185+
OpenStreamInformation.DEFAULT));
175186
assert (datChecksum.getChecksumBytes().length > 0);
176187
}
177188
}
@@ -199,7 +210,7 @@ protected void testReadVectored(
199210
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
200211

201212
S3SeekableInputStream s3SeekableInputStream =
202-
s3AALClientStreamReader.createReadStream(s3Object);
213+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
203214

204215
List<ObjectRange> objectRanges = new ArrayList<>();
205216
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, 500));
@@ -217,7 +228,7 @@ protected void testReadVectored(
217228
ByteBuffer byteBuffer = objectRange.getByteBuffer().join();
218229

219230
S3SeekableInputStream verificationStream =
220-
s3AALClientStreamReader.createReadStream(s3Object);
231+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
221232
verificationStream.seek(objectRange.getOffset());
222233
byte[] buffer = new byte[objectRange.getLength()];
223234
int readBytes = verificationStream.read(buffer, 0, buffer.length);
@@ -273,7 +284,8 @@ protected void testChangingEtagAfterStreamPassesAndReturnsCachedObject(
273284
// Create the s3DATClientStreamReader - that creates the shared state
274285
try (S3AALClientStreamReader s3AALClientStreamReader =
275286
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
276-
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
287+
S3SeekableInputStream stream =
288+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
277289
Crc32CChecksum datChecksum = calculateCRC32C(stream, bufferSize);
278290

279291
S3URI s3URI =
@@ -287,7 +299,8 @@ protected void testChangingEtagAfterStreamPassesAndReturnsCachedObject(
287299
AsyncRequestBody.fromBytes(generateRandomBytes(bufferSize)))
288300
.join();
289301

290-
S3SeekableInputStream cacheStream = s3AALClientStreamReader.createReadStream(s3Object);
302+
S3SeekableInputStream cacheStream =
303+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
291304
Crc32CChecksum cachedChecksum = calculateCRC32C(cacheStream, bufferSize);
292305

293306
// Assert checksums
@@ -351,7 +364,11 @@ protected void testAALReadConcurrency(
351364
// Read using the standard S3 async client. We do this once, to calculate the checksums
352365
Crc32CChecksum directChecksum = new Crc32CChecksum();
353366
executeReadPatternDirectly(
354-
s3ClientKind, s3Object, streamReadPattern, Optional.of(directChecksum));
367+
s3ClientKind,
368+
s3Object,
369+
streamReadPattern,
370+
Optional.of(directChecksum),
371+
OpenStreamInformation.DEFAULT);
355372

356373
// Create the s3DATClientStreamReader - that creates the shared state
357374
try (S3AALClientStreamReader s3AALClientStreamReader =
@@ -374,7 +391,8 @@ protected void testAALReadConcurrency(
374391
s3Object,
375392
s3AALClientStreamReader,
376393
streamReadPattern,
377-
Optional.of(datChecksum));
394+
Optional.of(datChecksum),
395+
OpenStreamInformation.DEFAULT);
378396

379397
// Assert checksums
380398
assertChecksums(directChecksum, datChecksum);
@@ -418,7 +436,8 @@ protected void testSmallObjectPrefetching(
418436
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
419437

420438
// First stream
421-
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
439+
S3SeekableInputStream stream =
440+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
422441
Crc32CChecksum firstChecksum = calculateCRC32C(stream, (int) s3Object.getSize());
423442

424443
S3URI s3URI =
@@ -433,7 +452,8 @@ protected void testSmallObjectPrefetching(
433452
.join();
434453

435454
// Create second stream
436-
S3SeekableInputStream secondStream = s3AALClientStreamReader.createReadStream(s3Object);
455+
S3SeekableInputStream secondStream =
456+
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
437457
Crc32CChecksum secondChecksum = calculateCRC32C(secondStream, (int) s3Object.getSize());
438458

439459
if (s3Object.getSize() < 8 * ONE_MB) {

0 commit comments

Comments
 (0)