Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions java/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ prost = "0.14.1"
roaring = "0.11"
prost-types = "0.14.1"
chrono = "0.4.41"
futures = "0.3"

[profile.dev]
debug = "line-tables-only"
Expand Down
81 changes: 80 additions & 1 deletion java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::array::{RecordBatch, RecordBatchIterator, StructArray};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::DataType;
use futures::StreamExt;
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::{
JNIEnv,
Expand All @@ -16,10 +17,13 @@ use lance::table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, R
use lance_io::utils::CachedFileSize;
use std::iter::once;

use lance::dataset::fragment::FileFragment;
use lance::dataset::fragment::{FileFragment, FragReadConfig};
use lance::io::ObjectStoreParams;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_datafusion::utils::StreamingWriteSource;
use lance_io::ffi::to_ffi_arrow_array_stream;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
use lance_io::stream::RecordBatchStreamAdapter;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -79,6 +83,81 @@ fn inner_count_rows_native(
Ok(res)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Fragment_nativeReadRange(
mut env: JNIEnv,
_jfragment: JObject,
jdataset: JObject,
fragment_id: jint,
offset: jint,
num_rows: jint,
columns_obj: JObject, // List<String>
batch_size: jint,
stream_addr: jlong,
) {
ok_or_throw_without_return!(
env,
inner_read_range(
&mut env,
jdataset,
fragment_id,
offset,
num_rows,
columns_obj,
batch_size,
stream_addr,
)
)
}

#[allow(clippy::too_many_arguments)]
fn inner_read_range(
env: &mut JNIEnv,
jdataset: JObject,
fragment_id: jint,
offset: jint,
num_rows: jint,
columns_obj: JObject, // List<String>
batch_size: jint,
stream_addr: jlong,
) -> Result<()> {
let columns: Vec<String> = env.get_strings(&columns_obj)?;

let (fragment, projection) = {
let dataset =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }?;

let projection = if columns.is_empty() {
dataset.inner.schema().clone()
} else {
dataset.inner.schema().project(&columns)?
};

let Some(fragment) = dataset.inner.get_fragment(fragment_id as usize) else {
return Err(Error::input_error(format!(
"Fragment not found: {fragment_id}"
)));
};
(fragment, projection)
};

let (arrow_schema, batch_stream) = RT.block_on(async {
let reader = fragment
.open(&projection, FragReadConfig::default())
.await?;
let schema = Arc::new(arrow_schema::Schema::from(&projection));
let range = offset as u32..(offset + num_rows) as u32;
let fut_stream = reader.read_range(range, batch_size as u32)?;
let batch_stream = fut_stream.buffered(get_num_compute_intensive_cpus());
Ok::<_, Error>((schema, batch_stream))
})?;

let record_batch_stream = RecordBatchStreamAdapter::new(arrow_schema, batch_stream);
let ffi_stream = to_ffi_arrow_array_stream(record_batch_stream, RT.handle().clone())?;
unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) }
Ok(())
}

///////////////////
// Write Methods //
///////////////////
Expand Down
42 changes: 42 additions & 0 deletions java/src/main/java/org/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,6 +133,46 @@ public int countRows() {
return countRowsNative(dataset, fragmentMetadata.getId());
}

/**
* Read a contiguous range of logical rows from this fragment.
*
* <p>Logical rows skip deleted rows, so row 0 is the first non-deleted row. Data is streamed
* batch-by-batch rather than materialized fully in memory.
*
* @param offset the starting logical row index (0-based)
* @param numRows the number of rows to read
* @param columns the columns to read (empty list for all columns)
* @param batchSize the maximum number of rows per batch
* @return an ArrowReader that streams the requested rows
*/
public ArrowReader readRange(int offset, int numRows, List<String> columns, int batchSize)
throws IOException {
Preconditions.checkArgument(offset >= 0, "offset must be non-negative");
Preconditions.checkArgument(numRows > 0, "numRows must be positive");
Preconditions.checkArgument(batchSize > 0, "batchSize must be positive");
BufferAllocator allocator = dataset.allocator();
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
nativeReadRange(
dataset,
fragmentMetadata.getId(),
offset,
numRows,
columns,
batchSize,
stream.memoryAddress());
return Data.importArrayStream(allocator, stream);
}
}

private native void nativeReadRange(
Dataset dataset,
int fragmentId,
int offset,
int numRows,
List<String> columns,
int batchSize,
long streamAddress);

/**
* Merge the new columns into this Fragment, will return the new fragment with the same
* FragmentId. This operation will perform a left-join with the right table (new data in stream)
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/org/lance/FragmentMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/** Metadata of a Fragment in the dataset. Matching to lance Fragment. */
public class FragmentMetadata implements Serializable {
private static final long serialVersionUID = -5886811251944130460L;
private final int id;
private final int id; // FIXME: change to long
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fragment ID should be a long but that is a fairly significant change so left that for a follow up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened this follow up to move fragment ID to long... #6498

private final List<DataFile> files;
private final long physicalRows;
private final DeletionFile deletionFile;
Expand Down
65 changes: 65 additions & 0 deletions java/src/test/java/org/lance/FragmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.lance.operation.Update;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -344,4 +345,68 @@ void testMergeColumns(@TempDir Path tempDir) throws Exception {
}
}
}

@Test
void testReadRange(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("testReadRange").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int rowCount = 50;
FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(fragmentMeta));
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
Fragment fragment = dataset.getFragments().get(0);

// Read a range of rows from the middle
try (ArrowReader reader = fragment.readRange(10, 20, Collections.emptyList(), 1024)) {
int totalRead = 0;
int batchCount = 0;
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
IntVector idVector = (IntVector) root.getVector("id");
for (int i = 0; i < root.getRowCount(); i++) {
assertEquals(10 + totalRead + i, idVector.get(i));
}
totalRead += root.getRowCount();
batchCount++;
}
assertEquals(20, totalRead);
assertTrue(batchCount == 1);
}

// Read with specific columns
try (ArrowReader reader = fragment.readRange(0, 5, Collections.singletonList("id"), 1024)) {
int totalRead = 0;
int batchCount = 0;
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(1, root.getSchema().getFields().size());
assertEquals("id", root.getSchema().getFields().get(0).getName());
IntVector idVector = (IntVector) root.getVector("id");
for (int i = 0; i < root.getRowCount(); i++) {
assertEquals(totalRead + i, idVector.get(i));
}
totalRead += root.getRowCount();
batchCount++;
}
assertEquals(5, totalRead);
assertTrue(batchCount == 1);
}

// Read with custom batch size
try (ArrowReader reader = fragment.readRange(0, 10, Collections.emptyList(), 3)) {
int totalRead = 0;
int batchCount = 0;
while (reader.loadNextBatch()) {
totalRead += reader.getVectorSchemaRoot().getRowCount();
batchCount++;
}
assertEquals(10, totalRead);
assertTrue(batchCount > 1);
}
}
}
}
}
Loading