Skip to content

Commit dad344b

Browse files
committed
address proper object storage
1 parent 0304ad8 commit dad344b

File tree

8 files changed

+52
-31
lines changed

8 files changed

+52
-31
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ Plan getPlan(
111111
IcebergHistory snapshots_info,
112112
const PersistentTableComponents & persistent_table_components,
113113
ObjectStoragePtr object_storage,
114+
std::map<String, DB::ObjectStoragePtr> secondary_storages,
114115
StorageObjectStorageConfigurationPtr configuration,
115116
ContextPtr context,
116117
CompressionMethod compression_method)
@@ -146,29 +147,33 @@ Plan getPlan(
146147
std::unordered_map<String, std::shared_ptr<ManifestFilePlan>> manifest_files;
147148
for (const auto & snapshot : snapshots_info)
148149
{
150+
auto [manifest_list_storage, key_in_storage] = resolveObjectStorageForPath("", snapshot.manifest_list_path, object_storage, secondary_storages, context);
151+
149152
auto manifest_list
150-
= getManifestList(object_storage, configuration, persistent_table_components, context, snapshot.manifest_list_path, log);
153+
= getManifestList(manifest_list_storage, configuration, persistent_table_components, context, key_in_storage, snapshot.manifest_list_path, log);
154+
151155
for (const auto & manifest_file : manifest_list)
152156
{
153-
plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path);
154-
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path))
155-
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id;
157+
plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path);
158+
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path))
159+
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id;
156160
auto manifest_file_content = getManifestFile(
157161
object_storage,
158162
configuration,
159163
persistent_table_components,
160164
context,
161165
log,
162166
manifest_file.manifest_file_path,
167+
manifest_file.manifest_file_absolute_path,
163168
manifest_file.added_sequence_number,
164169
manifest_file.added_snapshot_id);
165170

166-
if (!manifest_files.contains(manifest_file.manifest_file_path))
171+
if (!manifest_files.contains(manifest_file.manifest_file_absolute_path))
167172
{
168-
manifest_files[manifest_file.manifest_file_path] = std::make_shared<ManifestFilePlan>(current_schema);
169-
manifest_files[manifest_file.manifest_file_path]->path = manifest_file.manifest_file_path;
173+
manifest_files[manifest_file.manifest_file_absolute_path] = std::make_shared<ManifestFilePlan>(current_schema);
174+
manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path;
170175
}
171-
manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
176+
manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
172177
auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA);
173178
auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE);
174179
for (const auto & pos_delete_file : positional_delete_files)
@@ -182,17 +187,17 @@ Plan getPlan(
182187

183188
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file);
184189
std::shared_ptr<DataFilePlan> data_file_ptr;
185-
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path))
190+
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_absolute_path))
186191
{
187192
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
188193
.data_object_info = data_object_info,
189-
.manifest_list = manifest_files[manifest_file.manifest_file_path],
194+
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
190195
.patched_path = plan.generator.generateDataFileName()});
191-
plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr;
196+
plan.path_to_data_file[manifest_file.manifest_file_absolute_path] = data_file_ptr;
192197
}
193198
else
194199
{
195-
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path];
200+
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_absolute_path];
196201
}
197202
plan.partitions[partition_index].push_back(data_file_ptr);
198203
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
@@ -508,14 +513,15 @@ void compactIcebergTable(
508513
IcebergHistory snapshots_info,
509514
const PersistentTableComponents & persistent_table_components,
510515
ObjectStoragePtr object_storage_,
516+
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
511517
StorageObjectStorageConfigurationPtr configuration_,
512518
const std::optional<FormatSettings> & format_settings_,
513519
SharedHeader sample_block_,
514520
ContextPtr context_,
515521
CompressionMethod compression_method_)
516522
{
517523
auto plan
518-
= getPlan(std::move(snapshots_info), persistent_table_components, object_storage_, configuration_, context_, compression_method_);
524+
= getPlan(std::move(snapshots_info), persistent_table_components, object_storage_, secondary_storages_, configuration_, context_, compression_method_);
519525
if (plan.need_optimize)
520526
{
521527
auto old_files = getOldFiles(object_storage_, configuration_);

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ void compactIcebergTable(
1515
IcebergHistory snapshots_info,
1616
const PersistentTableComponents & persistent_table_components,
1717
DB::ObjectStoragePtr object_storage_,
18+
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
1819
DB::StorageObjectStorageConfigurationPtr configuration_,
1920
const std::optional<DB::FormatSettings> & format_settings_,
2021
DB::SharedHeader sample_block_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
131131
local_context,
132132
log,
133133
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_path,
134+
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
134135
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
135136
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id);
136137
internal_data_index = 0;

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,16 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
496496
}
497497

498498
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
499+
500+
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath("", snapshot->getValue<String>(f_manifest_list), object_storage, secondary_storages, local_context);
501+
499502
relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
500503
getManifestList(
501-
object_storage,
504+
storage_to_use,
502505
configuration_ptr,
503506
persistent_components,
504-
local_context,
507+
local_context,
508+
key_in_storage,
505509
makeAbsolutePath(persistent_components.table_location, snapshot->getValue<String>(f_manifest_list)),
506510
log),
507511
relevant_snapshot_id,
@@ -540,6 +544,7 @@ bool IcebergMetadata::optimize(const StorageMetadataPtr & metadata_snapshot, Con
540544
snapshots_info,
541545
persistent_components,
542546
object_storage,
547+
secondary_storages,
543548
configuration_ptr,
544549
format_settings,
545550
sample_block,
@@ -910,6 +915,7 @@ std::optional<size_t> IcebergMetadata::totalRows(ContextPtr local_context) const
910915
local_context,
911916
log,
912917
manifest_list_entry.manifest_file_path,
918+
manifest_list_entry.manifest_file_absolute_path,
913919
manifest_list_entry.added_sequence_number,
914920
manifest_list_entry.added_snapshot_id);
915921
auto data_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA);
@@ -950,6 +956,7 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
950956
local_context,
951957
log,
952958
manifest_list_entry.manifest_file_path,
959+
manifest_list_entry.manifest_file_absolute_path,
953960
manifest_list_entry.added_sequence_number,
954961
manifest_list_entry.added_snapshot_id);
955962
auto count = manifest_file_ptr->getBytesCountInAllDataFilesExcludingDeleted();

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace DB
3232
struct ManifestFileCacheKey
3333
{
3434
String manifest_file_path;
35+
String manifest_file_absolute_path;
3536
Int64 added_sequence_number;
3637
Int64 added_snapshot_id;
3738
Iceberg::ManifestFileContentType content_type;
@@ -73,7 +74,7 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable
7374
size_t total_size = 0;
7475
for (const auto & entry: manifest_file_cache_keys)
7576
{
76-
total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_path.capacity();
77+
total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_absolute_path.capacity();
7778
}
7879
return total_size;
7980
}

src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct IcebergHistoryRecord
4646
Int64 parent_id;
4747
bool is_current_ancestor;
4848
String manifest_list_path;
49+
String manifest_list_absolute_path;
4950

5051
Int32 added_files = 0;
5152
Int32 added_records = 0;

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ Iceberg::ManifestFilePtr getManifestFile(
7373
const PersistentTableComponents & persistent_table_components,
7474
ContextPtr local_context,
7575
LoggerPtr log,
76-
const String & filename,
76+
const String & key_in_storage,
77+
const String & absolute_path,
7778
Int64 inherited_sequence_number,
7879
Int64 inherited_snapshot_id)
7980
{
@@ -84,33 +85,33 @@ Iceberg::ManifestFilePtr getManifestFile(
8485

8586
auto create_fn = [&, use_iceberg_metadata_cache]()
8687
{
87-
PathWithMetadata manifest_object_info(filename);
88+
PathWithMetadata manifest_object_info(key_in_storage, std::nullopt, absolute_path, object_storage);
8889

8990
auto read_settings = local_context->getReadSettings();
9091
/// Do not utilize filesystem cache if more precise cache enabled
9192
if (use_iceberg_metadata_cache)
9293
read_settings.enable_filesystem_cache = false;
9394

9495
auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings);
95-
Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context));
96+
Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), key_in_storage, getFormatSettings(local_context));
9697

9798
return std::make_shared<Iceberg::ManifestFileContent>(
9899
manifest_file_deserializer,
99-
filename,
100+
key_in_storage,
100101
persistent_table_components.format_version,
101102
configuration->getPathForRead().path,
102103
*persistent_table_components.schema_processor,
103104
inherited_sequence_number,
104105
inherited_snapshot_id,
105106
persistent_table_components.table_location,
106107
local_context,
107-
filename);
108+
absolute_path);
108109
};
109110

110111
if (use_iceberg_metadata_cache)
111112
{
112113
auto manifest_file = persistent_table_components.metadata_cache->getOrSetManifestFile(
113-
IcebergMetadataFilesCache::getKey(configuration, filename), create_fn);
114+
IcebergMetadataFilesCache::getKey(configuration, absolute_path), create_fn);
114115
return manifest_file;
115116
}
116117
return create_fn();
@@ -121,7 +122,8 @@ ManifestFileCacheKeys getManifestList(
121122
StorageObjectStorageConfigurationWeakPtr configuration,
122123
const PersistentTableComponents & persistent_table_components,
123124
ContextPtr local_context,
124-
const String & filename,
125+
const String & key_in_storage,
126+
const String & absolute_path,
125127
LoggerPtr log)
126128
{
127129
auto configuration_ptr = configuration.lock();
@@ -135,15 +137,15 @@ ManifestFileCacheKeys getManifestList(
135137

136138
auto create_fn = [&, use_iceberg_metadata_cache]()
137139
{
138-
StorageObjectStorage::ObjectInfo object_info(filename);
140+
PathWithMetadata object_info(key_in_storage, std::nullopt, absolute_path, object_storage);
139141

140142
auto read_settings = local_context->getReadSettings();
141143
/// Do not utilize filesystem cache if more precise cache enabled
142144
if (use_iceberg_metadata_cache)
143145
read_settings.enable_filesystem_cache = false;
144146

145147
auto manifest_list_buf = createReadBuffer(object_info, object_storage, local_context, log, read_settings);
146-
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context));
148+
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), key_in_storage, getFormatSettings(local_context));
147149

148150
ManifestFileCacheKeys manifest_file_cache_keys;
149151

@@ -152,7 +154,7 @@ ManifestFileCacheKeys getManifestList(
152154
manifest_list_deserializer.getMetadataContent(),
153155
DB::IcebergMetadataLogLevel::ManifestListMetadata,
154156
configuration_ptr->getRawPath().path,
155-
filename,
157+
key_in_storage,
156158
std::nullopt);
157159

158160
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
@@ -178,14 +180,14 @@ ManifestFileCacheKeys getManifestList(
178180
manifest_list_deserializer.getValueFromRowByName(i, f_content, TypeIndex::Int32).safeGet<Int32>());
179181
}
180182
manifest_file_cache_keys.emplace_back(
181-
manifest_file_name, added_sequence_number, added_snapshot_id.safeGet<Int64>(), content_type);
183+
file_path, manifest_file_name, added_sequence_number, added_snapshot_id.safeGet<Int64>(), content_type);
182184

183185
insertRowToLogTable(
184186
local_context,
185187
manifest_list_deserializer.getContent(i),
186188
DB::IcebergMetadataLogLevel::ManifestListEntry,
187189
configuration_ptr->getRawPath().path,
188-
filename,
190+
absolute_path,
189191
i);
190192
}
191193
/// We only return the list of {file name, seq number} for cache.
@@ -197,7 +199,7 @@ ManifestFileCacheKeys getManifestList(
197199
ManifestFileCacheKeys manifest_file_cache_keys;
198200
if (use_iceberg_metadata_cache)
199201
manifest_file_cache_keys = persistent_table_components.metadata_cache->getOrSetManifestFileCacheKeys(
200-
IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn);
202+
IcebergMetadataFilesCache::getKey(configuration_ptr, absolute_path), create_fn);
201203
else
202204
manifest_file_cache_keys = create_fn();
203205
return manifest_file_cache_keys;

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ Iceberg::ManifestFilePtr getManifestFile(
2929
const PersistentTableComponents & persistent_table_components,
3030
ContextPtr local_context,
3131
LoggerPtr log,
32-
const String & filename,
32+
const String & key_in_storage,
33+
const String & absolute_path,
3334
Int64 inherited_sequence_number,
3435
Int64 inherited_snapshot_id);
3536

@@ -39,7 +40,8 @@ ManifestFileCacheKeys getManifestList(
3940
StorageObjectStorageConfigurationWeakPtr configuration,
4041
const PersistentTableComponents & persistent_table_components,
4142
ContextPtr local_context,
42-
const String & filename,
43+
const String & key_in_storage,
44+
const String & absolute_path,
4345
LoggerPtr log);
4446

4547
std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object);

0 commit comments

Comments
 (0)