From c8943a3d9a1305759cde6dea44d51d158f66d514 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 11 Nov 2025 09:10:02 +0000 Subject: [PATCH 1/6] feat: support parallel table operations in COPY DATABASE Signed-off-by: WenyXu --- Cargo.lock | 1 + src/operator/Cargo.toml | 1 + src/operator/src/statement/copy_database.rs | 129 +++++++++++++------- 3 files changed, 87 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee2358214560..648fe718e3ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8659,6 +8659,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-sql", + "common-stat", "common-telemetry", "common-test-util", "common-time", diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index d883c15689b7..42c831ebc1a6 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -36,6 +36,7 @@ common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true common-sql.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index c7cf0b47b0c3..60e92b3422c8 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -12,14 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::path::Path; use std::str::FromStr; +use std::sync::Arc; use client::{Output, OutputData, OutputMeta}; +use common_catalog::format_full_table_name; use common_datasource::file_format::Format; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; +use common_stat::get_total_cpu_cores; use common_telemetry::{debug, error, info, tracing}; +use futures::future::try_join_all; use object_store::Entry; use regex::Regex; use session::context::QueryContextRef; @@ -27,6 +32,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use table::table_reference::TableReference; +use tokio::sync::Semaphore; use crate::error; use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu}; @@ -35,6 +41,15 @@ use crate::statement::StatementExecutor; pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error"; +pub(crate) const PARALLELISM_KEY: &str = "parallelism"; + +/// Get parallelism from options, default to total CPU cores. +fn parse_parallelism_from_option_map(options: &HashMap) -> usize { + options + .get(PARALLELISM_KEY) + .and_then(|v| v.parse::().ok()) + .unwrap_or(get_total_cpu_cores()) +} impl StatementExecutor { #[tracing::instrument(skip_all)] @@ -51,22 +66,26 @@ impl StatementExecutor { } ); + let parallelism = parse_parallelism_from_option_map(&req.with); info!( - "Copy database {}.{} to dir: {}, time: {:?}", - req.catalog_name, req.schema_name, req.location, req.time_range + "Copy database {}.{} to dir: {}, time: {:?}, parallelism: {}", + req.catalog_name, req.schema_name, req.location, req.time_range, parallelism ); let table_names = self .catalog_manager .table_names(&req.catalog_name, &req.schema_name, Some(&ctx)) .await .context(CatalogSnafu)?; + let num_tables = table_names.len(); let suffix = Format::try_from(&req.with) .context(error::ParseFileFormatSnafu)? .suffix(); - let mut exported_rows = 0; - for table_name in table_names { + let mut tasks = Vec::with_capacity(num_tables); + let semaphore = Arc::new(Semaphore::new(parallelism)); + + for (i, table_name) in table_names.into_iter().enumerate() { let table = self .get_table(&TableReference { catalog: &req.catalog_name, @@ -89,33 +108,40 @@ impl StatementExecutor { { continue; } + + let semaphore_moved = semaphore.clone(); let mut table_file = req.location.clone(); table_file.push_str(&table_name); table_file.push_str(suffix); - info!( - "Copy table: {}.{}.{} to {}", - req.catalog_name, req.schema_name, table_name, table_file - ); + let table_no = i + 1; + let moved_ctx = ctx.clone(); + let full_table_name = + format_full_table_name(&req.catalog_name, &req.schema_name, &table_name); + let copy_table_req = CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: table_file.clone(), + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Export, + timestamp_range: req.time_range, + limit: None, + }; - let exported = self - .copy_table_to( - CopyTableRequest { - catalog_name: req.catalog_name.clone(), - schema_name: req.schema_name.clone(), - table_name, - location: table_file, - with: req.with.clone(), - connection: req.connection.clone(), - pattern: None, - direction: CopyDirection::Export, - timestamp_range: req.time_range, - limit: None, - }, - ctx.clone(), - ) - .await?; - exported_rows += exported; + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + info!( + "Copy table({}/{}): {} to {}", + table_no, num_tables, full_table_name, table_file + ); + self.copy_table_to(copy_table_req, moved_ctx).await + }); } + + let results = try_join_all(tasks).await?; + let exported_rows = results.into_iter().sum(); Ok(Output::new_with_affected_rows(exported_rows)) } @@ -134,9 +160,10 @@ impl StatementExecutor { } ); + let parallelism = parse_parallelism_from_option_map(&req.with); info!( - "Copy database {}.{} from dir: {}, time: {:?}", - req.catalog_name, req.schema_name, req.location, req.time_range + "Copy database {}.{} from dir: {}, time: {:?}, parallelism: {}", + req.catalog_name, req.schema_name, req.location, req.time_range, parallelism ); let suffix = Format::try_from(&req.with) .context(error::ParseFileFormatSnafu)? @@ -150,8 +177,8 @@ impl StatementExecutor { .and_then(|v| bool::from_str(v).ok()) .unwrap_or(false); - let mut rows_inserted = 0; - let mut insert_cost = 0; + let mut tasks = Vec::with_capacity(entries.len()); + let semaphore = Arc::new(Semaphore::new(parallelism)); for e in entries { let table_name = match parse_file_name_to_copy(&e) { @@ -165,6 +192,7 @@ impl StatementExecutor { } } }; + let req = CopyTableRequest { catalog_name: req.catalog_name.clone(), schema_name: req.schema_name.clone(), @@ -177,23 +205,36 @@ impl StatementExecutor { timestamp_range: None, limit: None, }; - debug!("Copy table, arg: {:?}", req); - match self.copy_table_from(req, ctx.clone()).await { - Ok(o) => { - let (rows, cost) = o.extract_rows_and_cost(); - rows_inserted += rows; - insert_cost += cost; - } - Err(err) => { - if continue_on_error { - error!(err; "Failed to import file to table: {}", table_name); - continue; - } else { - return Err(err); + let moved_ctx = ctx.clone(); + let moved_table_name = table_name.clone(); + let moved_semaphore = semaphore.clone(); + tasks.push(async move { + let _permit = moved_semaphore.acquire().await.unwrap(); + debug!("Copy table, arg: {:?}", req); + match self.copy_table_from(req, moved_ctx).await { + Ok(o) => { + let (rows, cost) = o.extract_rows_and_cost(); + Ok((rows, cost)) + } + Err(err) => { + if continue_on_error { + error!(err; "Failed to import file to table: {}", moved_table_name); + Ok((0, 0)) + } else { + Err(err) + } } } - } + }); } + + let results = try_join_all(tasks).await?; + let (rows_inserted, insert_cost) = results + .into_iter() + .fold((0, 0), |(acc_rows, acc_cost), (rows, cost)| { + (acc_rows + rows, acc_cost + cost) + }); + Ok(Output::new( OutputData::AffectedRows(rows_inserted), OutputMeta::new_with_cost(insert_cost), From 069a8e095ba4eed535669b635d4cfde7903fee20 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 11 Nov 2025 10:03:41 +0000 Subject: [PATCH 2/6] feat(cli): add a new `parallelism` parameter to control the parallelism during export Signed-off-by: WenyXu --- src/cli/src/data/export.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/cli/src/data/export.rs b/src/cli/src/data/export.rs index 5ddc2a39bcaf..d5ef6a96c7b1 100644 --- a/src/cli/src/data/export.rs +++ b/src/cli/src/data/export.rs @@ -89,6 +89,10 @@ pub struct ExportCommand { #[clap(long)] end_time: Option, + /// The parallelism of the export. + #[clap(long, default_value = "8")] + parallelism: usize, + /// The basic authentication for connecting to the server #[clap(long)] auth_basic: Option, @@ -210,10 +214,11 @@ impl ExportCommand { schema, database_client, output_dir: self.output_dir.clone(), - parallelism: self.export_jobs, + export_jobs: self.export_jobs, target: self.target.clone(), start_time: self.start_time.clone(), end_time: self.end_time.clone(), + parallelism: self.parallelism, s3: self.s3, ddl_local_dir: self.ddl_local_dir.clone(), s3_bucket: self.s3_bucket.clone(), @@ -251,10 +256,11 @@ pub struct Export { schema: Option, database_client: DatabaseClient, output_dir: Option, - parallelism: usize, + export_jobs: usize, target: ExportTarget, start_time: Option, end_time: Option, + parallelism: usize, s3: bool, ddl_local_dir: Option, s3_bucket: Option, @@ -464,7 +470,7 @@ impl Export { async fn export_create_table(&self) -> Result<()> { let timer = Instant::now(); - let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let semaphore = Arc::new(Semaphore::new(self.export_jobs)); let db_names = self.get_db_names().await?; let db_count = db_names.len(); let operator = Arc::new(self.build_prefer_fs_operator().await?); @@ -625,13 +631,13 @@ impl Export { async fn export_database_data(&self) -> Result<()> { let timer = Instant::now(); - let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let semaphore = Arc::new(Semaphore::new(self.export_jobs)); let db_names = self.get_db_names().await?; let db_count = db_names.len(); let mut tasks = Vec::with_capacity(db_count); let operator = Arc::new(self.build_operator().await?); let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?); - let with_options = build_with_options(&self.start_time, &self.end_time); + let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism); for schema in db_names { let semaphore_moved = semaphore.clone(); @@ -888,7 +894,11 @@ impl Tool for Export { } /// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports. -fn build_with_options(start_time: &Option, end_time: &Option) -> String { +fn build_with_options( + start_time: &Option, + end_time: &Option, + parallelism: usize, +) -> String { let mut options = vec!["format = 'parquet'".to_string()]; if let Some(start) = start_time { options.push(format!("start_time = '{}'", start)); @@ -896,5 +906,6 @@ fn build_with_options(start_time: &Option, end_time: &Option) -> if let Some(end) = end_time { options.push(format!("end_time = '{}'", end)); } + options.push(format!("parallelism = {}", parallelism)); options.join(", ") } From 02b7e1e3dc5cc6b1df4bfc5df33936402087558b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 13 Nov 2025 03:35:55 +0000 Subject: [PATCH 3/6] chore: add sqlness tests Signed-off-by: WenyXu --- src/operator/src/statement/copy_database.rs | 22 ++- .../copy/copy_database_from_fs_parquet.result | 142 ++++++++++++++++++ .../copy/copy_database_from_fs_parquet.sql | 66 ++++++++ 3 files changed, 227 insertions(+), 3 deletions(-) diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index 60e92b3422c8..f62d1b5a9318 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -48,7 +48,8 @@ fn parse_parallelism_from_option_map(options: &HashMap) -> usize options .get(PARALLELISM_KEY) .and_then(|v| v.parse::().ok()) - .unwrap_or(get_total_cpu_cores()) + .unwrap_or_else(|| get_total_cpu_cores()) + .max(1) } impl StatementExecutor { @@ -270,15 +271,18 @@ async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::R #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; + use common_stat::get_total_cpu_cores; use object_store::ObjectStore; use object_store::services::Fs; use object_store::util::normalize_dir; use path_slash::PathExt; use table::requests::CopyDatabaseRequest; - use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy}; + use crate::statement::copy_database::{ + list_files_to_copy, parse_file_name_to_copy, parse_parallelism_from_option_map, + }; #[tokio::test] async fn test_list_files_and_parse_table_name() { @@ -317,4 +321,16 @@ mod tests { listed ); } + + #[test] + fn test_parse_parallelism_from_option_map() { + let options = HashMap::new(); + assert_eq!( + parse_parallelism_from_option_map(&options), + get_total_cpu_cores() + ); + + let options = HashMap::from([("parallelism".to_string(), "0".to_string())]); + assert_eq!(parse_parallelism_from_option_map(&options), 1); + } } diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 999271da8dce..2f77638f2d20 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -64,3 +64,145 @@ DROP TABLE demo; Affected Rows: 0 +CREATE TABLE cpu_metrics ( + host STRING, + `usage` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +INSERT INTO cpu_metrics VALUES +('host1', 66.6, 1655276557000), +('host2', 77.7, 1655276558000), +('host3', 88.8, 1655276559000); + +Affected Rows: 3 + +CREATE TABLE memory_stats ( + host STRING, + used DOUBLE, + `free` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +INSERT INTO memory_stats VALUES +('host1', 1024, 512, 1655276557000), +('host2', 2048, 1024, 1655276558000), +('host3', 4096, 2048, 1655276559000); + +Affected Rows: 3 + +CREATE TABLE event_logs ( + `id` INT, + `message` STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +INSERT INTO event_logs VALUES +(1, 'start', 1655276557000), +(2, 'processing', 1655276558000), +(3, 'finish', 1655276559000); + +Affected Rows: 3 + +CREATE TABLE sensors ( + sensor_id STRING, + temperature DOUBLE, + pressure INT, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +INSERT INTO sensors VALUES +('s1', 36.5, 1001, 1655276557000), +('s2', 37.2, 1003, 1655276558000), +('s3', 35.9, 998, 1655276559000); + +Affected Rows: 3 + +COPY DATABASE public TO '${SQLNESS_HOME}/export_parallel/' WITH (format='parquet', parallelism=2); + +Affected Rows: 12 + +DELETE FROM cpu_metrics; + +Affected Rows: 3 + +DELETE FROM memory_stats; + +Affected Rows: 3 + +DELETE FROM event_logs; + +Affected Rows: 3 + +DELETE FROM sensors; + +Affected Rows: 3 + +COPY DATABASE public FROM '${SQLNESS_HOME}/export_parallel/' WITH (parallelism=2); + +Affected Rows: 12 + +SELECT * FROM cpu_metrics; + ++-------+-------+---------------------+ +| host | usage | ts | ++-------+-------+---------------------+ +| host1 | 66.6 | 2022-06-15T07:02:37 | +| host2 | 77.7 | 2022-06-15T07:02:38 | +| host3 | 88.8 | 2022-06-15T07:02:39 | ++-------+-------+---------------------+ + +SELECT * FROM memory_stats; + ++-------+--------+--------+---------------------+ +| host | used | free | ts | ++-------+--------+--------+---------------------+ +| host1 | 1024.0 | 512.0 | 2022-06-15T07:02:37 | +| host2 | 2048.0 | 1024.0 | 2022-06-15T07:02:38 | +| host3 | 4096.0 | 2048.0 | 2022-06-15T07:02:39 | ++-------+--------+--------+---------------------+ + +SELECT * FROM event_logs; + ++----+------------+---------------------+ +| id | message | ts | ++----+------------+---------------------+ +| 1 | start | 2022-06-15T07:02:37 | +| 2 | processing | 2022-06-15T07:02:38 | +| 3 | finish | 2022-06-15T07:02:39 | ++----+------------+---------------------+ + +SELECT * FROM sensors; + ++-----------+-------------+----------+---------------------+ +| sensor_id | temperature | pressure | ts | ++-----------+-------------+----------+---------------------+ +| s1 | 36.5 | 1001 | 2022-06-15T07:02:37 | +| s2 | 37.2 | 1003 | 2022-06-15T07:02:38 | +| s3 | 35.9 | 998 | 2022-06-15T07:02:39 | ++-----------+-------------+----------+---------------------+ + +DROP TABLE cpu_metrics; + +Affected Rows: 0 + +DROP TABLE memory_stats; + +Affected Rows: 0 + +DROP TABLE event_logs; + +Affected Rows: 0 + +DROP TABLE sensors; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql index 671070e07b07..22cbb45d9518 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -25,3 +25,69 @@ DELETE FROM demo; COPY DATABASE public FROM '${SQLNESS_HOME}/demo/export/parquet_range/' LIMIT 2; DROP TABLE demo; + +CREATE TABLE cpu_metrics ( + host STRING, + `usage` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +INSERT INTO cpu_metrics VALUES +('host1', 66.6, 1655276557000), +('host2', 77.7, 1655276558000), +('host3', 88.8, 1655276559000); + +CREATE TABLE memory_stats ( + host STRING, + used DOUBLE, + `free` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +INSERT INTO memory_stats VALUES +('host1', 1024, 512, 1655276557000), +('host2', 2048, 1024, 1655276558000), +('host3', 4096, 2048, 1655276559000); + +CREATE TABLE event_logs ( + `id` INT, + `message` STRING, + ts TIMESTAMP TIME INDEX +); + +INSERT INTO event_logs VALUES +(1, 'start', 1655276557000), +(2, 'processing', 1655276558000), +(3, 'finish', 1655276559000); + +CREATE TABLE sensors ( + sensor_id STRING, + temperature DOUBLE, + pressure INT, + ts TIMESTAMP TIME INDEX +); + +INSERT INTO sensors VALUES +('s1', 36.5, 1001, 1655276557000), +('s2', 37.2, 1003, 1655276558000), +('s3', 35.9, 998, 1655276559000); + + +COPY DATABASE public TO '${SQLNESS_HOME}/export_parallel/' WITH (format='parquet', parallelism=2); + +DELETE FROM cpu_metrics; +DELETE FROM memory_stats; +DELETE FROM event_logs; +DELETE FROM sensors; + +COPY DATABASE public FROM '${SQLNESS_HOME}/export_parallel/' WITH (parallelism=2); + +SELECT * FROM cpu_metrics; +SELECT * FROM memory_stats; +SELECT * FROM event_logs; +SELECT * FROM sensors; + +DROP TABLE cpu_metrics; +DROP TABLE memory_stats; +DROP TABLE event_logs; +DROP TABLE sensors; From 40d5ed075943db8ba921585a3f644d7f714f524f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 13 Nov 2025 06:46:54 +0000 Subject: [PATCH 4/6] chore: clippy Signed-off-by: WenyXu --- src/operator/src/statement/copy_database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index f62d1b5a9318..cd8eeb6d7995 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -48,7 +48,7 @@ fn parse_parallelism_from_option_map(options: &HashMap) -> usize options .get(PARALLELISM_KEY) .and_then(|v| v.parse::().ok()) - .unwrap_or_else(|| get_total_cpu_cores()) + .unwrap_or_else(get_total_cpu_cores) .max(1) } From 4f18b7cb7e9710566a62f905b02aa41d9207a5fe Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 17 Nov 2025 03:29:11 +0000 Subject: [PATCH 5/6] chore: apply suggestions from CR Signed-off-by: WenyXu --- src/cli/src/data/export.rs | 2 +- .../copy/copy_database_from_fs_parquet.result | 36 ++++++++------- .../copy/copy_database_from_fs_parquet.sql | 45 ++++++++++++------- 3 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/cli/src/data/export.rs b/src/cli/src/data/export.rs index d5ef6a96c7b1..57974fd80421 100644 --- a/src/cli/src/data/export.rs +++ b/src/cli/src/data/export.rs @@ -90,7 +90,7 @@ pub struct ExportCommand { end_time: Option, /// The parallelism of the export. - #[clap(long, default_value = "8")] + #[clap(long, default_value = "4")] parallelism: usize, /// The basic authentication for connecting to the server diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 2f77638f2d20..3ec38aa7ca7b 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -72,10 +72,11 @@ CREATE TABLE cpu_metrics ( Affected Rows: 0 -INSERT INTO cpu_metrics VALUES -('host1', 66.6, 1655276557000), -('host2', 77.7, 1655276558000), -('host3', 88.8, 1655276559000); +INSERT INTO cpu_metrics +VALUES + ('host1', 66.6, 1655276557000), + ('host2', 77.7, 1655276558000), + ('host3', 88.8, 1655276559000); Affected Rows: 3 @@ -88,10 +89,11 @@ CREATE TABLE memory_stats ( Affected Rows: 0 -INSERT INTO memory_stats VALUES -('host1', 1024, 512, 1655276557000), -('host2', 2048, 1024, 1655276558000), -('host3', 4096, 2048, 1655276559000); +INSERT INTO memory_stats +VALUES + ('host1', 1024, 512, 1655276557000), + ('host2', 2048, 1024, 1655276558000), + ('host3', 4096, 2048, 1655276559000); Affected Rows: 3 @@ -103,10 +105,11 @@ CREATE TABLE event_logs ( Affected Rows: 0 -INSERT INTO event_logs VALUES -(1, 'start', 1655276557000), -(2, 'processing', 1655276558000), -(3, 'finish', 1655276559000); +INSERT INTO event_logs +VALUES + (1, 'start', 1655276557000), + (2, 'processing', 1655276558000), + (3, 'finish', 1655276559000); Affected Rows: 3 @@ -119,10 +122,11 @@ CREATE TABLE sensors ( Affected Rows: 0 -INSERT INTO sensors VALUES -('s1', 36.5, 1001, 1655276557000), -('s2', 37.2, 1003, 1655276558000), -('s3', 35.9, 998, 1655276559000); +INSERT INTO sensors +VALUES + ('s1', 36.5, 1001, 1655276557000), + ('s2', 37.2, 1003, 1655276558000), + ('s3', 35.9, 998, 1655276559000); Affected Rows: 3 diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql index 22cbb45d9518..691bfd95e5e6 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -32,10 +32,11 @@ CREATE TABLE cpu_metrics ( ts TIMESTAMP TIME INDEX ); -INSERT INTO cpu_metrics VALUES -('host1', 66.6, 1655276557000), -('host2', 77.7, 1655276558000), -('host3', 88.8, 1655276559000); +INSERT INTO cpu_metrics +VALUES + ('host1', 66.6, 1655276557000), + ('host2', 77.7, 1655276558000), + ('host3', 88.8, 1655276559000); CREATE TABLE memory_stats ( host STRING, @@ -44,10 +45,11 @@ CREATE TABLE memory_stats ( ts TIMESTAMP TIME INDEX ); -INSERT INTO memory_stats VALUES -('host1', 1024, 512, 1655276557000), -('host2', 2048, 1024, 1655276558000), -('host3', 4096, 2048, 1655276559000); +INSERT INTO memory_stats +VALUES + ('host1', 1024, 512, 1655276557000), + ('host2', 2048, 1024, 1655276558000), + ('host3', 4096, 2048, 1655276559000); CREATE TABLE event_logs ( `id` INT, @@ -55,10 +57,11 @@ CREATE TABLE event_logs ( ts TIMESTAMP TIME INDEX ); -INSERT INTO event_logs VALUES -(1, 'start', 1655276557000), -(2, 'processing', 1655276558000), -(3, 'finish', 1655276559000); +INSERT INTO event_logs +VALUES + (1, 'start', 1655276557000), + (2, 'processing', 1655276558000), + (3, 'finish', 1655276559000); CREATE TABLE sensors ( sensor_id STRING, @@ -67,27 +70,37 @@ CREATE TABLE sensors ( ts TIMESTAMP TIME INDEX ); -INSERT INTO sensors VALUES -('s1', 36.5, 1001, 1655276557000), -('s2', 37.2, 1003, 1655276558000), -('s3', 35.9, 998, 1655276559000); +INSERT INTO sensors +VALUES + ('s1', 36.5, 1001, 1655276557000), + ('s2', 37.2, 1003, 1655276558000), + ('s3', 35.9, 998, 1655276559000); COPY DATABASE public TO '${SQLNESS_HOME}/export_parallel/' WITH (format='parquet', parallelism=2); DELETE FROM cpu_metrics; + DELETE FROM memory_stats; + DELETE FROM event_logs; + DELETE FROM sensors; COPY DATABASE public FROM '${SQLNESS_HOME}/export_parallel/' WITH (parallelism=2); SELECT * FROM cpu_metrics; + SELECT * FROM memory_stats; + SELECT * FROM event_logs; + SELECT * FROM sensors; DROP TABLE cpu_metrics; + DROP TABLE memory_stats; + DROP TABLE event_logs; + DROP TABLE sensors; From 225b204f9eeea370a56f8232a87a7037503c54dd Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 17 Nov 2025 08:41:23 +0000 Subject: [PATCH 6/6] refactor(cli): improve parallelism configuration for data export and import Signed-off-by: WenyXu --- src/cli/src/data/export.rs | 22 +++++++++++++--------- src/cli/src/data/import.rs | 10 ++++++---- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/cli/src/data/export.rs b/src/cli/src/data/export.rs index 57974fd80421..007f8aa67c3c 100644 --- a/src/cli/src/data/export.rs +++ b/src/cli/src/data/export.rs @@ -67,9 +67,17 @@ pub struct ExportCommand { #[clap(long, default_value_t = default_database())] database: String, - /// Parallelism of the export. - #[clap(long, short = 'j', default_value = "1")] - export_jobs: usize, + /// The number of databases exported in parallel. + /// For example, if there are 20 databases and `db_parallelism` is 4, + /// 4 databases will be exported concurrently. + #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")] + db_parallelism: usize, + + /// The number of tables exported in parallel within a single database. + /// For example, if a database has 30 tables and `parallelism` is 8, + /// 8 tables will be exported concurrently. + #[clap(long, default_value = "4")] + table_parallelism: usize, /// Max retry times for each job. #[clap(long, default_value = "3")] @@ -89,10 +97,6 @@ pub struct ExportCommand { #[clap(long)] end_time: Option, - /// The parallelism of the export. - #[clap(long, default_value = "4")] - parallelism: usize, - /// The basic authentication for connecting to the server #[clap(long)] auth_basic: Option, @@ -214,11 +218,11 @@ impl ExportCommand { schema, database_client, output_dir: self.output_dir.clone(), - export_jobs: self.export_jobs, + export_jobs: self.db_parallelism, target: self.target.clone(), start_time: self.start_time.clone(), end_time: self.end_time.clone(), - parallelism: self.parallelism, + parallelism: self.table_parallelism, s3: self.s3, ddl_local_dir: self.ddl_local_dir.clone(), s3_bucket: self.s3_bucket.clone(), diff --git a/src/cli/src/data/import.rs b/src/cli/src/data/import.rs index 908f3d4c9fa4..ffe8b62c7ede 100644 --- a/src/cli/src/data/import.rs +++ b/src/cli/src/data/import.rs @@ -56,9 +56,11 @@ pub struct ImportCommand { #[clap(long, default_value_t = default_database())] database: String, - /// Parallelism of the import. - #[clap(long, short = 'j', default_value = "1")] - import_jobs: usize, + /// The number of databases imported in parallel. + /// For example, if there are 20 databases and `db_parallelism` is 4, + /// 4 databases will be imported concurrently. + #[clap(long, short = 'j', default_value = "1", alias = "import-jobs")] + db_parallelism: usize, /// Max retry times for each job. #[clap(long, default_value = "3")] @@ -109,7 +111,7 @@ impl ImportCommand { schema, database_client, input_dir: self.input_dir.clone(), - parallelism: self.import_jobs, + parallelism: self.db_parallelism, target: self.target.clone(), })) }