Skip to content

Commit d449a9a

Browse files
committed
Autodetect table to write in Merge engine
1 parent 0aa3def commit d449a9a

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed

src/Storages/StorageMerge.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ StorageMerge::StorageMerge(
143143
bool database_is_regexp_,
144144
const DBToTableSetMap & source_databases_and_tables_,
145145
const std::optional<String> & table_to_write_,
146+
bool table_to_write_auto_,
146147
ContextPtr context_)
147148
: IStorage(table_id_)
148149
, WithContext(context_->getGlobalContext())
@@ -151,6 +152,7 @@ StorageMerge::StorageMerge(
151152
database_is_regexp_,
152153
source_database_name_or_regexp_, {},
153154
source_databases_and_tables_)
155+
, table_to_write_auto(table_to_write_auto_)
154156
{
155157
StorageInMemoryMetadata storage_metadata;
156158
storage_metadata.setColumns(columns_.empty()
@@ -159,7 +161,8 @@ StorageMerge::StorageMerge(
159161
storage_metadata.setComment(comment);
160162
setInMemoryMetadata(storage_metadata);
161163
setVirtuals(createVirtuals());
162-
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
164+
if (!table_to_write_auto)
165+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
163166
}
164167

165168
StorageMerge::StorageMerge(
@@ -170,6 +173,7 @@ StorageMerge::StorageMerge(
170173
bool database_is_regexp_,
171174
const String & source_table_regexp_,
172175
const std::optional<String> & table_to_write_,
176+
bool table_to_write_auto_,
173177
ContextPtr context_)
174178
: IStorage(table_id_)
175179
, WithContext(context_->getGlobalContext())
@@ -178,6 +182,7 @@ StorageMerge::StorageMerge(
178182
database_is_regexp_,
179183
source_database_name_or_regexp_,
180184
source_table_regexp_, {})
185+
, table_to_write_auto(table_to_write_auto_)
181186
{
182187
StorageInMemoryMetadata storage_metadata;
183188
storage_metadata.setColumns(columns_.empty()
@@ -186,7 +191,8 @@ StorageMerge::StorageMerge(
186191
storage_metadata.setComment(comment);
187192
setInMemoryMetadata(storage_metadata);
188193
setVirtuals(createVirtuals());
189-
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
194+
if (!table_to_write_auto)
195+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
190196
}
191197

192198
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
@@ -277,6 +283,29 @@ void StorageMerge::forEachTable(F && func) const
277283
});
278284
}
279285

286+
template <typename F>
287+
void StorageMerge::forEachTableName(F && func) const
288+
{
289+
auto database_table_iterators = database_name_or_regexp.getDatabaseIterators(getContext());
290+
291+
for (auto & iterator : database_table_iterators)
292+
{
293+
while (iterator->isValid())
294+
{
295+
const auto & table = iterator->table();
296+
if (table.get() != this)
297+
{
298+
QualifiedTableName table_name;
299+
table_name.database = iterator->databaseName();
300+
table_name.table = iterator->name();
301+
func(table_name);
302+
}
303+
304+
iterator->next();
305+
}
306+
}
307+
}
308+
280309
bool StorageMerge::isRemote() const
281310
{
282311
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table && table->isRemote(); });
@@ -1717,6 +1746,18 @@ SinkToStoragePtr StorageMerge::write(
17171746
ContextPtr context_,
17181747
bool async_insert)
17191748
{
1749+
if (table_to_write_auto)
1750+
{
1751+
table_to_write = std::nullopt;
1752+
forEachTableName([&](const auto & table_name)
1753+
{
1754+
if (!table_to_write.has_value())
1755+
table_to_write = table_name;
1756+
else if (table_to_write->getFullName() < table_name.getFullName())
1757+
table_to_write = table_name;
1758+
});
1759+
}
1760+
17201761
if (!table_to_write.has_value())
17211762
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not allowed in storage {} without described table to write.", getName());
17221763

@@ -1753,14 +1794,23 @@ void registerStorageMerge(StorageFactory & factory)
17531794
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
17541795

17551796
std::optional<String> table_to_write = std::nullopt;
1797+
bool table_to_write_auto = false;
17561798
if (size == 3)
17571799
{
1800+
bool is_identifier = engine_args[2]->as<ASTIdentifier>();
17581801
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
17591802
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
1803+
if (is_identifier && table_to_write == "auto")
1804+
{
1805+
if (is_regexp)
1806+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RegExp for database with auto table_to_write is forbidden.");
1807+
table_to_write_auto = true;
1808+
}
17601809
}
17611810

17621811
return std::make_shared<StorageMerge>(
1763-
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, table_to_write, args.getContext());
1812+
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp,
1813+
table_name_regexp, table_to_write, table_to_write_auto, args.getContext());
17641814
},
17651815
{
17661816
.supports_schema_inference = true

src/Storages/StorageMerge.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class StorageMerge final : public IStorage, WithContext
3131
bool database_is_regexp_,
3232
const DBToTableSetMap & source_databases_and_tables_,
3333
const std::optional<String> & table_to_write_,
34+
bool table_to_write_auto_,
3435
ContextPtr context_);
3536

3637
StorageMerge(
@@ -41,6 +42,7 @@ class StorageMerge final : public IStorage, WithContext
4142
bool database_is_regexp_,
4243
const String & source_table_regexp_,
4344
const std::optional<String> & table_to_write_,
45+
bool table_to_write_auto_,
4446
ContextPtr context_);
4547

4648
std::string getName() const override { return "Merge"; }
@@ -128,13 +130,17 @@ class StorageMerge final : public IStorage, WithContext
128130
DatabaseNameOrRegexp database_name_or_regexp;
129131

130132
std::optional<QualifiedTableName> table_to_write;
133+
bool table_to_write_auto = false;
131134

132135
template <typename F>
133136
StoragePtr getFirstTable(F && predicate) const;
134137

135138
template <typename F>
136139
void forEachTable(F && func) const;
137140

141+
template <typename F>
142+
void forEachTableName(F && func) const;
143+
138144
ColumnSizeByName getColumnSizes() const override;
139145

140146
ColumnsDescription getColumnsDescriptionFromSourceTables(size_t max_tables_to_look) const;

src/TableFunctions/TableFunctionMerge.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, Cont
200200
database_is_regexp,
201201
getSourceDatabasesAndTables(context),
202202
table_to_write,
203+
false,
203204
context);
204205

205206
res->startup();

tests/queries/0_stateless/03373_write_to_merge_table.reference

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,28 @@
1717
2 1
1818
2 2
1919
2 3
20+
1 1
21+
2 1
22+
2 2
23+
2 3
24+
4
25+
2 1
26+
2 2
27+
2 3
28+
3 1
29+
4
30+
2 1
31+
2 2
32+
2 3
33+
3 1
34+
1
35+
3 2
36+
4
37+
2 1
38+
2 2
39+
2 3
40+
3 1
41+
0
42+
2
43+
3 2
44+
3 3

tests/queries/0_stateless/03373_write_to_merge_table.sql

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
-- Tags: no-parallel
2+
13
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
24
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
5+
DROP TABLE IF EXISTS test03373_db.test03373_table_3;
6+
DROP TABLE IF EXISTS test03373_db.test03373_table_4;
37
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
48
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
59
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
610
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
11+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_auto;
712
DROP DATABASE IF EXISTS test03373_db;
813

914
CREATE DATABASE test03373_db;
@@ -17,6 +22,8 @@ CREATE TABLE test03373_db.test03373_merge_wr_1 (key UInt32, value UInt32) ENGINE
1722
CREATE TABLE test03373_db.test03373_merge_wr_2 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_db.test03373_table_2);
1823
CREATE TABLE test03373_db.test03373_merge_wr_3 (key UInt32, value UInt32) ENGINE=Merge(REGEXP('test03373_.*'), 'test03373_table_\d+', test03373_db.test03373_table_2);
1924

25+
CREATE TABLE test03373_db.test03373_merge_wr_auto (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', auto);
26+
2027
INSERT INTO test03373_db.test03373_table_1 VALUES (1,1);
2128

2229
INSERT INTO test03373_db.test03373_merge_wr_1 VALUES (2,1);
@@ -30,10 +37,38 @@ SELECT * FROM test03373_db.test03373_merge_wr_1 ORDER BY key, value;
3037
SELECT * FROM test03373_db.test03373_merge_wr_2 ORDER BY key, value;
3138
SELECT * FROM test03373_db.test03373_merge_wr_3 ORDER BY key, value;
3239

40+
SELECT * FROM test03373_db.test03373_merge_wr_auto ORDER BY key, value;
41+
42+
-- insert into test03373_table_2
43+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,1);
44+
SELECT count() FROM test03373_db.test03373_table_2;
45+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
46+
47+
CREATE TABLE test03373_db.test03373_table_4 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
48+
-- insert into test03373_table_4
49+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,2);
50+
SELECT count() FROM test03373_db.test03373_table_2;
51+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
52+
SELECT count() FROM test03373_db.test03373_table_4;
53+
SELECT * FROM test03373_db.test03373_table_4 ORDER BY key, value;
54+
55+
CREATE TABLE test03373_db.test03373_table_3 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
56+
-- insert into test03373_table_4
57+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,3);
58+
SELECT count() FROM test03373_db.test03373_table_2;
59+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
60+
SELECT count() FROM test03373_db.test03373_table_3;
61+
SELECT * FROM test03373_db.test03373_table_3 ORDER BY key, value;
62+
SELECT count() FROM test03373_db.test03373_table_4;
63+
SELECT * FROM test03373_db.test03373_table_4 ORDER BY key, value;
64+
3365
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
3466
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
67+
DROP TABLE IF EXISTS test03373_db.test03373_table_3;
68+
DROP TABLE IF EXISTS test03373_db.test03373_table_4;
3569
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
3670
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
3771
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
3872
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
73+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_auto;
3974
DROP DATABASE IF EXISTS test03373_db;

0 commit comments

Comments
 (0)