Skip to content

Commit 38f9c1c

Browse files
authored
HADOOP-19559. S3A: Prerequisite features for AAL default ON. (#8067)
Backports: * HADOOP-19394. S3A: Integrate with AAL's readVectored(). (#7720) * HADOOP-19664. S3A: Analytics stream to use Java sync client. (#7909) * HADOOP-19698. S3A: Add AAL dependency to LICENSE-binary. * HADOOP-19587. S3A: Adds in support for SSE-C to AAL (#7906) * HADOOP-19365. S3A: Adds in support for auditing for AAL. (#7723)
1 parent e4d02cc commit 38f9c1c

File tree

15 files changed

+273
-34
lines changed

15 files changed

+273
-34
lines changed

LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ org.xerial.snappy:snappy-java:1.1.10.4
361361
org.yaml:snakeyaml:2.0
362362
org.wildfly.openssl:wildfly-openssl:2.2.5.Final
363363
software.amazon.awssdk:bundle:2.29.52
364+
software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0
364365

365366
--------------------------------------------------------------------------------
366367
This product bundles various third-party components under other open source

hadoop-project/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@
204204
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
205205
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
206206
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
207-
<amazon-s3-analyticsaccelerator-s3.version>1.2.1</amazon-s3-analyticsaccelerator-s3.version>
207+
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
208208
<aws.eventstream.version>1.0.1</aws.eventstream.version>
209209
<hsqldb.version>2.7.1</hsqldb.version>
210210
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,10 @@ private FSDataInputStream executeOpen(
19281928
.withCallbacks(createInputStreamCallbacks(auditSpan))
19291929
.withContext(readContext.build())
19301930
.withObjectAttributes(createObjectAttributes(path, fileStatus))
1931-
.withStreamStatistics(inputStreamStats);
1931+
.withStreamStatistics(inputStreamStats)
1932+
.withEncryptionSecrets(getEncryptionSecrets())
1933+
.withAuditSpan(auditSpan);
1934+
19321935
return new FSDataInputStream(getStore().readObject(parameters));
19331936
}
19341937

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222

2323
import software.amazon.awssdk.core.SdkRequest;
24+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2425
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
2526
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
2627
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -50,6 +51,8 @@
5051
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
5152
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
5253
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
54+
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
55+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
5356

5457
/**
5558
* Extract information from a request.
@@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
193196
|| request instanceof CreateSessionRequest;
194197
}
195198

199+
/**
200+
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
201+
* of requests which are made outside S3A's requestFactory.
202+
*
203+
* @param executionAttributes request execution attributes
204+
* @return true if request is audited outside of current span
205+
*/
206+
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
207+
return executionAttributes.getAttribute(SPAN_ID) != null
208+
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
209+
}
210+
196211
/**
197212
* Predicate which returns true if the request is part of the
198213
* multipart upload API -and which therefore must be rejected

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
6262
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
6363
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
64+
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
6465
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
6566
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
6667
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
@@ -69,6 +70,8 @@
6970
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
7071
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
7172
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
73+
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
74+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
7275

7376
/**
7477
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
@@ -85,7 +88,6 @@ public class LoggingAuditor
8588
private static final Logger LOG =
8689
LoggerFactory.getLogger(LoggingAuditor.class);
8790

88-
8991
/**
9092
* Some basic analysis for the logs.
9193
*/
@@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
267269
*/
268270
private class LoggingAuditSpan extends AbstractAuditSpanImpl {
269271

270-
private final HttpReferrerAuditHeader referrer;
272+
private HttpReferrerAuditHeader referrer;
273+
274+
/**
275+
* Builder for the referrer header. Requests that execute outside S3A, such as in AAL, will initially have SpanId
276+
* of the outside-span operation. For such requests, the spanId and operation name in this builder is overwritten
277+
* in the modifyHttpRequest execution interceptor.
278+
*/
279+
private final HttpReferrerAuditHeader.Builder headerBuilder;
271280

272281
/**
273282
* Attach Range of data for GetObject Request.
@@ -300,7 +309,7 @@ private LoggingAuditSpan(
300309
final String path2) {
301310
super(spanId, operationName);
302311

303-
this.referrer = HttpReferrerAuditHeader.builder()
312+
this.headerBuilder = HttpReferrerAuditHeader.builder()
304313
.withContextId(getAuditorId())
305314
.withSpanId(spanId)
306315
.withOperationName(operationName)
@@ -312,8 +321,9 @@ private LoggingAuditSpan(
312321
currentThreadID())
313322
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
314323
.withEvaluated(context.getEvaluatedEntries())
315-
.withFilter(filters)
316-
.build();
324+
.withFilter(filters);
325+
326+
this.referrer = this.headerBuilder.build();
317327

318328
this.description = referrer.buildHttpReferrer();
319329
}
@@ -384,6 +394,26 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
384394
SdkHttpRequest httpRequest = context.httpRequest();
385395
SdkRequest sdkRequest = context.request();
386396

397+
// If spanId and operationName are set in execution attributes, then use these values,
398+
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
399+
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
400+
// will attach the current spanId and operationName via execution attributes during it's request creation. These
401+
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
402+
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
403+
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);
404+
405+
String spanId = isRequestAuditedOutsideCurrentSpan ?
406+
executionAttributes.getAttribute(SPAN_ID) : getSpanId();
407+
408+
String operationName = isRequestAuditedOutsideCurrentSpan ?
409+
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();
410+
411+
if (isRequestAuditedOutsideCurrentSpan) {
412+
this.headerBuilder.withSpanId(spanId);
413+
this.headerBuilder.withOperationName(operationName);
414+
this.referrer = this.headerBuilder.build();
415+
}
416+
387417
// attach range for GetObject requests
388418
attachRangeFromRequest(httpRequest, executionAttributes);
389419

@@ -400,11 +430,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
400430
.appendHeader(HEADER_REFERRER, header)
401431
.build();
402432
}
433+
403434
if (LOG.isDebugEnabled()) {
404435
LOG.debug("[{}] {} Executing {} with {}; {}",
405436
currentThreadID(),
406-
getSpanId(),
407-
getOperationName(),
437+
spanId,
438+
operationName,
408439
analyzer.analyze(context.request()),
409440
header);
410441
}
@@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution context,
533564
+ analyzer.analyze(context.request());
534565
final String unaudited = getSpanId() + " "
535566
+ UNAUDITED_OPERATION + " " + error;
567+
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
568+
// as an audited request.
536569
if (isRequestNotAlwaysInSpan(context.request())) {
537-
// can get by auditing during a copy, so don't overreact
570+
// can get by auditing during a copy, so don't overreact.
538571
LOG.debug(unaudited);
539-
} else {
572+
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
540573
final RuntimeException ex = new AuditFailureException(unaudited);
541574
LOG.debug(unaudited, ex);
542575
if (isRejectOutOfSpan()) {
@@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context,
547580
super.beforeExecution(context, executionAttributes);
548581
}
549582
}
550-
551583
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -993,10 +993,9 @@ public InputStreamType streamType() {
993993
private class FactoryCallbacks implements StreamFactoryCallbacks {
994994

995995
@Override
996-
public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
997-
// Needs support of the CRT before the requireCRT can be used
998-
LOG.debug("Stream factory requested async client");
999-
return clientManager().getOrCreateAsyncClient();
996+
public S3Client getOrCreateSyncClient() throws IOException {
997+
LOG.debug("Stream factory requested sync client");
998+
return clientManager().getOrCreateS3Client();
1000999
}
10011000

10021001
@Override

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,22 @@
2121

2222
import java.io.EOFException;
2323
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Consumer;
29+
import java.util.function.IntFunction;
30+
import java.util.Optional;
2431

32+
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
33+
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
2534
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
2635
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
36+
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
37+
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
2738
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
39+
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
2840
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
2941
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
3042
import software.amazon.s3.analyticsaccelerator.util.S3URI;
@@ -37,6 +49,11 @@
3749
import org.apache.hadoop.fs.s3a.Retries;
3850
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
3951
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
52+
import org.apache.hadoop.fs.FileRange;
53+
import org.apache.hadoop.fs.VectoredReadUtils;
54+
55+
import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
56+
4057

4158
/**
4259
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
@@ -128,6 +145,42 @@ public int read(byte[] buf, int off, int len) throws IOException {
128145
return bytesRead;
129146
}
130147

148+
/**
149+
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
150+
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
151+
* {@inheritDoc}
152+
*/
153+
@Override
154+
public void readVectored(List<? extends FileRange> ranges,
155+
IntFunction<ByteBuffer> allocate) throws IOException {
156+
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
157+
}
158+
159+
/**
160+
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
161+
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
162+
* {@inheritDoc}
163+
*/
164+
@Override
165+
public void readVectored(final List<? extends FileRange> ranges,
166+
final IntFunction<ByteBuffer> allocate,
167+
final Consumer<ByteBuffer> release) throws IOException {
168+
LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
169+
throwIfClosed();
170+
171+
List<ObjectRange> objectRanges = new ArrayList<>();
172+
173+
for (FileRange range : ranges) {
174+
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
175+
ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength());
176+
objectRanges.add(objectRange);
177+
range.setData(result);
178+
}
179+
180+
// AAL does not do any range coalescing, so input and combined ranges are the same.
181+
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
182+
inputStream.readVectored(objectRanges, allocate, release);
183+
}
131184

132185
@Override
133186
public boolean seekToNewSource(long l) throws IOException {
@@ -205,6 +258,18 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
205258
.etag(parameters.getObjectAttributes().getETag()).build());
206259
}
207260

261+
262+
if (parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
263+
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
264+
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
265+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
266+
}
267+
268+
openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
269+
.operationName(parameters.getAuditSpan().getOperationName())
270+
.spanId(parameters.getAuditSpan().getSpanId())
271+
.build());
272+
208273
return openStreamInformationBuilder.build();
209274
}
210275

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323

2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
26-
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
26+
2727
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
2828
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
29+
import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient;
2930
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
3031

3132
import org.apache.hadoop.conf.Configuration;
@@ -96,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
9697
vectorContext.setMinSeekForVectoredReads(0);
9798

9899
return new StreamFactoryRequirements(0,
99-
0, vectorContext,
100-
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
100+
0, vectorContext);
101101
}
102102

103103
@Override
@@ -118,7 +118,7 @@ private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
118118

119119
private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() {
120120
return () -> new S3SeekableInputStreamFactory(
121-
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
121+
new S3SyncSdkObjectClient(callbacks().getOrCreateSyncClient()),
122122
seekableInputStreamConfiguration);
123123
}
124124

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import java.io.IOException;
2222

23-
import software.amazon.awssdk.services.s3.S3AsyncClient;
23+
import software.amazon.awssdk.services.s3.S3Client;
2424

2525
import org.apache.hadoop.fs.s3a.Statistic;
2626
import org.apache.hadoop.fs.StreamCapabilities;
@@ -80,12 +80,11 @@ ObjectInputStream readObject(ObjectReadParameters parameters)
8080
interface StreamFactoryCallbacks {
8181

8282
/**
83-
* Get the Async S3Client, raising a failure to create as an IOException.
84-
* @param requireCRT is the CRT required.
83+
* Get the Sync S3Client, raising a failure to create as an IOException.
8584
* @return the Async S3 client
8685
* @throws IOException failure to create the client.
8786
*/
88-
S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
87+
S3Client getOrCreateSyncClient() throws IOException;
8988

9089
void incrementFactoryStatistic(Statistic statistic);
9190
}

0 commit comments

Comments
 (0)