Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 24 additions & 9 deletions src/cli/src/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,17 @@ pub struct ExportCommand {
#[clap(long, default_value_t = default_database())]
database: String,

/// Parallelism of the export.
#[clap(long, short = 'j', default_value = "1")]
export_jobs: usize,
/// The number of databases exported in parallel.
/// For example, if there are 20 databases and `db_parallelism` is 4,
/// 4 databases will be exported concurrently.
#[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
db_parallelism: usize,

/// The number of tables exported in parallel within a single database.
/// For example, if a database has 30 tables and `parallelism` is 8,
/// 8 tables will be exported concurrently.
#[clap(long, default_value = "4")]
table_parallelism: usize,

/// Max retry times for each job.
#[clap(long, default_value = "3")]
Expand Down Expand Up @@ -210,10 +218,11 @@ impl ExportCommand {
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
export_jobs: self.db_parallelism,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
parallelism: self.table_parallelism,
s3: self.s3,
ddl_local_dir: self.ddl_local_dir.clone(),
s3_bucket: self.s3_bucket.clone(),
Expand Down Expand Up @@ -251,10 +260,11 @@ pub struct Export {
schema: Option<String>,
database_client: DatabaseClient,
output_dir: Option<String>,
parallelism: usize,
export_jobs: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
parallelism: usize,
s3: bool,
ddl_local_dir: Option<String>,
s3_bucket: Option<String>,
Expand Down Expand Up @@ -464,7 +474,7 @@ impl Export {

async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = Arc::new(self.build_prefer_fs_operator().await?);
Expand Down Expand Up @@ -625,13 +635,13 @@ impl Export {

async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
let operator = Arc::new(self.build_operator().await?);
let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
let with_options = build_with_options(&self.start_time, &self.end_time);
let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);

for schema in db_names {
let semaphore_moved = semaphore.clone();
Expand Down Expand Up @@ -888,13 +898,18 @@ impl Tool for Export {
}

/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
fn build_with_options(
start_time: &Option<String>,
end_time: &Option<String>,
parallelism: usize,
) -> String {
let mut options = vec!["format = 'parquet'".to_string()];
if let Some(start) = start_time {
options.push(format!("start_time = '{}'", start));
}
if let Some(end) = end_time {
options.push(format!("end_time = '{}'", end));
}
options.push(format!("parallelism = {}", parallelism));
options.join(", ")
}
10 changes: 6 additions & 4 deletions src/cli/src/data/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ pub struct ImportCommand {
#[clap(long, default_value_t = default_database())]
database: String,

/// Parallelism of the import.
#[clap(long, short = 'j', default_value = "1")]
import_jobs: usize,
/// The number of databases imported in parallel.
/// For example, if there are 20 databases and `db_parallelism` is 4,
/// 4 databases will be imported concurrently.
#[clap(long, short = 'j', default_value = "1", alias = "import-jobs")]
db_parallelism: usize,

/// Max retry times for each job.
#[clap(long, default_value = "3")]
Expand Down Expand Up @@ -109,7 +111,7 @@ impl ImportCommand {
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
parallelism: self.db_parallelism,
target: self.target.clone(),
}))
}
Expand Down
1 change: 1 addition & 0 deletions src/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-sql.workspace = true
common-stat.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
Expand Down
151 changes: 104 additions & 47 deletions src/operator/src/statement/copy_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

use client::{Output, OutputData, OutputMeta};
use common_catalog::format_full_table_name;
use common_datasource::file_format::Format;
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_stat::get_total_cpu_cores;
use common_telemetry::{debug, error, info, tracing};
use futures::future::try_join_all;
use object_store::Entry;
use regex::Regex;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::table_reference::TableReference;
use tokio::sync::Semaphore;

use crate::error;
use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu};
Expand All @@ -35,6 +41,16 @@ use crate::statement::StatementExecutor;
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error";
pub(crate) const PARALLELISM_KEY: &str = "parallelism";

/// Get parallelism from options, default to total CPU cores.
fn parse_parallelism_from_option_map(options: &HashMap<String, String>) -> usize {
options
.get(PARALLELISM_KEY)
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_else(get_total_cpu_cores)
.max(1)
}

impl StatementExecutor {
#[tracing::instrument(skip_all)]
Expand All @@ -51,22 +67,26 @@ impl StatementExecutor {
}
);

let parallelism = parse_parallelism_from_option_map(&req.with);
info!(
"Copy database {}.{} to dir: {}, time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
"Copy database {}.{} to dir: {}, time: {:?}, parallelism: {}",
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
);
let table_names = self
.catalog_manager
.table_names(&req.catalog_name, &req.schema_name, Some(&ctx))
.await
.context(CatalogSnafu)?;
let num_tables = table_names.len();

let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
.suffix();

let mut exported_rows = 0;
for table_name in table_names {
let mut tasks = Vec::with_capacity(num_tables);
let semaphore = Arc::new(Semaphore::new(parallelism));

for (i, table_name) in table_names.into_iter().enumerate() {
let table = self
.get_table(&TableReference {
catalog: &req.catalog_name,
Expand All @@ -89,33 +109,40 @@ impl StatementExecutor {
{
continue;
}

let semaphore_moved = semaphore.clone();
let mut table_file = req.location.clone();
table_file.push_str(&table_name);
table_file.push_str(suffix);
info!(
"Copy table: {}.{}.{} to {}",
req.catalog_name, req.schema_name, table_name, table_file
);

let exported = self
.copy_table_to(
CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file,
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
.await?;
exported_rows += exported;
let table_no = i + 1;
let moved_ctx = ctx.clone();
let full_table_name =
format_full_table_name(&req.catalog_name, &req.schema_name, &table_name);
let copy_table_req = CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file.clone(),
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
};

tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
info!(
"Copy table({}/{}): {} to {}",
table_no, num_tables, full_table_name, table_file
);
self.copy_table_to(copy_table_req, moved_ctx).await
});
}

let results = try_join_all(tasks).await?;
let exported_rows = results.into_iter().sum();
Ok(Output::new_with_affected_rows(exported_rows))
}

Expand All @@ -134,9 +161,10 @@ impl StatementExecutor {
}
);

let parallelism = parse_parallelism_from_option_map(&req.with);
info!(
"Copy database {}.{} from dir: {}, time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
"Copy database {}.{} from dir: {}, time: {:?}, parallelism: {}",
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
);
let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
Expand All @@ -150,8 +178,8 @@ impl StatementExecutor {
.and_then(|v| bool::from_str(v).ok())
.unwrap_or(false);

let mut rows_inserted = 0;
let mut insert_cost = 0;
let mut tasks = Vec::with_capacity(entries.len());
let semaphore = Arc::new(Semaphore::new(parallelism));

for e in entries {
let table_name = match parse_file_name_to_copy(&e) {
Expand All @@ -165,6 +193,7 @@ impl StatementExecutor {
}
}
};

let req = CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
Expand All @@ -177,23 +206,36 @@ impl StatementExecutor {
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {
Ok(o) => {
let (rows, cost) = o.extract_rows_and_cost();
rows_inserted += rows;
insert_cost += cost;
}
Err(err) => {
if continue_on_error {
error!(err; "Failed to import file to table: {}", table_name);
continue;
} else {
return Err(err);
let moved_ctx = ctx.clone();
let moved_table_name = table_name.clone();
let moved_semaphore = semaphore.clone();
tasks.push(async move {
let _permit = moved_semaphore.acquire().await.unwrap();
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, moved_ctx).await {
Ok(o) => {
let (rows, cost) = o.extract_rows_and_cost();
Ok((rows, cost))
}
Err(err) => {
if continue_on_error {
error!(err; "Failed to import file to table: {}", moved_table_name);
Ok((0, 0))
} else {
Err(err)
}
}
}
}
});
}

let results = try_join_all(tasks).await?;
let (rows_inserted, insert_cost) = results
.into_iter()
.fold((0, 0), |(acc_rows, acc_cost), (rows, cost)| {
(acc_rows + rows, acc_cost + cost)
});

Ok(Output::new(
OutputData::AffectedRows(rows_inserted),
OutputMeta::new_with_cost(insert_cost),
Expand Down Expand Up @@ -229,15 +271,18 @@ async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::R

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use common_stat::get_total_cpu_cores;
use object_store::ObjectStore;
use object_store::services::Fs;
use object_store::util::normalize_dir;
use path_slash::PathExt;
use table::requests::CopyDatabaseRequest;

use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy};
use crate::statement::copy_database::{
list_files_to_copy, parse_file_name_to_copy, parse_parallelism_from_option_map,
};

#[tokio::test]
async fn test_list_files_and_parse_table_name() {
Expand Down Expand Up @@ -276,4 +321,16 @@ mod tests {
listed
);
}

#[test]
fn test_parse_parallelism_from_option_map() {
let options = HashMap::new();
assert_eq!(
parse_parallelism_from_option_map(&options),
get_total_cpu_cores()
);

let options = HashMap::from([("parallelism".to_string(), "0".to_string())]);
assert_eq!(parse_parallelism_from_option_map(&options), 1);
}
}
Loading
Loading