Skip to content

Commit 6adc348

Browse files
authored
feat: support parallel table operations in COPY DATABASE (#7213)
* feat: support parallel table operations in COPY DATABASE Signed-off-by: WenyXu <[email protected]> * feat(cli): add a new `parallelism` parameter to control the parallelism during export Signed-off-by: WenyXu <[email protected]> * chore: add sqlness tests Signed-off-by: WenyXu <[email protected]> * chore: clippy Signed-off-by: WenyXu <[email protected]> * chore: apply suggestions from CR Signed-off-by: WenyXu <[email protected]> * refactor(cli): improve parallelism configuration for data export and import Signed-off-by: WenyXu <[email protected]> --------- Signed-off-by: WenyXu <[email protected]>
1 parent cc61af7 commit 6adc348

File tree

7 files changed

+361
-60
lines changed

7 files changed

+361
-60
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cli/src/data/export.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,17 @@ pub struct ExportCommand {
6767
#[clap(long, default_value_t = default_database())]
6868
database: String,
6969

70-
/// Parallelism of the export.
71-
#[clap(long, short = 'j', default_value = "1")]
72-
export_jobs: usize,
70+
/// The number of databases exported in parallel.
71+
/// For example, if there are 20 databases and `db_parallelism` is 4,
72+
/// 4 databases will be exported concurrently.
73+
#[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
74+
db_parallelism: usize,
75+
76+
/// The number of tables exported in parallel within a single database.
77+
/// For example, if a database has 30 tables and `parallelism` is 8,
78+
/// 8 tables will be exported concurrently.
79+
#[clap(long, default_value = "4")]
80+
table_parallelism: usize,
7381

7482
/// Max retry times for each job.
7583
#[clap(long, default_value = "3")]
@@ -210,10 +218,11 @@ impl ExportCommand {
210218
schema,
211219
database_client,
212220
output_dir: self.output_dir.clone(),
213-
parallelism: self.export_jobs,
221+
export_jobs: self.db_parallelism,
214222
target: self.target.clone(),
215223
start_time: self.start_time.clone(),
216224
end_time: self.end_time.clone(),
225+
parallelism: self.table_parallelism,
217226
s3: self.s3,
218227
ddl_local_dir: self.ddl_local_dir.clone(),
219228
s3_bucket: self.s3_bucket.clone(),
@@ -251,10 +260,11 @@ pub struct Export {
251260
schema: Option<String>,
252261
database_client: DatabaseClient,
253262
output_dir: Option<String>,
254-
parallelism: usize,
263+
export_jobs: usize,
255264
target: ExportTarget,
256265
start_time: Option<String>,
257266
end_time: Option<String>,
267+
parallelism: usize,
258268
s3: bool,
259269
ddl_local_dir: Option<String>,
260270
s3_bucket: Option<String>,
@@ -464,7 +474,7 @@ impl Export {
464474

465475
async fn export_create_table(&self) -> Result<()> {
466476
let timer = Instant::now();
467-
let semaphore = Arc::new(Semaphore::new(self.parallelism));
477+
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
468478
let db_names = self.get_db_names().await?;
469479
let db_count = db_names.len();
470480
let operator = Arc::new(self.build_prefer_fs_operator().await?);
@@ -625,13 +635,13 @@ impl Export {
625635

626636
async fn export_database_data(&self) -> Result<()> {
627637
let timer = Instant::now();
628-
let semaphore = Arc::new(Semaphore::new(self.parallelism));
638+
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
629639
let db_names = self.get_db_names().await?;
630640
let db_count = db_names.len();
631641
let mut tasks = Vec::with_capacity(db_count);
632642
let operator = Arc::new(self.build_operator().await?);
633643
let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
634-
let with_options = build_with_options(&self.start_time, &self.end_time);
644+
let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
635645

636646
for schema in db_names {
637647
let semaphore_moved = semaphore.clone();
@@ -888,13 +898,18 @@ impl Tool for Export {
888898
}
889899

890900
/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
891-
fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
901+
fn build_with_options(
902+
start_time: &Option<String>,
903+
end_time: &Option<String>,
904+
parallelism: usize,
905+
) -> String {
892906
let mut options = vec!["format = 'parquet'".to_string()];
893907
if let Some(start) = start_time {
894908
options.push(format!("start_time = '{}'", start));
895909
}
896910
if let Some(end) = end_time {
897911
options.push(format!("end_time = '{}'", end));
898912
}
913+
options.push(format!("parallelism = {}", parallelism));
899914
options.join(", ")
900915
}

src/cli/src/data/import.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ pub struct ImportCommand {
5656
#[clap(long, default_value_t = default_database())]
5757
database: String,
5858

59-
/// Parallelism of the import.
60-
#[clap(long, short = 'j', default_value = "1")]
61-
import_jobs: usize,
59+
/// The number of databases imported in parallel.
60+
/// For example, if there are 20 databases and `db_parallelism` is 4,
61+
/// 4 databases will be imported concurrently.
62+
#[clap(long, short = 'j', default_value = "1", alias = "import-jobs")]
63+
db_parallelism: usize,
6264

6365
/// Max retry times for each job.
6466
#[clap(long, default_value = "3")]
@@ -109,7 +111,7 @@ impl ImportCommand {
109111
schema,
110112
database_client,
111113
input_dir: self.input_dir.clone(),
112-
parallelism: self.import_jobs,
114+
parallelism: self.db_parallelism,
113115
target: self.target.clone(),
114116
}))
115117
}

src/operator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ common-query.workspace = true
3636
common-recordbatch.workspace = true
3737
common-runtime.workspace = true
3838
common-sql.workspace = true
39+
common-stat.workspace = true
3940
common-telemetry.workspace = true
4041
common-time.workspace = true
4142
datafusion.workspace = true

src/operator/src/statement/copy_database.rs

Lines changed: 104 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,27 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::path::Path;
1617
use std::str::FromStr;
18+
use std::sync::Arc;
1719

1820
use client::{Output, OutputData, OutputMeta};
21+
use common_catalog::format_full_table_name;
1922
use common_datasource::file_format::Format;
2023
use common_datasource::lister::{Lister, Source};
2124
use common_datasource::object_store::build_backend;
25+
use common_stat::get_total_cpu_cores;
2226
use common_telemetry::{debug, error, info, tracing};
27+
use futures::future::try_join_all;
2328
use object_store::Entry;
2429
use regex::Regex;
2530
use session::context::QueryContextRef;
2631
use snafu::{OptionExt, ResultExt, ensure};
2732
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
2833
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
2934
use table::table_reference::TableReference;
35+
use tokio::sync::Semaphore;
3036

3137
use crate::error;
3238
use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu};
@@ -35,6 +41,16 @@ use crate::statement::StatementExecutor;
3541
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
3642
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
3743
pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error";
44+
pub(crate) const PARALLELISM_KEY: &str = "parallelism";
45+
46+
/// Get parallelism from options, default to total CPU cores.
47+
fn parse_parallelism_from_option_map(options: &HashMap<String, String>) -> usize {
48+
options
49+
.get(PARALLELISM_KEY)
50+
.and_then(|v| v.parse::<usize>().ok())
51+
.unwrap_or_else(get_total_cpu_cores)
52+
.max(1)
53+
}
3854

3955
impl StatementExecutor {
4056
#[tracing::instrument(skip_all)]
@@ -51,22 +67,26 @@ impl StatementExecutor {
5167
}
5268
);
5369

70+
let parallelism = parse_parallelism_from_option_map(&req.with);
5471
info!(
55-
"Copy database {}.{} to dir: {}, time: {:?}",
56-
req.catalog_name, req.schema_name, req.location, req.time_range
72+
"Copy database {}.{} to dir: {}, time: {:?}, parallelism: {}",
73+
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
5774
);
5875
let table_names = self
5976
.catalog_manager
6077
.table_names(&req.catalog_name, &req.schema_name, Some(&ctx))
6178
.await
6279
.context(CatalogSnafu)?;
80+
let num_tables = table_names.len();
6381

6482
let suffix = Format::try_from(&req.with)
6583
.context(error::ParseFileFormatSnafu)?
6684
.suffix();
6785

68-
let mut exported_rows = 0;
69-
for table_name in table_names {
86+
let mut tasks = Vec::with_capacity(num_tables);
87+
let semaphore = Arc::new(Semaphore::new(parallelism));
88+
89+
for (i, table_name) in table_names.into_iter().enumerate() {
7090
let table = self
7191
.get_table(&TableReference {
7292
catalog: &req.catalog_name,
@@ -89,33 +109,40 @@ impl StatementExecutor {
89109
{
90110
continue;
91111
}
112+
113+
let semaphore_moved = semaphore.clone();
92114
let mut table_file = req.location.clone();
93115
table_file.push_str(&table_name);
94116
table_file.push_str(suffix);
95-
info!(
96-
"Copy table: {}.{}.{} to {}",
97-
req.catalog_name, req.schema_name, table_name, table_file
98-
);
99-
100-
let exported = self
101-
.copy_table_to(
102-
CopyTableRequest {
103-
catalog_name: req.catalog_name.clone(),
104-
schema_name: req.schema_name.clone(),
105-
table_name,
106-
location: table_file,
107-
with: req.with.clone(),
108-
connection: req.connection.clone(),
109-
pattern: None,
110-
direction: CopyDirection::Export,
111-
timestamp_range: req.time_range,
112-
limit: None,
113-
},
114-
ctx.clone(),
115-
)
116-
.await?;
117-
exported_rows += exported;
117+
let table_no = i + 1;
118+
let moved_ctx = ctx.clone();
119+
let full_table_name =
120+
format_full_table_name(&req.catalog_name, &req.schema_name, &table_name);
121+
let copy_table_req = CopyTableRequest {
122+
catalog_name: req.catalog_name.clone(),
123+
schema_name: req.schema_name.clone(),
124+
table_name,
125+
location: table_file.clone(),
126+
with: req.with.clone(),
127+
connection: req.connection.clone(),
128+
pattern: None,
129+
direction: CopyDirection::Export,
130+
timestamp_range: req.time_range,
131+
limit: None,
132+
};
133+
134+
tasks.push(async move {
135+
let _permit = semaphore_moved.acquire().await.unwrap();
136+
info!(
137+
"Copy table({}/{}): {} to {}",
138+
table_no, num_tables, full_table_name, table_file
139+
);
140+
self.copy_table_to(copy_table_req, moved_ctx).await
141+
});
118142
}
143+
144+
let results = try_join_all(tasks).await?;
145+
let exported_rows = results.into_iter().sum();
119146
Ok(Output::new_with_affected_rows(exported_rows))
120147
}
121148

@@ -134,9 +161,10 @@ impl StatementExecutor {
134161
}
135162
);
136163

164+
let parallelism = parse_parallelism_from_option_map(&req.with);
137165
info!(
138-
"Copy database {}.{} from dir: {}, time: {:?}",
139-
req.catalog_name, req.schema_name, req.location, req.time_range
166+
"Copy database {}.{} from dir: {}, time: {:?}, parallelism: {}",
167+
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
140168
);
141169
let suffix = Format::try_from(&req.with)
142170
.context(error::ParseFileFormatSnafu)?
@@ -150,8 +178,8 @@ impl StatementExecutor {
150178
.and_then(|v| bool::from_str(v).ok())
151179
.unwrap_or(false);
152180

153-
let mut rows_inserted = 0;
154-
let mut insert_cost = 0;
181+
let mut tasks = Vec::with_capacity(entries.len());
182+
let semaphore = Arc::new(Semaphore::new(parallelism));
155183

156184
for e in entries {
157185
let table_name = match parse_file_name_to_copy(&e) {
@@ -165,6 +193,7 @@ impl StatementExecutor {
165193
}
166194
}
167195
};
196+
168197
let req = CopyTableRequest {
169198
catalog_name: req.catalog_name.clone(),
170199
schema_name: req.schema_name.clone(),
@@ -177,23 +206,36 @@ impl StatementExecutor {
177206
timestamp_range: None,
178207
limit: None,
179208
};
180-
debug!("Copy table, arg: {:?}", req);
181-
match self.copy_table_from(req, ctx.clone()).await {
182-
Ok(o) => {
183-
let (rows, cost) = o.extract_rows_and_cost();
184-
rows_inserted += rows;
185-
insert_cost += cost;
186-
}
187-
Err(err) => {
188-
if continue_on_error {
189-
error!(err; "Failed to import file to table: {}", table_name);
190-
continue;
191-
} else {
192-
return Err(err);
209+
let moved_ctx = ctx.clone();
210+
let moved_table_name = table_name.clone();
211+
let moved_semaphore = semaphore.clone();
212+
tasks.push(async move {
213+
let _permit = moved_semaphore.acquire().await.unwrap();
214+
debug!("Copy table, arg: {:?}", req);
215+
match self.copy_table_from(req, moved_ctx).await {
216+
Ok(o) => {
217+
let (rows, cost) = o.extract_rows_and_cost();
218+
Ok((rows, cost))
219+
}
220+
Err(err) => {
221+
if continue_on_error {
222+
error!(err; "Failed to import file to table: {}", moved_table_name);
223+
Ok((0, 0))
224+
} else {
225+
Err(err)
226+
}
193227
}
194228
}
195-
}
229+
});
196230
}
231+
232+
let results = try_join_all(tasks).await?;
233+
let (rows_inserted, insert_cost) = results
234+
.into_iter()
235+
.fold((0, 0), |(acc_rows, acc_cost), (rows, cost)| {
236+
(acc_rows + rows, acc_cost + cost)
237+
});
238+
197239
Ok(Output::new(
198240
OutputData::AffectedRows(rows_inserted),
199241
OutputMeta::new_with_cost(insert_cost),
@@ -229,15 +271,18 @@ async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::R
229271

230272
#[cfg(test)]
231273
mod tests {
232-
use std::collections::HashSet;
274+
use std::collections::{HashMap, HashSet};
233275

276+
use common_stat::get_total_cpu_cores;
234277
use object_store::ObjectStore;
235278
use object_store::services::Fs;
236279
use object_store::util::normalize_dir;
237280
use path_slash::PathExt;
238281
use table::requests::CopyDatabaseRequest;
239282

240-
use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy};
283+
use crate::statement::copy_database::{
284+
list_files_to_copy, parse_file_name_to_copy, parse_parallelism_from_option_map,
285+
};
241286

242287
#[tokio::test]
243288
async fn test_list_files_and_parse_table_name() {
@@ -276,4 +321,16 @@ mod tests {
276321
listed
277322
);
278323
}
324+
325+
#[test]
326+
fn test_parse_parallelism_from_option_map() {
327+
let options = HashMap::new();
328+
assert_eq!(
329+
parse_parallelism_from_option_map(&options),
330+
get_total_cpu_cores()
331+
);
332+
333+
let options = HashMap::from([("parallelism".to_string(), "0".to_string())]);
334+
assert_eq!(parse_parallelism_from_option_map(&options), 1);
335+
}
279336
}

0 commit comments

Comments
 (0)