Skip to content

Commit 199ea1e

Browse files
authored
feat(native): Support basic writing to iceberg table (#26338)
Velox PR facebookincubator/velox#14723 This commit adds support for basic Iceberg table insertion in Prestissimo (native execution) and provides comprehensive test coverage. All tests use the native query runner to verify Prestissimo's Iceberg functionality Basic insertion does not include partition transform, metrics, sort order.
1 parent 0085f7f commit 199ea1e

File tree

10 files changed

+2296
-21
lines changed

10 files changed

+2296
-21
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h"
1717

1818
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
19+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
1920
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
2021
#include "velox/type/fbhive/HiveTypeParser.h"
2122

@@ -274,4 +275,74 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
274275
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();
275276
}
276277

278+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
279+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
280+
const protocol::CreateHandle* createHandle,
281+
const TypeParser& typeParser) const {
282+
auto icebergOutputTableHandle =
283+
std::dynamic_pointer_cast<protocol::iceberg::IcebergOutputTableHandle>(
284+
createHandle->handle.connectorHandle);
285+
286+
VELOX_CHECK_NOT_NULL(
287+
icebergOutputTableHandle,
288+
"Unexpected output table handle type {}",
289+
createHandle->handle.connectorHandle->_type);
290+
291+
const auto inputColumns =
292+
toHiveColumns(icebergOutputTableHandle->inputColumns, typeParser);
293+
294+
return std::make_unique<
295+
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
296+
inputColumns,
297+
std::make_shared<velox::connector::hive::LocationHandle>(
298+
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
299+
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
300+
velox::connector::hive::LocationHandle::TableType::kNew),
301+
toVeloxFileFormat(icebergOutputTableHandle->fileFormat),
302+
std::optional(
303+
toFileCompressionKind(icebergOutputTableHandle->compressionCodec)));
304+
}
305+
306+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
307+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
308+
const protocol::InsertHandle* insertHandle,
309+
const TypeParser& typeParser) const {
310+
auto icebergInsertTableHandle =
311+
std::dynamic_pointer_cast<protocol::iceberg::IcebergInsertTableHandle>(
312+
insertHandle->handle.connectorHandle);
313+
314+
VELOX_CHECK_NOT_NULL(
315+
icebergInsertTableHandle,
316+
"Unexpected insert table handle type {}",
317+
insertHandle->handle.connectorHandle->_type);
318+
319+
const auto inputColumns =
320+
toHiveColumns(icebergInsertTableHandle->inputColumns, typeParser);
321+
322+
return std::make_unique<
323+
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
324+
inputColumns,
325+
std::make_shared<velox::connector::hive::LocationHandle>(
326+
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
327+
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
328+
velox::connector::hive::LocationHandle::TableType::kExisting),
329+
toVeloxFileFormat(icebergInsertTableHandle->fileFormat),
330+
std::optional(
331+
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
332+
}
333+
334+
std::vector<velox::connector::hive::HiveColumnHandlePtr>
335+
IcebergPrestoToVeloxConnector::toHiveColumns(
336+
const protocol::List<protocol::iceberg::IcebergColumnHandle>& inputColumns,
337+
const TypeParser& typeParser) const {
338+
std::vector<velox::connector::hive::HiveColumnHandlePtr> hiveColumns;
339+
hiveColumns.reserve(inputColumns.size());
340+
for (const auto& columnHandle : inputColumns) {
341+
hiveColumns.emplace_back(
342+
std::dynamic_pointer_cast<velox::connector::hive::HiveColumnHandle>(
343+
std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser))));
344+
}
345+
return hiveColumns;
346+
}
347+
277348
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#pragma once
1616

1717
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
18+
#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h"
1819

1920
namespace facebook::presto {
2021

@@ -40,6 +41,22 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
4041

4142
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
4243
const final;
44+
45+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
46+
toVeloxInsertTableHandle(
47+
const protocol::CreateHandle* createHandle,
48+
const TypeParser& typeParser) const final;
49+
50+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
51+
toVeloxInsertTableHandle(
52+
const protocol::InsertHandle* insertHandle,
53+
const TypeParser& typeParser) const final;
54+
55+
private:
56+
std::vector<velox::connector::hive::HiveColumnHandlePtr> toHiveColumns(
57+
const protocol::List<protocol::iceberg::IcebergColumnHandle>&
58+
inputColumns,
59+
const TypeParser& typeParser) const;
4360
};
4461

4562
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -153,26 +153,6 @@ dwio::common::FileFormat toFileFormat(
153153
}
154154
}
155155

156-
velox::common::CompressionKind toFileCompressionKind(
157-
const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) {
158-
switch (hiveCompressionCodec) {
159-
case protocol::hive::HiveCompressionCodec::SNAPPY:
160-
return velox::common::CompressionKind::CompressionKind_SNAPPY;
161-
case protocol::hive::HiveCompressionCodec::GZIP:
162-
return velox::common::CompressionKind::CompressionKind_GZIP;
163-
case protocol::hive::HiveCompressionCodec::LZ4:
164-
return velox::common::CompressionKind::CompressionKind_LZ4;
165-
case protocol::hive::HiveCompressionCodec::ZSTD:
166-
return velox::common::CompressionKind::CompressionKind_ZSTD;
167-
case protocol::hive::HiveCompressionCodec::NONE:
168-
return velox::common::CompressionKind::CompressionKind_NONE;
169-
default:
170-
VELOX_UNSUPPORTED(
171-
"Unsupported file compression format: {}.",
172-
toJsonString(hiveCompressionCodec));
173-
}
174-
}
175-
176156
velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind(
177157
protocol::hive::BucketFunctionType bucketFuncType) {
178158
switch (bucketFuncType) {
@@ -425,6 +405,26 @@ std::unique_ptr<connector::ConnectorTableHandle> toHiveTableHandle(
425405
finalTableParameters);
426406
}
427407

408+
velox::common::CompressionKind toFileCompressionKind(
409+
const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) {
410+
switch (hiveCompressionCodec) {
411+
case protocol::hive::HiveCompressionCodec::SNAPPY:
412+
return velox::common::CompressionKind::CompressionKind_SNAPPY;
413+
case protocol::hive::HiveCompressionCodec::GZIP:
414+
return velox::common::CompressionKind::CompressionKind_GZIP;
415+
case protocol::hive::HiveCompressionCodec::LZ4:
416+
return velox::common::CompressionKind::CompressionKind_LZ4;
417+
case protocol::hive::HiveCompressionCodec::ZSTD:
418+
return velox::common::CompressionKind::CompressionKind_ZSTD;
419+
case protocol::hive::HiveCompressionCodec::NONE:
420+
return velox::common::CompressionKind::CompressionKind_NONE;
421+
default:
422+
VELOX_UNSUPPORTED(
423+
"Unsupported file compression format: {}.",
424+
toJsonString(hiveCompressionCodec));
425+
}
426+
}
427+
428428
std::unique_ptr<velox::connector::ConnectorSplit>
429429
HivePrestoToVeloxConnector::toVeloxSplit(
430430
const protocol::ConnectorId& catalogId,

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
5252
const VeloxExprConverter& exprConverter,
5353
const TypeParser& typeParser);
5454

55+
velox::common::CompressionKind toFileCompressionKind(
56+
const protocol::hive::HiveCompressionCodec& hiveCompressionCodec);
57+
5558
class PrestoToVeloxConnector {
5659
public:
5760
virtual ~PrestoToVeloxConnector() = default;

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public enum QueryRunnerType
8484
public static final String REMOTE_FUNCTION_CATALOG_NAME = "remote";
8585
public static final String HIVE_DATA = "hive_data";
8686

87-
protected static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET";
87+
public static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET";
8888

8989
private static final Logger log = Logger.get(PrestoNativeQueryRunnerUtils.class);
9090
private static final String DEFAULT_STORAGE_FORMAT = "DWRF";

0 commit comments

Comments
 (0)