Skip to content

[Draft] Export MergeTree part to Parquet #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: antalya
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
}
break;
}
case ASTAlterCommand::EXPORT_PART:
{
required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::REPLACE_PARTITION:
{
required_access.emplace_back(AccessType::SELECT, command.from_database, command.from_table);
Expand Down
23 changes: 23 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
ostr << quoteString(move_destination_name);
}
}
else if (type == ASTAlterCommand::EXPORT_PART)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(ostr, settings, state, frame);
ostr << " TO ";
switch (move_destination_type)
{
case DataDestinationType::TABLE:
ostr << "TABLE ";
if (!to_database.empty())
{
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table)
<< (settings.hilite ? hilite_none : "");
return;
default:
break;
}

}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST
FREEZE_ALL,
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,

DELETE,
UPDATE,
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
18 changes: 18 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION);
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_part(Keyword::EXPORT_PARTITION);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -554,6 +555,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<const String &>();
}
}
else if (s_export_part.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PART;
// command->part = true;

if (!s_to_table.ignore(pos, expected))
{
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_add_constraint.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))
Expand Down
60 changes: 34 additions & 26 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,10 @@
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
Expand All @@ -45,46 +31,62 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTAlterQuery.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/Statistics/ConditionSelectivityEstimator.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/Statistics/ConditionSelectivityEstimator.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>

#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
Expand Down Expand Up @@ -5594,6 +5596,12 @@ Pipe MergeTreeData::alterPartition(
}
break;

case PartitionCommand::EXPORT_PART:
{
exportPartitionToTable(command, query_context);
break;
}

case PartitionCommand::DROP_DETACHED_PARTITION:
dropDetached(command.partition, command.part, query_context);
break;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,8 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Moves partition to specified Table
void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context);

virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "export not implemented");}

/// Checks that Partition could be dropped right now
/// Otherwise - throws an exception with detailed information.
/// We do not use mutex because it is not very important that the size could change during the operation.
Expand Down
72 changes: 72 additions & 0 deletions src/Storages/MergeTree/exportMTPartToStorage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/exportMTPartToStorage.h>


namespace DB
{

void exportMTPartToStorage(const MergeTreeData & source_data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context)
{
auto metadata_snapshot = source_data.getInMemoryMetadataPtr();
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
StorageSnapshotPtr storage_snapshot = source_data.getStorageSnapshot(metadata_snapshot, context);

MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = data_part->getMetadataVersion(),
};

auto mutations_snapshot = source_data.getMutationsSnapshot(params);

auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
data_part,
mutations_snapshot,
metadata_snapshot,
context);

QueryPlan plan;

// todoa arthur
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Merge;

bool apply_deleted_mask = true;
bool read_with_direct_io = false;
bool prefetch = false;

createReadFromPartStep(
read_type,
plan,
source_data,
storage_snapshot,
data_part,
alter_conversions,
columns_to_read,
nullptr,
apply_deleted_mask,
std::nullopt,
read_with_direct_io,
prefetch,
context,
getLogger("abcde"));

auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings);

QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));

pipeline.complete(std::move(dst_storage_sink));

CompletedPipelineExecutor executor(pipeline);
executor.execute();
}

}
10 changes: 10 additions & 0 deletions src/Storages/MergeTree/exportMTPartToStorage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <Storages/MergeTree/MergeTreeData.h>

namespace DB
{

void exportMTPartToStorage(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context);

}
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(con
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings[Setting::s3_truncate_on_insert],
.create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert],
. create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert],
.schema_inference_use_cache = settings[Setting::schema_inference_use_cache_for_s3],
.schema_inference_mode = settings[Setting::schema_inference_mode],
.skip_empty_files = settings[Setting::s3_skip_empty_files],
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ SinkToStoragePtr StorageObjectStorage::write(
configuration->getPath());
}

// todo arthur continue from here
if (configuration->withGlobsIgnorePartitionWildcard())
{
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
Expand Down
11 changes: 11 additions & 0 deletions src/Storages/PartitionCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.part = command_ast->part;
return res;
}
if (command_ast->type == ASTAlterCommand::EXPORT_PART)
{
PartitionCommand res;
res.type = EXPORT_PART;
res.partition = command_ast->partition->clone();
res.part = command_ast->part;
res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE;
res.to_database = command_ast->to_database;
res.to_table = command_ast->to_table;
return res;
}
if (command_ast->type == ASTAlterCommand::MOVE_PARTITION)
{
PartitionCommand res;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/PartitionCommands.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct PartitionCommand
UNFREEZE_ALL_PARTITIONS,
UNFREEZE_PARTITION,
REPLACE_PARTITION,
EXPORT_PART,
};

Type type = UNKNOWN;
Expand Down
38 changes: 38 additions & 0 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include <Common/ProfileEventsScope.h>
#include <Common/escapeForFileName.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/MergeTree/exportMTPartToStorage.h>


namespace DB
Expand Down Expand Up @@ -2447,6 +2449,42 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
}
}

/*
* For now, this function is meant to be used when exporting to different formats (i.e, the case where data needs to be re-encoded / serialized)
* For the cases where this is not necessary, there are way more optimal ways of doing that, such as hard links implemented by `movePartitionToTable`
* */
void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context)
{
String dest_database = query_context->resolveDatabase(command.to_database);
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);

/// The target table and the source table are the same.
if (dest_storage->getStorageID() == this->getStorageID())
return;

bool async_insert = areAsynchronousInsertsEnabled();

auto query = std::make_shared<ASTInsertQuery>();

String partition_id = getPartitionIDFromQuery(command.partition, getContext());
auto src_parts = getVisibleDataPartsVectorInPartition(getContext(), partition_id);

if (src_parts.empty())
{
return;
}

auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto lock2 = dest_storage->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto merges_blocker = stopMergesAndWait();

for (const auto & data_part : src_parts)
{
auto sink = dest_storage->write(query, getInMemoryMetadataPtr(), getContext(), async_insert);
exportMTPartToStorage(*this, data_part, sink, query_context);
}
}

ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class StorageMergeTree final : public MergeTreeData

void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) override;
void exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) override;
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Update mutation entries after part mutation execution. May reset old
/// errors if mutation was successful. Otherwise update last_failed* fields
Expand Down
Loading