Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::Pipeline;
use databend_common_storage::Histogram;
use databend_common_storage::StorageMetrics;
use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
Expand Down Expand Up @@ -240,7 +239,7 @@ pub trait Table: Sync + Send {
copied_files: Option<UpsertTableCopiedFileReq>,
update_stream_meta: Vec<UpdateStreamMetaReq>,
overwrite: bool,
prev_snapshot_id: Option<SnapshotId>,
forbid_occ_retry: bool,
_deduplicated_label: Option<String>,
) -> Result<()> {
let (_, _, _, _, _, _) = (
Expand All @@ -249,7 +248,7 @@ pub trait Table: Sync + Send {
update_stream_meta,
pipeline,
overwrite,
prev_snapshot_id,
forbid_occ_retry,
);

Ok(())
Expand Down
20 changes: 8 additions & 12 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_sql::binder::MutationType;
use databend_common_sql::optimizer::get_udf_names;
use databend_common_sql::plans::Append;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::Mutation;
use databend_common_sql::plans::OptimizeCompactBlock;
use databend_common_sql::plans::PresignAction;
use databend_common_sql::plans::Recluster;
use databend_common_sql::plans::RewriteKind;
use databend_common_sql::BindContext;
use databend_common_sql::Planner;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -1002,17 +1004,18 @@ impl AccessChecker for PrivilegeAccess {
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false)
.await?;
}
// Others.
Plan::Insert(plan) => {
let target_table_privileges = if plan.overwrite {
Plan::Append { s_expr, target_table_index,metadata,overwrite,.. } => {
let target_table_privileges = if *overwrite {
vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]
} else {
vec![UserPrivilegeType::Insert]
};
let (_, catalog, database, table) =
Append::target_table(metadata, *target_table_index);
for privilege in target_table_privileges {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, false, false).await?;
self.validate_table_access(&catalog, &database, &table, privilege, false, false).await?;
}
self.validate_insert_source(ctx, &plan.source).await?;
self.check(ctx, &Plan::Query { s_expr:s_expr.clone(), metadata: metadata.clone(), bind_context: Box::new(BindContext::new()), rewrite_kind: None, formatted_ast: None, ignore_result: false }).await?;
}
Plan::InsertMultiTable(plan) => {
let target_table_privileges = if plan.overwrite {
Expand Down Expand Up @@ -1164,13 +1167,6 @@ impl AccessChecker for PrivilegeAccess {
self.validate_access(&GrantObject::Global, UserPrivilegeType::Alter, false, false)
.await?;
}
Plan::CopyIntoTable(plan) => {
self.validate_stage_access(&plan.stage_table_info.stage_info, UserPrivilegeType::Read).await?;
self.validate_table_access(plan.catalog_info.catalog_name(), &plan.database_name, &plan.table_name, UserPrivilegeType::Insert, false, false).await?;
if let Some(query) = &plan.query {
self.check(ctx, query).await?;
}
}
Plan::CopyIntoLocation(plan) => {
self.validate_stage_access(&plan.stage, UserPrivilegeType::Write).await?;
let from = plan.from.clone();
Expand Down
232 changes: 232 additions & 0 deletions src/query/service/src/interpreters/interpreter_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::StringType;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_expression::SendableDataBlockStream;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::plans::Append;
use databend_common_sql::plans::AppendType;
use databend_common_sql::IndexType;
use log::info;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::PipelineBuilder;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::MetadataRef;
use crate::stream::DataBlockStream;

pub struct AppendInterpreter {
ctx: Arc<QueryContext>,
s_expr: SExpr,
metadata: MetadataRef,
target_table_index: IndexType,
stage_table_info: Option<Box<StageTableInfo>>,
overwrite: bool,
forbid_occ_retry: bool,
append_type: AppendType,
}

#[async_trait::async_trait]
impl Interpreter for AppendInterpreter {
fn name(&self) -> &str {
"AppendInterpreter"
}

fn is_ddl(&self) -> bool {
false
}

#[fastrace::trace]
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if check_deduplicate_label(self.ctx.clone()).await? {
return Ok(PipelineBuildResult::create());
}
let (target_table, catalog, database, table) =
Append::target_table(&self.metadata, self.target_table_index);
target_table.check_mutable()?;

// 1. build source and append pipeline
let mut build_res = {
let mut physical_plan_builder =
PhysicalPlanBuilder::new(self.metadata.clone(), self.ctx.clone(), false);
let physical_plan = physical_plan_builder
.build(&self.s_expr, Default::default())
.await?;
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?
};

// 2. build commit pipeline
let copied_files_meta_req = match &self.stage_table_info {
Some(stage_table_info) => PipelineBuilder::build_upsert_copied_files_to_meta_req(
self.ctx.clone(),
target_table.as_ref(),
stage_table_info
.files_to_copy
.as_deref()
.unwrap_or_default(),
&stage_table_info.copy_into_table_options,
)?,
None => None,
};
let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), &self.metadata).await?;
target_table.commit_insertion(
self.ctx.clone(),
&mut build_res.main_pipeline,
copied_files_meta_req,
update_stream_meta,
self.overwrite,
self.forbid_occ_retry,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
)?;

// 3. Purge files on pipeline finished.
if let Some(stage_table_info) = &self.stage_table_info {
let files_to_copy = stage_table_info
.files_to_copy
.as_deref()
.unwrap_or_default();
info!(
"set files to be purged, # of copied files: {}, # of duplicated files: {}",
files_to_copy.len(),
stage_table_info.duplicated_files_detected.len()
);

let files_to_be_deleted = files_to_copy
.iter()
.map(|f| f.path.clone())
.chain(stage_table_info.duplicated_files_detected.clone())
.collect::<Vec<_>>();
PipelineBuilder::set_purge_files_on_finished(
self.ctx.clone(),
files_to_be_deleted,
&stage_table_info.copy_into_table_options,
stage_table_info.stage_info.clone(),
&mut build_res.main_pipeline,
)?;
}

// 4. Execute hook.
{
let hook_operator = HookOperator::create(
self.ctx.clone(),
catalog,
database,
table,
MutationKind::Insert,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}

Ok(build_res)
}

fn inject_result(&self) -> Result<SendableDataBlockStream> {
match &self.append_type {
AppendType::CopyInto => {
let blocks = self.get_copy_into_table_result()?;
Ok(Box::pin(DataBlockStream::create(None, blocks)))
}
AppendType::Insert => Ok(Box::pin(DataBlockStream::create(None, vec![]))),
}
}
}

impl AppendInterpreter {
pub fn try_create(
ctx: Arc<QueryContext>,
s_expr: SExpr,
metadata: MetadataRef,
stage_table_info: Option<Box<StageTableInfo>>,
overwrite: bool,
forbid_occ_retry: bool,
append_type: AppendType,
table_index: IndexType,
) -> Result<Self> {
Ok(AppendInterpreter {
ctx,
s_expr,
metadata,
stage_table_info,
overwrite,
forbid_occ_retry,
append_type,
target_table_index: table_index,
})
}

fn get_copy_into_table_result(&self) -> Result<Vec<DataBlock>> {
let return_all = !self
.stage_table_info
.as_ref()
.unwrap()
.copy_into_table_options
.return_failed_only;
let cs = self.ctx.get_copy_status();

let mut results = cs.files.iter().collect::<Vec<_>>();
results.sort_by(|a, b| a.key().cmp(b.key()));

let n = cs.files.len();
let mut files = Vec::with_capacity(n);
let mut rows_loaded = Vec::with_capacity(n);
let mut errors_seen = Vec::with_capacity(n);
let mut first_error = Vec::with_capacity(n);
let mut first_error_line = Vec::with_capacity(n);

for entry in results {
let status = entry.value();
if let Some(err) = &status.error {
files.push(entry.key().clone());
rows_loaded.push(status.num_rows_loaded as i32);
errors_seen.push(err.num_errors as i32);
first_error.push(Some(err.first_error.error.to_string().clone()));
first_error_line.push(Some(err.first_error.line as i32 + 1));
} else if return_all {
files.push(entry.key().clone());
rows_loaded.push(status.num_rows_loaded as i32);
errors_seen.push(0);
first_error.push(None);
first_error_line.push(None);
}
}
let blocks = vec![DataBlock::new_from_columns(vec![
StringType::from_data(files),
Int32Type::from_data(rows_loaded),
Int32Type::from_data(errors_seen),
StringType::from_opt_data(first_error),
Int32Type::from_opt_data(first_error_line),
])];
Ok(blocks)
}
}
Loading
Loading