Skip to content

Commit b59a17f

Browse files
committed
[native] Support the execution of calling distributed procedures
1 parent 220ed80 commit b59a17f

File tree

6 files changed

+101
-1
lines changed

6 files changed

+101
-1
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,34 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
270270
typeParser);
271271
}
272272

273+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
274+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
275+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
276+
const TypeParser& typeParser) const {
277+
auto icebergDistributedProcedureHandle =
278+
std::dynamic_pointer_cast<protocol::iceberg::IcebergDistributedProcedureHandle>(
279+
executeProcedureHandle->handle.connectorHandle);
280+
281+
VELOX_CHECK_NOT_NULL(
282+
icebergDistributedProcedureHandle,
283+
"Unexpected call distributed procedure handle type {}",
284+
executeProcedureHandle->handle.connectorHandle->_type);
285+
286+
const auto inputColumns =
287+
toHiveColumns(icebergDistributedProcedureHandle->inputColumns, typeParser);
288+
289+
return std::make_unique<
290+
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
291+
inputColumns,
292+
std::make_shared<velox::connector::hive::LocationHandle>(
293+
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
294+
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
295+
velox::connector::hive::LocationHandle::TableType::kExisting),
296+
toVeloxFileFormat(icebergDistributedProcedureHandle->fileFormat),
297+
std::optional(
298+
toFileCompressionKind(icebergDistributedProcedureHandle->compressionCodec)));
299+
}
300+
273301
std::unique_ptr<protocol::ConnectorProtocol>
274302
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
275303
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
3939
const TypeParser& typeParser,
4040
velox::connector::ColumnHandleMap& assignments) const final;
4141

42+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle> toVeloxInsertTableHandle(
43+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
44+
const TypeParser& typeParser) const final;
45+
4246
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
4347
const final;
4448

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ class PrestoToVeloxConnector {
106106
return {};
107107
}
108108

109+
[[nodiscard]] virtual std::unique_ptr<
110+
velox::connector::ConnectorInsertTableHandle>
111+
toVeloxInsertTableHandle(
112+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
113+
const TypeParser& typeParser) const {
114+
return {};
115+
}
116+
109117
[[nodiscard]] std::unique_ptr<velox::core::PartitionFunctionSpec>
110118
createVeloxPartitionFunctionSpec(
111119
const protocol::ConnectorPartitioningHandle* partitioningHandle,

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,57 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15411541
sourceVeloxPlan);
15421542
}
15431543

1544+
std::shared_ptr<const core::TableWriteNode>
1545+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1546+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
1547+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1548+
const protocol::TaskId& taskId) {
1549+
const auto executeProcedureHandle = std::dynamic_pointer_cast<protocol::ExecuteProcedureHandle>(tableWriteInfo->writerTarget);
1550+
1551+
if (!executeProcedureHandle) {
1552+
VELOX_UNSUPPORTED(
1553+
"Unsupported execute procedure handle: {}",
1554+
toJsonString(tableWriteInfo->writerTarget));
1555+
}
1556+
1557+
std::string connectorId = executeProcedureHandle->handle.connectorId;
1558+
auto& connector =
1559+
getPrestoToVeloxConnector(executeProcedureHandle->handle.connectorHandle->_type);
1560+
auto veloxHandle = connector.toVeloxInsertTableHandle(
1561+
executeProcedureHandle.get(), typeParser_);
1562+
auto connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
1563+
1564+
if (!connectorInsertHandle) {
1565+
VELOX_UNSUPPORTED(
1566+
"Unsupported execute procedure handle: {}",
1567+
toJsonString(tableWriteInfo->writerTarget));
1568+
}
1569+
1570+
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
1571+
connectorId, connectorInsertHandle);
1572+
1573+
const auto outputType = toRowType(
1574+
generateOutputVariables(
1575+
{node->rowCountVariable,
1576+
node->fragmentVariable,
1577+
node->tableCommitContextVariable},
1578+
nullptr),
1579+
typeParser_);
1580+
const auto sourceVeloxPlan =
1581+
toVeloxQueryPlan(node->source, tableWriteInfo, taskId);
1582+
1583+
return std::make_shared<core::TableWriteNode>(
1584+
node->id,
1585+
toRowType(node->columns, typeParser_),
1586+
node->columnNames,
1587+
std::nullopt,
1588+
std::move(insertTableHandle),
1589+
node->partitioningScheme != nullptr,
1590+
outputType,
1591+
getCommitStrategy(),
1592+
sourceVeloxPlan);
1593+
}
1594+
15441595
std::shared_ptr<const core::TableWriteNode>
15451596
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15461597
const std::shared_ptr<const protocol::DeleteNode>& node,
@@ -1929,6 +1980,10 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
19291980
std::dynamic_pointer_cast<const protocol::TableWriterNode>(node)) {
19301981
return toVeloxQueryPlan(tableWriter, tableWriteInfo, taskId);
19311982
}
1983+
if (auto callDistributedProcedure =
1984+
std::dynamic_pointer_cast<const protocol::CallDistributedProcedureNode>(node)) {
1985+
return toVeloxQueryPlan(callDistributedProcedure, tableWriteInfo, taskId);
1986+
}
19321987
if (auto deleteNode =
19331988
std::dynamic_pointer_cast<const protocol::DeleteNode>(node)) {
19341989
return toVeloxQueryPlan(deleteNode, tableWriteInfo, taskId);

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ class VeloxQueryPlanConverterBase {
160160
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
161161
const protocol::TaskId& taskId);
162162

163+
std::shared_ptr<const velox::core::TableWriteNode> toVeloxQueryPlan(
164+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
165+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
166+
const protocol::TaskId& taskId);
167+
163168
std::shared_ptr<const velox::core::TableWriteMergeNode> toVeloxQueryPlan(
164169
const std::shared_ptr<const protocol::TableWriterMergeNode>& node,
165170
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
2727
IcebergSplit,
2828
NotImplemented,
2929
hive::HiveTransactionHandle,
30-
NotImplemented,
30+
IcebergDistributedProcedureHandle,
3131
NotImplemented,
3232
NotImplemented>;
3333

0 commit comments

Comments
 (0)