Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1763,12 +1763,12 @@ Possible values:
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
Changes the behaviour of object storage cluster function or table.

ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table.

Restrictions:

- Only applied for JOIN subqueries.
- Only if the FROM section uses a object storage cluster function ot table.
- Only if the FROM section uses a object storage cluster function or table.

Possible values:

Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
});
Expand Down
2 changes: 1 addition & 1 deletion src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t

DECLARE_SETTING_ENUM(DistributedProductMode)

/// The setting for executing object storage cluster function ot table JOIN sections.
/// The setting for executing object storage cluster function or table JOIN sections.
enum class ObjectStorageClusterJoinMode : uint8_t
{
LOCAL, /// Convert to local query
Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
if (table_node && table_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
Expand Down
147 changes: 98 additions & 49 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>

Expand Down Expand Up @@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}

bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
{
Expand All @@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito

auto node_type = node->getNodeType();

if (node_type == type)
if (types.contains(node_type))
{
passed_node = node;
passed_type = node_type;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }
std::optional<QueryTreeNodeType> getType() const { return passed_type; }

private:
QueryTreeNodeType type;
std::unordered_set<QueryTreeNodeType> types;
QueryTreeNodePtr passed_node;
std::optional<QueryTreeNodeType> passed_type;
};

/*
Expand Down Expand Up @@ -216,49 +223,70 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto modified_query_tree = query_tree->clone();
bool need_modify = false;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
auto info = getQueryTreeInfo(query_tree, context);

if (has_join)
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
auto modified_query_tree = query_tree->clone();

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
left_table_expression_searcher.visit(modified_query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

QueryTreeNodePtr query_tree_distributed;

auto & query_node = modified_query_tree->as<QueryNode &>();

if (info.has_join)
{
auto join_node = query_node.getJoinTree();
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
}
else
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node");
query_tree_distributed = cross_join_node->as<CrossJoinNode>()->getTableExpressions()[0]->clone();
}

// Find add used columns from table function to make proper projection list
// Need to do before changing WHERE condition
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
collector.visit(query_tree);
collector.visit(modified_query_tree);
const auto & columns = collector.getColumns();

auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.resolveProjectionColumns(columns);
auto column_nodes_to_select = std::make_shared<ListNode>();
column_nodes_to_select->getNodes().reserve(columns.size());
for (auto & column : columns)
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
query_node.getProjectionNode() = column_nodes_to_select;

// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
if (info.has_local_columns_in_where)
{
if (query_node.getPrewhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
if (query_node.getWhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
}

need_modify = true;
}
query_node.getOrderByNode() = std::make_shared<ListNode>();
query_node.getGroupByNode() = std::make_shared<ListNode>();

if (has_local_columns_in_where)
{
auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.getWhere() = {};
}
if (query_tree_distributed)
{
// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
}

if (need_modify)
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
Expand Down Expand Up @@ -492,38 +520,59 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
pipeline.init(std::move(pipe));
}

QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
QueryTreeInfo info;

if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context);
join_searcher.visit(query_tree);

SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
join_searcher.visit(query_info.query_tree);
if (join_searcher.getNode())
has_join = true;
if (join_searcher.getNode())
{
if (join_searcher.getType() == QueryTreeNodeType::JOIN)
info.has_join = true;
else
info.has_cross_join = true;
}

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_info.query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
left_table_expression_searcher.visit(query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");

auto & query_node = query_tree->as<QueryNode &>();
if (query_node.hasWhere() || query_node.hasPrewhere())
{
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand CollectUsedColumnsForSourceVisitor is meant to collect all columns for a given "source". As I understand, the source is provided in the constructor. But what makes it local? Doesn't the query_tree object you are passing represent the entire query tree? I don't get it, please educate me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, query_tree contains all, But in different places.
Columns can be in selected list, in where condition, in join condition, etc.

SELECT table1.column1,... FROM table1 JOIN table2 ON table1.column2=table2.column2 WHERE table1.column3=...

and collector traverses the tree and collect all cases in single list.
Need to all these columns to select list for left table subquery.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the columns can be in different places, but let's say there are no local columns. Wouldn't collector_where.getColumns() be non-empty regardless? Btw, what makes a table local or not local assuming you can't check for its existence on the remote node?

auto & query_node = query_info.query_tree->as<QueryNode &>();
if (query_node.hasPrewhere())
collector_where.visit(query_node.getPrewhere());
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// Can't use 'WHERE' on remote node if it contains columns from other sources
// SELECT x FROM datalake.table WHERE x IN local.table.
// Need to modify 'WHERE' on remote node if it contains columns from other sources
// because remote node might not have those sources.
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
info.has_local_columns_in_where = true;
}

return info;
}

QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];

if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

if (has_join || has_local_columns_in_where)
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}

Expand Down
10 changes: 8 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ class IStorageCluster : public IStorage
LoggerPtr log;
String cluster_name;

mutable bool has_join = false;
mutable bool has_local_columns_in_where = false;
struct QueryTreeInfo
{
bool has_join = false;
bool has_cross_join = false;
bool has_local_columns_in_where = false;
};

static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context);
};


Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ Chunk StorageObjectStorageSource::generate()
{
chunk.addColumn(constant_column.first,
constant_column.second.name_and_type.type->createColumnConst(
chunk.getNumRows(), constant_column.second.value));
chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst());
}

#if USE_PARQUET && USE_AWS_S3
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query)
return table_expression ? table_expression->table_function : nullptr;
}

ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query)
{
auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query);
return table_expression ? table_expression->database_and_table_name : nullptr;
}

ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
{
auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct ASTTableExpression;

ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query);
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);

Expand Down
12 changes: 12 additions & 0 deletions tests/integration/test_database_iceberg/configs/cluster.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>
Loading
Loading