From 779bd68f56d5c89f8eb2659847f596f616e4a7ae Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 9 Jan 2024 17:06:06 -0800 Subject: [PATCH 1/7] optimize actions --- rust/lance-index/Cargo.toml | 1 + rust/lance-index/src/lib.rs | 1 + rust/lance-index/src/optimize.rs | 26 ++++++++++ rust/lance-index/src/vector/pq.rs | 2 +- rust/lance/src/index.rs | 55 +++++++++++++++++++--- rust/lance/src/index/vector/ivf/builder.rs | 2 +- 6 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 rust/lance-index/src/optimize.rs diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index d7bbe18c732..c373c92d58f 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -44,6 +44,7 @@ snafu.workspace = true tokio.workspace = true tracing.workspace = true tempfile.workspace = true +uuid.workspace = true [dev-dependencies] approx.workspace = true diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 8827c3fba94..7a7cdbbcb5d 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use lance_core::Result; use roaring::RoaringBitmap; +pub mod optimize; pub mod scalar; pub mod vector; diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs new file mode 100644 index 00000000000..333dac5054b --- /dev/null +++ b/rust/lance-index/src/optimize.rs @@ -0,0 +1,26 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub struct MergeIndex {} + +pub enum OptimizeAction { + Append, + + /// Create a delta index that covers the un-indexed rows from the + /// last index. + CreateDelta, + + /// Merge existing indices + Merge(MergeIndex), +} diff --git a/rust/lance-index/src/vector/pq.rs b/rust/lance-index/src/vector/pq.rs index d8d00a27641..76082a6eea9 100644 --- a/rust/lance-index/src/vector/pq.rs +++ b/rust/lance-index/src/vector/pq.rs @@ -159,7 +159,7 @@ impl ProductQuantizerImpl { /// Get the centroids for one sub-vector. /// - /// Returns a flatten `num_centroids * sub_vector_width` f32 array. + /// Returns a flatten `num_centroids * sub_vector_width` float array. pub fn centroids(&self, sub_vector_idx: usize) -> &[T::Native] { get_sub_vector_centroids( self.codebook.as_slice(), diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2d81c71dcd5..5827933444d 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -21,11 +21,13 @@ use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; +use datafusion::optimizer::OptimizerConfig; use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader}; +use lance_index::optimize::OptimizeAction; use lance_index::pb::index::Implementation; -use lance_index::scalar::expression::IndexInformationProvider; -use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::scalar::ScalarIndex; +use lance_index::scalar::{ + expression::IndexInformationProvider, lance_format::LanceIndexStore, ScalarIndex, +}; use lance_index::{pb, Index, IndexType, INDEX_FILE_NAME}; use snafu::{location, Location}; use tracing::instrument; @@ -155,7 +157,7 @@ pub trait DatasetIndexExt { ) -> Result<()>; /// Optimize indices. - async fn optimize_indices(&mut self) -> Result<()>; + async fn optimize_indices(&mut self, action: Option) -> Result<()>; } async fn open_index_proto(dataset: &Dataset, reader: &dyn Reader) -> Result { @@ -279,16 +281,57 @@ impl DatasetIndexExt for Dataset { Ok(()) } + /// Optimize Indices. #[instrument(skip_all)] - async fn optimize_indices(&mut self) -> Result<()> { + async fn optimize_indices(&mut self, action: Option) -> Result<()> { let dataset = Arc::new(self.clone()); - // Append index let indices = self.load_indices().await?; + let mut indices_by_column = HashMap::>::new(); + indices.iter().try_for_each(|idx| { + if idx.fields.len() != 1 { + return Err(Error::Index { + message: "Only support optimize indices with 1 column at the moment" + .to_string(), + location: location!(), + }); + } + indices_by_column[&idx.fields[0]].push(idx); + Ok(()) + })?; + + for (&column_id, indices) in indices_by_column.iter() { + let column = dataset + .schema() + .field_by_id(column_id) + .ok_or(Err(Error::Index { + message: format!( + "Optimize indices: column {} does not exist in dataset.", + column_id + ), + location: location!(), + }))?; + + match action { + Some(OptimizeAction::CreateDelta) => { + todo!() + } + Some(OptimizeAction::Merge(opts)) => { + todo!() + } + None | Some(OptimizeAction::Append) => { + // Default action is append to existing index. + // For backward compatibility. + todo!() + } + } + } + let mut new_indices = vec![]; let mut removed_indices = vec![]; for idx in indices.as_slice() { if idx.dataset_version == self.manifest.version { + // This is latest index, skip. continue; } let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else { diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 82838cc3797..108cb2dde7f 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -64,7 +64,7 @@ pub(super) async fn build_partitions( column, pq.clone(), Some(part_range), - precomputed_partitons, + precomputed_partitions, )?; let stream = shuffle_dataset( From 176cbec6a7b582e1ae4352a30896cddf3d7b303e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 9 Jan 2024 17:25:29 -0800 Subject: [PATCH 2/7] sort indices by dataset version; --- rust/lance-index/src/optimize.rs | 6 ++++++ rust/lance/src/index.rs | 21 +++++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs index 333dac5054b..d41d3e25623 100644 --- a/rust/lance-index/src/optimize.rs +++ b/rust/lance-index/src/optimize.rs @@ -24,3 +24,9 @@ pub enum OptimizeAction { /// Merge existing indices Merge(MergeIndex), } + +impl Default for OptimizeAction { + fn default() -> Self { + Self::Append + } +} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 5827933444d..1314735b5a9 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -21,14 +21,13 @@ use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; -use datafusion::optimizer::OptimizerConfig; use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader}; -use lance_index::optimize::OptimizeAction; -use lance_index::pb::index::Implementation; use lance_index::scalar::{ expression::IndexInformationProvider, lance_format::LanceIndexStore, ScalarIndex, }; -use lance_index::{pb, Index, IndexType, INDEX_FILE_NAME}; +use lance_index::{ + optimize::OptimizeAction, pb, pb::index::Implementation, Index, IndexType, INDEX_FILE_NAME, +}; use snafu::{location, Location}; use tracing::instrument; use uuid::Uuid; @@ -287,7 +286,7 @@ impl DatasetIndexExt for Dataset { let dataset = Arc::new(self.clone()); let indices = self.load_indices().await?; - let mut indices_by_column = HashMap::>::new(); + let mut indices_by_column = HashMap::>::new(); indices.iter().try_for_each(|idx| { if idx.fields.len() != 1 { return Err(Error::Index { @@ -296,21 +295,27 @@ impl DatasetIndexExt for Dataset { location: location!(), }); } - indices_by_column[&idx.fields[0]].push(idx); + indices_by_column + .entry(idx.fields[0]) + .or_default() + .push(idx); Ok(()) })?; + for (_, indices) in indices_by_column.iter_mut() { + indices.sort_by_key(|idx| idx.dataset_version); + } for (&column_id, indices) in indices_by_column.iter() { let column = dataset .schema() .field_by_id(column_id) - .ok_or(Err(Error::Index { + .ok_or(Error::Index { message: format!( "Optimize indices: column {} does not exist in dataset.", column_id ), location: location!(), - }))?; + })?; match action { Some(OptimizeAction::CreateDelta) => { From 53a0b4552d867b8b060138ed3ebcaa93607e7f1e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 9 Jan 2024 17:32:18 -0800 Subject: [PATCH 3/7] differnet actions --- rust/lance/src/index.rs | 45 +++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 1314735b5a9..dbf38668f2b 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -305,6 +305,9 @@ impl DatasetIndexExt for Dataset { indices.sort_by_key(|idx| idx.dataset_version); } + let mut new_indices = vec![]; + let mut removed_indices = vec![]; + for (&column_id, indices) in indices_by_column.iter() { let column = dataset .schema() @@ -321,39 +324,33 @@ impl DatasetIndexExt for Dataset { Some(OptimizeAction::CreateDelta) => { todo!() } - Some(OptimizeAction::Merge(opts)) => { + Some(OptimizeAction::Merge(_opts)) => { todo!() } None | Some(OptimizeAction::Append) => { // Default action is append to existing index. // For backward compatibility. - todo!() + let idx = indices.last().expect( + format!("Must has more than 1 index for column: {}", column.name).as_str(), + ); + let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? + else { + continue; + }; + + let new_idx = IndexMetadata { + uuid: new_id, + name: idx.name.clone(), + fields: idx.fields.clone(), + dataset_version: self.manifest.version, + fragment_bitmap: new_frag_ids, + }; + removed_indices.push((*idx).clone()); + new_indices.push(new_idx); } } } - let mut new_indices = vec![]; - let mut removed_indices = vec![]; - for idx in indices.as_slice() { - if idx.dataset_version == self.manifest.version { - // This is latest index, skip. - continue; - } - let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else { - continue; - }; - - let new_idx = IndexMetadata { - uuid: new_id, - name: idx.name.clone(), - fields: idx.fields.clone(), - dataset_version: self.manifest.version, - fragment_bitmap: new_frag_ids, - }; - removed_indices.push(idx.clone()); - new_indices.push(new_idx); - } - if new_indices.is_empty() { return Ok(()); } From 4b1b77536e0ed24219cc416fffb33bddc89fa166 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 9 Jan 2024 17:39:58 -0800 Subject: [PATCH 4/7] better comments --- rust/lance/src/index.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index dbf38668f2b..eae18eba426 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -286,9 +286,12 @@ impl DatasetIndexExt for Dataset { let dataset = Arc::new(self.clone()); let indices = self.load_indices().await?; + // Collecting the indices by each column, cause it is possible we have multiple small + // delta indices on the same column. let mut indices_by_column = HashMap::>::new(); indices.iter().try_for_each(|idx| { if idx.fields.len() != 1 { + // Do not support multi-column indices at the moment. return Err(Error::Index { message: "Only support optimize indices with 1 column at the moment" .to_string(), From 41f62e464be187eac7944855d3a8433dcc05815c Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 9 Jan 2024 19:16:19 -0800 Subject: [PATCH 5/7] update signature --- rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/index/append.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a341b277bf9..9b8778cdf03 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2753,7 +2753,7 @@ mod test { // UPDATE - dataset.optimize_indices().await.unwrap(); + dataset.optimize_indices(None).await.unwrap(); let updated_version = dataset.version().version; // APPEND -> DELETE diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index a305391bac1..e28e07c0f98 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -193,7 +193,7 @@ mod tests { .unwrap(); assert_eq!(results[0].num_rows(), 10); // Flat search. - dataset.optimize_indices().await.unwrap(); + dataset.optimize_indices(None).await.unwrap(); let index = &dataset.load_indices().await.unwrap()[0]; assert!(unindexed_fragments(index, &dataset) .await From 52ba8b8040fa16bcd598d8fac66384b0c852d42b Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Wed, 10 Jan 2024 09:10:19 -0800 Subject: [PATCH 6/7] fix clippy --- rust/lance/src/index.rs | 6 +++--- rust/lance/src/index/vector/ivf/builder.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index eae18eba426..beb5562cd67 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -333,9 +333,9 @@ impl DatasetIndexExt for Dataset { None | Some(OptimizeAction::Append) => { // Default action is append to existing index. // For backward compatibility. - let idx = indices.last().expect( - format!("Must has more than 1 index for column: {}", column.name).as_str(), - ); + let idx = indices.last().unwrap_or_else(|| { + panic!("Must has more than 1 index for column: {}", column.name) + }); let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else { continue; diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 108cb2dde7f..81e9e613937 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -39,7 +39,7 @@ pub(super) async fn build_partitions( pq: Arc, metric_type: MetricType, part_range: Range, - precomputed_partitons: Option>, + precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, ) -> Result<()> { From d864724363dd1cb9ef2aa63188287e21a758038d Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Wed, 10 Jan 2024 09:26:04 -0800 Subject: [PATCH 7/7] options --- rust/lance-index/src/optimize.rs | 30 +++++++++++++++++------------- rust/lance/src/index.rs | 3 ++- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs index d41d3e25623..1207fe444e1 100644 --- a/rust/lance-index/src/optimize.rs +++ b/rust/lance-index/src/optimize.rs @@ -12,21 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub struct MergeIndex {} - -pub enum OptimizeAction { - Append, - - /// Create a delta index that covers the un-indexed rows from the - /// last index. - CreateDelta, - - /// Merge existing indices - Merge(MergeIndex), +pub struct OptimizeOptions { + /// The number of existing indices to merge into a single index, plus the un-indexed fragments. + /// + /// - If this is zero, then no merging will be done, which means a new delta index will be created + /// just to cover un-indexed fragments. + /// - If it is one, we will append the un-indexed fragments to the last index. + /// - If it is greater than one, we will merge the last `num_indices_to_merge` indices into a single + /// one, thus reduce the number of indices for this column. + /// - If this number exceeds the number of existing indices, we will merge all existing indices into + /// a single one. So it is a re-write of the entire index. + /// + /// Note that no re-train of the index happens during the operation. + pub num_indices_to_merge: usize, } -impl Default for OptimizeAction { +impl Default for OptimizeOptions { fn default() -> Self { - Self::Append + Self { + num_indices_to_merge: 0, + } } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index beb5562cd67..6db12d810e7 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader}; +use lance_index::optimize::OptimizeOptions; use lance_index::scalar::{ expression::IndexInformationProvider, lance_format::LanceIndexStore, ScalarIndex, }; @@ -282,7 +283,7 @@ impl DatasetIndexExt for Dataset { /// Optimize Indices. #[instrument(skip_all)] - async fn optimize_indices(&mut self, action: Option) -> Result<()> { + async fn optimize_indices(&mut self, options: OptimizeOptions) -> Result<()> { let dataset = Arc::new(self.clone()); let indices = self.load_indices().await?;