Skip to content

Commit 1f6e189

Browse files
authored
Merge pull request #675 from Altinity/feature/antalya-iceberg-table-all-storages
Generalize engine definition for Iceberg tables
2 parents 8a0da57 + e2ad619 commit 1f6e189

22 files changed

+843
-200
lines changed

src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h

+8-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
7171
{
7272
public:
7373
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
74-
size_t size() const override { return arguments ? arguments->size() : 0; }
75-
std::unique_ptr<Argument> at(size_t n) const override { return std::make_unique<ArgumentTreeNode>(arguments->at(n).get()); }
74+
size_t size() const override
75+
{ /// size withous skipped indexes
76+
return arguments ? arguments->size() - skippedSize() : 0;
77+
}
78+
std::unique_ptr<Argument> at(size_t n) const override
79+
{ /// n is relative index, some can be skipped
80+
return std::make_unique<ArgumentTreeNode>(arguments->at(getRealIndex(n)).get());
81+
}
7682
private:
7783
const QueryTreeNodes * arguments = nullptr;
7884
};

src/Databases/Iceberg/DatabaseIceberg.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
230230

231231
/// with_table_structure = false: because there will be
232232
/// no table structure in table definition AST.
233-
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, storage_settings.get());
233+
configuration->initialize(args, context_, /* with_table_structure */false, storage_settings.get());
234234

235235
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
236236

src/Disks/DiskType.cpp

+42-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace ErrorCodes
99
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
1010
}
1111

12-
MetadataStorageType metadataTypeFromString(const String & type)
12+
MetadataStorageType metadataTypeFromString(const std::string & type)
1313
{
1414
auto check_type = Poco::toLower(type);
1515
if (check_type == "local")
@@ -53,23 +53,47 @@ std::string DataSourceDescription::toString() const
5353
case DataSourceType::RAM:
5454
return "memory";
5555
case DataSourceType::ObjectStorage:
56-
{
57-
switch (object_storage_type)
58-
{
59-
case ObjectStorageType::S3:
60-
return "s3";
61-
case ObjectStorageType::HDFS:
62-
return "hdfs";
63-
case ObjectStorageType::Azure:
64-
return "azure_blob_storage";
65-
case ObjectStorageType::Local:
66-
return "local_blob_storage";
67-
case ObjectStorageType::Web:
68-
return "web";
69-
case ObjectStorageType::None:
70-
return "none";
71-
}
72-
}
56+
return DB::toString(object_storage_type);
7357
}
7458
}
59+
60+
ObjectStorageType objectStorageTypeFromString(const std::string & type)
61+
{
62+
auto check_type = Poco::toLower(type);
63+
if (check_type == "s3")
64+
return ObjectStorageType::S3;
65+
if (check_type == "hdfs")
66+
return ObjectStorageType::HDFS;
67+
if (check_type == "azure_blob_storage" || check_type == "azure")
68+
return ObjectStorageType::Azure;
69+
if (check_type == "local_blob_storage" || check_type == "local")
70+
return ObjectStorageType::Local;
71+
if (check_type == "web")
72+
return ObjectStorageType::Web;
73+
if (check_type == "none")
74+
return ObjectStorageType::None;
75+
76+
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
77+
"Unknown object storage type: {}", type);
78+
}
79+
80+
std::string toString(ObjectStorageType type)
81+
{
82+
switch (type)
83+
{
84+
case ObjectStorageType::S3:
85+
return "s3";
86+
case ObjectStorageType::HDFS:
87+
return "hdfs";
88+
case ObjectStorageType::Azure:
89+
return "azure_blob_storage";
90+
case ObjectStorageType::Local:
91+
return "local_blob_storage";
92+
case ObjectStorageType::Web:
93+
return "web";
94+
case ObjectStorageType::None:
95+
return "none";
96+
}
97+
}
98+
7599
}

src/Disks/DiskType.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ enum class MetadataStorageType : uint8_t
3434
Memory,
3535
};
3636

37-
MetadataStorageType metadataTypeFromString(const String & type);
38-
String toString(DataSourceType data_source_type);
37+
MetadataStorageType metadataTypeFromString(const std::string & type);
38+
39+
ObjectStorageType objectStorageTypeFromString(const std::string & type);
40+
std::string toString(ObjectStorageType type);
3941

4042
struct DataSourceDescription
4143
{

src/Parsers/FunctionSecretArgumentsFinder.h

+98-10
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include <Common/KnownObjectNames.h>
44
#include <Common/re2.h>
55
#include <Common/maskURIPassword.h>
6+
#include <Common/NamedCollections/NamedCollections.h>
7+
#include <Common/NamedCollections/NamedCollectionsFactory.h>
68
#include <Core/QualifiedTableName.h>
79
#include <base/defines.h>
810
#include <boost/algorithm/string/predicate.hpp>
11+
#include <Poco/String.h>
912

1013

1114
namespace DB
@@ -29,6 +32,21 @@ class AbstractFunction
2932
virtual ~Arguments() = default;
3033
virtual size_t size() const = 0;
3134
virtual std::unique_ptr<Argument> at(size_t n) const = 0;
35+
void skipArgument(size_t n) { skipped_indexes.insert(n); }
36+
void unskipArguments() { skipped_indexes.clear(); }
37+
size_t getRealIndex(size_t n) const
38+
{
39+
for (auto idx : skipped_indexes)
40+
{
41+
if (n < idx)
42+
break;
43+
++n;
44+
}
45+
return n;
46+
}
47+
size_t skippedSize() const { return skipped_indexes.size(); }
48+
private:
49+
std::set<size_t> skipped_indexes;
3250
};
3351

3452
virtual ~AbstractFunction() = default;
@@ -75,14 +93,15 @@ class FunctionSecretArgumentsFinder
7593
{
7694
if (index >= function->arguments->size())
7795
return;
96+
auto real_index = function->arguments->getRealIndex(index);
7897
if (!result.count)
7998
{
80-
result.start = index;
99+
result.start = real_index;
81100
result.are_named = argument_is_named;
82101
}
83-
chassert(index >= result.start); /// We always check arguments consecutively
102+
chassert(real_index >= result.start); /// We always check arguments consecutively
84103
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
85-
result.count = index + 1 - result.start;
104+
result.count = real_index + 1 - result.start;
86105
if (!argument_is_named)
87106
result.are_named = false;
88107
}
@@ -100,14 +119,18 @@ class FunctionSecretArgumentsFinder
100119
{
101120
findMongoDBSecretArguments();
102121
}
122+
else if (function->name() == "iceberg")
123+
{
124+
findIcebergFunctionSecretArguments();
125+
}
103126
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
104-
(function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
127+
(function->name() == "deltaLake") || (function->name() == "hudi") ||
105128
(function->name() == "gcs") || (function->name() == "icebergS3"))
106129
{
107130
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
108131
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
109132
}
110-
else if (function->name() == "s3Cluster")
133+
else if ((function->name() == "s3Cluster") || (function->name() == "icebergS3Cluster"))
111134
{
112135
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...)
113136
findS3FunctionSecretArguments(/* is_cluster_function= */ true);
@@ -117,7 +140,7 @@ class FunctionSecretArgumentsFinder
117140
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
118141
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ false);
119142
}
120-
else if (function->name() == "azureBlobStorageCluster")
143+
else if ((function->name() == "azureBlobStorageCluster") || (function->name() == "icebergAzureCluster"))
121144
{
122145
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
123146
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ true);
@@ -218,11 +241,18 @@ class FunctionSecretArgumentsFinder
218241
findSecretNamedArgument("secret_access_key", 1);
219242
return;
220243
}
244+
if (is_cluster_function && isNamedCollectionName(1))
245+
{
246+
/// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...)
247+
findSecretNamedArgument("secret_access_key", 2);
248+
return;
249+
}
221250

222251
/// We should check other arguments first because we don't need to do any replacement in case of
223252
/// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
224253
/// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
225254
size_t count = excludeS3OrURLNestedMaps();
255+
226256
if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4))
227257
{
228258
String second_arg;
@@ -287,6 +317,48 @@ class FunctionSecretArgumentsFinder
287317
markSecretArgument(url_arg_idx + 4);
288318
}
289319

320+
std::string findIcebergStorageType()
321+
{
322+
std::string storage_type = "s3";
323+
324+
size_t count = function->arguments->size();
325+
if (!count)
326+
return storage_type;
327+
328+
auto storage_type_idx = findNamedArgument(&storage_type, "storage_type");
329+
if (storage_type_idx != -1)
330+
{
331+
storage_type = Poco::toLower(storage_type);
332+
function->arguments->skipArgument(storage_type_idx);
333+
}
334+
else if (isNamedCollectionName(0))
335+
{
336+
std::string collection_name;
337+
if (function->arguments->at(0)->tryGetString(&collection_name, true))
338+
{
339+
NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name);
340+
if (collection && collection->has("storage_type"))
341+
{
342+
storage_type = Poco::toLower(collection->get<std::string>("storage_type"));
343+
}
344+
}
345+
}
346+
347+
return storage_type;
348+
}
349+
350+
void findIcebergFunctionSecretArguments()
351+
{
352+
auto storage_type = findIcebergStorageType();
353+
354+
if (storage_type == "s3")
355+
findS3FunctionSecretArguments(false);
356+
else if (storage_type == "azure")
357+
findAzureBlobStorageFunctionSecretArguments(false);
358+
359+
function->arguments->unskipArguments();
360+
}
361+
290362
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
291363
{
292364
String url_arg;
@@ -310,7 +382,7 @@ class FunctionSecretArgumentsFinder
310382
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
311383
{
312384
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
313-
result.start = url_arg_idx;
385+
result.start = function->arguments->getRealIndex(url_arg_idx);
314386
result.are_named = argument_is_named;
315387
result.count = 1;
316388
result.replacement = url_arg;
@@ -458,6 +530,7 @@ class FunctionSecretArgumentsFinder
458530
void findTableEngineSecretArguments()
459531
{
460532
const String & engine_name = function->name();
533+
461534
if (engine_name == "ExternalDistributed")
462535
{
463536
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
@@ -475,10 +548,13 @@ class FunctionSecretArgumentsFinder
475548
{
476549
findMongoDBSecretArguments();
477550
}
551+
else if (engine_name == "Iceberg")
552+
{
553+
findIcebergTableEngineSecretArguments();
554+
}
478555
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS")
479556
|| (engine_name == "DeltaLake") || (engine_name == "Hudi")
480-
|| (engine_name == "Iceberg") || (engine_name == "IcebergS3")
481-
|| (engine_name == "S3Queue"))
557+
|| (engine_name == "IcebergS3") || (engine_name == "S3Queue"))
482558
{
483559
/// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...)
484560
findS3TableEngineSecretArguments();
@@ -487,7 +563,7 @@ class FunctionSecretArgumentsFinder
487563
{
488564
findURLSecretArguments();
489565
}
490-
else if (engine_name == "AzureBlobStorage")
566+
else if ((engine_name == "AzureBlobStorage") || (engine_name == "IcebergAzure"))
491567
{
492568
findAzureBlobStorageTableEngineSecretArguments();
493569
}
@@ -579,6 +655,18 @@ class FunctionSecretArgumentsFinder
579655
markSecretArgument(url_arg_idx + 4);
580656
}
581657

658+
void findIcebergTableEngineSecretArguments()
659+
{
660+
auto storage_type = findIcebergStorageType();
661+
662+
if (storage_type == "s3")
663+
findS3TableEngineSecretArguments();
664+
else if (storage_type == "azure")
665+
findAzureBlobStorageTableEngineSecretArguments();
666+
667+
function->arguments->unskipArguments();
668+
}
669+
582670
void findDatabaseEngineSecretArguments()
583671
{
584672
const String & engine_name = function->name();

src/Parsers/FunctionSecretArgumentsFinderAST.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ class FunctionAST : public AbstractFunction
5454
{
5555
public:
5656
explicit ArgumentsAST(const ASTs * arguments_) : arguments(arguments_) {}
57-
size_t size() const override { return arguments ? arguments->size() : 0; }
57+
size_t size() const override
58+
{ /// size withous skipped indexes
59+
return arguments ? arguments->size() - skippedSize() : 0;
60+
}
5861
std::unique_ptr<Argument> at(size_t n) const override
59-
{
60-
return std::make_unique<ArgumentAST>(arguments->at(n).get());
62+
{ /// n is relative index, some can be skipped
63+
return std::make_unique<ArgumentAST>(arguments->at(getRealIndex(n)).get());
6164
}
6265
private:
6366
const ASTs * arguments = nullptr;

src/Storages/ObjectStorage/Azure/Configuration.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {
5454
"account_key",
5555
"connection_string",
5656
"storage_account_url",
57+
"storage_type",
5758
};
5859

5960
void StorageAzureConfiguration::check(ContextPtr context) const

0 commit comments

Comments
 (0)