diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 75a28fec4542..341ee9442c4c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1043,7 +1043,8 @@ impl RegionServerInner { RegionRequest::Alter(_) | RegionRequest::Flush(_) | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => RegionChange::None, + | RegionRequest::Truncate(_) + | RegionRequest::BuildIndex(_) => RegionChange::None, RegionRequest::Catchup(_) => RegionChange::Catchup, }; diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 0066118f2c50..eabd98b135ed 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -222,6 +222,18 @@ impl RegionEngine for MetricEngine { } } RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await, + RegionRequest::BuildIndex(_) => { + if self.inner.is_physical_region(region_id) { + self.inner + .mito + .handle_request(region_id, request) + .await + .context(error::MitoFlushOperationSnafu) + .map(|response| response.affected_rows) + } else { + UnsupportedRegionRequestSnafu { request }.fail() + } + } RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(), RegionRequest::Delete(_) => { if self.inner.is_physical_region(region_id) { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d48f75fa91a8..3fb3a8abd849 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -40,6 +40,8 @@ mod edit_region_test; mod filter_deleted_test; #[cfg(test)] mod flush_test; +#[cfg(test)] +mod index_build_test; #[cfg(any(test, feature = "test"))] pub mod listener; #[cfg(test)] diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 86cd89a27846..57791221f922 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -37,7 +37,7 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows_for_key, column_metadata_to_column_schema, put_rows, }; -async fn put_and_flush( +pub(crate) async fn put_and_flush( engine: &MitoEngine, region_id: RegionId, column_schemas: &[ColumnSchema], @@ -74,7 +74,7 @@ async fn flush(engine: &MitoEngine, region_id: RegionId) { assert_eq!(0, result.affected_rows); } -async fn compact(engine: &MitoEngine, region_id: RegionId) { +pub(crate) async fn compact(engine: &MitoEngine, region_id: RegionId) { let result = engine .handle_request( region_id, @@ -85,7 +85,7 @@ async fn compact(engine: &MitoEngine, region_id: RegionId) { assert_eq!(result.affected_rows, 0); } -async fn delete_and_flush( +pub(crate) async fn delete_and_flush( engine: &MitoEngine, region_id: RegionId, column_schemas: &[ColumnSchema], diff --git a/src/mito2/src/engine/index_build_test.rs b/src/mito2/src/engine/index_build_test.rs new file mode 100644 index 000000000000..6fe27929e530 --- /dev/null +++ b/src/mito2/src/engine/index_build_test.rs @@ -0,0 +1,302 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Index build tests for mito engine. +//! +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::Rows; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AlterKind, RegionAlterRequest, RegionRequest, SetIndexOption}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::{IndexBuildMode, MitoConfig, Mode}; +use crate::engine::MitoEngine; +use crate::engine::compaction_test::{compact, put_and_flush}; +use crate::engine::listener::IndexBuildListener; +use crate::read::scan_region::Scanner; +use crate::sst::location; +use crate::test_util::{ + CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, reopen_region, rows_schema, +}; + +// wait listener receives enough success count. +async fn wait_finish(listener: &IndexBuildListener, times: usize) { + listener.wait_finish(times).await; +} + +fn async_build_mode_config(is_create_on_flush: bool) -> MitoConfig { + let mut config = MitoConfig::default(); + config.index.build_mode = IndexBuildMode::Async; + if !is_create_on_flush { + config.inverted_index.create_on_flush = Mode::Disable; + config.fulltext_index.create_on_flush = Mode::Disable; + config.bloom_filter_index.create_on_flush = Mode::Disable; + } + config +} + +/// Get the number of generated index files for existed sst files in the scanner. +async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: RegionId) -> usize { + let region = engine.get_region(region_id).unwrap(); + let access_layer = region.access_layer.clone(); + // When there is no file, return 0 directly. + // Because we can't know region file ids here. + if scanner.file_ids().is_empty() { + return 0; + } + let mut index_files_count: usize = 0; + for region_file_id in scanner.file_ids() { + let index_path = location::index_file_path( + access_layer.table_dir(), + region_file_id, + access_layer.path_type(), + ); + if access_layer + .object_store() + .exists(&index_path) + .await + .unwrap() + { + index_files_count += 1; + } + } + index_files_count +} + +#[allow(dead_code)] +fn assert_listener_counts( + listener: &IndexBuildListener, + expected_begin_count: usize, + + expected_success_count: usize, +) { + assert_eq!(listener.begin_count(), expected_begin_count); + assert_eq!(listener.success_count(), expected_success_count); +} + +#[tokio::test] +async fn test_index_build_type_flush() { + let mut env = TestEnv::with_prefix("test_index_build_type_flush_").await; + let listener = Arc::new(IndexBuildListener::default()); + let engine = env + .create_engine_with( + async_build_mode_config(true), + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build_with_index(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 2), + }; + put_rows(&engine, region_id, rows).await; + + // Before first flush is finished, index file and data file should not exist. + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_memtables() == 1); + assert!(scanner.num_files() == 0); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 0); + + flush_region(&engine, region_id, None).await; + + // When first flush is just finished, index file should not exist. + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_memtables() == 0); + assert!(scanner.num_files() == 1); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 0); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(2, 4), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id, None).await; + + // After 2 index build task are finished, 2 index files should exist. + wait_finish(&listener, 2).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 2); +} + +#[tokio::test] +async fn test_index_build_type_compact() { + let mut env = TestEnv::with_prefix("test_index_build_type_compact_").await; + let listener = Arc::new(IndexBuildListener::default()); + let engine = env + .create_engine_with( + async_build_mode_config(true), + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.trigger_file_num", "100") // Make sure compaction is not triggered by file num. + .build_with_index(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; + put_and_flush(&engine, region_id, &column_schemas, 40..50).await; + + // Before compaction is triggered, files should be 4, and not all index files are built. + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_files() == 4); + assert!(num_of_index_files(&engine, &scanner, region_id).await < 4); + + // Note: Compaction have been implicitly triggered by the flush operations above. + // This explicit compaction call serves to make the process deterministic for the test. + compact(&engine, region_id).await; + + // Before compaction is triggered, files should be 2, and not all index files are built. + listener.clear_success_count(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_files() == 2); + assert!(num_of_index_files(&engine, &scanner, region_id).await < 2); + + // Wait a while to make sure index build tasks are finished. + wait_finish(&listener, 2).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_files() == 2); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 2); +} + +#[tokio::test] +async fn test_index_build_type_schema_change() { + let mut env = TestEnv::with_prefix("test_index_build_type_schema_change_").await; + let listener = Arc::new(IndexBuildListener::default()); + let engine = env + .create_engine_with( + async_build_mode_config(true), + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + // Create a region without index. + let request = CreateRequestBuilder::new().build(); + let table_dir = request.table_dir.clone(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush and make sure there is no index file. + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + reopen_region(&engine, region_id, table_dir, true, HashMap::new()).await; + + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_files() == 1); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 0); + + // Set Index and make sure index file is built without flush or compaction. + let set_index_request = RegionAlterRequest { + kind: AlterKind::SetIndexes { + options: vec![SetIndexOption::Inverted { + column_name: "tag_0".to_string(), + }], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(set_index_request)) + .await + .unwrap(); + wait_finish(&listener, 1).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert!(scanner.num_files() == 1); + assert!(num_of_index_files(&engine, &scanner, region_id).await == 1); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index e2ede93b37bb..317c3cdfd00b 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -23,6 +23,8 @@ use common_telemetry::info; use store_api::storage::{FileId, RegionId}; use tokio::sync::Notify; +use crate::sst::file::RegionFileId; + /// Mito engine background event listener. #[async_trait] pub trait EventListener: Send + Sync { @@ -71,6 +73,12 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that region starts to send a region change result to worker. async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {} + + /// Notifies the listener that the index build task is executed successfully. + async fn on_index_build_success(&self, _region_file_id: RegionFileId) {} + + /// Notifies the listener that the index build task is started. + async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {} } pub type EventListenerRef = Arc; @@ -298,3 +306,48 @@ impl EventListener for NotifyRegionChangeResultListener { self.notify.notified().await; } } + +#[derive(Default)] +pub struct IndexBuildListener { + notify: Notify, + success_count: AtomicUsize, + start_count: AtomicUsize, +} + +impl IndexBuildListener { + /// Wait until index build is done for `times` times. + pub async fn wait_finish(&self, times: usize) { + while self.success_count.load(Ordering::Relaxed) < times { + self.notify.notified().await; + } + } + + /// Clears the success count. + pub fn clear_success_count(&self) { + self.success_count.store(0, Ordering::Relaxed); + } + + /// Returns the success count. + pub fn success_count(&self) -> usize { + self.success_count.load(Ordering::Relaxed) + } + + /// Returns the start count. + pub fn begin_count(&self) -> usize { + self.start_count.load(Ordering::Relaxed) + } +} + +#[async_trait] +impl EventListener for IndexBuildListener { + async fn on_index_build_success(&self, region_file_id: RegionFileId) { + info!("Region {} index build successfully", region_file_id); + self.success_count.fetch_add(1, Ordering::Relaxed); + self.notify.notify_one(); + } + + async fn on_index_build_begin(&self, region_file_id: RegionFileId) { + info!("Region {} index build begin", region_file_id); + self.start_count.fetch_add(1, Ordering::Relaxed); + } +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 46ad4a97b26c..ad6d7c7caa31 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -599,6 +599,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build index asynchronously in region {}", region_id))] + BuildIndexAsync { + region_id: RegionId, + source: Arc, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to convert value"))] ConvertValue { source: datatypes::error::Error, @@ -1210,7 +1218,7 @@ impl ErrorExt for Error { InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable, - FlushRegion { source, .. } => source.status_code(), + FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, RegionTruncated { .. } => StatusCode::Cancelled, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 16933f6827d4..aac7090174dc 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -234,6 +234,18 @@ impl MitoRegion { ) } + /// Returns whether the region should abort index building. + pub(crate) fn should_abort_index(&self) -> bool { + matches!( + self.manifest_ctx.state.load(), + RegionRoleState::Follower + | RegionRoleState::Leader(RegionLeaderState::Dropping) + | RegionRoleState::Leader(RegionLeaderState::Truncating) + | RegionRoleState::Leader(RegionLeaderState::Downgrading) + | RegionRoleState::Leader(RegionLeaderState::Staging) + ) + } + /// Returns whether the region is downgrading. pub(crate) fn is_downgrading(&self) -> bool { matches!( diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d7748fd0778c..ce013b15d31c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -35,9 +35,9 @@ use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint} use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{ - AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest, - RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, - RegionOpenRequest, RegionRequest, RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest, + RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, + RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::RegionId; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -593,10 +593,6 @@ pub(crate) enum WorkerRequest { /// Keep the manifest of a region up to date. SyncRegion(RegionSyncRequest), - /// Build indexes of a region. - #[allow(dead_code)] - BuildIndexRegion(RegionBuildIndexRequest), - /// Bulk inserts request and region metadata. BulkInserts { metadata: Option, @@ -692,6 +688,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Compact(v), }), + RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::BuildIndex(v), + }), RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), @@ -754,6 +755,7 @@ pub(crate) enum DdlRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + BuildIndex(RegionBuildIndexRequest), Truncate(RegionTruncateRequest), Catchup(RegionCatchupRequest), } @@ -919,6 +921,8 @@ pub(crate) struct RegionChangeResult { pub(crate) sender: OptionOutputTx, /// Result from the manifest manager. pub(crate) result: Result<()>, + /// Used for index build in schema change. + pub(crate) need_index: bool, } /// Request to edit a region directly. @@ -944,7 +948,7 @@ pub(crate) struct RegionEditResult { } #[derive(Debug)] -pub(crate) struct RegionBuildIndexRequest { +pub(crate) struct BuildIndexRequest { pub(crate) region_id: RegionId, pub(crate) build_type: IndexBuildType, /// files need to build index, empty means all. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index a79adcf7876c..20e8cb66d3cb 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -29,22 +29,23 @@ use common_telemetry::{debug, info, warn}; use datatypes::arrow::record_batch::RecordBatch; use puffin_manager::SstPuffinManager; use smallvec::{SmallVec, smallvec}; +use snafu::ResultExt; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId, RegionId}; use strum::IntoStaticStr; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory}; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::write_cache::{UploadTracker, WriteCacheRef}; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; -use crate::error::Result; +use crate::error::{BuildIndexAsyncSnafu, Error, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::{Batch, BatchReader}; use crate::region::options::IndexOptions; -use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; +use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState}; use crate::request::{ BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime, @@ -437,6 +438,9 @@ pub enum IndexBuildOutcome { Aborted(String), } +/// Mpsc output result sender. +pub type ResultMpscSender = Sender>; + pub struct IndexBuildTask { /// The file meta to build index for. pub file_meta: FileMeta, @@ -449,69 +453,94 @@ pub struct IndexBuildTask { /// Otherwise, it should be built from the access layer. pub indexer_builder: Arc, /// Request sender to notify the region worker. - pub(crate) request_sender: mpsc::Sender, - /// Optional sender to send the result back to the caller. - pub result_sender: Option>, + pub(crate) request_sender: Sender, + /// Index build result sender. + pub(crate) result_sender: ResultMpscSender, } impl IndexBuildTask { - fn into_index_build_job(mut self, version_control: &VersionControlRef) -> Job { - let version_data = version_control.current(); + /// Notify the caller the job is success. + pub async fn on_success(&mut self, outcome: IndexBuildOutcome) { + let _ = self.result_sender.send(Ok(outcome)).await; + } + + /// Send index build error to waiter. + pub async fn on_failure(&mut self, err: Arc) { + let _ = self + .result_sender + .send(Err(err.clone()).context(BuildIndexAsyncSnafu { + region_id: self.file_meta.region_id, + })) + .await; + } + fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job { Box::pin(async move { - self.do_index_build(version_data).await; + self.do_index_build(version_control).await; }) } - async fn do_index_build(&mut self, version_data: VersionControlData) { - let outcome = match self.index_build(&version_data).await { - Ok(outcome) => outcome, + async fn do_index_build(&mut self, version_control: VersionControlRef) { + match self.index_build(version_control).await { + Ok(outcome) => self.on_success(outcome).await, Err(e) => { warn!( e; "Index build task failed, region: {}, file_id: {}", self.file_meta.region_id, self.file_meta.file_id, ); - IndexBuildOutcome::Aborted(format!("Index build failed: {}", e)) + self.on_failure(e.into()).await } }; - if let Some(sender) = self.result_sender.take() { - let _ = sender.send(outcome); - } } // Checks if the SST file still exists in object store and version to avoid conflict with compaction. - async fn check_sst_file_exists(&self, version: &VersionRef) -> bool { - let region_id = self.file_meta.region_id; + async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool { let file_id = self.file_meta.file_id; + let level = self.file_meta.level; + // We should check current version instead of the version when the job is created. + let version = version_control.current().version; - let found_in_version = version - .ssts - .levels() - .iter() - .flat_map(|level| level.files.iter()) - .any(|(id, handle)| { - *id == self.file_meta.file_id && !handle.is_deleted() && !handle.compacting() - }); - if !found_in_version { + let Some(level_files) = version.ssts.levels().get(level as usize) else { warn!( - "File id {} not found in region version for index build, region: {}", - file_id, region_id + "File id {} not found in level {} for index build, region: {}", + file_id, level, self.file_meta.region_id ); - false - } else { - // If the file's metadata is present in the current version, the physical SST file - // is guaranteed to exist on object store. The file purger removes the physical - // file only after its metadata is removed from the version. - true + return false; + }; + + match level_files.files.get(&file_id) { + Some(handle) if !handle.is_deleted() && !handle.compacting() => { + // If the file's metadata is present in the current version, the physical SST file + // is guaranteed to exist on object store. The file purger removes the physical + // file only after its metadata is removed from the version. + true + } + _ => { + warn!( + "File id {} not found in region version for index build, region: {}", + file_id, self.file_meta.region_id + ); + false + } } } async fn index_build( &mut self, - version_data: &VersionControlData, + version_control: VersionControlRef, ) -> Result { - let version = &version_data.version; let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await; + + // Check SST file existence before building index to avoid failure of parquet reader. + if !self.check_sst_file_exists(&version_control).await { + // Calls abort to clean up index files. + indexer.abort().await; + return Ok(IndexBuildOutcome::Aborted(format!( + "SST file not found during index build, region: {}, file_id: {}", + self.file_meta.region_id, self.file_meta.file_id + ))); + } + let mut parquet_reader = self .access_layer .read_sst(FileHandle::new( @@ -538,7 +567,7 @@ impl IndexBuildTask { if index_output.file_size > 0 { // Check SST file existence again after building index. - if !self.check_sst_file_exists(version).await { + if !self.check_sst_file_exists(&version_control).await { // Calls abort to clean up index files. indexer.abort().await; return Ok(IndexBuildOutcome::Aborted(format!( @@ -662,7 +691,8 @@ impl IndexBuildScheduler { version_control: &VersionControlRef, task: IndexBuildTask, ) -> Result<()> { - let job = task.into_index_build_job(version_control); + // We should clone version control to expand the lifetime. + let job = task.into_index_build_job(version_control.clone()); self.scheduler.schedule(job)?; Ok(()) } @@ -683,7 +713,7 @@ mod tests { use object_store::services::Memory; use puffin_manager::PuffinManagerFactory; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; - use tokio::sync::{mpsc, oneshot}; + use tokio::sync::mpsc; use super::*; use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType}; @@ -1086,7 +1116,7 @@ mod tests { async fn test_index_build_task_sst_not_exist() { let env = SchedulerEnv::new().await; let (tx, _rx) = mpsc::channel(4); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, mut result_rx) = mpsc::channel::>(4); let mut scheduler = env.mock_index_build_scheduler(); let metadata = Arc::new(sst_region_metadata()); let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; @@ -1112,13 +1142,17 @@ mod tests { file_purger, indexer_builder, request_sender: tx, - result_sender: Some(result_tx), + result_sender: result_tx, }; // Schedule the build task and check result. scheduler.schedule_build(&version_control, task).unwrap(); - match result_rx.await.unwrap() { - IndexBuildOutcome::Aborted(_) => {} + match result_rx.recv().await.unwrap() { + Ok(outcome) => { + if outcome == IndexBuildOutcome::Finished { + panic!("Expect aborted result due to missing SST file") + } + } _ => panic!("Expect aborted result due to missing SST file"), } } @@ -1148,7 +1182,7 @@ mod tests { // Create mock task. let (tx, mut rx) = mpsc::channel(4); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, @@ -1158,13 +1192,18 @@ mod tests { file_purger, indexer_builder, request_sender: tx, - result_sender: Some(result_tx), + result_sender: result_tx, }; scheduler.schedule_build(&version_control, task).unwrap(); // The task should finish successfully. - assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + match result_rx.recv().await.unwrap() { + Ok(outcome) => { + assert_eq!(outcome, IndexBuildOutcome::Finished); + } + _ => panic!("Expect finished result"), + } // A notification should be sent to the worker to update the manifest. let worker_req = rx.recv().await.unwrap().request; @@ -1210,7 +1249,7 @@ mod tests { // Create mock task. let (tx, _rx) = mpsc::channel(4); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, @@ -1220,7 +1259,7 @@ mod tests { file_purger, indexer_builder, request_sender: tx, - result_sender: Some(result_tx), + result_sender: result_tx, }; scheduler.schedule_build(&version_control, task).unwrap(); @@ -1252,7 +1291,12 @@ mod tests { } // The task should finish successfully. - assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + match result_rx.recv().await.unwrap() { + Ok(outcome) => { + assert_eq!(outcome, IndexBuildOutcome::Finished); + } + _ => panic!("Expect finished result"), + } // The index file should exist after the task finishes. assert!( @@ -1301,7 +1345,7 @@ mod tests { // Create mock task. let (tx, mut rx) = mpsc::channel(4); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, @@ -1311,13 +1355,18 @@ mod tests { file_purger, indexer_builder, request_sender: tx, - result_sender: Some(result_tx), + result_sender: result_tx, }; scheduler.schedule_build(&version_control, task).unwrap(); // The task should finish successfully. - assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + match result_rx.recv().await.unwrap() { + Ok(outcome) => { + assert_eq!(outcome, IndexBuildOutcome::Finished); + } + _ => panic!("Expect finished result"), + } // No index is built, so no notification should be sent to the worker. let _ = rx.recv().await.is_none(); @@ -1376,7 +1425,7 @@ mod tests { // Create mock task. let (tx, mut _rx) = mpsc::channel(4); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, @@ -1386,13 +1435,18 @@ mod tests { file_purger, indexer_builder, request_sender: tx, - result_sender: Some(result_tx), + result_sender: result_tx, }; scheduler.schedule_build(&version_control, task).unwrap(); // The task should finish successfully. - assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + match result_rx.recv().await.unwrap() { + Ok(outcome) => { + assert_eq!(outcome, IndexBuildOutcome::Finished); + } + _ => panic!("Expect finished result"), + } // The write cache should contain the uploaded index file. let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8deba23ff8b9..4aac6f2e4bea 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -832,6 +832,68 @@ impl CreateRequestBuilder { partition_expr_json: self.partition_expr_json.clone(), } } + + pub fn build_with_index(&self) -> RegionCreateRequest { + let mut column_id = 0; + let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); + let mut primary_key = Vec::with_capacity(self.tag_num); + let nullable = !self.all_not_null; + for i in 0..self.tag_num { + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + nullable, + ) + .with_inverted_index(true), + semantic_type: SemanticType::Tag, + column_id, + }); + primary_key.push(column_id); + column_id += 1; + } + for i in 0..self.field_num { + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + nullable, + ), + semantic_type: SemanticType::Field, + column_id, + }); + column_id += 1; + } + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + self.ts_type.clone(), + // Time index is always not null. + false, + ), + semantic_type: SemanticType::Timestamp, + column_id, + }); + let mut options = self.options.clone(); + if let Some(topic) = &self.kafka_topic { + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + }); + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + } + RegionCreateRequest { + engine: self.engine.clone(), + column_metadatas, + primary_key: self.primary_key.clone().unwrap_or(primary_key), + options, + table_dir: self.table_dir.clone(), + path_type: PathType::Bare, + partition_expr_json: self.partition_expr_json.clone(), + } + } } /// Creates value for timestamp millis. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 9ceb10d2eca5..87c25cd964fc 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -67,6 +67,7 @@ use crate::request::{ WorkerRequest, WorkerRequestWithTime, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::sst::file::RegionFileId; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::IndexBuildScheduler; use crate::sst::index::intermediate::IntermediateManager; @@ -931,9 +932,6 @@ impl RegionWorkerLoop { WorkerRequest::EditRegion(request) => { self.handle_region_edit(request).await; } - WorkerRequest::BuildIndexRegion(request) => { - self.handle_rebuild_index(request).await; - } WorkerRequest::Stop => { debug_assert!(!self.running.load(Ordering::Relaxed)); } @@ -1005,6 +1003,11 @@ impl RegionWorkerLoop { .await; continue; } + DdlRequest::BuildIndex(req) => { + self.handle_build_index_request(ddl.region_id, req, ddl.sender) + .await; + continue; + } DdlRequest::Truncate(req) => { self.handle_truncate_request(ddl.region_id, req, ddl.sender) .await; @@ -1216,6 +1219,20 @@ impl WorkerListener { .await; } } + + pub(crate) async fn on_index_build_success(&self, _region_file_id: RegionFileId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_index_build_success(_region_file_id).await; + } + } + + pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_index_build_begin(_region_file_id).await; + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 23d00ab4e743..e02c4cd33c13 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -146,6 +146,7 @@ impl RegionWorkerLoop { request: RegionAlterRequest, sender: OptionOutputTx, ) { + let need_index = need_change_index(&request.kind); let new_meta = match metadata_after_alteration(&version.metadata, request) { Ok(new_meta) => new_meta, Err(e) => { @@ -158,7 +159,7 @@ impl RegionWorkerLoop { metadata: new_meta, sst_format: region.sst_format(), }; - self.handle_manifest_region_change(region, change, sender) + self.handle_manifest_region_change(region, change, need_index, sender); } /// Handles requests that changes region options, like TTL. It only affects memory state @@ -280,3 +281,16 @@ fn log_option_update( option_name, region_id, prev_value, cur_value ); } + +/// Used to determine whether we can build index directly after schema change. +fn need_change_index(kind: &AlterKind) -> bool { + match kind { + // `SetIndexes` is a fast-path operation because it can build indexes for existing SSTs + // in the background, without needing to wait for a flush or compaction cycle. + AlterKind::SetIndexes { options: _ } => true, + // For AddColumns, DropColumns, UnsetIndexes and ModifyColumnTypes, we don't treat them as index changes. + // Index files still need to be rebuilt after schema changes, + // but this will happen automatically during flush or compaction. + _ => false, + } +} diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index ffa0aa4468d4..b50134c372f6 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -17,10 +17,14 @@ use common_telemetry::{error, info, warn}; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; +use crate::config::IndexBuildMode; use crate::error::RegionNotFoundSnafu; use crate::metrics::COMPACTION_REQUEST_COUNT; use crate::region::MitoRegionRef; -use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; +use crate::request::{ + BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx, +}; +use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -80,9 +84,26 @@ impl RegionWorkerLoop { region.file_purger.clone(), ); + let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add); + // compaction finished. request.on_success(); + // In async mode, create indexes after compact if new files are created. + if self.config.index.build_mode == IndexBuildMode::Async + && !index_build_file_metas.is_empty() + { + self.handle_rebuild_index( + BuildIndexRequest { + region_id, + build_type: IndexBuildType::Compact, + file_metas: index_build_file_metas, + }, + OptionOutputTx::new(None), + ) + .await; + } + // Schedule next compaction if necessary. self.compaction_scheduler .on_compaction_finished( diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 517e69721f85..04dbb4ae7877 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -22,11 +22,12 @@ use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; -use crate::config::MitoConfig; +use crate::config::{IndexBuildMode, MitoConfig}; use crate::error::{RegionNotFoundSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::region::MitoRegionRef; -use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; +use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; +use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -246,9 +247,24 @@ impl RegionWorkerLoop { return; } + let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add); + // Notifies waiters and observes the flush timer. request.on_success(); + // In async mode, create indexes after flush. + if self.config.index.build_mode == IndexBuildMode::Async { + self.handle_rebuild_index( + BuildIndexRequest { + region_id, + build_type: IndexBuildType::Flush, + file_metas: index_build_file_metas, + }, + OptionOutputTx::new(None), + ) + .await; + } + // Handle pending requests for the region. if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) = self.flush_scheduler.on_flush_success(region_id) diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 9f19df3bdec6..be4bde6583ba 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -26,6 +26,7 @@ use store_api::storage::RegionId; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileType, IndexKey}; +use crate::config::IndexBuildMode; use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, @@ -34,9 +35,10 @@ use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD; use crate::region::version::VersionBuilder; use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; use crate::request::{ - BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, - RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime, + BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest, + RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime, }; +use crate::sst::index::IndexBuildType; use crate::sst::location; use crate::worker::{RegionWorkerLoop, WorkerListener}; @@ -117,6 +119,18 @@ impl RegionWorkerLoop { // Sends the result. change_result.sender.send(change_result.result.map(|_| 0)); + // In async mode, rebuild index after index metadata changed. + if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index { + self.handle_rebuild_index( + BuildIndexRequest { + region_id: region.region_id, + build_type: IndexBuildType::SchemaChange, + file_metas: Vec::new(), + }, + OptionOutputTx::new(None), + ) + .await; + } // Handles the stalled requests. self.handle_region_stalled_requests(&change_result.region_id) .await; @@ -345,6 +359,7 @@ impl RegionWorkerLoop { &self, region: MitoRegionRef, change: RegionChange, + need_index: bool, sender: OptionOutputTx, ) { // Marks the region as altering. @@ -371,6 +386,7 @@ impl RegionWorkerLoop { sender, result, new_meta, + need_index, }), }; listener diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 4128e09183e8..71f9bc206f41 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -17,14 +17,18 @@ use std::collections::HashMap; use std::sync::Arc; -use common_telemetry::{error, warn}; +use common_telemetry::{debug, error, warn}; +use store_api::region_request::RegionBuildIndexRequest; use store_api::storage::{FileId, RegionId}; -use tokio::sync::oneshot; +use tokio::sync::mpsc; +use crate::error::Result; use crate::region::MitoRegionRef; -use crate::request::{IndexBuildFailed, IndexBuildFinished, RegionBuildIndexRequest}; -use crate::sst::file::FileHandle; -use crate::sst::index::{IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl}; +use crate::request::{BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, OptionOutputTx}; +use crate::sst::file::{FileHandle, RegionFileId}; +use crate::sst::index::{ + IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender, +}; use crate::sst::parquet::WriteOptions; use crate::worker::RegionWorkerLoop; @@ -34,7 +38,7 @@ impl RegionWorkerLoop { region: &MitoRegionRef, file: FileHandle, build_type: IndexBuildType, - result_sender: Option>, + result_sender: ResultMpscSender, ) -> IndexBuildTask { let version = region.version(); let access_layer = region.access_layer.clone(); @@ -76,9 +80,32 @@ impl RegionWorkerLoop { } } - pub(crate) async fn handle_rebuild_index(&mut self, request: RegionBuildIndexRequest) { + /// Handles manual build index requests. + /// TODO(SNC123): Support admin function of manual index building later. + pub(crate) async fn handle_build_index_request( + &mut self, + region_id: RegionId, + _req: RegionBuildIndexRequest, + sender: OptionOutputTx, + ) { + self.handle_rebuild_index( + BuildIndexRequest { + region_id, + build_type: IndexBuildType::Manual, + file_metas: Vec::new(), + }, + sender, + ) + .await; + } + + pub(crate) async fn handle_rebuild_index( + &mut self, + request: BuildIndexRequest, + mut sender: OptionOutputTx, + ) { let region_id = request.region_id; - let Some(region) = self.regions.get_region(region_id) else { + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; @@ -107,13 +134,59 @@ impl RegionWorkerLoop { .collect::>() }; + if build_tasks.is_empty() { + debug!( + "No files need to build index for region {}, request: {:?}", + region_id, request + ); + sender.send(Ok(0)); + return; + } + + let num_tasks = build_tasks.len(); + let (tx, mut rx) = mpsc::channel::>(num_tasks); + for file_handle in build_tasks { - let task = - self.new_index_build_task(®ion, file_handle, request.build_type.clone(), None); + debug!( + "Scheduling index build for region {}, file_id {}", + region_id, + file_handle.meta_ref().file_id + ); + + if region.should_abort_index() { + warn!( + "Region {} is in state {:?}, abort index rebuild process for file_id {}", + region_id, + region.state(), + file_handle.meta_ref().file_id + ); + break; + } + + let task = self.new_index_build_task( + ®ion, + file_handle.clone(), + request.build_type.clone(), + tx.clone(), + ); let _ = self .index_build_scheduler .schedule_build(®ion.version_control, task); + self.listener + .on_index_build_begin(RegionFileId::new(region_id, file_handle.meta_ref().file_id)) + .await; } + // Wait for all index build tasks to finish and notify the caller. + common_runtime::spawn_global(async move { + for _ in 0..num_tasks { + if let Some(Err(e)) = rx.recv().await { + warn!(e; "Index build task failed for region: {}", region_id); + sender.send(Err(e)); + return; + } + } + sender.send(Ok(0)); + }); } pub(crate) async fn handle_index_build_finished( @@ -131,11 +204,17 @@ impl RegionWorkerLoop { return; } }; + region.version_control.apply_edit( Some(request.edit.clone()), &[], region.file_purger.clone(), ); + for file_meta in &request.edit.files_to_add { + self.listener + .on_index_build_success(RegionFileId::new(region_id, file_meta.file_id)) + .await; + } } pub(crate) async fn handle_index_build_failed( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 23927bdca944..344d35eaa07d 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -146,6 +146,7 @@ pub enum RegionRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + BuildIndex(RegionBuildIndexRequest), Truncate(RegionTruncateRequest), Catchup(RegionCatchupRequest), BulkInserts(RegionBulkInsertsRequest), @@ -1354,6 +1355,9 @@ impl Default for RegionCompactRequest { } } +#[derive(Debug, Clone, Default)] +pub struct RegionBuildIndexRequest {} + /// Truncate region request. #[derive(Debug)] pub enum RegionTruncateRequest { @@ -1413,6 +1417,7 @@ impl fmt::Display for RegionRequest { RegionRequest::Alter(_) => write!(f, "Alter"), RegionRequest::Flush(_) => write!(f, "Flush"), RegionRequest::Compact(_) => write!(f, "Compact"), + RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"), RegionRequest::Truncate(_) => write!(f, "Truncate"), RegionRequest::Catchup(_) => write!(f, "Catchup"), RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),