Skip to content

Commit ad8079a

Browse files
authored
feat: add lance_dataset_compact_files for fragment compaction (#40)
## Summary Adds `lance_dataset_compact_files`, exposing upstream's `compact_files` so callers can merge small fragments and materialize pending deletion files in one pass. It's the operational follow-on to `_delete` / `_update` / `_merge_insert`: heavy mutation users accumulate small fragments and deletion files, and this is the cleanup step. A clean dataset (no fragment under the target size, no deletions worth materializing) is a no-op — the call succeeds with all-zero metrics and the version is unchanged. ## Surface `LanceCompactionOptions` carries the five common numeric knobs: | field | upstream default | |---|---| | `target_rows_per_fragment` | ~1Mi rows | | `max_rows_per_group` | 1024 | | `max_bytes_per_file` | writer cap | | `num_threads` | num compute-intensive CPUs | | `batch_size` | scanner default | Each uses `0` to mean "keep the upstream default", so `NULL` and a zero-initialized struct behave identically. Non-zero values go through a `usize` range check so the API doesn't silently truncate on 32-bit hosts. `LanceCompactionMetrics` mirrors upstream's four counts: `fragments_removed`, `fragments_added`, `files_removed`, `files_added`. The C++ surface is just `Dataset::compact_files(options=nullptr)`. ## Out of scope The knobs that don't fit the 0-sentinel convention: `materialize_deletions` (bool, default `true`), `materialize_deletions_threshold` (f32, default `0.1`), `compaction_mode` (3-variant enum), and the niche `defer_index_remap` / `binary_copy_read_batch_bytes` / `max_source_fragments` / `transaction_properties` fields. The struct is `repr(C)` and grows by appending, so any of these can land in a later PR without breaking existing callers. The `remap_options` argument to upstream is fixed at `None` for now (uses the default remapper). ## Test plan - 12 Rust integration tests: basic merge of small neighbors, data round-trip, no-op on a clean single fragment, deletion-file materialization after a partial-row delete on each fragment, each of the five numeric overrides flowing through, zero-init/NULL equivalence, optional `out_metrics`, NULL-dataset rejection, and untouched-on-error semantics. - C and C++ smoke tests in `compile_and_run_test` exercise the no-op path on the existing single-fragment fixture; the C side also pins the NULL-dataset rejection. - `cargo fmt`, `cargo clippy --all-targets -- -D warnings`, the full Rust test suite, and the C/C++ smoke compile are green locally.
1 parent 3f30e80 commit ad8079a

7 files changed

Lines changed: 591 additions & 0 deletions

File tree

include/lance/lance.h

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,69 @@ int32_t lance_dataset_merge_insert(
390390
LanceMergeInsertResult* out_result
391391
);
392392

393+
/* ─── lance_dataset_compact_files ─────────────────────────────────────────── */
394+
395+
/**
396+
* Tunable parameters for lance_dataset_compact_files. Pass NULL to use the
397+
* upstream defaults. Each numeric field uses 0 as a "keep upstream default"
398+
* sentinel; non-zero values are forwarded after a usize range check so the
399+
* API does not silently truncate on 32-bit hosts.
400+
*/
401+
typedef struct LanceCompactionOptions {
402+
/* Target row count per output fragment. Fragments below this size are
403+
candidates for being merged with neighbors. 0 = default (~1Mi rows). */
404+
uint64_t target_rows_per_fragment;
405+
/* Soft cap on rows per row group within an output fragment. 0 = default. */
406+
uint64_t max_rows_per_group;
407+
/* Soft cap on bytes per output fragment file. 0 = default (writer cap). */
408+
uint64_t max_bytes_per_file;
409+
/* Compute parallelism for compaction tasks. 0 = default
410+
(number of compute-intensive CPUs). */
411+
uint64_t num_threads;
412+
/* Scanner batch size for reading input fragments. 0 = default. */
413+
uint64_t batch_size;
414+
} LanceCompactionOptions;
415+
416+
/** Per-call compaction metrics returned via the optional out parameter. */
417+
typedef struct LanceCompactionMetrics {
418+
/* Number of input fragments that were rewritten and dropped. */
419+
uint64_t fragments_removed;
420+
/* Number of new fragments produced by the rewrite. */
421+
uint64_t fragments_added;
422+
/* Total files removed across the operation, including deletion files. */
423+
uint64_t files_removed;
424+
/* Total files added across the operation; one per new fragment. */
425+
uint64_t files_added;
426+
} LanceCompactionMetrics;
427+
428+
/**
429+
* Compact the dataset's fragments, committing a new manifest if anything
430+
* changed. Each compaction task merges adjacent small fragments and
431+
* materializes any deletion files in the process. A clean dataset (no
432+
* fragment under the target size, no deletions worth materializing) is a
433+
* no-op: the function returns success with all-zero metrics and the
434+
* dataset's version is unchanged.
435+
*
436+
* Mutates `dataset` in place — the same handle remains valid afterward and
437+
* sees the new version. Scanners already in flight against this dataset
438+
* keep their pre-compaction snapshot view.
439+
*
440+
* @param dataset Open dataset (not consumed). Must not be NULL.
441+
* @param options Tunable parameters. Pass NULL for upstream defaults.
442+
* @param out_metrics Optional. If non-NULL, on success receives the per-call
443+
* compaction metrics. On error the slot is left unchanged
444+
* — do not read it.
445+
* @return 0 on success, -1 on error. Error codes:
446+
* LANCE_ERR_INVALID_ARGUMENT for NULL `dataset` or for numeric
447+
* overrides that exceed usize::MAX on the running target;
448+
* LANCE_ERR_COMMIT_CONFLICT for a concurrent writer.
449+
*/
450+
int32_t lance_dataset_compact_files(
451+
LanceDataset* dataset,
452+
const LanceCompactionOptions* options,
453+
LanceCompactionMetrics* out_metrics
454+
);
455+
393456
/**
394457
* Export the dataset schema via Arrow C Data Interface.
395458
* @param out Pointer to caller-allocated ArrowSchema struct

include/lance/lance.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,19 @@ class Dataset {
423423
return merge_insert(on_columns, source, &params);
424424
}
425425

426+
/// Compact small or deleted-heavy fragments into larger ones, committing
427+
/// a new manifest. A clean dataset is a no-op — all-zero metrics and the
428+
/// version is unchanged. Pass `nullptr` for upstream defaults.
429+
/// Throws lance::Error on failure (commit conflict, ...).
430+
LanceCompactionMetrics compact_files(
431+
const LanceCompactionOptions* options = nullptr) {
432+
LanceCompactionMetrics metrics{};
433+
if (lance_dataset_compact_files(handle_.get(), options, &metrics) != 0) {
434+
check_error();
435+
}
436+
return metrics;
437+
}
438+
426439
/// Export the schema as an Arrow C Data Interface struct.
427440
void schema(ArrowSchema* out) const {
428441
if (lance_dataset_schema(handle_.get(), out) != 0) {

src/compact.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Compaction C API: rewrite small/deleted-heavy fragments into larger ones,
5+
//! committing a new manifest. Operates as a no-op (no version bump) when no
6+
//! fragments need compacting.
7+
//!
8+
//! Mutates the dataset in place under an exclusive write lock; existing
9+
//! scanners that already cloned the inner Arc keep their pre-compaction
10+
//! snapshot view.
11+
12+
use lance::dataset::optimize::{CompactionOptions, compact_files};
13+
use lance_core::Result;
14+
15+
use crate::dataset::LanceDataset;
16+
use crate::error::ffi_try;
17+
use crate::runtime::block_on;
18+
19+
/// Tunable parameters for `lance_dataset_compact_files`. Pass NULL to use the
20+
/// upstream defaults. Each numeric field uses `0` as a "keep upstream default"
21+
/// sentinel; explicit overrides are forwarded after a `usize` range check.
22+
///
23+
/// The struct is `#[repr(C)]` and ABI-stable within a minor version; new
24+
/// fields can be appended without breaking existing callers.
25+
#[repr(C)]
26+
pub struct LanceCompactionOptions {
27+
/// Target row count per output fragment. Fragments below this size are
28+
/// candidates for being merged with neighbors. `0` uses upstream's
29+
/// default (~1Mi rows).
30+
pub target_rows_per_fragment: u64,
31+
/// Soft cap on rows per row group within an output fragment. `0` uses
32+
/// upstream's default.
33+
pub max_rows_per_group: u64,
34+
/// Soft cap on bytes per output fragment file. `0` uses upstream's
35+
/// default (the writer's per-file cap).
36+
pub max_bytes_per_file: u64,
37+
/// Compute parallelism for compaction tasks. `0` uses upstream's default
38+
/// (the number of compute-intensive CPUs).
39+
pub num_threads: u64,
40+
/// Scanner batch size for reading input fragments. `0` uses upstream's
41+
/// default.
42+
pub batch_size: u64,
43+
}
44+
45+
/// Per-call compaction metrics returned via the optional out parameter.
46+
#[repr(C)]
47+
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
48+
pub struct LanceCompactionMetrics {
49+
/// Number of input fragments that were rewritten and dropped.
50+
pub fragments_removed: u64,
51+
/// Number of new fragments produced by the rewrite.
52+
pub fragments_added: u64,
53+
/// Total files removed across the operation, including deletion files.
54+
pub files_removed: u64,
55+
/// Total files added across the operation; one per new fragment.
56+
pub files_added: u64,
57+
}
58+
59+
/// Compact the dataset's fragments, committing a new manifest if anything
60+
/// changed.
61+
///
62+
/// Each compaction task merges adjacent small fragments and materializes any
63+
/// deletion files in the process. A clean dataset (no fragment under the
64+
/// target size, no deletions worth materializing) is a no-op: the function
65+
/// returns success with all-zero metrics and the dataset's version is
66+
/// unchanged.
67+
///
68+
/// - `dataset`: Open dataset (mutated; same handle remains valid afterward).
69+
/// Must not be NULL.
70+
/// - `options`: Optional. NULL uses upstream defaults; otherwise each field
71+
/// is treated as an override (`0` keeps the default for that field).
72+
/// - `out_metrics`: Optional. If non-NULL, on success receives the
73+
/// `LanceCompactionMetrics` for this call. On error the slot is untouched.
74+
///
75+
/// Returns 0 on success, -1 on error. Error codes:
76+
/// `LANCE_ERR_INVALID_ARGUMENT` for NULL `dataset` or for numeric overrides
77+
/// that exceed `usize::MAX` on the running target;
78+
/// `LANCE_ERR_COMMIT_CONFLICT` for a concurrent writer.
79+
#[unsafe(no_mangle)]
80+
pub unsafe extern "C" fn lance_dataset_compact_files(
81+
dataset: *mut LanceDataset,
82+
options: *const LanceCompactionOptions,
83+
out_metrics: *mut LanceCompactionMetrics,
84+
) -> i32 {
85+
ffi_try!(unsafe { compact_inner(dataset, options, out_metrics) }, neg)
86+
}
87+
88+
unsafe fn compact_inner(
89+
dataset: *mut LanceDataset,
90+
options: *const LanceCompactionOptions,
91+
out_metrics: *mut LanceCompactionMetrics,
92+
) -> Result<i32> {
93+
if dataset.is_null() {
94+
return Err(lance_core::Error::InvalidInput {
95+
source: "dataset must not be NULL".into(),
96+
location: snafu::location!(),
97+
});
98+
}
99+
100+
// SAFETY: `options` is either NULL (use defaults) or points to a valid
101+
// `LanceCompactionOptions` for the duration of this call. `resolve` only
102+
// reads through the pointer.
103+
let resolved = unsafe { resolve_options(options)? };
104+
105+
// SAFETY: `dataset` is non-NULL (checked above) and the caller guarantees
106+
// it points to a live `LanceDataset` not aliased mutably elsewhere.
107+
let ds = unsafe { &*dataset };
108+
let metrics = ds.with_mut(|d| block_on(compact_files(d, resolved, None)))?;
109+
110+
if !out_metrics.is_null() {
111+
// SAFETY: caller guarantees `out_metrics` (when non-NULL) points to
112+
// caller-owned, writable storage of size `sizeof(LanceCompactionMetrics)`.
113+
// We only write on success; on the error paths above the slot stays
114+
// untouched per the documented contract.
115+
unsafe {
116+
*out_metrics = LanceCompactionMetrics {
117+
fragments_removed: metrics.fragments_removed as u64,
118+
fragments_added: metrics.fragments_added as u64,
119+
files_removed: metrics.files_removed as u64,
120+
files_added: metrics.files_added as u64,
121+
};
122+
}
123+
}
124+
Ok(0)
125+
}
126+
127+
/// Apply caller-provided overrides onto a default `CompactionOptions`. NULL
128+
/// means "no overrides"; the per-field sentinel and overflow contract are
129+
/// documented on `LanceCompactionOptions`.
130+
unsafe fn resolve_options(options: *const LanceCompactionOptions) -> Result<CompactionOptions> {
131+
let mut resolved = CompactionOptions::default();
132+
if options.is_null() {
133+
return Ok(resolved);
134+
}
135+
136+
// SAFETY: `options` is non-NULL (checked above) and the caller guarantees
137+
// it points to a properly-initialized `LanceCompactionOptions` valid for
138+
// the duration of this call. We read by shared reference.
139+
let opts = unsafe { &*options };
140+
141+
if opts.target_rows_per_fragment > 0 {
142+
resolved.target_rows_per_fragment =
143+
u64_to_usize(opts.target_rows_per_fragment, "target_rows_per_fragment")?;
144+
}
145+
if opts.max_rows_per_group > 0 {
146+
resolved.max_rows_per_group = u64_to_usize(opts.max_rows_per_group, "max_rows_per_group")?;
147+
}
148+
if opts.max_bytes_per_file > 0 {
149+
resolved.max_bytes_per_file =
150+
Some(u64_to_usize(opts.max_bytes_per_file, "max_bytes_per_file")?);
151+
}
152+
if opts.num_threads > 0 {
153+
resolved.num_threads = Some(u64_to_usize(opts.num_threads, "num_threads")?);
154+
}
155+
if opts.batch_size > 0 {
156+
resolved.batch_size = Some(u64_to_usize(opts.batch_size, "batch_size")?);
157+
}
158+
Ok(resolved)
159+
}
160+
161+
/// Narrow `u64 -> usize` with an explicit error on overflow (32-bit hosts).
162+
/// Realistic compaction tunings fit in `usize` on every supported target,
163+
/// but a silent `as` cast would wrap on a 32-bit host.
164+
fn u64_to_usize(v: u64, field: &'static str) -> Result<usize> {
165+
usize::try_from(v).map_err(|_| lance_core::Error::InvalidInput {
166+
source: format!("{field}={v} exceeds usize::MAX on this target").into(),
167+
location: snafu::location!(),
168+
})
169+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
mod async_dispatcher;
1919
mod batch;
20+
mod compact;
2021
mod dataset;
2122
mod delete;
2223
mod error;
@@ -33,6 +34,7 @@ mod writer;
3334

3435
// Re-export all extern "C" symbols so they appear in the cdylib.
3536
pub use batch::*;
37+
pub use compact::*;
3638
pub use dataset::*;
3739
pub use delete::*;
3840
pub use error::{

0 commit comments

Comments
 (0)