diff --git a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt index 389b4ca01a99b..694c13fbb00f6 100644 --- a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt @@ -12,6 +12,7 @@ add_library(presto_connectors IcebergPrestoToVeloxConnector.cpp PrestoToVeloxConnectorUtils.cpp + HivePrestoToVeloxConnector.cpp Registration.cpp PrestoToVeloxConnector.cpp SystemConnector.cpp) diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp new file mode 100644 index 0000000000000..29b570602dc82 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp @@ -0,0 +1,555 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" + +#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" +#include "presto_cpp/main/types/PrestoToVeloxExpr.h" +#include "presto_cpp/main/types/TypeParser.h" +#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h" + +#include +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/type/Filter.h" + +namespace facebook::presto { +using namespace velox; + +namespace { + +connector::hive::LocationHandle::TableType toTableType( + protocol::hive::TableType tableType) { + switch (tableType) { + case protocol::hive::TableType::NEW: + // Temporary tables are written and read by the SPI in a single pipeline. + // So they can be treated as New. They do not require Append or Overwrite + // semantics as applicable for regular tables. + case protocol::hive::TableType::TEMPORARY: + return connector::hive::LocationHandle::TableType::kNew; + case protocol::hive::TableType::EXISTING: + return connector::hive::LocationHandle::TableType::kExisting; + default: + VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); + } +} + +std::shared_ptr toLocationHandle( + const protocol::hive::LocationHandle& locationHandle) { + return std::make_shared( + locationHandle.targetPath, + locationHandle.writePath, + toTableType(locationHandle.tableType)); +} + +velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind( + protocol::hive::BucketFunctionType bucketFuncType) { + switch (bucketFuncType) { + case protocol::hive::BucketFunctionType::PRESTO_NATIVE: + return velox::connector::hive::HiveBucketProperty::Kind::kPrestoNative; + case protocol::hive::BucketFunctionType::HIVE_COMPATIBLE: + return velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible; + default: + VELOX_USER_FAIL( + "Unknown hive bucket function: {}", toJsonString(bucketFuncType)); + } +} + +dwio::common::FileFormat toFileFormat( + const protocol::hive::HiveStorageFormat storageFormat, + const char* usage) { + switch (storageFormat) { + case protocol::hive::HiveStorageFormat::DWRF: + return dwio::common::FileFormat::DWRF; + case protocol::hive::HiveStorageFormat::PARQUET: + return dwio::common::FileFormat::PARQUET; + case protocol::hive::HiveStorageFormat::ALPHA: + // This has been renamed in Velox from ALPHA to NIMBLE. + return dwio::common::FileFormat::NIMBLE; + case protocol::hive::HiveStorageFormat::TEXTFILE: + return dwio::common::FileFormat::TEXT; + default: + VELOX_UNSUPPORTED( + "Unsupported file format in {}: {}.", + usage, + toJsonString(storageFormat)); + } +} + +std::vector stringToTypes( + const std::shared_ptr>& typeStrings, + const TypeParser& typeParser) { + std::vector types; + types.reserve(typeStrings->size()); + for (const auto& typeString : *typeStrings) { + types.push_back(stringToType(typeString, typeParser)); + } + return types; +} + +core::SortOrder toSortOrder(protocol::hive::Order order) { + switch (order) { + case protocol::hive::Order::ASCENDING: + return core::SortOrder(true, true); + case protocol::hive::Order::DESCENDING: + return core::SortOrder(false, false); + default: + VELOX_USER_FAIL("Unknown sort order: {}", toJsonString(order)); + } +} + +std::shared_ptr toHiveSortingColumn( + const protocol::hive::SortingColumn& sortingColumn) { + return std::make_shared( + sortingColumn.columnName, toSortOrder(sortingColumn.order)); +} + +std::vector> +toHiveSortingColumns( + const protocol::List& sortedBy) { + std::vector> + sortingColumns; + sortingColumns.reserve(sortedBy.size()); + for (const auto& sortingColumn : sortedBy) { + sortingColumns.push_back(toHiveSortingColumn(sortingColumn)); + } + return sortingColumns; +} + +std::shared_ptr +toHiveBucketProperty( + const std::vector>& + inputColumns, + const std::shared_ptr& bucketProperty, + const TypeParser& typeParser) { + if (bucketProperty == nullptr) { + return nullptr; + } + + VELOX_USER_CHECK_GT( + bucketProperty->bucketCount, 0, "Bucket count must be a positive value"); + + VELOX_USER_CHECK( + !bucketProperty->bucketedBy.empty(), + "Bucketed columns must be set: {}", + toJsonString(*bucketProperty)); + + const velox::connector::hive::HiveBucketProperty::Kind kind = + toHiveBucketPropertyKind(bucketProperty->bucketFunctionType); + std::vector bucketedTypes; + if (kind == + velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible) { + VELOX_USER_CHECK_NULL( + bucketProperty->types, + "Unexpected bucketed types set for hive compatible bucket function: {}", + toJsonString(*bucketProperty)); + bucketedTypes.reserve(bucketProperty->bucketedBy.size()); + for (const auto& bucketedColumn : bucketProperty->bucketedBy) { + TypePtr bucketedType{nullptr}; + for (const auto& inputColumn : inputColumns) { + if (inputColumn->name() != bucketedColumn) { + continue; + } + VELOX_USER_CHECK_NOT_NULL(inputColumn->hiveType()); + bucketedType = inputColumn->hiveType(); + break; + } + VELOX_USER_CHECK_NOT_NULL( + bucketedType, "Bucketed column {} not found", bucketedColumn); + bucketedTypes.push_back(std::move(bucketedType)); + } + } else { + VELOX_USER_CHECK_EQ( + bucketProperty->types->size(), + bucketProperty->bucketedBy.size(), + "Bucketed types is not set properly for presto native bucket function: {}", + toJsonString(*bucketProperty)); + bucketedTypes = stringToTypes(bucketProperty->types, typeParser); + } + + const auto sortedBy = toHiveSortingColumns(bucketProperty->sortedBy); + + return std::make_shared( + toHiveBucketPropertyKind(bucketProperty->bucketFunctionType), + bucketProperty->bucketCount, + bucketProperty->bucketedBy, + bucketedTypes, + sortedBy); +} + +std::unique_ptr +toVeloxHiveColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) { + auto* hiveColumn = + dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + hiveColumn, "Unexpected column handle type {}", column->_type); + velox::type::fbhive::HiveTypeParser hiveTypeParser; + // TODO(spershin): Should we pass something different than 'typeSignature' + // to 'hiveType' argument of the 'HiveColumnHandle' constructor? + return std::make_unique( + hiveColumn->name, + toHiveColumnType(hiveColumn->columnType), + stringToType(hiveColumn->typeSignature, typeParser), + hiveTypeParser.parse(hiveColumn->hiveType), + toRequiredSubfields(hiveColumn->requiredSubfields)); +} + +velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( + const protocol::hive::BucketConversion& bucketConversion) { + velox::connector::hive::HiveBucketConversion veloxBucketConversion; + // Current table bucket count (new). + veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount; + // Partition bucket count (old). + veloxBucketConversion.partitionBucketCount = + bucketConversion.partitionBucketCount; + TypeParser typeParser; + for (const auto& column : bucketConversion.bucketColumnHandles) { + // Columns used as bucket input. + veloxBucketConversion.bucketColumnHandles.push_back( + toVeloxHiveColumnHandle(&column, typeParser)); + } + return veloxBucketConversion; +} + +std::unique_ptr toHiveTableHandle( + const protocol::TupleDomain& domainPredicate, + const std::shared_ptr& remainingPredicate, + bool isPushdownFilterEnabled, + const std::string& tableName, + const protocol::List& dataColumns, + const protocol::TableHandle& tableHandle, + const protocol::Map& tableParameters, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser) { + common::SubfieldFilters subfieldFilters; + auto domains = domainPredicate.domains; + for (const auto& domain : *domains) { + auto filter = domain.second; + subfieldFilters[common::Subfield(domain.first)] = + toFilter(domain.second, exprConverter, typeParser); + } + + auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate); + if (auto constant = std::dynamic_pointer_cast( + remainingFilter)) { + bool value = constant->value().value(); + VELOX_CHECK(value, "Unexpected always-false remaining predicate"); + + // Use null for always-true filter. + remainingFilter = nullptr; + } + + RowTypePtr finalDataColumns; + if (!dataColumns.empty()) { + std::vector names; + std::vector types; + velox::type::fbhive::HiveTypeParser hiveTypeParser; + names.reserve(dataColumns.size()); + types.reserve(dataColumns.size()); + for (auto& column : dataColumns) { + std::string name = column.name; + folly::toLowerAscii(name); + names.emplace_back(std::move(name)); + auto parsedType = hiveTypeParser.parse(column.type); + // The type from the metastore may have upper case letters + // in field names, convert them all to lower case to be + // compatible with Presto. + types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, parsedType->kind(), parsedType)); + } + finalDataColumns = ROW(std::move(names), std::move(types)); + } + + if (tableParameters.empty()) { + return std::make_unique( + tableHandle.connectorId, + tableName, + isPushdownFilterEnabled, + std::move(subfieldFilters), + remainingFilter, + finalDataColumns); + } + + std::unordered_map finalTableParameters = {}; + finalTableParameters.reserve(tableParameters.size()); + for (const auto& [key, value] : tableParameters) { + finalTableParameters[key] = value; + } + + return std::make_unique( + tableHandle.connectorId, + tableName, + isPushdownFilterEnabled, + std::move(subfieldFilters), + remainingFilter, + finalDataColumns, + finalTableParameters); +} + +} // namespace + +connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( + protocol::hive::ColumnType type) { + switch (type) { + case protocol::hive::ColumnType::PARTITION_KEY: + return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; + case protocol::hive::ColumnType::REGULAR: + return connector::hive::HiveColumnHandle::ColumnType::kRegular; + case protocol::hive::ColumnType::SYNTHESIZED: + return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; + default: + VELOX_UNSUPPORTED( + "Unsupported Hive column type: {}.", toJsonString(type)); + } +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const { + auto hiveSplit = + dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + hiveSplit, "Unexpected split type {}", connectorSplit->_type); + std::unordered_map> partitionKeys; + for (const auto& entry : hiveSplit->partitionKeys) { + partitionKeys.emplace( + entry.name, + entry.value == nullptr ? std::nullopt + : std::optional{*entry.value}); + } + std::unordered_map customSplitInfo; + for (const auto& [key, value] : hiveSplit->fileSplit.customSplitInfo) { + customSplitInfo[key] = value; + } + std::shared_ptr extraFileInfo; + if (hiveSplit->fileSplit.extraFileInfo) { + extraFileInfo = std::make_shared( + velox::encoding::Base64::decode(*hiveSplit->fileSplit.extraFileInfo)); + } + std::unordered_map serdeParameters; + serdeParameters.reserve(hiveSplit->storage.serdeParameters.size()); + for (const auto& [key, value] : hiveSplit->storage.serdeParameters) { + serdeParameters[key] = value; + } + std::unordered_map infoColumns = { + {"$path", hiveSplit->fileSplit.path}, + {"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)}, + {"$file_modified_time", + std::to_string(hiveSplit->fileSplit.fileModifiedTime)}, + }; + if (hiveSplit->tableBucketNumber) { + infoColumns["$bucket"] = std::to_string(*hiveSplit->tableBucketNumber); + } + auto veloxSplit = + std::make_unique( + catalogId, + hiveSplit->fileSplit.path, + toVeloxFileFormat(hiveSplit->storage.storageFormat), + hiveSplit->fileSplit.start, + hiveSplit->fileSplit.length, + partitionKeys, + hiveSplit->tableBucketNumber + ? std::optional(*hiveSplit->tableBucketNumber) + : std::nullopt, + customSplitInfo, + extraFileInfo, + serdeParameters, + hiveSplit->splitWeight, + splitContext->cacheable, + infoColumns); + if (hiveSplit->bucketConversion) { + VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); + veloxSplit->bucketConversion = + toVeloxBucketConversion(*hiveSplit->bucketConversion); + } + return veloxSplit; +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + return toVeloxHiveColumnHandle(column, typeParser); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + velox::connector::ColumnHandleMap& assignments) const { + auto addSynthesizedColumn = [&](const std::string& name, + protocol::hive::ColumnType columnType, + const protocol::ColumnHandle& column) { + if (toHiveColumnType(columnType) == + velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { + if (assignments.count(name) == 0) { + assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); + } + } + }; + auto hiveLayout = + std::dynamic_pointer_cast( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + hiveLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + for (const auto& entry : hiveLayout->partitionColumns) { + assignments.emplace(entry.name, toVeloxColumnHandle(&entry, typeParser)); + } + + // Add synthesized columns to the TableScanNode columnHandles as well. + for (const auto& entry : hiveLayout->predicateColumns) { + addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); + } + + auto hiveTableHandle = + std::dynamic_pointer_cast( + tableHandle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveTableHandle, + "Unexpected table handle type {}", + tableHandle.connectorHandle->_type); + + // Use fully qualified name if available. + std::string tableName = hiveTableHandle->schemaName.empty() + ? hiveTableHandle->tableName + : fmt::format( + "{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName); + + return toHiveTableHandle( + hiveLayout->domainPredicate, + hiveLayout->remainingPredicate, + hiveLayout->pushdownFilterEnabled, + tableName, + hiveLayout->dataColumns, + tableHandle, + hiveLayout->tableParameters, + exprConverter, + typeParser); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const { + auto hiveOutputTableHandle = + std::dynamic_pointer_cast( + createHandle->handle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveOutputTableHandle, + "Unexpected output table handle type {}", + createHandle->handle.connectorHandle->_type); + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + hiveOutputTableHandle->inputColumns, typeParser, isPartitioned); + return std::make_unique( + inputColumns, + toLocationHandle(hiveOutputTableHandle->locationHandle), + toFileFormat(hiveOutputTableHandle->actualStorageFormat, "TableWrite"), + toHiveBucketProperty( + inputColumns, hiveOutputTableHandle->bucketProperty, typeParser), + std::optional( + toFileCompressionKind(hiveOutputTableHandle->compressionCodec))); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const { + auto hiveInsertTableHandle = + std::dynamic_pointer_cast( + insertHandle->handle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveInsertTableHandle, + "Unexpected insert table handle type {}", + insertHandle->handle.connectorHandle->_type); + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + hiveInsertTableHandle->inputColumns, typeParser, isPartitioned); + + const auto table = hiveInsertTableHandle->pageSinkMetadata.table; + VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); + return std::make_unique( + inputColumns, + toLocationHandle(hiveInsertTableHandle->locationHandle), + toFileFormat(hiveInsertTableHandle->actualStorageFormat, "TableWrite"), + toHiveBucketProperty( + inputColumns, hiveInsertTableHandle->bucketProperty, typeParser), + std::optional( + toFileCompressionKind(hiveInsertTableHandle->compressionCodec)), + std::unordered_map( + table->storage.serdeParameters.begin(), + table->storage.serdeParameters.end())); +} + +std::vector> +HivePrestoToVeloxConnector::toHiveColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const { + hasPartitionColumn = false; + std::vector> + hiveColumns; + hiveColumns.reserve(inputColumns.size()); + for (const auto& columnHandle : inputColumns) { + hasPartitionColumn |= + columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY; + hiveColumns.emplace_back( + std::dynamic_pointer_cast( + std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); + } + return hiveColumns; +} + +std::unique_ptr +HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( + const protocol::ConnectorPartitioningHandle* partitioningHandle, + const std::vector& bucketToPartition, + const std::vector& channels, + const std::vector& constValues, + bool& effectivelyGather) const { + auto hivePartitioningHandle = + dynamic_cast( + partitioningHandle); + VELOX_CHECK_NOT_NULL( + hivePartitioningHandle, + "Unexpected partitioning handle type {}", + partitioningHandle->_type); + VELOX_USER_CHECK( + hivePartitioningHandle->bucketFunctionType == + protocol::hive::BucketFunctionType::HIVE_COMPATIBLE, + "Unsupported Hive bucket function type: {}", + toJsonString(hivePartitioningHandle->bucketFunctionType)); + effectivelyGather = hivePartitioningHandle->bucketCount == 1; + return std::make_unique( + hivePartitioningHandle->bucketCount, + bucketToPartition, + channels, + constValues); +} + +std::unique_ptr +HivePrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h new file mode 100644 index 0000000000000..34d85efddaed7 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.h @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/types/PrestoToVeloxExpr.h" +#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" +#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/core/PlanNode.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::presto { + +velox::connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( + protocol::hive::ColumnType type); + +class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit HivePrestoToVeloxConnector(std::string connectorName) + : PrestoToVeloxConnector(std::move(connectorName)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + velox::connector::ColumnHandleMap& assignments) const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const final; + + std::unique_ptr + createVeloxPartitionFunctionSpec( + const protocol::ConnectorPartitioningHandle* partitioningHandle, + const std::vector& bucketToPartition, + const std::vector& channels, + const std::vector& constValues, + bool& effectivelyGather) const final; + + std::unique_ptr createConnectorProtocol() + const final; + + private: + std::vector> + toHiveColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const; +}; + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp index 303ee763d4a60..f5083ab45f2ab 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp @@ -14,6 +14,7 @@ #include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" #include "velox/connectors/hive/iceberg/IcebergDataSink.h" diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp index 49b3dcac7b449..a35841f3aa3ba 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp @@ -32,6 +32,7 @@ #include "velox/type/Filter.h" namespace facebook::presto { +using namespace velox; namespace { std::unordered_map>& @@ -68,597 +69,6 @@ const PrestoToVeloxConnector& getPrestoToVeloxConnector( return *(it->second); } -namespace { -using namespace velox; - -dwio::common::FileFormat toVeloxFileFormat( - const presto::protocol::hive::StorageFormat& format) { - if (format.inputFormat == "com.facebook.hive.orc.OrcInputFormat") { - return dwio::common::FileFormat::DWRF; - } else if ( - format.inputFormat == "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat") { - return dwio::common::FileFormat::ORC; - } else if ( - format.inputFormat == - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") { - return dwio::common::FileFormat::PARQUET; - } else if (format.inputFormat == "org.apache.hadoop.mapred.TextInputFormat") { - if (format.serDe == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - return dwio::common::FileFormat::TEXT; - } else if (format.serDe == "org.apache.hive.hcatalog.data.JsonSerDe") { - return dwio::common::FileFormat::JSON; - } - } else if ( - format.inputFormat == - "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") { - if (format.serDe == - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { - return dwio::common::FileFormat::PARQUET; - } - } else if (format.inputFormat == "com.facebook.alpha.AlphaInputFormat") { - // ALPHA has been renamed in Velox to NIMBLE. - return dwio::common::FileFormat::NIMBLE; - } - VELOX_UNSUPPORTED( - "Unsupported file format: {} {}", format.inputFormat, format.serDe); -} - -template -std::string toJsonString(const T& value) { - return ((json)value).dump(); -} - -connector::hive::LocationHandle::TableType toTableType( - protocol::hive::TableType tableType) { - switch (tableType) { - case protocol::hive::TableType::NEW: - // Temporary tables are written and read by the SPI in a single pipeline. - // So they can be treated as New. They do not require Append or Overwrite - // semantics as applicable for regular tables. - case protocol::hive::TableType::TEMPORARY: - return connector::hive::LocationHandle::TableType::kNew; - case protocol::hive::TableType::EXISTING: - return connector::hive::LocationHandle::TableType::kExisting; - default: - VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); - } -} - -std::shared_ptr toLocationHandle( - const protocol::hive::LocationHandle& locationHandle) { - return std::make_shared( - locationHandle.targetPath, - locationHandle.writePath, - toTableType(locationHandle.tableType)); -} - -dwio::common::FileFormat toFileFormat( - const protocol::hive::HiveStorageFormat storageFormat, - const char* usage) { - switch (storageFormat) { - case protocol::hive::HiveStorageFormat::DWRF: - return dwio::common::FileFormat::DWRF; - case protocol::hive::HiveStorageFormat::PARQUET: - return dwio::common::FileFormat::PARQUET; - case protocol::hive::HiveStorageFormat::ALPHA: - // This has been renamed in Velox from ALPHA to NIMBLE. - return dwio::common::FileFormat::NIMBLE; - case protocol::hive::HiveStorageFormat::TEXTFILE: - return dwio::common::FileFormat::TEXT; - default: - VELOX_UNSUPPORTED( - "Unsupported file format in {}: {}.", - usage, - toJsonString(storageFormat)); - } -} - -velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind( - protocol::hive::BucketFunctionType bucketFuncType) { - switch (bucketFuncType) { - case protocol::hive::BucketFunctionType::PRESTO_NATIVE: - return velox::connector::hive::HiveBucketProperty::Kind::kPrestoNative; - case protocol::hive::BucketFunctionType::HIVE_COMPATIBLE: - return velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible; - default: - VELOX_USER_FAIL( - "Unknown hive bucket function: {}", toJsonString(bucketFuncType)); - } -} - -std::vector stringToTypes( - const std::shared_ptr>& typeStrings, - const TypeParser& typeParser) { - std::vector types; - types.reserve(typeStrings->size()); - for (const auto& typeString : *typeStrings) { - types.push_back(stringToType(typeString, typeParser)); - } - return types; -} - -core::SortOrder toSortOrder(protocol::hive::Order order) { - switch (order) { - case protocol::hive::Order::ASCENDING: - return core::SortOrder(true, true); - case protocol::hive::Order::DESCENDING: - return core::SortOrder(false, false); - default: - VELOX_USER_FAIL("Unknown sort order: {}", toJsonString(order)); - } -} - -std::shared_ptr toHiveSortingColumn( - const protocol::hive::SortingColumn& sortingColumn) { - return std::make_shared( - sortingColumn.columnName, toSortOrder(sortingColumn.order)); -} - -std::vector> -toHiveSortingColumns( - const protocol::List& sortedBy) { - std::vector> - sortingColumns; - sortingColumns.reserve(sortedBy.size()); - for (const auto& sortingColumn : sortedBy) { - sortingColumns.push_back(toHiveSortingColumn(sortingColumn)); - } - return sortingColumns; -} - -std::shared_ptr -toHiveBucketProperty( - const std::vector>& - inputColumns, - const std::shared_ptr& bucketProperty, - const TypeParser& typeParser) { - if (bucketProperty == nullptr) { - return nullptr; - } - - VELOX_USER_CHECK_GT( - bucketProperty->bucketCount, 0, "Bucket count must be a positive value"); - - VELOX_USER_CHECK( - !bucketProperty->bucketedBy.empty(), - "Bucketed columns must be set: {}", - toJsonString(*bucketProperty)); - - const velox::connector::hive::HiveBucketProperty::Kind kind = - toHiveBucketPropertyKind(bucketProperty->bucketFunctionType); - std::vector bucketedTypes; - if (kind == - velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible) { - VELOX_USER_CHECK_NULL( - bucketProperty->types, - "Unexpected bucketed types set for hive compatible bucket function: {}", - toJsonString(*bucketProperty)); - bucketedTypes.reserve(bucketProperty->bucketedBy.size()); - for (const auto& bucketedColumn : bucketProperty->bucketedBy) { - TypePtr bucketedType{nullptr}; - for (const auto& inputColumn : inputColumns) { - if (inputColumn->name() != bucketedColumn) { - continue; - } - VELOX_USER_CHECK_NOT_NULL(inputColumn->hiveType()); - bucketedType = inputColumn->hiveType(); - break; - } - VELOX_USER_CHECK_NOT_NULL( - bucketedType, "Bucketed column {} not found", bucketedColumn); - bucketedTypes.push_back(std::move(bucketedType)); - } - } else { - VELOX_USER_CHECK_EQ( - bucketProperty->types->size(), - bucketProperty->bucketedBy.size(), - "Bucketed types is not set properly for presto native bucket function: {}", - toJsonString(*bucketProperty)); - bucketedTypes = stringToTypes(bucketProperty->types, typeParser); - } - - const auto sortedBy = toHiveSortingColumns(bucketProperty->sortedBy); - - return std::make_shared( - toHiveBucketPropertyKind(bucketProperty->bucketFunctionType), - bucketProperty->bucketCount, - bucketProperty->bucketedBy, - bucketedTypes, - sortedBy); -} - -std::unique_ptr -toVeloxHiveColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) { - auto* hiveColumn = - dynamic_cast(column); - VELOX_CHECK_NOT_NULL( - hiveColumn, "Unexpected column handle type {}", column->_type); - velox::type::fbhive::HiveTypeParser hiveTypeParser; - // TODO(spershin): Should we pass something different than 'typeSignature' - // to 'hiveType' argument of the 'HiveColumnHandle' constructor? - return std::make_unique( - hiveColumn->name, - toHiveColumnType(hiveColumn->columnType), - stringToType(hiveColumn->typeSignature, typeParser), - hiveTypeParser.parse(hiveColumn->hiveType), - toRequiredSubfields(hiveColumn->requiredSubfields)); -} - -velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( - const protocol::hive::BucketConversion& bucketConversion) { - velox::connector::hive::HiveBucketConversion veloxBucketConversion; - // Current table bucket count (new). - veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount; - // Partition bucket count (old). - veloxBucketConversion.partitionBucketCount = - bucketConversion.partitionBucketCount; - TypeParser typeParser; - for (const auto& column : bucketConversion.bucketColumnHandles) { - // Columns used as bucket input. - veloxBucketConversion.bucketColumnHandles.push_back( - toVeloxHiveColumnHandle(&column, typeParser)); - } - return veloxBucketConversion; -} - -} // namespace - -std::vector toRequiredSubfields( - const protocol::List& subfields) { - std::vector result; - result.reserve(subfields.size()); - for (auto& subfield : subfields) { - result.emplace_back(subfield); - } - return result; -} - -connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( - protocol::hive::ColumnType type) { - switch (type) { - case protocol::hive::ColumnType::PARTITION_KEY: - return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; - case protocol::hive::ColumnType::REGULAR: - return connector::hive::HiveColumnHandle::ColumnType::kRegular; - case protocol::hive::ColumnType::SYNTHESIZED: - return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; - default: - VELOX_UNSUPPORTED( - "Unsupported Hive column type: {}.", toJsonString(type)); - } -} - -std::unique_ptr toHiveTableHandle( - const protocol::TupleDomain& domainPredicate, - const std::shared_ptr& remainingPredicate, - bool isPushdownFilterEnabled, - const std::string& tableName, - const protocol::List& dataColumns, - const protocol::TableHandle& tableHandle, - const protocol::Map& tableParameters, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser) { - common::SubfieldFilters subfieldFilters; - auto domains = domainPredicate.domains; - for (const auto& domain : *domains) { - auto filter = domain.second; - subfieldFilters[common::Subfield(domain.first)] = - toFilter(domain.second, exprConverter, typeParser); - } - - auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate); - if (auto constant = std::dynamic_pointer_cast( - remainingFilter)) { - bool value = constant->value().value(); - VELOX_CHECK(value, "Unexpected always-false remaining predicate"); - - // Use null for always-true filter. - remainingFilter = nullptr; - } - - RowTypePtr finalDataColumns; - if (!dataColumns.empty()) { - std::vector names; - std::vector types; - velox::type::fbhive::HiveTypeParser hiveTypeParser; - names.reserve(dataColumns.size()); - types.reserve(dataColumns.size()); - for (auto& column : dataColumns) { - std::string name = column.name; - folly::toLowerAscii(name); - names.emplace_back(std::move(name)); - auto parsedType = hiveTypeParser.parse(column.type); - // The type from the metastore may have upper case letters - // in field names, convert them all to lower case to be - // compatible with Presto. - types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, parsedType->kind(), parsedType)); - } - finalDataColumns = ROW(std::move(names), std::move(types)); - } - - if (tableParameters.empty()) { - return std::make_unique( - tableHandle.connectorId, - tableName, - isPushdownFilterEnabled, - std::move(subfieldFilters), - remainingFilter, - finalDataColumns); - } - - std::unordered_map finalTableParameters = {}; - finalTableParameters.reserve(tableParameters.size()); - for (const auto& [key, value] : tableParameters) { - finalTableParameters[key] = value; - } - - return std::make_unique( - tableHandle.connectorId, - tableName, - isPushdownFilterEnabled, - std::move(subfieldFilters), - remainingFilter, - finalDataColumns, - finalTableParameters); -} - -velox::common::CompressionKind toFileCompressionKind( - const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) { - switch (hiveCompressionCodec) { - case protocol::hive::HiveCompressionCodec::SNAPPY: - return velox::common::CompressionKind::CompressionKind_SNAPPY; - case protocol::hive::HiveCompressionCodec::GZIP: - return velox::common::CompressionKind::CompressionKind_GZIP; - case protocol::hive::HiveCompressionCodec::LZ4: - return velox::common::CompressionKind::CompressionKind_LZ4; - case protocol::hive::HiveCompressionCodec::ZSTD: - return velox::common::CompressionKind::CompressionKind_ZSTD; - case protocol::hive::HiveCompressionCodec::NONE: - return velox::common::CompressionKind::CompressionKind_NONE; - default: - VELOX_UNSUPPORTED( - "Unsupported file compression format: {}.", - toJsonString(hiveCompressionCodec)); - } -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit, - const protocol::SplitContext* splitContext) const { - auto hiveSplit = - dynamic_cast(connectorSplit); - VELOX_CHECK_NOT_NULL( - hiveSplit, "Unexpected split type {}", connectorSplit->_type); - std::unordered_map> partitionKeys; - for (const auto& entry : hiveSplit->partitionKeys) { - partitionKeys.emplace( - entry.name, - entry.value == nullptr ? std::nullopt - : std::optional{*entry.value}); - } - std::unordered_map customSplitInfo; - for (const auto& [key, value] : hiveSplit->fileSplit.customSplitInfo) { - customSplitInfo[key] = value; - } - std::shared_ptr extraFileInfo; - if (hiveSplit->fileSplit.extraFileInfo) { - extraFileInfo = std::make_shared( - velox::encoding::Base64::decode(*hiveSplit->fileSplit.extraFileInfo)); - } - std::unordered_map serdeParameters; - serdeParameters.reserve(hiveSplit->storage.serdeParameters.size()); - for (const auto& [key, value] : hiveSplit->storage.serdeParameters) { - serdeParameters[key] = value; - } - std::unordered_map infoColumns = { - {"$path", hiveSplit->fileSplit.path}, - {"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)}, - {"$file_modified_time", - std::to_string(hiveSplit->fileSplit.fileModifiedTime)}, - }; - if (hiveSplit->tableBucketNumber) { - infoColumns["$bucket"] = std::to_string(*hiveSplit->tableBucketNumber); - } - auto veloxSplit = - std::make_unique( - catalogId, - hiveSplit->fileSplit.path, - toVeloxFileFormat(hiveSplit->storage.storageFormat), - hiveSplit->fileSplit.start, - hiveSplit->fileSplit.length, - partitionKeys, - hiveSplit->tableBucketNumber - ? std::optional(*hiveSplit->tableBucketNumber) - : std::nullopt, - customSplitInfo, - extraFileInfo, - serdeParameters, - hiveSplit->splitWeight, - splitContext->cacheable, - infoColumns); - if (hiveSplit->bucketConversion) { - VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); - veloxSplit->bucketConversion = - toVeloxBucketConversion(*hiveSplit->bucketConversion); - } - return veloxSplit; -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const { - return toVeloxHiveColumnHandle(column, typeParser); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) const { - auto addSynthesizedColumn = [&](const std::string& name, - protocol::hive::ColumnType columnType, - const protocol::ColumnHandle& column) { - if (toHiveColumnType(columnType) == - velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { - if (assignments.count(name) == 0) { - assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); - } - } - }; - auto hiveLayout = - std::dynamic_pointer_cast( - tableHandle.connectorTableLayout); - VELOX_CHECK_NOT_NULL( - hiveLayout, - "Unexpected layout type {}", - tableHandle.connectorTableLayout->_type); - for (const auto& entry : hiveLayout->partitionColumns) { - assignments.emplace(entry.name, toVeloxColumnHandle(&entry, typeParser)); - } - - // Add synthesized columns to the TableScanNode columnHandles as well. - for (const auto& entry : hiveLayout->predicateColumns) { - addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); - } - - auto hiveTableHandle = - std::dynamic_pointer_cast( - tableHandle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveTableHandle, - "Unexpected table handle type {}", - tableHandle.connectorHandle->_type); - - // Use fully qualified name if available. - std::string tableName = hiveTableHandle->schemaName.empty() - ? hiveTableHandle->tableName - : fmt::format( - "{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName); - - return toHiveTableHandle( - hiveLayout->domainPredicate, - hiveLayout->remainingPredicate, - hiveLayout->pushdownFilterEnabled, - tableName, - hiveLayout->dataColumns, - tableHandle, - hiveLayout->tableParameters, - exprConverter, - typeParser); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxInsertTableHandle( - const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const { - auto hiveOutputTableHandle = - std::dynamic_pointer_cast( - createHandle->handle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveOutputTableHandle, - "Unexpected output table handle type {}", - createHandle->handle.connectorHandle->_type); - bool isPartitioned{false}; - const auto inputColumns = toHiveColumns( - hiveOutputTableHandle->inputColumns, typeParser, isPartitioned); - return std::make_unique( - inputColumns, - toLocationHandle(hiveOutputTableHandle->locationHandle), - toFileFormat(hiveOutputTableHandle->actualStorageFormat, "TableWrite"), - toHiveBucketProperty( - inputColumns, hiveOutputTableHandle->bucketProperty, typeParser), - std::optional( - toFileCompressionKind(hiveOutputTableHandle->compressionCodec))); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxInsertTableHandle( - const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const { - auto hiveInsertTableHandle = - std::dynamic_pointer_cast( - insertHandle->handle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveInsertTableHandle, - "Unexpected insert table handle type {}", - insertHandle->handle.connectorHandle->_type); - bool isPartitioned{false}; - const auto inputColumns = toHiveColumns( - hiveInsertTableHandle->inputColumns, typeParser, isPartitioned); - - const auto table = hiveInsertTableHandle->pageSinkMetadata.table; - VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); - return std::make_unique( - inputColumns, - toLocationHandle(hiveInsertTableHandle->locationHandle), - toFileFormat(hiveInsertTableHandle->actualStorageFormat, "TableWrite"), - toHiveBucketProperty( - inputColumns, hiveInsertTableHandle->bucketProperty, typeParser), - std::optional( - toFileCompressionKind(hiveInsertTableHandle->compressionCodec)), - std::unordered_map( - table->storage.serdeParameters.begin(), - table->storage.serdeParameters.end())); -} - -std::vector> -HivePrestoToVeloxConnector::toHiveColumns( - const protocol::List& inputColumns, - const TypeParser& typeParser, - bool& hasPartitionColumn) const { - hasPartitionColumn = false; - std::vector> - hiveColumns; - hiveColumns.reserve(inputColumns.size()); - for (const auto& columnHandle : inputColumns) { - hasPartitionColumn |= - columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY; - hiveColumns.emplace_back( - std::dynamic_pointer_cast( - std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); - } - return hiveColumns; -} - -std::unique_ptr -HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( - const protocol::ConnectorPartitioningHandle* partitioningHandle, - const std::vector& bucketToPartition, - const std::vector& channels, - const std::vector& constValues, - bool& effectivelyGather) const { - auto hivePartitioningHandle = - dynamic_cast( - partitioningHandle); - VELOX_CHECK_NOT_NULL( - hivePartitioningHandle, - "Unexpected partitioning handle type {}", - partitioningHandle->_type); - VELOX_USER_CHECK( - hivePartitioningHandle->bucketFunctionType == - protocol::hive::BucketFunctionType::HIVE_COMPATIBLE, - "Unsupported Hive bucket function type: {}", - toJsonString(hivePartitioningHandle->bucketFunctionType)); - effectivelyGather = hivePartitioningHandle->bucketCount == 1; - return std::make_unique( - hivePartitioningHandle->bucketCount, - bucketToPartition, - channels, - constValues); -} - -std::unique_ptr -HivePrestoToVeloxConnector::createConnectorProtocol() const { - return std::make_unique(); -} - std::unique_ptr TpchPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h index b9b80b6d7ddc3..2c8fafa333951 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h @@ -35,26 +35,6 @@ void unregisterPrestoToVeloxConnector(const std::string& connectorName); const PrestoToVeloxConnector& getPrestoToVeloxConnector( const std::string& connectorName); -std::vector toRequiredSubfields( - const protocol::List& subfields); - -velox::connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( - protocol::hive::ColumnType type); - -std::unique_ptr toHiveTableHandle( - const protocol::TupleDomain& domainPredicate, - const std::shared_ptr& remainingPredicate, - bool isPushdownFilterEnabled, - const std::string& tableName, - const protocol::List& dataColumns, - const protocol::TableHandle& tableHandle, - const protocol::Map& tableParameters, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser); - -velox::common::CompressionKind toFileCompressionKind( - const protocol::hive::HiveCompressionCodec& hiveCompressionCodec); - class PrestoToVeloxConnector { public: virtual ~PrestoToVeloxConnector() = default; @@ -136,56 +116,6 @@ class PrestoToVeloxConnector { const std::string connectorName_; }; -class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { - public: - explicit HivePrestoToVeloxConnector(std::string connectorName) - : PrestoToVeloxConnector(std::move(connectorName)) {} - - std::unique_ptr toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit, - const protocol::SplitContext* splitContext) const final; - - std::unique_ptr toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const final; - - std::unique_ptr toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - velox::connector::ColumnHandleMap& assignments) - const final; - - std::unique_ptr - toVeloxInsertTableHandle( - const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const final; - - std::unique_ptr - toVeloxInsertTableHandle( - const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const final; - - std::unique_ptr - createVeloxPartitionFunctionSpec( - const protocol::ConnectorPartitioningHandle* partitioningHandle, - const std::vector& bucketToPartition, - const std::vector& channels, - const std::vector& constValues, - bool& effectivelyGather) const final; - - std::unique_ptr createConnectorProtocol() - const final; - - private: - std::vector> - toHiveColumns( - const protocol::List& inputColumns, - const TypeParser& typeParser, - bool& hasPartitionColumn) const; -}; - class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { public: explicit TpchPrestoToVeloxConnector(std::string connectorName) diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp index 5dbf73b9cb869..3845fa4ce7ad9 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp @@ -695,4 +695,66 @@ std::unique_ptr toFilter( VELOX_UNSUPPORTED("Unsupported filter found."); } +std::vector toRequiredSubfields( + const protocol::List& subfields) { + std::vector result; + result.reserve(subfields.size()); + for (auto& subfield : subfields) { + result.emplace_back(subfield); + } + return result; +} + +velox::common::CompressionKind toFileCompressionKind( + const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) { + switch (hiveCompressionCodec) { + case protocol::hive::HiveCompressionCodec::SNAPPY: + return velox::common::CompressionKind::CompressionKind_SNAPPY; + case protocol::hive::HiveCompressionCodec::GZIP: + return velox::common::CompressionKind::CompressionKind_GZIP; + case protocol::hive::HiveCompressionCodec::LZ4: + return velox::common::CompressionKind::CompressionKind_LZ4; + case protocol::hive::HiveCompressionCodec::ZSTD: + return velox::common::CompressionKind::CompressionKind_ZSTD; + case protocol::hive::HiveCompressionCodec::NONE: + return velox::common::CompressionKind::CompressionKind_NONE; + default: + VELOX_UNSUPPORTED( + "Unsupported file compression format: {}.", + toJsonString(hiveCompressionCodec)); + } +} + +dwio::common::FileFormat toVeloxFileFormat( + const presto::protocol::hive::StorageFormat& format) { + if (format.inputFormat == "com.facebook.hive.orc.OrcInputFormat") { + return dwio::common::FileFormat::DWRF; + } else if ( + format.inputFormat == "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat") { + return dwio::common::FileFormat::ORC; + } else if ( + format.inputFormat == + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") { + return dwio::common::FileFormat::PARQUET; + } else if (format.inputFormat == "org.apache.hadoop.mapred.TextInputFormat") { + if (format.serDe == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + return dwio::common::FileFormat::TEXT; + } else if (format.serDe == "org.apache.hive.hcatalog.data.JsonSerDe") { + return dwio::common::FileFormat::JSON; + } + } else if ( + format.inputFormat == + "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") { + if (format.serDe == + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { + return dwio::common::FileFormat::PARQUET; + } + } else if (format.inputFormat == "com.facebook.alpha.AlphaInputFormat") { + // ALPHA has been renamed in Velox to NIMBLE. + return dwio::common::FileFormat::NIMBLE; + } + VELOX_UNSUPPORTED( + "Unsupported file format: {} {}", format.inputFormat, format.serDe); +} + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h index ef701f87423a2..29adcc7c339f3 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h @@ -14,7 +14,9 @@ #pragma once #include "presto_cpp/main/types/PrestoToVeloxExpr.h" +#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" #include "presto_cpp/presto_protocol/core/presto_protocol_core.h" +#include "velox/dwio/common/Options.h" #include "velox/type/Filter.h" #include "velox/type/Type.h" @@ -32,4 +34,18 @@ std::unique_ptr toFilter( const VeloxExprConverter& exprConverter, const TypeParser& typeParser); +template +std::string toJsonString(const T& value) { + return ((json)value).dump(); +} + +std::vector toRequiredSubfields( + const protocol::List& subfields); + +velox::common::CompressionKind toFileCompressionKind( + const protocol::hive::HiveCompressionCodec& hiveCompressionCodec); + +velox::dwio::common::FileFormat toVeloxFileFormat( + const presto::protocol::hive::StorageFormat& format); + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/Registration.cpp b/presto-native-execution/presto_cpp/main/connectors/Registration.cpp index 267272a7a690f..5dc286d2b3b05 100644 --- a/presto-native-execution/presto_cpp/main/connectors/Registration.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/Registration.cpp @@ -13,6 +13,7 @@ */ #include "presto_cpp/main/connectors/Registration.h" #include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include "presto_cpp/main/connectors/SystemConnector.h" #ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR diff --git a/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp index b87a4df4b32f8..38b8150634d0f 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp @@ -16,7 +16,7 @@ #include "presto_cpp/main/thrift/ProtocolToThrift.h" #include "presto_cpp/presto_protocol/core/Duration.h" #include "presto_cpp/main/common/tests/test_json.h" -#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" using namespace facebook::presto; diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index 1b8c33ff04d92..fc8cd9575862b 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -20,6 +20,7 @@ #include "presto_cpp/main/TaskResource.h" #include "presto_cpp/main/common/tests/MutableConfigs.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include "presto_cpp/main/tests/HttpServerWrapper.h" #include "velox/common/base/Fs.h" #include "velox/common/base/tests/GTestUtils.h" diff --git a/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp index 43bc9743235bc..065ce62996e2e 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp @@ -17,6 +17,7 @@ #include "presto_cpp/main/thrift/ThriftIO.h" #include "presto_cpp/main/common/tests/test_json.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" using namespace facebook::presto; diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index e649d2425ed84..e507fe4d5cef5 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -74,11 +74,6 @@ RowTypePtr toRowType( return ROW(std::move(names), std::move(types)); } -template -std::string toJsonString(const T& value) { - return (static_cast(value)).dump(); -} - std::shared_ptr toColumnHandle( const protocol::ColumnHandle* column, const TypeParser& typeParser) { diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index b4a5bc4590b1e..f139db590a889 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -14,7 +14,7 @@ #include #include "presto_cpp/main/common/tests/test_json.h" -#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleRead.h" diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp index 5c3364aa7cced..1f9ced81d675a 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp @@ -12,7 +12,7 @@ * limitations under the License. */ -#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include #include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" #include "presto_cpp/main/types/PrestoToVeloxExpr.h" diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp index 9bcbf9b4f0542..e0ee5755e3bd5 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp @@ -13,7 +13,7 @@ */ #include "presto_cpp/main/types/PrestoToVeloxSplit.h" #include -#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" using namespace facebook::velox;