Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ee46a0eeaaa21b62bb3b8dcd90374dbef9dc8fdc" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
3 changes: 3 additions & 0 deletions src/common/function/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod build_index_table;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
Expand All @@ -26,6 +27,7 @@ use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
use reconcile_table::ReconcileTableFunction;

use crate::admin::build_index_table::BuildIndexFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;

Expand All @@ -40,6 +42,7 @@ impl AdminFunction {
registry.register(CompactRegionFunction::factory());
registry.register(FlushTableFunction::factory());
registry.register(CompactTableFunction::factory());
registry.register(BuildIndexFunction::factory());
registry.register(FlushFlowFunction::factory());
registry.register(ReconcileCatalogFunction::factory());
registry.register(ReconcileDatabaseFunction::factory());
Expand Down
80 changes: 80 additions & 0 deletions src/common/function/src/admin/build_index_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::datatypes::DataType as ArrowDataType;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::*;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ResultExt, ensure};
use table::requests::BuildIndexTableRequest;

use crate::handlers::TableMutationHandlerRef;

#[admin_fn(
name = BuildIndexFunction,
display_name = build_index,
sig_fn = build_index_signature,
ret = uint64
)]
pub(crate) async fn build_index(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "build_index",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};

let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;

let affected_rows = table_mutation_handler
.build_index(
BuildIndexTableRequest {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;

Ok(Value::from(affected_rows as u64))
}

fn build_index_signature() -> Signature {
Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
}
11 changes: 10 additions & 1 deletion src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use common_query::Output;
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
use table::requests::{
BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};

/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
Expand All @@ -47,6 +49,13 @@ pub trait TableMutationHandler: Send + Sync {
ctx: QueryContextRef,
) -> Result<AffectedRows>;

/// Trigger a index build task for table.
async fn build_index(
&self,
request: BuildIndexTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;

/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;
Expand Down
11 changes: 10 additions & 1 deletion src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ impl FunctionState {
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
InsertRequest,
};

use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
Expand Down Expand Up @@ -120,6 +121,14 @@ impl FunctionState {
Ok(ROWS)
}

async fn build_index(
&self,
_request: BuildIndexTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush_region(
&self,
_region_id: RegionId,
Expand Down
3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,8 @@ impl RegionServerInner {
RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

Expand Down
12 changes: 12 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ impl RegionEngine for MetricEngine {
}
}
RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
RegionRequest::BuildIndex(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
RegionRequest::Delete(_) => {
if self.inner.is_physical_region(region_id) {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ impl Compactor for DefaultCompactor {
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn new_file_handle_with_size_and_sequence(
level,
file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ mod edit_region_test;
mod filter_deleted_test;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
mod index_build_test;
#[cfg(any(test, feature = "test"))]
pub mod listener;
#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows_for_key, column_metadata_to_column_schema, put_rows,
};

async fn put_and_flush(
pub(crate) async fn put_and_flush(
engine: &MitoEngine,
region_id: RegionId,
column_schemas: &[ColumnSchema],
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn flush(engine: &MitoEngine, region_id: RegionId) {
assert_eq!(0, result.affected_rows);
}

async fn compact(engine: &MitoEngine, region_id: RegionId) {
pub(crate) async fn compact(engine: &MitoEngine, region_id: RegionId) {
let result = engine
.handle_request(
region_id,
Expand All @@ -85,7 +85,7 @@ async fn compact(engine: &MitoEngine, region_id: RegionId) {
assert_eq!(result.affected_rows, 0);
}

async fn delete_and_flush(
pub(crate) async fn delete_and_flush(
engine: &MitoEngine,
region_id: RegionId,
column_schemas: &[ColumnSchema],
Expand Down
Loading