-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for requestPays in java-storage call flow #995
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,7 @@ | |
@VisibleForTesting | ||
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { | ||
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); | ||
|
||
private static GoogleCloudStorageImpl delegate; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this static? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having the base class and derived class having same field is an anti pattern |
||
private final GoogleCloudStorageOptions storageOptions; | ||
private final Storage storage; | ||
|
||
|
@@ -65,6 +65,7 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { | |
.setNameFormat("gcsio-storage-client-write-channel-pool-%d") | ||
.setDaemon(true) | ||
.build()); | ||
|
||
/** | ||
* Having an instance of gscImpl to redirect calls to Json client while new client implementation | ||
* is in WIP. | ||
|
@@ -78,13 +79,14 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { | |
@Nullable Function<List<AccessBoundary>, String> downscopedAccessTokenFn) | ||
throws IOException { | ||
super( | ||
GoogleCloudStorageImpl.builder() | ||
.setOptions(options) | ||
.setCredentials(credentials) | ||
.setHttpTransport(httpTransport) | ||
.setHttpRequestInitializer(httpRequestInitializer) | ||
.setDownscopedAccessTokenFn(downscopedAccessTokenFn) | ||
.build()); | ||
delegate = | ||
GoogleCloudStorageImpl.builder() | ||
.setOptions(options) | ||
.setCredentials(credentials) | ||
.setHttpTransport(httpTransport) | ||
.setHttpRequestInitializer(httpRequestInitializer) | ||
.setDownscopedAccessTokenFn(downscopedAccessTokenFn) | ||
.build()); | ||
this.storageOptions = options; | ||
this.storage = | ||
clientLibraryStorage == null ? createStorage(credentials, options) : clientLibraryStorage; | ||
|
@@ -108,7 +110,12 @@ public WritableByteChannel create(StorageResourceId resourceId, CreateObjectOpti | |
|
||
GoogleCloudStorageClientWriteChannel channel = | ||
new GoogleCloudStorageClientWriteChannel( | ||
storage, storageOptions, resourceIdWithGeneration, options, backgroundTasksThreadPool); | ||
storage, | ||
storageOptions, | ||
resourceIdWithGeneration, | ||
options, | ||
requesterShouldPay(resourceIdWithGeneration.getBucketName()), | ||
backgroundTasksThreadPool); | ||
channel.initialize(); | ||
return channel; | ||
} | ||
|
@@ -144,7 +151,8 @@ private SeekableByteChannel open( | |
itemInfo == null ? getItemInfo(resourceId) : itemInfo, | ||
readOptions, | ||
errorExtractor, | ||
storageOptions); | ||
storageOptions, | ||
requesterShouldPay(resourceId.getBucketName())); | ||
} | ||
|
||
@Override | ||
|
@@ -160,6 +168,11 @@ public void close() { | |
} | ||
} | ||
|
||
@Override | ||
public GoogleCloudStorageImpl getDelegate() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is overriding the same method as base class? |
||
return delegate; | ||
} | ||
|
||
/** | ||
* Gets the object generation for a write operation | ||
* | ||
|
@@ -195,6 +208,10 @@ private static Storage createStorage( | |
.getService(); | ||
} | ||
|
||
private boolean requesterShouldPay(String bucketName) throws IOException { | ||
return getDelegate().requesterShouldPay(bucketName); | ||
} | ||
|
||
public static Builder builder() { | ||
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ | |
|
||
import com.google.cloud.WriteChannel; | ||
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; | ||
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; | ||
import com.google.cloud.storage.BlobId; | ||
import com.google.cloud.storage.BlobInfo; | ||
import com.google.cloud.storage.Storage; | ||
|
@@ -46,18 +45,18 @@ class GoogleCloudStorageClientWriteChannel extends AbstractGoogleAsyncWriteChann | |
private final StorageResourceId resourceId; | ||
private WriteChannel writeChannel; | ||
private boolean uploadSucceeded = false; | ||
// TODO: not supported as of now | ||
// private final String requesterPaysProject; | ||
|
||
public GoogleCloudStorageClientWriteChannel( | ||
Storage storage, | ||
GoogleCloudStorageOptions storageOptions, | ||
StorageResourceId resourceId, | ||
CreateObjectOptions createOptions, | ||
boolean requesterPays, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wont this break the public API? Maybe add a new constructor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider renaming this to something like requesterPaysEnabled or something similar |
||
ExecutorService uploadThreadPool) { | ||
super(uploadThreadPool, storageOptions.getWriteChannelOptions()); | ||
this.resourceId = resourceId; | ||
this.writeChannel = getClientWriteChannel(storage, resourceId, createOptions, storageOptions); | ||
this.writeChannel = | ||
getClientWriteChannel(storage, resourceId, createOptions, storageOptions, requesterPays); | ||
} | ||
|
||
@Override | ||
|
@@ -90,14 +89,13 @@ private static WriteChannel getClientWriteChannel( | |
Storage storage, | ||
StorageResourceId resourceId, | ||
CreateObjectOptions createOptions, | ||
GoogleCloudStorageOptions storageOptions) { | ||
AsyncWriteChannelOptions channelOptions = storageOptions.getWriteChannelOptions(); | ||
GoogleCloudStorageOptions storageOptions, | ||
boolean requesterPays) { | ||
WriteChannel writeChannel = | ||
storage.writer( | ||
getBlobInfo(resourceId, createOptions), | ||
generateWriteOptions(createOptions, storageOptions)); | ||
writeChannel.setChunkSize(channelOptions.getUploadChunkSize()); | ||
|
||
generateWriteOptions(createOptions, storageOptions, requesterPays)); | ||
writeChannel.setChunkSize(storageOptions.getWriteChannelOptions().getUploadChunkSize()); | ||
return writeChannel; | ||
} | ||
|
||
|
@@ -156,9 +154,10 @@ public Boolean call() throws Exception { | |
} | ||
|
||
private static BlobWriteOption[] generateWriteOptions( | ||
CreateObjectOptions createOptions, GoogleCloudStorageOptions storageOptions) { | ||
CreateObjectOptions createOptions, | ||
GoogleCloudStorageOptions storageOptions, | ||
boolean requesterPays) { | ||
List<BlobWriteOption> blobWriteOptions = new ArrayList<>(); | ||
|
||
blobWriteOptions.add(BlobWriteOption.disableGzipContent()); | ||
blobWriteOptions.add(BlobWriteOption.generationMatch()); | ||
if (createOptions.getKmsKeyName() != null) { | ||
|
@@ -171,6 +170,10 @@ private static BlobWriteOption[] generateWriteOptions( | |
blobWriteOptions.add( | ||
BlobWriteOption.encryptionKey(storageOptions.getEncryptionKey().value())); | ||
} | ||
if (requesterPays) { | ||
blobWriteOptions.add( | ||
BlobWriteOption.userProject(storageOptions.getRequesterPaysOptions().getProjectId())); | ||
} | ||
return blobWriteOptions.toArray(new BlobWriteOption[blobWriteOptions.size()]); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,13 +33,17 @@ | |
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; | ||
import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper; | ||
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor; | ||
import com.google.cloud.hadoop.util.RequesterPaysOptions; | ||
import com.google.cloud.storage.BlobId; | ||
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.Storage.BlobSourceOption; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.protobuf.ByteString; | ||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.List; | ||
import java.util.Random; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
@@ -60,6 +64,10 @@ public class GoogleCloudStorageClientReadChannelTest { | |
private static final ByteString CONTENT = | ||
GoogleCloudStorageTestHelper.createTestData(OBJECT_SIZE); | ||
|
||
private ArgumentCaptor<BlobId> blobIdCaptor = ArgumentCaptor.forClass(BlobId.class); | ||
private ArgumentCaptor<BlobSourceOption> blobSourceOptionCaptor = | ||
ArgumentCaptor.forClass(BlobSourceOption.class); | ||
|
||
private static final GoogleCloudStorageReadOptions DEFAULT_READ_OPTION = | ||
GoogleCloudStorageReadOptions.builder() | ||
.setFadvise(Fadvise.RANDOM) | ||
|
@@ -88,11 +96,48 @@ public class GoogleCloudStorageClientReadChannelTest { | |
|
||
@Before | ||
public void setUp() throws IOException { | ||
|
||
fakeReadChannel = spy(new FakeReadChannel(CONTENT)); | ||
when(mockedStorage.reader(any(), any())).thenReturn(fakeReadChannel); | ||
when(mockedStorage.reader(blobIdCaptor.capture(), blobSourceOptionCaptor.capture())) | ||
.thenReturn(fakeReadChannel); | ||
readChannel = getJavaStorageChannel(DEFAULT_ITEM_INFO, DEFAULT_READ_OPTION); | ||
} | ||
|
||
@Test | ||
public void verifyRequesterPaysOption() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any integration test for requesterPays? |
||
String dummyProjectId = "dummyProjectId"; | ||
int readBytes = 100; | ||
fakeReadChannel = spy(new FakeReadChannel(CONTENT)); | ||
when(mockedStorage.reader(blobIdCaptor.capture(), blobSourceOptionCaptor.capture())) | ||
.thenReturn(fakeReadChannel); | ||
readChannel = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are we really testing here? |
||
new GoogleCloudStorageClientReadChannel( | ||
mockedStorage, | ||
DEFAULT_ITEM_INFO, | ||
DEFAULT_READ_OPTION, | ||
GrpcErrorTypeExtractor.INSTANCE, | ||
GoogleCloudStorageOptions.DEFAULT.toBuilder() | ||
.setRequesterPaysOptions( | ||
RequesterPaysOptions.DEFAULT.toBuilder().setProjectId(dummyProjectId).build()) | ||
.build(), /*requesterPays*/ | ||
true); | ||
getJavaStorageChannel(DEFAULT_ITEM_INFO, DEFAULT_READ_OPTION); | ||
int startPosition = 0; | ||
readChannel.position(startPosition); | ||
assertThat(readChannel.position()).isEqualTo(startPosition); | ||
|
||
ByteBuffer buffer = ByteBuffer.allocate(readBytes); | ||
readChannel.read(buffer); | ||
verifyContent(buffer, startPosition, readBytes); | ||
List<BlobSourceOption> optionsList = blobSourceOptionCaptor.getAllValues(); | ||
assertThat(optionsList).contains(BlobSourceOption.userProject(dummyProjectId)); | ||
verify(fakeReadChannel, times(1)).seek(anyLong()); | ||
verify(fakeReadChannel, times(1)).limit(anyLong()); | ||
verify(fakeReadChannel, times(1)).read(any()); | ||
|
||
verifyNoMoreInteractions(fakeReadChannel); | ||
} | ||
|
||
@Test | ||
public void inValidSeekPositions() { | ||
int seekPosition = -1; | ||
|
@@ -535,6 +580,7 @@ private GoogleCloudStorageClientReadChannel getJavaStorageChannel( | |
objectInfo, | ||
readOptions, | ||
GrpcErrorTypeExtractor.INSTANCE, | ||
GoogleCloudStorageOptions.DEFAULT.toBuilder().build()); | ||
GoogleCloudStorageOptions.DEFAULT.toBuilder().build(), /*requesterPays*/ | ||
false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this final?