Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ snafu.workspace = true
tokio.workspace = true
tracing.workspace = true
tempfile.workspace = true
uuid.workspace = true

[dev-dependencies]
approx.workspace = true
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use async_trait::async_trait;
use lance_core::Result;
use roaring::RoaringBitmap;

pub mod optimize;
pub mod scalar;
pub mod vector;

Expand Down
36 changes: 36 additions & 0 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub struct OptimizeOptions {
/// The number of existing indices to merge into a single index, plus the un-indexed fragments.
///
/// - If this is zero, then no merging will be done, which means a new delta index will be created
/// just to cover un-indexed fragments.
/// - If it is one, we will append the un-indexed fragments to the last index.
/// - If it is greater than one, we will merge the last `num_indices_to_merge` indices into a single
/// one, thus reduce the number of indices for this column.
/// - If this number exceeds the number of existing indices, we will merge all existing indices into
/// a single one. So it is a re-write of the entire index.
///
/// Note that no re-train of the index happens during the operation.
pub num_indices_to_merge: usize,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 0,
}
}
}
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/pq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<T: ArrowFloatType + Cosine + Dot + L2> ProductQuantizerImpl<T> {

/// Get the centroids for one sub-vector.
///
/// Returns a flatten `num_centroids * sub_vector_width` f32 array.
/// Returns a flatten `num_centroids * sub_vector_width` float array.
pub fn centroids(&self, sub_vector_idx: usize) -> &[T::Native] {
get_sub_vector_centroids(
self.codebook.as_slice(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,7 @@ mod test {

// UPDATE

dataset.optimize_indices().await.unwrap();
dataset.optimize_indices(None).await.unwrap();
let updated_version = dataset.version().version;

// APPEND -> DELETE
Expand Down
97 changes: 73 additions & 24 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ use std::sync::Arc;
use arrow_schema::DataType;
use async_trait::async_trait;
use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader};
use lance_index::pb::index::Implementation;
use lance_index::scalar::expression::IndexInformationProvider;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::ScalarIndex;
use lance_index::{pb, Index, IndexType, INDEX_FILE_NAME};
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::{
expression::IndexInformationProvider, lance_format::LanceIndexStore, ScalarIndex,
};
use lance_index::{
optimize::OptimizeAction, pb, pb::index::Implementation, Index, IndexType, INDEX_FILE_NAME,
};
use snafu::{location, Location};
use tracing::instrument;
use uuid::Uuid;
Expand Down Expand Up @@ -155,7 +157,7 @@ pub trait DatasetIndexExt {
) -> Result<()>;

/// Optimize indices.
async fn optimize_indices(&mut self) -> Result<()>;
async fn optimize_indices(&mut self, action: Option<OptimizeAction>) -> Result<()>;
}

async fn open_index_proto(dataset: &Dataset, reader: &dyn Reader) -> Result<pb::Index> {
Expand Down Expand Up @@ -279,31 +281,78 @@ impl DatasetIndexExt for Dataset {
Ok(())
}

/// Optimize Indices.
#[instrument(skip_all)]
async fn optimize_indices(&mut self) -> Result<()> {
async fn optimize_indices(&mut self, options: OptimizeOptions) -> Result<()> {
let dataset = Arc::new(self.clone());
// Append index
let indices = self.load_indices().await?;

// Collecting the indices by each column, cause it is possible we have multiple small
// delta indices on the same column.
let mut indices_by_column = HashMap::<i32, Vec<&lance_core::format::Index>>::new();
indices.iter().try_for_each(|idx| {
if idx.fields.len() != 1 {
// Do not support multi-column indices at the moment.
return Err(Error::Index {
message: "Only support optimize indices with 1 column at the moment"
.to_string(),
location: location!(),
});
}
indices_by_column
.entry(idx.fields[0])
.or_default()
.push(idx);
Ok(())
})?;
for (_, indices) in indices_by_column.iter_mut() {
indices.sort_by_key(|idx| idx.dataset_version);
}

let mut new_indices = vec![];
let mut removed_indices = vec![];
for idx in indices.as_slice() {
if idx.dataset_version == self.manifest.version {
continue;
}
let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else {
continue;
};

let new_idx = IndexMetadata {
uuid: new_id,
name: idx.name.clone(),
fields: idx.fields.clone(),
dataset_version: self.manifest.version,
fragment_bitmap: new_frag_ids,
};
removed_indices.push(idx.clone());
new_indices.push(new_idx);
for (&column_id, indices) in indices_by_column.iter() {
let column = dataset
.schema()
.field_by_id(column_id)
.ok_or(Error::Index {
message: format!(
"Optimize indices: column {} does not exist in dataset.",
column_id
),
location: location!(),
})?;

match action {
Some(OptimizeAction::CreateDelta) => {
todo!()
}
Some(OptimizeAction::Merge(_opts)) => {
todo!()
}
None | Some(OptimizeAction::Append) => {
// Default action is append to existing index.
// For backward compatibility.
let idx = indices.last().unwrap_or_else(|| {
panic!("Must has more than 1 index for column: {}", column.name)
});
let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await?
else {
continue;
};

let new_idx = IndexMetadata {
uuid: new_id,
name: idx.name.clone(),
fields: idx.fields.clone(),
dataset_version: self.manifest.version,
fragment_bitmap: new_frag_ids,
};
removed_indices.push((*idx).clone());
new_indices.push(new_idx);
}
}
}

if new_indices.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ mod tests {
.unwrap();
assert_eq!(results[0].num_rows(), 10); // Flat search.

dataset.optimize_indices().await.unwrap();
dataset.optimize_indices(None).await.unwrap();
let index = &dataset.load_indices().await.unwrap()[0];
assert!(unindexed_fragments(index, &dataset)
.await
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/vector/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(super) async fn build_partitions(
pq: Arc<dyn ProductQuantizer>,
metric_type: MetricType,
part_range: Range<u32>,
precomputed_partitons: Option<HashMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
) -> Result<()> {
Expand All @@ -64,7 +64,7 @@ pub(super) async fn build_partitions(
column,
pq.clone(),
Some(part_range),
precomputed_partitons,
precomputed_partitions,
)?;

let stream = shuffle_dataset(
Expand Down