Skip to content

Commit 0aa3def

Browse files
committed
Write to Merge storage
1 parent 024a0b4 commit 0aa3def

File tree

5 files changed

+130
-4
lines changed

5 files changed

+130
-4
lines changed

src/Storages/StorageMerge.cpp

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ StorageMerge::StorageMerge(
142142
const String & source_database_name_or_regexp_,
143143
bool database_is_regexp_,
144144
const DBToTableSetMap & source_databases_and_tables_,
145+
const std::optional<String> & table_to_write_,
145146
ContextPtr context_)
146147
: IStorage(table_id_)
147148
, WithContext(context_->getGlobalContext())
@@ -158,6 +159,7 @@ StorageMerge::StorageMerge(
158159
storage_metadata.setComment(comment);
159160
setInMemoryMetadata(storage_metadata);
160161
setVirtuals(createVirtuals());
162+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
161163
}
162164

163165
StorageMerge::StorageMerge(
@@ -167,6 +169,7 @@ StorageMerge::StorageMerge(
167169
const String & source_database_name_or_regexp_,
168170
bool database_is_regexp_,
169171
const String & source_table_regexp_,
172+
const std::optional<String> & table_to_write_,
170173
ContextPtr context_)
171174
: IStorage(table_id_)
172175
, WithContext(context_->getGlobalContext())
@@ -183,6 +186,7 @@ StorageMerge::StorageMerge(
183186
storage_metadata.setComment(comment);
184187
setInMemoryMetadata(storage_metadata);
185188
setVirtuals(createVirtuals());
189+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
186190
}
187191

188192
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
@@ -1683,6 +1687,44 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
16831687
return first_table ? std::nullopt : std::make_optional(total_rows_or_bytes);
16841688
}
16851689

1690+
void StorageMerge::setTableToWrite(
1691+
const std::optional<String> & table_to_write_,
1692+
const String & source_database_name_or_regexp_,
1693+
bool database_is_regexp_)
1694+
{
1695+
if (!table_to_write_.has_value())
1696+
{
1697+
table_to_write = std::nullopt;
1698+
return;
1699+
}
1700+
1701+
auto qualified_name = QualifiedTableName::parseFromString(*table_to_write_);
1702+
1703+
if (qualified_name.database.empty())
1704+
{
1705+
if (database_is_regexp_)
1706+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument 'table_to_write' must contain database if 'db_name' is regular expression.");
1707+
1708+
qualified_name.database = source_database_name_or_regexp_;
1709+
}
1710+
1711+
table_to_write = qualified_name;
1712+
}
1713+
1714+
SinkToStoragePtr StorageMerge::write(
1715+
const ASTPtr & query,
1716+
const StorageMetadataPtr & metadata_snapshot,
1717+
ContextPtr context_,
1718+
bool async_insert)
1719+
{
1720+
if (!table_to_write.has_value())
1721+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not allowed in storage {} without described table to write.", getName());
1722+
1723+
auto database = DatabaseCatalog::instance().getDatabase(table_to_write->database);
1724+
auto table = database->getTable(table_to_write->table, context_);
1725+
return table->write(query, metadata_snapshot, context_, async_insert);
1726+
}
1727+
16861728
void registerStorageMerge(StorageFactory & factory)
16871729
{
16881730
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
@@ -1693,10 +1735,12 @@ void registerStorageMerge(StorageFactory & factory)
16931735

16941736
ASTs & engine_args = args.engine_args;
16951737

1696-
if (engine_args.size() != 2)
1738+
size_t size = engine_args.size();
1739+
1740+
if (size < 2 || size > 3)
16971741
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
1698-
"Storage Merge requires exactly 2 parameters - name "
1699-
"of source database and regexp for table names.");
1742+
"Storage Merge requires 2 or 3 parameters - name "
1743+
"of source database, regexp for table names, and optional table name for writing.");
17001744

17011745
auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(engine_args[0], args.getLocalContext());
17021746

@@ -1708,8 +1752,15 @@ void registerStorageMerge(StorageFactory & factory)
17081752
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
17091753
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
17101754

1755+
std::optional<String> table_to_write = std::nullopt;
1756+
if (size == 3)
1757+
{
1758+
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
1759+
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
1760+
}
1761+
17111762
return std::make_shared<StorageMerge>(
1712-
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getContext());
1763+
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, table_to_write, args.getContext());
17131764
},
17141765
{
17151766
.supports_schema_inference = true

src/Storages/StorageMerge.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class StorageMerge final : public IStorage, WithContext
3030
const String & source_database_name_or_regexp_,
3131
bool database_is_regexp_,
3232
const DBToTableSetMap & source_databases_and_tables_,
33+
const std::optional<String> & table_to_write_,
3334
ContextPtr context_);
3435

3536
StorageMerge(
@@ -39,6 +40,7 @@ class StorageMerge final : public IStorage, WithContext
3940
const String & source_database_name_or_regexp_,
4041
bool database_is_regexp_,
4142
const String & source_table_regexp_,
43+
const std::optional<String> & table_to_write_,
4244
ContextPtr context_);
4345

4446
std::string getName() const override { return "Merge"; }
@@ -70,6 +72,12 @@ class StorageMerge final : public IStorage, WithContext
7072
size_t max_block_size,
7173
size_t num_streams) override;
7274

75+
SinkToStoragePtr write(
76+
const ASTPtr & query,
77+
const StorageMetadataPtr & metadata_snapshot,
78+
ContextPtr context,
79+
bool async_insert) override;
80+
7381
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
7482

7583
/// you need to add and remove columns in the sub-tables manually
@@ -119,6 +127,8 @@ class StorageMerge final : public IStorage, WithContext
119127

120128
DatabaseNameOrRegexp database_name_or_regexp;
121129

130+
std::optional<QualifiedTableName> table_to_write;
131+
122132
template <typename F>
123133
StoragePtr getFirstTable(F && predicate) const;
124134

@@ -136,6 +146,11 @@ class StorageMerge final : public IStorage, WithContext
136146
template <typename F>
137147
std::optional<UInt64> totalRowsOrBytes(F && func) const;
138148

149+
void setTableToWrite(
150+
const std::optional<String> & table_to_write_,
151+
const String & source_database_name_or_regexp_,
152+
bool database_is_regexp_);
153+
139154
friend class ReadFromMerge;
140155
};
141156

src/TableFunctions/TableFunctionMerge.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,15 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex
191191

192192
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
193193
{
194+
std::optional<std::string> table_to_write = std::nullopt;
194195
auto res = std::make_shared<StorageMerge>(
195196
StorageID(getDatabaseName(), table_name),
196197
ColumnsDescription{},
197198
String{},
198199
source_database_name_or_regexp,
199200
database_is_regexp,
200201
getSourceDatabasesAndTables(context),
202+
table_to_write,
201203
context);
202204

203205
res->startup();
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2 1
2+
2 2
3+
2 3
4+
1 1
5+
2 1
6+
2 2
7+
2 3
8+
1 1
9+
2 1
10+
2 2
11+
2 3
12+
1 1
13+
2 1
14+
2 2
15+
2 3
16+
1 1
17+
2 1
18+
2 2
19+
2 3
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
2+
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
3+
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
4+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
5+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
6+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
7+
DROP DATABASE IF EXISTS test03373_db;
8+
9+
CREATE DATABASE test03373_db;
10+
11+
CREATE TABLE test03373_db.test03373_table_1 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
12+
CREATE TABLE test03373_db.test03373_table_2 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
13+
14+
CREATE TABLE test03373_db.test03373_merge_ro (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+');
15+
16+
CREATE TABLE test03373_db.test03373_merge_wr_1 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_table_2);
17+
CREATE TABLE test03373_db.test03373_merge_wr_2 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_db.test03373_table_2);
18+
CREATE TABLE test03373_db.test03373_merge_wr_3 (key UInt32, value UInt32) ENGINE=Merge(REGEXP('test03373_.*'), 'test03373_table_\d+', test03373_db.test03373_table_2);
19+
20+
INSERT INTO test03373_db.test03373_table_1 VALUES (1,1);
21+
22+
INSERT INTO test03373_db.test03373_merge_wr_1 VALUES (2,1);
23+
INSERT INTO test03373_db.test03373_merge_wr_2 VALUES (2,2);
24+
INSERT INTO test03373_db.test03373_merge_wr_3 VALUES (2,3);
25+
26+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
27+
28+
SELECT * FROM test03373_db.test03373_merge_ro ORDER BY key, value;
29+
SELECT * FROM test03373_db.test03373_merge_wr_1 ORDER BY key, value;
30+
SELECT * FROM test03373_db.test03373_merge_wr_2 ORDER BY key, value;
31+
SELECT * FROM test03373_db.test03373_merge_wr_3 ORDER BY key, value;
32+
33+
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
34+
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
35+
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
36+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
37+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
38+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
39+
DROP DATABASE IF EXISTS test03373_db;

0 commit comments

Comments
 (0)