Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6097651
tmp
arthurpassos Oct 7, 2025
43e9459
tmp2 - just in case of disaster recovery
arthurpassos Oct 9, 2025
659b309
able to export partition using two different replicas and upload comm…
arthurpassos Oct 10, 2025
d3bb820
checkpoint
arthurpassos Oct 13, 2025
444e0ee
some changes
arthurpassos Oct 13, 2025
35c6cca
add a silly test
arthurpassos Oct 13, 2025
b884fd3
hold parts references to prevent deletion
arthurpassos Oct 13, 2025
c7493cb
fix a few tests
arthurpassos Oct 13, 2025
f4f9d52
try to fix integ test failure and fix failure handling
arthurpassos Oct 14, 2025
91c7ec2
a few fixes
arthurpassos Oct 14, 2025
69cd83f
make dest storage id part of the key
arthurpassos Oct 15, 2025
62cb51f
add system.replicated_partition_exports
arthurpassos Oct 16, 2025
54c2dfb
add exception to replicated systems table
arthurpassos Oct 17, 2025
7b3a7c9
add the replica that caused the exception
arthurpassos Oct 17, 2025
3f3983c
export_merge_tree_partition_force_export
arthurpassos Oct 17, 2025
b89cd5e
almost done with kill export partition
arthurpassos Oct 21, 2025
bb04fd9
working kill export, update next idx upon lock and lock as many parts…
arthurpassos Oct 22, 2025
55e7b94
fix conflicts
arthurpassos Oct 22, 2025
abe14f3
rmv from system.exports
arthurpassos Oct 23, 2025
e225798
add no fasttest
arthurpassos Oct 23, 2025
0ca5e28
some adjustments
arthurpassos Oct 23, 2025
63c48ce
silly change to force cicd rebuild
arthurpassos Oct 23, 2025
ee00ebb
remove kind of dead code
arthurpassos Oct 28, 2025
5c61bd6
small tweaks for demo
arthurpassos Oct 28, 2025
d609d04
todo comment
arthurpassos Oct 28, 2025
e2b221a
Merge branch 'antalya-25.8' into export_replicated_mt_partition
arthurpassos Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ enum class AccessType : uint8_t
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PARTITION, "ALTER EXPORT PARTITION, EXPORT PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
Expand Down
2 changes: 1 addition & 1 deletion src/Common/ZooKeeper/ZooKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ class ZooKeeper
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);

using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, false>;

using MultiTryGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, true>;

template <typename TIter>
MultiGetChildrenResponse
getChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6873,6 +6873,9 @@ Use roaring bitmap for iceberg positional deletes.
)", 0) \
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
\
/* ####################################################### */ \
Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::EXPORT_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, command.to_database, command.to_table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::FETCH_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);
Expand Down
74 changes: 74 additions & 0 deletions src/Interpreters/InterpreterKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,77 @@ BlockIO InterpreterKillQueryQuery::execute()

break;
}
case ASTKillQueryQuery::Type::ExportPartition:
{
Block exports_block = getSelectResult(
"source_database, source_table, transaction_id, destination_database, destination_table, partition_id",
"system.replicated_partition_exports");
if (exports_block.empty())
return res_io;

const ColumnString & src_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_database").column);
const ColumnString & src_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_table").column);
const ColumnString & dst_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_database").column);
const ColumnString & dst_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_table").column);
const ColumnString & tx_col = typeid_cast<const ColumnString &>(*exports_block.getByName("transaction_id").column);

auto header = exports_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});

MutableColumns res_columns = header.cloneEmptyColumns();
auto table_id = StorageID::createEmpty();
AccessRightsElements required_access_rights;
auto access = getContext()->getAccess();
bool access_denied = false;

for (size_t i = 0; i < exports_block.rows(); ++i)
{
const auto src_database = src_db_col.getDataAt(i).toString();
const auto src_table = src_tbl_col.getDataAt(i).toString();
const auto dst_database = dst_db_col.getDataAt(i).toString();
const auto dst_table = dst_tbl_col.getDataAt(i).toString();

table_id = StorageID{src_database, src_table};
auto transaction_id = tx_col.getDataAt(i).toString();

CancellationCode code = CancellationCode::Unknown;
if (!query.test)
{
auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (!storage)
code = CancellationCode::NotFound;
else
{
ASTAlterCommand alter_command{};
alter_command.type = ASTAlterCommand::EXPORT_PARTITION;
alter_command.move_destination_type = DataDestinationType::TABLE;
alter_command.from_database = src_database;
alter_command.from_table = src_table;
alter_command.to_database = dst_database;
alter_command.to_table = dst_table;

required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand(
alter_command, table_id.database_name, table_id.table_name);
if (!access->isGranted(required_access_rights))
{
access_denied = true;
continue;
}
code = storage->killExportPartition(transaction_id);
}
}

insertResultRow(i, code, exports_block, header, res_columns);
}

if (res_columns[0]->empty() && access_denied)
throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to kill export partition. "
"To execute this query, it's necessary to have the grant {}", required_access_rights.toString());

res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(std::make_shared<const Block>(header.cloneWithColumns(std::move(res_columns))))));

break;
}
case ASTKillQueryQuery::Type::Mutation:
{
Block mutations_block = getSelectResult("database, table, mutation_id, command", "system.mutations");
Expand Down Expand Up @@ -462,6 +533,9 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster
| AccessType::ALTER_MATERIALIZE_COLUMN
| AccessType::ALTER_MATERIALIZE_TTL
);
/// todo arthur think about this
else if (query.type == ASTKillQueryQuery::Type::ExportPartition)
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION);
return required_access;
}

Expand Down
11 changes: 11 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,17 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
}

}
else if (type == ASTAlterCommand::EXPORT_PARTITION)
{
ostr << "EXPORT PARTITION ";
partition->format(ostr, settings, state, frame);
ostr << " TO TABLE ";
if (!to_database.empty())
{
ostr << backQuoteIfNeed(to_database) << ".";
}
ostr << backQuoteIfNeed(to_table);
}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ASTAlterCommand : public IAST
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,
EXPORT_PARTITION,

DELETE,
UPDATE,
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void ASTKillQueryQuery::formatQueryImpl(WriteBuffer & ostr, const FormatSettings
case Type::Transaction:
ostr << "TRANSACTION";
break;
case Type::ExportPartition:
ostr << "EXPORT PARTITION";
break;
}

formatOnCluster(ostr, settings);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTKillQueryQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluste
{
Query, /// KILL QUERY
Mutation, /// KILL MUTATION
ExportPartition, /// KILL EXPORT_PARTITION
PartMoveToShard, /// KILL PART_MOVE_TO_SHARD
Transaction, /// KILL TRANSACTION
};
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ namespace DB
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
17 changes: 17 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_part(Keyword::EXPORT_PART);
ParserKeyword s_export_partition(Keyword::EXPORT_PARTITION);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -553,6 +554,22 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_export_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PARTITION;

if (!s_to_table.ignore(pos, expected))
{
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_move_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ParserKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword p_kill{Keyword::KILL};
ParserKeyword p_query{Keyword::QUERY};
ParserKeyword p_mutation{Keyword::MUTATION};
ParserKeyword p_export_partition{Keyword::EXPORT_PARTITION};
ParserKeyword p_part_move_to_shard{Keyword::PART_MOVE_TO_SHARD};
ParserKeyword p_transaction{Keyword::TRANSACTION};
ParserKeyword p_on{Keyword::ON};
Expand All @@ -33,6 +34,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
query->type = ASTKillQueryQuery::Type::Query;
else if (p_mutation.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::Mutation;
else if (p_export_partition.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::ExportPartition;
else if (p_part_move_to_shard.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::PartMoveToShard;
else if (p_transaction.ignore(pos, expected))
Expand Down
68 changes: 68 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#pragma once

#include <base/types.h>
#include <Interpreters/StorageID.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Parser.h>

namespace DB
{

struct ExportReplicatedMergeTreePartitionManifest
{
String transaction_id;
String partition_id;
String destination_database;
String destination_table;
String source_replica;
size_t number_of_parts;
std::vector<String> parts;
time_t create_time;

std::string toJsonString() const
{
Poco::JSON::Object json;
json.set("transaction_id", transaction_id);
json.set("partition_id", partition_id);
json.set("destination_database", destination_database);
json.set("destination_table", destination_table);
json.set("source_replica", source_replica);
json.set("number_of_parts", number_of_parts);

Poco::JSON::Array::Ptr parts_array = new Poco::JSON::Array();
for (const auto & part : parts)
parts_array->add(part);
json.set("parts", parts_array);

json.set("create_time", create_time);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

static ExportReplicatedMergeTreePartitionManifest fromJsonString(const std::string & json_string)
{
Poco::JSON::Parser parser;
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();
chassert(json);

ExportReplicatedMergeTreePartitionManifest manifest;
manifest.transaction_id = json->getValue<String>("transaction_id");
manifest.partition_id = json->getValue<String>("partition_id");
manifest.destination_database = json->getValue<String>("destination_database");
manifest.destination_table = json->getValue<String>("destination_table");
manifest.source_replica = json->getValue<String>("source_replica");
manifest.number_of_parts = json->getValue<size_t>("number_of_parts");

auto parts_array = json->getArray("parts");
for (size_t i = 0; i < parts_array->size(); ++i)
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));

manifest.create_time = json->getValue<time_t>("create_time");
return manifest;
}
};

}
21 changes: 21 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>

namespace DB
{
struct ExportReplicatedMergeTreePartitionTaskEntry
{
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
ExportReplicatedMergeTreePartitionManifest manifest;

std::size_t parts_to_do;
/// References to the parts that should be exported
/// This is used to prevent the parts from being deleted before finishing the export operation
/// It does not mean this replica will export all the parts
/// There is also a chance this replica does not contain a given part and it is totally ok.
std::vector<DataPartPtr> part_references;
};

}
5 changes: 5 additions & 0 deletions src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ CancellationCode IStorage::killPartMoveToShard(const UUID & /*task_uuid*/)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName());
}

CancellationCode IStorage::killExportPartition(const String & /*transaction_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Export partition is not supported by storage {}", getName());
}

StorageID IStorage::getStorageID() const
{
std::lock_guard lock(id_mutex);
Expand Down
12 changes: 12 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,15 @@ It is currently only implemented in StorageObjectStorage.
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}

virtual void commitExportPartitionTransaction(
const String & /* transaction_id */,
const String & /* partition_id */,
const Strings & /* exported_paths */,
ContextPtr /* local_context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "commitExportPartitionTransaction is not implemented for storage type {}", getName());
}


/** Writes the data to a table in distributed manner.
Expand Down Expand Up @@ -582,6 +591,9 @@ It is currently only implemented in StorageObjectStorage.

virtual void setMutationCSN(const String & /*mutation_id*/, UInt64 /*csn*/);

/// Cancel a replicated partition export by transaction id.
virtual CancellationCode killExportPartition(const String & /*transaction_id*/);

/// Cancel a part move to shard.
virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/);

Expand Down
8 changes: 8 additions & 0 deletions src/Storages/MergeTree/ExportList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ UInt64 ExportsListElement::getPeakMemoryUsage() const
return thread_group->memory_tracker.getPeak();
}

void ExportsList::remove(const StorageID & source_table_id, const StorageID & destination_table_id, const String & part_name)
{
std::erase_if(entries, [source_table_id, destination_table_id, part_name](const auto & entry)
{
return entry.source_table_id == source_table_id && entry.destination_table_id == destination_table_id && entry.part_name == part_name;
});
}

}
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ExportList.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class ExportsList final : public BackgroundProcessList<ExportsListElement, Expor
ExportsList()
: Parent(CurrentMetrics::Export)
{}

void remove(const StorageID & source_table_id, const StorageID & destination_table_id, const String & part_name);
};

using ExportsListEntry = BackgroundProcessListEntry<ExportsListElement, ExportInfo>;
Expand Down
Loading
Loading