[feature](iceberg) Implements iceberg update delete merge into functionality. #60482
[feature](iceberg) Implements iceberg update delete merge into functionality. #60482kaka11chen wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
96b12f9 to
a6ce27b
Compare
|
run buildall |
a6ce27b to
66648d2
Compare
|
run buildall |
TPC-H: Total hot run time: 32029 ms |
ClickBench: Total hot run time: 28.21 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
TPC-H: Total hot run time: 30511 ms |
ClickBench: Total hot run time: 28.21 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
3e61bcd to
300a7c8
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
TPC-DS: Total hot run time: 169222 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
7779266 to
cf68d86
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 26972 ms |
TPC-DS: Total hot run time: 168566 ms |
|
/review |
There was a problem hiding this comment.
Code Review: feature Implements iceberg update delete merge into
This is a large feature PR adding Iceberg UPDATE/DELETE/MERGE INTO support across FE, BE, and Thrift layers. Below is a checkpoint-by-checkpoint review per the code-review skill.
Critical Checkpoint Conclusions
Goal & Test Coverage: The feature adds position-delete-based UPDATE/DELETE/MERGE INTO for Iceberg tables. The architecture is sound (merge partitioner for dual insert+delete routing, position delete file writer, RowDelta commit). However, regression test coverage is minimal — only a basic MERGE INTO scenario is tested. Standalone DELETE, UPDATE, MERGE edge cases (empty source, all-matched, all-unmatched, WHEN MATCHED THEN DELETE) are missing.
Minimal & Focused: Reasonable for a feature of this scope. Some thrift field duplication (TIcebergMergeSink copies 12 fields from TIcebergTableSink instead of composing).
Concurrency: IcebergTransaction has inconsistent synchronization — commitDataList is synchronized when adding but unsynchronized when reading in getUpdateCnt(), updateManifestAfterDelete(), etc. ConnectContext.icebergRowIdTargetTableId relies on single-threaded planning assumption — should be documented.
Lifecycle: No circular references or SIOF issues found.
Configuration: icebergWriteTargetFileSizeBytes session variable missing description annotation. enableIcebergMergePartitioning defaults to true — verify this is intentional as default-on.
Incompatible Changes: New thrift enum values (ICEBERG_DELETE_SINK, ICEBERG_MERGE_SINK, MERGE_PARTITIONED) require BE upgrade before FE during rolling upgrades.
Parallel Code Paths: SessionVariable.needIcebergRowId is dead code — never set by any production path. Should be removed.
Data Writes: Merge sink approach (position delete + data insert via RowDelta) is architecturally correct. Transaction rollback for DELETE/MERGE is a no-op — written but uncommitted files are not cleaned up.
FE-BE Variable Passing: setIcebergRowIdTargetTableId is called in all command paths (DELETE, UPDATE, MERGE, EXPLAIN). Save/restore pattern is consistent.
Observability: Adequate logging and profile counters in BE sinks.
Issues Found (ordered by severity)
| # | Severity | Location | Description |
|---|---|---|---|
| 1 | CRITICAL | merge_partitioner.cpp:240 |
Self-referential insert_from causes use-after-free for string columns |
| 2 | CRITICAL | merge_partitioner.cpp:245 |
Out-of-bounds access on insert_hashes when _insert_random is true |
| 3 | HIGH | IcebergTransaction.java:161 |
NPE when table has no snapshot (empty table) in beginRewrite() |
| 4 | HIGH | IcebergTransaction.java:261-267 |
Format version v2 check is commented out — operating on v1 tables will fail |
| 5 | HIGH | IcebergRowId.java:42-45 |
createHiddenColumn() returns shared mutable singleton — concurrent mutation risk |
| 6 | MEDIUM | iceberg_partition_function.cpp:209 |
Const columns skipped in hash computation — different const values across blocks route incorrectly |
| 7 | MEDIUM | viceberg_delete_sink.cpp:478-480 |
UUID truncated to 8 chars — collision risk in concurrent environments |
| 8 | MEDIUM | IcebergTransaction.java:523-534 |
getUpdateCnt() returns misleading counts for mixed INSERT+DELETE operations |
| 9 | LOW | Multiple BE files | Missing compile_check_begin.h/compile_check_end.h in new source files |
| 10 | LOW | viceberg_delete_sink.h:112 |
Unused member _writers — dead code |
See inline comments for details on the critical and high severity issues.
| op_values[row] = kUpdateDeleteOperation; | ||
| for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) { | ||
| mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row); | ||
| } |
There was a problem hiding this comment.
CRITICAL: Use-after-free (self-referential insert_from)
When op == kUpdateOperation, this line duplicates a row from mutable_columns[col_idx] back into the same column:
mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row);For ColumnString, insert_from calls chars.resize(new_size) which can reallocate the internal buffer, then reads from src.chars[offset] — but src IS *this, so the source pointer is now dangling. This is a classic self-referential container insertion bug that causes use-after-free / data corruption.
Fix: Clone each row's data into a temporary Block or row-holder first, then append from the temporary. Alternatively, collect all update rows' indices first, then bulk-duplicate them from a snapshot of the original columns.
| op_values[new_row_idx] = kUpdateInsertOperation; | ||
| const uint32_t insert_channel = | ||
| _insert_random ? _next_rr_channel() : insert_hashes[row]; | ||
| _channel_ids.push_back(insert_channel); |
There was a problem hiding this comment.
CRITICAL: Out-of-bounds vector access when _insert_random is true
insert_hashes is only populated when has_insert && !_insert_random (line 171). However, when op == kUpdateOperation, this line accesses insert_hashes[row] unconditionally via the ternary's false branch. If _insert_random is true, insert_hashes is empty, causing undefined behavior (out-of-bounds read).
While the ternary condition _insert_random ? _next_rr_channel() : insert_hashes[row] appears to short-circuit, C++ does NOT short-circuit ternary operands — both branches may be evaluated depending on compiler optimization. Actually, the ternary DOES short-circuit (only the selected branch is evaluated), but this relies on the compiler not speculatively evaluating the false branch. The real concern is: if _insert_random can ever be false for some rows but true for the function as a whole, or if future refactoring changes the branching logic.
Correction: On re-analysis, the C++ ternary operator does guarantee only one branch is evaluated. The condition _insert_random is a member variable that doesn't change during the loop. So if _insert_random is true, insert_hashes[row] is never evaluated. This is technically safe but fragile — consider guarding with a DCHECK(!_insert_random || insert_hashes.empty()) or populating insert_hashes unconditionally.
| @@ -226,6 +249,210 @@ | |||
There was a problem hiding this comment.
HIGH: NullPointerException on empty Iceberg table
table.currentSnapshot() returns null for a newly created table with no data. This line will throw NPE:
this.startingSnapshotId = table.currentSnapshot().snapshotId();Note that getSnapshotIdIfPresent() (line 587) already handles this case correctly. Consider using:
this.startingSnapshotId = getSnapshotIdIfPresent(table);| this.table = IcebergUtils.getIcebergTable(dorisTable); | ||
| this.baseSnapshotId = getSnapshotIdIfPresent(table); | ||
| // // Verify table format version (must be v2+ for delete support) | ||
| // String formatVersionStr = table.properties().get("format-version"); |
There was a problem hiding this comment.
HIGH: Format version v2 check is commented out
Position delete files require Iceberg table format version >= 2. This validation is commented out. Operating against a v1 table will lead to runtime errors from the Iceberg library when attempting to create position delete files, or potentially silent data corruption.
This check should be active, or at minimum there should be a clear comment explaining why it's deferred (e.g., if the Iceberg library itself validates this).
| } | ||
| } | ||
| return dataRows > 0 ? dataRows : deleteRows; | ||
| } |
There was a problem hiding this comment.
MEDIUM: getUpdateCnt() returns misleading row counts
For a MERGE/UPDATE that produces both data files (inserts) and delete files:
return dataRows > 0 ? dataRows : deleteRows;This reports only insert rows for mixed operations. For an UPDATE of 100 rows (which produces 100 position deletes + 100 new inserts), the user sees "100 rows affected" but the 100 deletes are invisible. For a pure DELETE operation, only delete rows are reported. The semantics should be documented, or consider returning Math.max(dataRows, deleteRows) or a separate count for each operation type.
| _partition_fields[i].transformer->apply(*block, results[i]); | ||
| const auto& [column, is_const] = unpack_if_const(transformed.column); | ||
| if (is_const) { | ||
| continue; |
There was a problem hiding this comment.
MEDIUM: Const columns skipped in hash computation
When a partition column is const (same value for all rows in a block), its hash contribution is skipped entirely (continue). If different blocks have different const values for the same partition column, rows that should go to different partitions will hash identically, causing incorrect data routing.
For example: block A has partition_col = 'a' (const), block B has partition_col = 'b' (const). Both blocks will produce the same hash values since the partition column's contribution is skipped.
Consider materializing const columns before hashing, or computing the const value's hash contribution once and applying it to all rows.
| } | ||
|
|
||
| std::string VIcebergDeleteSink::_generate_delete_file_path( | ||
| const std::string& referenced_data_file) { |
There was a problem hiding this comment.
MEDIUM: UUID truncated to 8 characters — collision risk
The generated delete file name uses uuid.substr(0, 8) which is only 8 hex characters (32 bits of entropy). Combined with std::hash % 10000000, there's a non-trivial collision risk in high-throughput concurrent environments where many delete files are written simultaneously.
Consider using the full UUID or at least a significantly longer substring (e.g., 16+ characters).
morningman
left a comment
There was a problem hiding this comment.
The following test scenarios should be added:
- Error handling when executing DELETE/UPDATE on Iceberg v1 tables
- Conflict detection for concurrent DELETE/UPDATE operations
- DELETE/UPDATE with subqueries
- UPDATE SET clause with complex expressions
- MERGE INTO edge cases: matched-only, not-matched-only, multiple WHEN clauses
- DELETE/UPDATE after Schema Evolution
| + "to exclude the impact of dangling delete files."}) | ||
| public boolean ignoreIcebergDanglingDelete = false; | ||
|
|
||
| @VariableMgr.VarAttr(name = ENABLE_ICEBERG_MERGE_PARTITIONING, |
There was a problem hiding this comment.
Remove chinese description
| ) | ||
| public boolean showHiddenColumns = false; | ||
|
|
||
| // 内部变量,保留兼容性;实际由 ConnectContext.needIcebergRowId 控制 |
|
|
||
| if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) { | ||
| SCOPED_TIMER(_write_delete_files_timer); | ||
| RETURN_IF_ERROR(_write_position_delete_files(_file_deletions)); |
There was a problem hiding this comment.
All file deletions will be hold in memory and write to disk until close().
There will be potential memory issue if there are lots for delete files.
| if (targetAlias.isPresent()) { | ||
| targetPlan = new LogicalSubQueryAlias<>(targetAlias.get(), targetPlan); | ||
| } | ||
| return new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN, |
There was a problem hiding this comment.
Consider using INNER JOIN when there is no WHEN NOT MATCHED clause, for better performance.
| return icebergRowIdTargetTableId; | ||
| } | ||
|
|
||
| /** @deprecated Use setIcebergRowIdTargetTableId instead. Kept for backward compat. */ |
There was a problem hiding this comment.
This is a newly added method and marked as @deprecated?
cf68d86 to
ea86acb
Compare
|
run buildall |
ea86acb to
e5b7339
Compare
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
| namespace doris { | ||
|
|
||
| namespace { | ||
| constexpr int8_t kInsertOperation = 1; |
There was a problem hiding this comment.
Both viceberg_merge_sink and merge_partitioner have these definitions; consider whether to combine them. kInsertOperation, _is_delete_op ...
|
|
||
| if (_need_row_id_column) { | ||
| if (auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get())) { | ||
| parquet_reader->set_iceberg_rowid_params(_current_file_path, _partition_spec_id, |
There was a problem hiding this comment.
set_iceberg_rowid_params can called at IcebergParquetReader / IcebergOrcReader init_reader.
| partition_data_json = table_desc.partition_data_json; | ||
| } | ||
|
|
||
| set_current_file_info(file_path, partition_spec_id, partition_data_json); |
There was a problem hiding this comment.
The set_current_file_info method seems somewhat unnecessary.
be/src/format/column_processor.h
Outdated
| /// the value is backfilled at runtime (e.g. Iceberg V3 _row_id). | ||
| /// - INTERNAL: Read from the data file but consumed internally by the TableReader | ||
| /// and removed from the output block (e.g. Hive ACID system columns). | ||
| enum class ColumnCategory { |
| if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params && | ||
| _current_range.table_format_params.table_format_type == "iceberg") { | ||
| if (auto* iceberg_reader = dynamic_cast<IcebergTableReader*>(_cur_reader.get())) { | ||
| iceberg_reader->set_row_id_column_position(_iceberg_rowid_column_pos); |
There was a problem hiding this comment.
move to _get_next_reader or _init_parquet_reader
| if (_t_sink.iceberg_merge_sink.__isset.schema_json) { | ||
| try { | ||
| std::unique_ptr<iceberg::Schema> schema = | ||
| iceberg::SchemaParser::from_json(_t_sink.iceberg_merge_sink.schema_json); |
There was a problem hiding this comment.
It seems that VIcebergTableWriter::open will perform a SchemaParser::from_json check again.
| partition_data_col.reserve(num_rows); | ||
|
|
||
| for (size_t i = 0; i < num_rows; ++i) { | ||
| file_path_col.insert_data(file_path.data(), file_path.size()); |
| if (_position_delete_ordered_rowids == nullptr) { | ||
| return; | ||
| } | ||
| auto start = _row_reader->getRowNumber(); |
There was a problem hiding this comment.
Is this for performance or correctness reasons?
| @Override | ||
| public List<Column> getBaseSchema(boolean full) { | ||
| return getFullSchema(); | ||
| List<Column> schema = getFullSchema(); |
There was a problem hiding this comment.
I don't quite understand this logic. I guess you want to express:
fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
public List<Column> getBaseSchema(boolean full)
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
fd3d9dd to
1d993ac
Compare
|
run buildall |
1 similar comment
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 26802 ms |
TPC-DS: Total hot run time: 169529 ms |
1d993ac to
561d1fd
Compare
|
run buildall |
What problem does this PR solve?
Release note
[feature] (iceberg) Implements iceberg update & delete & merge into.
DELETE,UPDATE, andMERGE INTOtargeting Iceberg tables.IcebergDeleteCommand,IcebergMergeCommand,IcebergUpdateCommand,Logical/PhysicalIcebergDeleteSink,Logical/PhysicalIcebergMergeSink).IcebergTransaction.IcebergConflictDetectionFilterUtils)._file_path,_pos) viaIcebergMetadataColumnto accurately locate deleting records.VIcebergDeleteSinkandVIcebergMergeSinksink operators.VIcebergDeleteFileWritermechanism to support writing Position Delete files natively inside Doris BE.MergePartitionerandIcebergPartitionFunctionto properly align input rows with Iceberg's partitioning constraints during writes.FileScannerandGenericReaderto natively support row-level operations (like generating$row_id), feeding precise coordinates for DML modifications.IcebergDDLAndDMLPlanTest).viceberg_merge_sink_test.cpp,viceberg_delete_sink_test.cpp, etc.).regression-test/suites/external_table_p0/iceberg/dml/*).Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)