Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for userProject in java-storage #1205

Open
wants to merge 2 commits into
base: branch-2.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.hadoop.gcsio;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.GoogleLogger;
import com.google.gson.Gson;
Expand All @@ -43,10 +44,14 @@
public class GoogleCloudStorageClientGrpcTracingInterceptor implements ClientInterceptor {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
public static final String IDEMPOTENCY_TOKEN_HEADER = "x-goog-gcs-idempotency-token";
public static final String USER_PROJECT_HEADER = "x-goog-user-project";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this as an interceptor? Why not veneer provide this as a config while creating the client. Even better, they should be able to figure this out from the GCE VM they are running (if it is running from VM), correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this as an interceptor?

This is Trace interceptor, here we trace the request for userProject filed. If this field is in headers we do log it. It points towards the requesterPays filed being set for gcsbucket in request.

Why not veneer provide this as a config while creating the client.

Via this code we are not updating the header to use the provided project but other way round. Java-storage client do provide a way for requester-pays to be set.

No. It's a userProject header is the one which carries information of requesterpays feature of gcs bucket. This has nothing to with the account/project the VM is in. This feature can even be used from machines which are not part of GCP.

private static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
private static final String DEFAULT_HEADER_VALUE = "NOT-FOUND";
private static final Metadata.Key<String> idempotencyKey =
Metadata.Key.of(IDEMPOTENCY_TOKEN_HEADER, Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> userProjectKey =
Metadata.Key.of(USER_PROJECT_HEADER, Metadata.ASCII_STRING_MARSHALLER);

@NonNull
static String fmtProto(@NonNull Object obj) {
Expand Down Expand Up @@ -215,15 +220,33 @@
}

private String getInvocationId() {
return headers.get(idempotencyKey);
String value = headers.get(idempotencyKey);
if (Strings.isNullOrEmpty(value)) {
logger.atWarning().log(

Check warning on line 225 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcTracingInterceptor.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcTracingInterceptor.java#L225

Added line #L225 was not covered by tests
"Received a null or empty value for invocation Id: %s for rpc method: %s",
value, rpcMethod);
value = DEFAULT_HEADER_VALUE;

Check warning on line 228 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcTracingInterceptor.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcTracingInterceptor.java#L228

Added line #L228 was not covered by tests
}
return value;
}

private ImmutableMap.Builder<String, Object> getStreamContext() {
protected String getRequesterPaysProject() {
String value = headers.get(userProjectKey);
return value == null ? DEFAULT_HEADER_VALUE : value;
}

private ImmutableMap.Builder<String, Object> getHeaderValues() {
return new ImmutableMap.Builder<String, Object>()
.put(GoogleCloudStorageTracingFields.RPC_METHOD.name, rpcMethod)
.put(
GoogleCloudStorageTracingFields.REQUESTER_PAYS_PROJECT.name,
getRequesterPaysProject())
.put(GoogleCloudStorageTracingFields.IDEMPOTENCY_TOKEN.name, getInvocationId());
}

private ImmutableMap.Builder<String, Object> getStreamContext() {
return getHeaderValues().put(GoogleCloudStorageTracingFields.RPC_METHOD.name, rpcMethod);
}

private ImmutableMap.Builder<String, Object> getCommonTraceFields() {
return getStreamContext()
.put(GoogleCloudStorageTracingFields.CURRENT_TIME.name, dtf.format(LocalDateTime.now()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -74,6 +75,7 @@

private final GoogleCloudStorageOptions storageOptions;
private final Storage storage;
private final RequesterPaysManager requesterPaysManager;

// Error extractor to map APi exception to meaningful ErrorTypes.
private static final ErrorTypeExtractor errorExtractor = GrpcErrorTypeExtractor.INSTANCE;
Expand Down Expand Up @@ -114,6 +116,8 @@
? createStorage(
credentials, options, gRPCInterceptors, pCUExecutorService, downscopedAccessTokenFn)
: clientLibraryStorage;
this.requesterPaysManager =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are adding more than the userProject in this PR? If yes, please update the PR descriptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. it is very specific to requesterPays feature. RequesterPaysManager is created with an idea that multiple GoogleCloundStorage client will be able to share the same caching logic of requesterPaysInfo.

new RequesterPaysManager(options.getRequesterPaysOptions(), this::shouldRequesterPay);
}

@Override
Expand All @@ -137,7 +141,11 @@
}

return new GoogleCloudStorageClientWriteChannel(
storage, storageOptions, resourceIdWithGeneration, options);
storage,
storageOptions,
resourceIdWithGeneration,
options,
requesterPaysManager::requesterShouldPay);
}

@Override
Expand All @@ -157,7 +165,8 @@
itemInfo == null ? getItemInfo(resourceId) : itemInfo,
readOptions,
errorExtractor,
storageOptions);
storageOptions,
requesterPaysManager::requesterShouldPay);
}

@Override
Expand Down Expand Up @@ -270,6 +279,15 @@
.getService();
}

private Boolean shouldRequesterPay(String bucketName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move this to RequestPaysManager?

try {
storage.testIamPermissions(bucketName, ImmutableList.of("storage.buckets.get"));

Check warning on line 284 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java#L284

Added line #L284 was not covered by tests
} catch (StorageException e) {
return errorExtractor.userProjectMissing(e);
}
return false;

Check warning on line 288 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java#L287-L288

Added lines #L287 - L288 were not covered by tests
}

private static ImmutableMap<String, String> getUpdatedHeadersWithUserAgent(
GoogleCloudStorageOptions storageOptions) {
ImmutableMap<String, String> httpRequestHeaders =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;

/** Provides seekable read access to GCS via java-storage library. */
Expand All @@ -64,6 +65,7 @@
private long objectSize;
private final ErrorTypeExtractor errorExtractor;
private ContentReadChannel contentReadChannel;
private Function<String, Boolean> requesterShouldPay;
private boolean gzipEncoded = false;
private boolean open = true;

Expand All @@ -76,7 +78,8 @@
GoogleCloudStorageItemInfo itemInfo,
GoogleCloudStorageReadOptions readOptions,
ErrorTypeExtractor errorExtractor,
GoogleCloudStorageOptions storageOptions)
GoogleCloudStorageOptions storageOptions,
Function<String, Boolean> requesterShouldPay)
throws IOException {
validate(itemInfo);
this.storage = storage;
Expand All @@ -87,6 +90,7 @@
this.readOptions = readOptions;
this.storageOptions = storageOptions;
this.contentReadChannel = new ContentReadChannel(readOptions, resourceId);
this.requesterShouldPay = requesterShouldPay;
initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize());
}

Expand Down Expand Up @@ -582,6 +586,10 @@
blobReadOptions.add(
BlobSourceOption.decryptionKey(storageOptions.getEncryptionKey().value()));
}
if (requesterShouldPay.apply((blobId.getBucket()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we have this as a boolean value per GCSClientReadChannel? i.e. what is the value of recomputing this for reach GCS ReadChannel?

blobReadOptions.add(
BlobSourceOption.userProject(storageOptions.getRequesterPaysOptions().getProjectId()));

Check warning on line 591 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java#L590-L591

Added lines #L590 - L591 were not covered by tests
}
return blobReadOptions.toArray(new BlobSourceOption[blobReadOptions.size()]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/** Implements WritableByteChannel to provide write access to GCS via java-storage client */
class GoogleCloudStorageClientWriteChannel implements WritableByteChannel {
Expand All @@ -46,11 +47,12 @@ public GoogleCloudStorageClientWriteChannel(
Storage storage,
GoogleCloudStorageOptions storageOptions,
StorageResourceId resourceId,
CreateObjectOptions createOptions)
CreateObjectOptions createOptions,
Function<String, Boolean> requesterShouldPay)
throws IOException {
this.resourceId = resourceId;
BlobWriteSession blobWriteSession =
getBlobWriteSession(storage, resourceId, createOptions, storageOptions);
getBlobWriteSession(storage, resourceId, createOptions, storageOptions, requesterShouldPay);
this.writableByteChannel = blobWriteSession.open();
}

Expand All @@ -73,16 +75,19 @@ private static BlobWriteSession getBlobWriteSession(
Storage storage,
StorageResourceId resourceId,
CreateObjectOptions createOptions,
GoogleCloudStorageOptions storageOptions) {
GoogleCloudStorageOptions storageOptions,
Function<String, Boolean> requesterShouldPay) {
return storage.blobWriteSession(
getBlobInfo(resourceId, createOptions),
generateWriteOptions(createOptions, storageOptions));
generateWriteOptions(resourceId, createOptions, storageOptions, requesterShouldPay));
}

private static BlobWriteOption[] generateWriteOptions(
CreateObjectOptions createOptions, GoogleCloudStorageOptions storageOptions) {
StorageResourceId resourceId,
CreateObjectOptions createOptions,
GoogleCloudStorageOptions storageOptions,
Function<String, Boolean> requesterShouldPay) {
List<BlobWriteOption> blobWriteOptions = new ArrayList<>();

blobWriteOptions.add(BlobWriteOption.disableGzipContent());
blobWriteOptions.add(BlobWriteOption.generationMatch());
if (createOptions.getKmsKeyName() != null) {
Expand All @@ -95,6 +100,10 @@ private static BlobWriteOption[] generateWriteOptions(
blobWriteOptions.add(
BlobWriteOption.encryptionKey(storageOptions.getEncryptionKey().value()));
}
if (requesterShouldPay.apply(resourceId.getBucketName())) {
blobWriteOptions.add(
BlobWriteOption.userProject(storageOptions.getRequesterPaysOptions().getProjectId()));
}
return blobWriteOptions.toArray(new BlobWriteOption[blobWriteOptions.size()]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum GoogleCloudStorageTracingFields {
// GRPC specific fields
RPC_METHOD("rpcMethod"),
IDEMPOTENCY_TOKEN("idempotency-token"),
REQUESTER_PAYS_PROJECT("requesterPaysProject"),
REQUEST_COUNTER("requestCounter"),
RESPONSE_COUNTER("responseCounter"),
REQUEST_MESSAGE_AS_STRING("reqMessageAsString"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import com.google.cloud.hadoop.util.RequesterPaysOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.time.Duration;
import java.util.function.Function;

@VisibleForTesting
public class RequesterPaysManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using this in the JSON path as well? Looks like now autobuckets will be there in both GCSImpl and GCSClientImpl?

private final RequesterPaysOptions requesterPaysOptions;
private final Function<String, Boolean> shouldRequesterPay;
private final LoadingCache<String, Boolean> autoBuckets =
CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.build(
new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String bucketName) {
return shouldRequesterPay.apply(bucketName);
}
});

public RequesterPaysManager(
RequesterPaysOptions requesterPaysOptions, Function<String, Boolean> shouldRequesterPay) {
this.requesterPaysOptions = requesterPaysOptions;
this.shouldRequesterPay = shouldRequesterPay;
}

public boolean requesterShouldPay(String bucketName) {
if (bucketName == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can this happen?

return false;

Check warning on line 50 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/RequesterPaysManager.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/RequesterPaysManager.java#L50

Added line #L50 was not covered by tests
}
switch (requesterPaysOptions.getMode()) {
case ENABLED:
return true;
case CUSTOM:
return requesterPaysOptions.getBuckets().contains(bucketName);
case AUTO:
return autoBuckets.getUnchecked(bucketName);
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ private GoogleCloudStorageClientReadChannel getJavaStorageChannel(
objectInfo,
readOptions,
GrpcErrorTypeExtractor.INSTANCE,
GoogleCloudStorageOptions.DEFAULT.toBuilder().build());
GoogleCloudStorageOptions.DEFAULT.toBuilder().build(),
(bucketName) -> false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ private GoogleCloudStorageClientWriteChannel getJavaStorageChannel() throws IOEx
.setContentEncoding(CONTENT_ENCODING)
.setMetadata(GoogleCloudStorageTestHelper.getDecodedMetadata(metadata))
.setKmsKeyName(KMS_KEY)
.build());
.build(),
(bucketName) -> false);
}

private static void verifyBlobInfoProperties(
Expand Down
Loading