Skip to content

Commit db906b2

Browse files
committed
[native] Prepare for the actual support of distributed procedures
1 parent 1c0e43e commit db906b2

File tree

9 files changed

+102
-1
lines changed

9 files changed

+102
-1
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,14 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
269269
typeParser);
270270
}
271271

272+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
273+
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
274+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
275+
const TypeParser& typeParser) const {
276+
// TODO: requires data insertion support
277+
VELOX_FAIL("Not yet supported, requires data insertion support first");
278+
}
279+
272280
std::unique_ptr<protocol::ConnectorProtocol>
273281
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
274282
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
3838
const TypeParser& typeParser,
3939
velox::connector::ColumnHandleMap& assignments) const final;
4040

41+
std::unique_ptr<velox::connector::ConnectorInsertTableHandle> toVeloxInsertTableHandle(
42+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
43+
const TypeParser& typeParser) const final;
44+
4145
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
4246
const final;
4347
};

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ class PrestoToVeloxConnector {
103103
return {};
104104
}
105105

106+
[[nodiscard]] virtual std::unique_ptr<
107+
velox::connector::ConnectorInsertTableHandle>
108+
toVeloxInsertTableHandle(
109+
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
110+
const TypeParser& typeParser) const {
111+
return {};
112+
}
113+
106114
[[nodiscard]] std::unique_ptr<velox::core::PartitionFunctionSpec>
107115
createVeloxPartitionFunctionSpec(
108116
const protocol::ConnectorPartitioningHandle* partitioningHandle,

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,57 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15411541
sourceVeloxPlan);
15421542
}
15431543

1544+
std::shared_ptr<const core::TableWriteNode>
1545+
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
1546+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
1547+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
1548+
const protocol::TaskId& taskId) {
1549+
const auto executeProcedureHandle = std::dynamic_pointer_cast<protocol::ExecuteProcedureHandle>(tableWriteInfo->writerTarget);
1550+
1551+
if (!executeProcedureHandle) {
1552+
VELOX_UNSUPPORTED(
1553+
"Unsupported execute procedure handle: {}",
1554+
toJsonString(tableWriteInfo->writerTarget));
1555+
}
1556+
1557+
std::string connectorId = executeProcedureHandle->handle.connectorId;
1558+
auto& connector =
1559+
getPrestoToVeloxConnector(executeProcedureHandle->handle.connectorHandle->_type);
1560+
auto veloxHandle = connector.toVeloxInsertTableHandle(
1561+
executeProcedureHandle.get(), typeParser_);
1562+
auto connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));
1563+
1564+
if (!connectorInsertHandle) {
1565+
VELOX_UNSUPPORTED(
1566+
"Unsupported execute procedure handle: {}",
1567+
toJsonString(tableWriteInfo->writerTarget));
1568+
}
1569+
1570+
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
1571+
connectorId, connectorInsertHandle);
1572+
1573+
const auto outputType = toRowType(
1574+
generateOutputVariables(
1575+
{node->rowCountVariable,
1576+
node->fragmentVariable,
1577+
node->tableCommitContextVariable},
1578+
nullptr),
1579+
typeParser_);
1580+
const auto sourceVeloxPlan =
1581+
toVeloxQueryPlan(node->source, tableWriteInfo, taskId);
1582+
1583+
return std::make_shared<core::TableWriteNode>(
1584+
node->id,
1585+
toRowType(node->columns, typeParser_),
1586+
node->columnNames,
1587+
std::nullopt,
1588+
std::move(insertTableHandle),
1589+
node->partitioningScheme != nullptr,
1590+
outputType,
1591+
getCommitStrategy(),
1592+
sourceVeloxPlan);
1593+
}
1594+
15441595
std::shared_ptr<const core::TableWriteNode>
15451596
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
15461597
const std::shared_ptr<const protocol::DeleteNode>& node,
@@ -1929,6 +1980,10 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
19291980
std::dynamic_pointer_cast<const protocol::TableWriterNode>(node)) {
19301981
return toVeloxQueryPlan(tableWriter, tableWriteInfo, taskId);
19311982
}
1983+
if (auto callDistributedProcedure =
1984+
std::dynamic_pointer_cast<const protocol::CallDistributedProcedureNode>(node)) {
1985+
return toVeloxQueryPlan(callDistributedProcedure, tableWriteInfo, taskId);
1986+
}
19321987
if (auto deleteNode =
19331988
std::dynamic_pointer_cast<const protocol::DeleteNode>(node)) {
19341989
return toVeloxQueryPlan(deleteNode, tableWriteInfo, taskId);

presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ class VeloxQueryPlanConverterBase {
160160
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
161161
const protocol::TaskId& taskId);
162162

163+
std::shared_ptr<const velox::core::TableWriteNode> toVeloxQueryPlan(
164+
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
165+
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
166+
const protocol::TaskId& taskId);
167+
163168
std::shared_ptr<const velox::core::TableWriteMergeNode> toVeloxQueryPlan(
164169
const std::shared_ptr<const protocol::TableWriterMergeNode>& node,
165170
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,

presto-native-execution/presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ using HiveConnectorProtocol = ConnectorProtocolTemplate<
2626
HivePartitioningHandle,
2727
HiveTransactionHandle,
2828
NotImplemented,
29+
NotImplemented,
2930
NotImplemented>;
3031
} // namespace facebook::presto::protocol::hive

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
2727
IcebergSplit,
2828
NotImplemented,
2929
hive::HiveTransactionHandle,
30+
IcebergDistributedProcedureHandle,
3031
NotImplemented,
3132
NotImplemented>;
3233

presto-native-execution/presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ using TpchConnectorProtocol = ConnectorProtocolTemplate<
2929
TpchPartitioningHandle,
3030
TpchTransactionHandle,
3131
NotImplemented,
32+
NotImplemented,
3233
NotImplemented>;
3334

3435
} // namespace facebook::presto::protocol::tpch

presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ class ConnectorProtocol {
7575
const std::string& thrift,
7676
std::shared_ptr<ConnectorInsertTableHandle>& proto) const = 0;
7777

78+
virtual void to_json(
79+
json& j,
80+
const std::shared_ptr<ConnectorDistributedProcedureHandle>& p) const = 0;
81+
virtual void from_json(
82+
const json& j,
83+
std::shared_ptr<ConnectorDistributedProcedureHandle>& p) const = 0;
84+
7885
virtual void to_json(
7986
json& j,
8087
const std::shared_ptr<ConnectorOutputTableHandle>& p) const = 0;
@@ -117,7 +124,7 @@ class ConnectorProtocol {
117124
std::string& thrift) const = 0;
118125
virtual void deserialize(
119126
const std::string& thrift,
120-
std::shared_ptr<ConnectorTransactionHandle>& proto) const = 0;
127+
std::shared_ptr<ConnectorTransactionHandle>& proto) const = 0;
121128

122129
virtual void to_json(
123130
json& j,
@@ -153,6 +160,7 @@ template <
153160
typename ConnectorSplitType = NotImplemented,
154161
typename ConnectorPartitioningHandleType = NotImplemented,
155162
typename ConnectorTransactionHandleType = NotImplemented,
163+
typename ConnectorDistributedProcedureHandleType = NotImplemented,
156164
typename ConnectorDeleteTableHandleType = NotImplemented,
157165
typename ConnectorIndexHandleType = NotImplemented>
158166
class ConnectorProtocolTemplate final : public ConnectorProtocol {
@@ -221,6 +229,15 @@ class ConnectorProtocolTemplate final : public ConnectorProtocol {
221229
deserializeTemplate<ConnectorInsertTableHandleType>(thrift, proto);
222230
}
223231

232+
void to_json(json& j, const std::shared_ptr<ConnectorDistributedProcedureHandle>& p)
233+
const final {
234+
to_json_template<ConnectorDistributedProcedureHandleType>(j, p);
235+
}
236+
void from_json(const json& j, std::shared_ptr<ConnectorDistributedProcedureHandle>& p)
237+
const final {
238+
from_json_template<ConnectorDistributedProcedureHandleType>(j, p);
239+
}
240+
224241
void to_json(json& j, const std::shared_ptr<ConnectorOutputTableHandle>& p)
225242
const final {
226243
to_json_template<ConnectorOutputTableHandleType>(j, p);
@@ -406,6 +423,7 @@ using SystemConnectorProtocol = ConnectorProtocolTemplate<
406423
SystemPartitioningHandle,
407424
SystemTransactionHandle,
408425
NotImplemented,
426+
NotImplemented,
409427
NotImplemented>;
410428

411429
} // namespace facebook::presto::protocol

0 commit comments

Comments
 (0)