Skip to content

Commit 4f1a897

Browse files
Add support for readFully at the S3SeekableInputStream level (#293)
## Description of change We want to support `readFully` as a part of our ongoing effort to integrate with Iceberg S3FileIO by default. [RangeReadable in Iceberg](https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/io/RangeReadable.java#L47) **Note:** Here's the comparison with existing read method: | Aspect | `read(byte[] buffer, int offset, int length)` | `readFully(long position, byte[] buffer, int offset, int length)` | |--------|----------------------------------------------|-------------------------------------------------------------------| | Method Signature | Reads from current stream position | Reads from specified position | | Position Behavior | Advances the stream position by the number of bytes actually read | Does not modify the stream position (position-independent read) | | Return & Error Handling | Returns `int` indicating bytes read; returns -1 at end of stream; may return fewer bytes than requested | Returns `void`; throws `IOException` if unable to read exact number of requested bytes | #### Relevant issues Once this feature is released as `1.2.0`, we will update this [Iceberg PR](apache/iceberg#13361) #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? Yes #### How was the contribution tested? Ran existing and new tests locally #### Does this contribution need a changelog entry? - [x] I have updated the CHANGELOG or README if appropriate --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
1 parent bbc4aef commit 4f1a897

File tree

5 files changed

+170
-0
lines changed

5 files changed

+170
-0
lines changed

input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStream.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package software.amazon.s3.analyticsaccelerator;
1717

18+
import java.io.EOFException;
1819
import java.io.IOException;
1920
import java.nio.ByteBuffer;
2021
import java.util.List;
@@ -234,6 +235,41 @@ public void readVectored(
234235
logicalIO.readVectored(ranges, allocate);
235236
}
236237

238+
/**
239+
* Fill the provided buffer with the contents of the input source starting at {@code position} for
240+
* the given {@code offset} and {@code length}.
241+
*
242+
* @param position start position of the read
243+
* @param buffer target buffer to copy data
244+
* @param offset offset in the buffer to copy the data
245+
* @param length size of the read
246+
* @throws IOException if an I/O error occurs
247+
*/
248+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
249+
throwIfClosed("cannot read from closed stream");
250+
validatePositionedReadArgs(position, buffer, offset, length);
251+
252+
if (length == 0) {
253+
return;
254+
}
255+
256+
this.telemetry.measureVerbose(
257+
() ->
258+
Operation.builder()
259+
.name(OPERATION_READ)
260+
.attribute(StreamAttributes.uri(this.s3URI))
261+
.attribute(StreamAttributes.etag(this.logicalIO.metadata().getEtag()))
262+
.attribute(StreamAttributes.range(position, position + length - 1))
263+
.build(),
264+
() -> {
265+
int bytesRead = this.logicalIO.read(buffer, offset, length, position);
266+
if (bytesRead < length) {
267+
throw new EOFException(
268+
"Reached the end of stream with " + (length - bytesRead) + " bytes left to read");
269+
}
270+
});
271+
}
272+
237273
/**
238274
* Releases all resources associated with the {@link S3SeekableInputStream}.
239275
*

input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/SeekableInputStream.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ public abstract void readVectored(
8080
Consumer<ByteBuffer> release)
8181
throws IOException;
8282

83+
/**
84+
* Fill the provided buffer with the contents of the input source starting at {@code position} for
85+
* the given {@code offset} and {@code length}.
86+
*
87+
* @param position start position of the read
88+
* @param buffer target buffer to copy data
89+
* @param offset offset in the buffer to copy the data
90+
* @param length size of the read
91+
* @throws IOException if an I/O error occurs
92+
*/
93+
public abstract void readFully(long position, byte[] buffer, int offset, int length)
94+
throws IOException;
95+
8396
/**
8497
* Validates the arguments for a read operation. This method is available to use in all subclasses
8598
* to ensure consistency.

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/model/InMemorySeekableStream.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package software.amazon.s3.analyticsaccelerator.model;
1717

18+
import java.io.IOException;
1819
import java.nio.ByteBuffer;
1920
import java.util.List;
2021
import java.util.function.Consumer;
@@ -84,6 +85,28 @@ public void readVectored(
8485
}
8586
}
8687

88+
@Override
89+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
90+
// Save current position of stream
91+
long prevPosition = this.position;
92+
if (position >= this.contentLength) {
93+
throw new IOException("Position is beyond end of stream");
94+
}
95+
96+
data.position((int) position);
97+
int bytesAvailable = this.contentLength - (int) position;
98+
int bytesToRead = Math.min(length, bytesAvailable);
99+
data.get(buffer, offset, bytesToRead);
100+
if (bytesToRead < length) {
101+
throw new IOException(
102+
"Reached the end of stream with " + (length - bytesToRead) + " bytes left to read");
103+
}
104+
105+
// Restore original position
106+
this.position = prevPosition;
107+
data.position((int) this.position);
108+
}
109+
87110
@Override
88111
public int read() {
89112
if (this.position >= this.contentLength) {

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public void readVectored(
131131
this.delegate.readVectored(ranges, allocate, release);
132132
}
133133

134+
@Override
135+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
136+
this.delegate.readFully(position, buffer, offset, length);
137+
}
138+
134139
@Override
135140
public int read() throws IOException {
136141
return this.delegate.read();

input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,99 @@ public void testInsufficientBuffer() throws IOException {
461461
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
462462
SpotBugsLambdaWorkaround.assertReadResult(
463463
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
464+
assertThrows(
465+
IndexOutOfBoundsException.class, () -> seekableInputStream.readFully(0, new byte[0], 0, 8));
466+
}
467+
468+
@Test
469+
void testReadFullyWithInvalidArgument() throws IOException {
470+
// Given: seekable stream
471+
try (S3SeekableInputStream stream = getTestStream()) {
472+
// When & Then: reading with invalid arguments, exception is thrown
473+
// -1 is invalid position
474+
assertThrows(IllegalArgumentException.class, () -> stream.readFully(-1, new byte[10], 0, 5));
475+
// -1 is invalid length
476+
assertThrows(IllegalArgumentException.class, () -> stream.readFully(0, new byte[10], 0, -1));
477+
// Requesting more data than byte buffer size
478+
assertThrows(IndexOutOfBoundsException.class, () -> stream.readFully(0, new byte[5], 0, 10));
479+
}
480+
}
481+
482+
@Test
483+
void testReadFullyHappyCase() throws IOException {
484+
// Given: seekable stream
485+
try (S3SeekableInputStream stream = getTestStream()) {
486+
// When: reading 5 bytes from position 3
487+
byte[] buf = new byte[5];
488+
stream.readFully(3, buf, 0, 5);
489+
490+
// Then: buffer contains the expected 5 bytes from position 3
491+
byte[] expected = TEST_DATA.substring(3, 8).getBytes(StandardCharsets.UTF_8);
492+
assertArrayEquals(expected, buf);
493+
494+
// Position should remain unchanged after readFully
495+
assertEquals(0, stream.getPos());
496+
}
497+
}
498+
499+
@Test
500+
void testReadFullyDoesNotAlterPosition() throws IOException {
501+
// Given: seekable stream with data "test-data12345678910"
502+
try (S3SeekableInputStream stream = getTestStream()) {
503+
// When:
504+
// 1) Reading first 5 bytes from position 0 (should be "test-")
505+
// 2) Reading 5 bytes from position 10 using readFully (should be "23456")
506+
// 3) Reading next 5 bytes from current position (should be "data1")
507+
byte[] one = new byte[5];
508+
byte[] two = new byte[5];
509+
byte[] three = new byte[5];
510+
511+
int numBytesRead1 = stream.read(one, 0, one.length);
512+
stream.readFully(10, two, 0, two.length);
513+
int numBytesRead3 = stream.read(three, 0, three.length);
514+
515+
// Then: readFully did not alter the position and reads #1 and #3 return subsequent bytes
516+
// First read should return 5 bytes
517+
assertEquals(5, numBytesRead1);
518+
// Third read should also return 5 bytes, continuing from where first read left off
519+
assertEquals(5, numBytesRead3);
520+
521+
// Verify the actual content of each buffer
522+
assertEquals("test-", new String(one, StandardCharsets.UTF_8));
523+
assertEquals("data1", new String(three, StandardCharsets.UTF_8));
524+
assertEquals("23456", new String(two, StandardCharsets.UTF_8));
525+
526+
// Verify the stream position is at 10 (5 + 5) after all reads
527+
assertEquals(10, stream.getPos());
528+
}
529+
}
530+
531+
@Test
532+
public void testReadFullyOnClosedStream() throws IOException {
533+
S3SeekableInputStream seekableInputStream = getTestStream();
534+
seekableInputStream.close();
535+
assertThrows(IOException.class, () -> seekableInputStream.readFully(0, new byte[8], 0, 8));
536+
}
537+
538+
@Test
539+
public void testZeroLengthReadFully() throws IOException {
540+
S3SeekableInputStream seekableInputStream = getTestStream();
541+
assertDoesNotThrow(() -> seekableInputStream.readFully(0, new byte[0], 0, 0));
542+
}
543+
544+
@Test
545+
void testReadFullyThrowsWhenInsufficientBytes() throws IOException {
546+
// Given: seekable stream with TEST_DATA (20 bytes)
547+
try (S3SeekableInputStream stream = getTestStream()) {
548+
// When & Then: trying to read beyond available data should throw IOException
549+
byte[] buffer = new byte[10];
550+
551+
// Try to read 10 bytes starting at position 15 (only 5 bytes available)
552+
assertThrows(IOException.class, () -> stream.readFully(15, buffer, 0, 10));
553+
554+
// Verify stream position remains unchanged after failed readFully
555+
assertEquals(0, stream.getPos());
556+
}
464557
}
465558

466559
private S3SeekableInputStream getTestStream() {

0 commit comments

Comments
 (0)