Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AUTO_RANDOM mode #1242

Open
wants to merge 3 commits into
base: branch-2.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release Notes

## Next
1. Add AUTO_RANDOM as new fadvise mode.

## 2.2.26 - 2024-12-09
1. Upgrade google-cloud-storage to 2.44.1
Expand Down
16 changes: 16 additions & 0 deletions gcs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,22 @@ permissions (not authorized) to execute these requests.
streaming requests as soon as first backward read or forward read for
more than `fs.gs.inputstream.inplace.seek.limit` bytes was detected.

* `AUTO_RANDOM` - It is complementing `AUTO` mode which uses sequential
mode to start with and adapts to bounded range requests. `AUTO_RANDOM`
mode uses bounded channel initially and adapts to sequential requests if
consecutive requests are within `fs.gs.inputstream.min.range.request.size`.
gzip-encode object will bypass this adoption, it will always be a
streaming(unbounded) channel. This helps in cases where egress limits is
getting breached for customer because `AUTO` mode will always lead to
one unbounded channel for a file. `AUTO_RANDOM` will avoid such unwanted
unbounded channels.

* `fs.gs.fadvise.request.track.count` (default: `3`)

Self adaptive fadvise mode uses distance between the served requests to
decide the access pattern. This property controls how many such requests
need to be tracked.

* `fs.gs.inputstream.inplace.seek.limit` (default: `8388608`)

If forward seeks are within this many bytes of the current position, seeks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public class GoogleHadoopFileSystemConfiguration {
public static final HadoopConfigurationProperty<Integer> GCS_BATCH_THREADS =
new HadoopConfigurationProperty<>("fs.gs.batch.threads", 15);

/**
* Configuration key for number of request to track for adapting the access pattern i.e. fadvise:
* AUTO & AUTO_RANDOM.
*/
public static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT =
new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3);

/**
* Configuration key for enabling the use of Rewrite requests for copy operations. Rewrite request
* has the same effect as Copy request, but it can handle moving large objects that may
Expand Down Expand Up @@ -667,6 +674,8 @@ private static GoogleCloudStorageReadOptions getReadChannelOptions(Configuration
.setTraceLogTimeThreshold(GCS_TRACE_LOG_TIME_THRESHOLD_MS.get(config, config::getLong))
.setTraceLogExcludeProperties(
ImmutableSet.copyOf(GCS_TRACE_LOG_EXCLUDE_PROPERTIES.getStringCollection(config)))
.setBlockSize(BLOCK_SIZE.get(config, config::getLong))
.setFadviseRequestTrackCount(GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
"fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
PartFileCleanupType.ALWAYS);
put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", "");
put("fs.gs.fadvise.request.track.count", 3);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import javax.annotation.Nullable;

/** Provides seekable read access to GCS via java-storage library. */
Expand Down Expand Up @@ -210,14 +212,40 @@ private class ContentReadChannel {
// in-place seeks.
private byte[] skipBuffer = null;
private ReadableByteChannel byteChannel = null;
// Keeps track of distance between consecutive requests
private BoundedList<Long> consecutiveRequestsDistances;
// Keeps track of last index of last served Request.
private long servedRequestLastIndex = -1;
private boolean randomAccess;

private class BoundedList<E> extends LinkedList<E> {
private int limit;

public BoundedList(int limit) {
this.limit = limit;
}

@Override
public boolean add(E o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can vectorized IO call this concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, VectoredRead don't use GcsReadChannel concurrently using multiple threads. GcsReadChannel class in not thread safe so is the FifoQueue.

super.add(o);
while (size() > limit) {
super.removeFirst();
}
return true;
}
}

public ContentReadChannel(
GoogleCloudStorageReadOptions readOptions, StorageResourceId resourceId) {
this.blobId =
BlobId.of(
resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId());
this.randomAccess = readOptions.getFadvise() == Fadvise.RANDOM;
this.randomAccess =
readOptions.getFadvise() == Fadvise.RANDOM
|| readOptions.getFadvise() == Fadvise.AUTO_RANDOM;
if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) {
consecutiveRequestsDistances = new BoundedList<>(readOptions.getFadviseRequestTrackCount());
}
}

public int readContent(ByteBuffer dst) throws IOException {
Expand Down Expand Up @@ -304,6 +332,7 @@ public int readContent(ByteBuffer dst) throws IOException {
int partialBytes = partiallyReadBytes(remainingBeforeRead, dst);
totalBytesRead += partialBytes;
currentPosition += partialBytes;
contentChannelCurrentPosition += partialBytes;
logger.atFine().log(
"Closing contentChannel after %s exception for '%s'.", e.getMessage(), resourceId);
closeContentChannel();
Expand All @@ -321,6 +350,10 @@ private int partiallyReadBytes(int remainingBeforeRead, ByteBuffer dst) {
return partialReadBytes;
}

private boolean shouldDetectSequentialAccess() {
return !gzipEncoded && randomAccess && readOptions.getFadvise() == Fadvise.AUTO_RANDOM;
}

private boolean shouldDetectRandomAccess() {
return !gzipEncoded && !randomAccess && readOptions.getFadvise() == Fadvise.AUTO;
}
Expand All @@ -329,6 +362,10 @@ private void setRandomAccess() {
randomAccess = true;
}

private void unsetRandomAccess() {
randomAccess = false;
}

private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException {
checkArgument(
bytesToRead > 0, "bytesToRead should be greater than 0, but was %s", bytesToRead);
Expand All @@ -341,6 +378,9 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException
return serveFooterContent();
}

// Should be updated only if content is not served from cached footer
updateAccessPattern();

setChannelBoundaries(bytesToRead);

ReadableByteChannel readableByteChannel =
Expand Down Expand Up @@ -426,12 +466,16 @@ private long getRangeRequestEnd(long startPosition, long bytesToRead) {
if (gzipEncoded) {
return objectSize;
}

long endPosition = objectSize;

if (randomAccess) {
// opening a channel for whole object doesn't make sense as anyhow it will not be utilized
// for further reads.
endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize());
} else {
if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) {
endPosition = min(startPosition + readOptions.getBlockSize(), objectSize);
}
}
if (footerContent != null) {
// If footer is cached open just till footerStart.
Expand All @@ -451,6 +495,7 @@ public void closeContentChannel() {
"Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId);
} finally {
byteChannel = null;
servedRequestLastIndex = contentChannelCurrentPosition;
reset();
}
}
Expand Down Expand Up @@ -521,34 +566,70 @@ private void performPendingSeeks() {
if (isInRangeSeek()) {
skipInPlace();
} else {
if (isRandomAccessPattern()) {
setRandomAccess();
}
// close existing contentChannel as requested bytes can't be served from current
// contentChannel;
closeContentChannel();
}
}

private void updateAccessPattern() {
if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) {
if (isSequentialAccessPattern()) {
unsetRandomAccess();
}
} else if (readOptions.getFadvise() == Fadvise.AUTO) {
if (isRandomAccessPattern()) {
setRandomAccess();
}
}
}

private boolean isSequentialAccessPattern() {
if (servedRequestLastIndex != -1 && consecutiveRequestsDistances != null) {
consecutiveRequestsDistances.add(currentPosition - servedRequestLastIndex);
}

if (!shouldDetectSequentialAccess()) {
return false;
}

if (consecutiveRequestsDistances.size() < 2) {
return false;
}

boolean sequentialRead = true;
// if more than two reads qualifies for sequential access pattern
ListIterator<Long> iterator = consecutiveRequestsDistances.listIterator();
while (iterator.hasNext()) {
Long distance = iterator.next();
if (distance < 0 || distance > readOptions.DEFAULT_INPLACE_SEEK_LIMIT) {
sequentialRead = false;
break;
}
}
return sequentialRead;
}

private boolean isRandomAccessPattern() {
if (!shouldDetectRandomAccess()) {
return false;
}
if (currentPosition < contentChannelCurrentPosition) {
if (servedRequestLastIndex == -1) {
return false;
}

if (currentPosition < servedRequestLastIndex) {
logger.atFine().log(
"Detected backward read from %s to %s position, switching to random IO for '%s'",
contentChannelCurrentPosition, currentPosition, resourceId);
servedRequestLastIndex, currentPosition, resourceId);
return true;
}
if (contentChannelCurrentPosition >= 0
&& contentChannelCurrentPosition + readOptions.getInplaceSeekLimit() < currentPosition) {
if (servedRequestLastIndex >= 0
&& servedRequestLastIndex + readOptions.getInplaceSeekLimit() < currentPosition) {
logger.atFine().log(
"Detected forward read from %s to %s position over %s threshold,"
+ " switching to random IO for '%s'",
contentChannelCurrentPosition,
currentPosition,
readOptions.getInplaceSeekLimit(),
resourceId);
servedRequestLastIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public abstract class GoogleCloudStorageReadOptions {
public enum Fadvise {
AUTO,
RANDOM,
SEQUENTIAL
SEQUENTIAL,
AUTO_RANDOM
}

public static final int DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS = 200;
Expand All @@ -43,6 +44,7 @@ public enum Fadvise {
public static final boolean DEFAULT_FAST_FAIL_ON_NOT_FOUND = true;
public static final boolean DEFAULT_SUPPORT_GZIP_ENCODING = true;
public static final long DEFAULT_INPLACE_SEEK_LIMIT = 8 * 1024 * 1024;
public static final long BLOCK_SIZE = 64 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we take the connector block_size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we define it's own constants. Defaults getting picked in [GoogleHadoopFileSystemConfiguration.java] (https://github.com/GoogleCloudDataproc/hadoop-connectors/pull/1242/files#diff-f06c91b66e47300ff6c940ca14f152898b99e6e48033502fd4c1dd69c07f0c68) is using the connector BLOCK_SIZE.

public static final Fadvise DEFAULT_FADVISE = Fadvise.SEQUENTIAL;
public static final int DEFAULT_MIN_RANGE_REQUEST_SIZE = 2 * 1024 * 1024;
public static final boolean GRPC_CHECKSUMS_ENABLED_DEFAULT = false;
Expand Down Expand Up @@ -75,6 +77,8 @@ public static Builder builder() {
.setGrpcReadMessageTimeoutMillis(DEFAULT_GRPC_READ_MESSAGE_TIMEOUT_MILLIS)
.setTraceLogEnabled(TRACE_LOGGING_ENABLED_DEFAULT)
.setTraceLogTimeThreshold(0L)
.setBlockSize(BLOCK_SIZE)
.setFadviseRequestTrackCount(3)
.setTraceLogExcludeProperties(ImmutableSet.of());
}

Expand Down Expand Up @@ -133,6 +137,10 @@ public static Builder builder() {
/** See {@link Builder#setTraceLogTimeThreshold(long)} . */
public abstract long getTraceLogTimeThreshold();

public abstract long getBlockSize();

public abstract int getFadviseRequestTrackCount();

/** Mutable builder for GoogleCloudStorageReadOptions. */
@AutoValue.Builder
public abstract static class Builder {
Expand Down Expand Up @@ -200,6 +208,9 @@ public abstract static class Builder {
* <ul>
* <li>{@code AUTO} - automatically switches to {@code RANDOM} mode if backward read or
* forward read for more than {@link #setInplaceSeekLimit} bytes is detected.
* <li>{@code AUTO_RANDOM} - Uses {@code RANDOM} to start with and automatically switches to
* {@code SEQUENTIAL} mode if more than 2 requests fall within {@link
* #setInplaceSeekLimit} limits.
* <li>{@code RANDOM} - sends HTTP requests with {@code Range} header set to greater of
* provided reade buffer by user.
* <li>{@code SEQUENTIAL} - sends HTTP requests with unbounded {@code Range} header.
Expand Down Expand Up @@ -242,6 +253,10 @@ public abstract static class Builder {
/** Sets the property for gRPC read message timeout in milliseconds. */
public abstract Builder setGrpcReadMessageTimeoutMillis(long grpcMessageTimeout);

public abstract Builder setBlockSize(long blockSize);

public abstract Builder setFadviseRequestTrackCount(int requestTrackCount);

abstract GoogleCloudStorageReadOptions autoBuild();

public GoogleCloudStorageReadOptions build() {
Expand Down
Loading