Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
const std::string VIRTUAL_COLUMN_PREFIX = "__DORIS_VIRTUAL_COL__";
const std::string ICEBERG_ROWID_COL = "__DORIS_ICEBERG_ROWID_COL__";

/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
constexpr int MAX_DECIMAL32_PRECISION = 9;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exchange/exchange_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool e
}

Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) {
auto rows = block->rows();
{
SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block));
}
{
auto rows = block->rows();
SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
const auto& channel_ids = _partitioner->get_channel_ids();

Expand Down
28 changes: 25 additions & 3 deletions be/src/exec/operator/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "exec/sink/scale_writer_partitioning_exchanger.hpp"
#include "exec/sink/tablet_sink_hash_partitioner.h"
#include "exprs/vexpr.h"
#include "format/transformer/merge_partitioner.h"
#include "runtime/runtime_profile.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -175,6 +176,20 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
custom_profile()->add_info_string(
"Partitioner", fmt::format("ScaleWriterPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::MERGE_PARTITIONED) {
if (!p._has_merge_partition_info) {
return Status::InternalError("Merge partition info is missing");
}
_partition_count = channels.size();
const bool use_new_shuffle_hash_method =
_state->query_options().__isset.enable_new_shuffle_hash_method &&
_state->query_options().enable_new_shuffle_hash_method;
_partitioner = std::make_unique<MergePartitioner>(_partition_count, p._merge_partition_info,
use_new_shuffle_hash_method);
RETURN_IF_ERROR(_partitioner->init({}));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
custom_profile()->add_info_string("Partitioner",
fmt::format("MergePartitioner({})", _partition_count));
}

return Status::OK();
Expand Down Expand Up @@ -258,7 +273,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::MERGE_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->open(state));
}
return Status::OK();
Expand Down Expand Up @@ -306,13 +322,18 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
sink.output_partition.type == TPartitionType::MERGE_PARTITIONED);
#endif
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
_output_tuple_id = sink.output_tuple_id;
}
if (sink.output_partition.__isset.merge_partition_info) {
_merge_partition_info = sink.output_partition.merge_partition_info;
_has_merge_partition_info = true;
}

if (_part_type != TPartitionType::UNPARTITIONED) {
// if the destinations only one dest, we need to use broadcast
Expand Down Expand Up @@ -512,8 +533,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, Block* block, bool eos)
(local_state.current_channel_idx + 1) % local_state.channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
_part_type == TPartitionType::MERGE_PARTITIONED) {
RETURN_IF_ERROR(local_state._writer->write(state, block, eos));
} else if (_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/operator/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<Exchan
RuntimeState* _state = nullptr;

const std::vector<TExpr> _texprs;
TMergePartitionInfo _merge_partition_info;
bool _has_merge_partition_info = false;

const RowDescriptor& _row_desc;
TTupleId _output_tuple_id = -1;
Expand Down
33 changes: 33 additions & 0 deletions be/src/exec/operator/iceberg_delete_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "iceberg_delete_sink_operator.h"

#include "common/status.h"

namespace doris {
#include "common/compile_check_begin.h"
Status IcebergDeleteSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}

} // namespace doris
87 changes: 87 additions & 0 deletions be/src/exec/operator/iceberg_delete_sink_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "exec/sink/viceberg_delete_sink.h"
#include "operator.h"

namespace doris {
#include "common/compile_check_begin.h"

class IcebergDeleteSinkOperatorX;

class IcebergDeleteSinkLocalState final
: public AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX> {
public:
using Base = AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
using Parent = IcebergDeleteSinkOperatorX;
ENABLE_FACTORY_CREATOR(IcebergDeleteSinkLocalState);
IcebergDeleteSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
return Base::open(state);
}
friend class IcebergDeleteSinkOperatorX;
};

class IcebergDeleteSinkOperatorX final : public DataSinkOperatorX<IcebergDeleteSinkLocalState> {
public:
using Base = DataSinkOperatorX<IcebergDeleteSinkLocalState>;
IcebergDeleteSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};

Status init(const TDataSink& thrift_sink) override {
RETURN_IF_ERROR(Base::init(thrift_sink));
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return VExpr::open(_output_vexpr_ctxs, state);
}

Status sink(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, eos);
}

private:
friend class IcebergDeleteSinkLocalState;
template <typename Writer, typename Parent>
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
friend class AsyncWriterSink;
const RowDescriptor& _row_desc;
VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
ObjectPool* _pool = nullptr;
};

#include "common/compile_check_end.h"
} // namespace doris
35 changes: 35 additions & 0 deletions be/src/exec/operator/iceberg_merge_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "iceberg_merge_sink_operator.h"

#include "common/status.h"

namespace doris {
#include "common/compile_check_begin.h"

Status IcebergMergeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris
86 changes: 86 additions & 0 deletions be/src/exec/operator/iceberg_merge_sink_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "exec/sink/viceberg_merge_sink.h"
#include "operator.h"

namespace doris {
#include "common/compile_check_begin.h"

class IcebergMergeSinkOperatorX;

class IcebergMergeSinkLocalState final
: public AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX> {
public:
using Base = AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX>;
using Parent = IcebergMergeSinkOperatorX;
ENABLE_FACTORY_CREATOR(IcebergMergeSinkLocalState);
IcebergMergeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
return Base::open(state);
}
friend class IcebergMergeSinkOperatorX;
};

class IcebergMergeSinkOperatorX final : public DataSinkOperatorX<IcebergMergeSinkLocalState> {
public:
using Base = DataSinkOperatorX<IcebergMergeSinkLocalState>;
IcebergMergeSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};

Status init(const TDataSink& thrift_sink) override {
RETURN_IF_ERROR(Base::init(thrift_sink));
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return VExpr::open(_output_vexpr_ctxs, state);
}

Status sink(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, eos);
}

private:
friend class IcebergMergeSinkLocalState;
template <typename Writer, typename Parent>
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
friend class AsyncWriterSink;
const RowDescriptor& _row_desc;
VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
ObjectPool* _pool = nullptr;
};

#include "common/compile_check_end.h"
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include "exec/operator/hashjoin_build_sink.h"
#include "exec/operator/hashjoin_probe_operator.h"
#include "exec/operator/hive_table_sink_operator.h"
#include "exec/operator/iceberg_delete_sink_operator.h"
#include "exec/operator/iceberg_merge_sink_operator.h"
#include "exec/operator/iceberg_table_sink_operator.h"
#include "exec/operator/jdbc_scan_operator.h"
#include "exec/operator/jdbc_table_sink_operator.h"
Expand Down Expand Up @@ -813,6 +815,8 @@ DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(TVFTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
DECLARE_OPERATOR(IcebergDeleteSinkLocalState)
DECLARE_OPERATOR(IcebergMergeSinkLocalState)
DECLARE_OPERATOR(MCTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(BlackholeSinkLocalState)
Expand Down Expand Up @@ -933,6 +937,8 @@ template class AsyncWriterSink<doris::VTabletWriterV2, OlapTableSinkV2OperatorX>
template class AsyncWriterSink<doris::VHiveTableWriter, HiveTableSinkOperatorX>;
template class AsyncWriterSink<doris::VIcebergTableWriter, IcebergTableSinkOperatorX>;
template class AsyncWriterSink<doris::VIcebergTableWriter, SpillIcebergTableSinkOperatorX>;
template class AsyncWriterSink<doris::VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
template class AsyncWriterSink<doris::VIcebergMergeSink, IcebergMergeSinkOperatorX>;
template class AsyncWriterSink<doris::VMCTableWriter, MCTableSinkOperatorX>;
template class AsyncWriterSink<doris::VTVFTableWriter, TVFTableSinkOperatorX>;

Expand Down
Loading
Loading