Skip to content

Commit de9bf1e

Browse files
authored
Merge pull request #42 from mhaseeb123/fea/cudf-parquet-connector
Add a new `ParquetConnector` and `ParquetDataSource` to read Parquet tables via cuDF backend
2 parents 1962974 + 18f6c35 commit de9bf1e

20 files changed

+1934
-8
lines changed

velox/experimental/cudf/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
add_subdirectory(exec)
16+
add_subdirectory(connectors)
1617
add_subdirectory(vector)
1718

1819
if(VELOX_BUILD_TESTING)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
add_subdirectory(parquet)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
add_library(velox_cudf_parquet_reader_config ParquetReaderConfig.cpp)
16+
17+
set_target_properties(
18+
velox_cudf_parquet_reader_config
19+
PROPERTIES CUDA_ARCHITECTURES native)
20+
21+
target_link_libraries(
22+
velox_cudf_parquet_reader_config velox_core velox_exception cudf::cudf)
23+
24+
add_library(
25+
velox_cudf_parquet_connector OBJECT
26+
ParquetReaderConfig.cpp
27+
ParquetConnector.cpp
28+
ParquetConnectorSplit.cpp
29+
ParquetDataSource.cpp
30+
ParquetTableHandle.cpp)
31+
32+
set_target_properties(
33+
velox_cudf_parquet_connector
34+
PROPERTIES CUDA_ARCHITECTURES native)
35+
36+
target_link_libraries(
37+
velox_cudf_parquet_connector
38+
PRIVATE
39+
cudf::cudf
40+
velox_cudf_exec
41+
velox_common_io
42+
velox_connector
43+
velox_type_tz
44+
velox_gcs)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/experimental/cudf/connectors/parquet/ParquetConnector.h"
18+
#include "velox/experimental/cudf/connectors/parquet/ParquetDataSource.h"
19+
20+
namespace facebook::velox::cudf_velox::connector::parquet {
21+
22+
using namespace facebook::velox::connector;
23+
24+
ParquetConnector::ParquetConnector(
25+
const std::string& id,
26+
std::shared_ptr<const facebook::velox::config::ConfigBase> config,
27+
folly::Executor* executor)
28+
: Connector(id),
29+
ParquetReaderConfig_(std::make_shared<ParquetReaderConfig>(config)),
30+
executor_(executor) {
31+
LOG(INFO) << "cudf::Parquet connector " << connectorId() << " created.";
32+
}
33+
34+
std::unique_ptr<DataSource> ParquetConnector::createDataSource(
35+
const std::shared_ptr<const RowType>& outputType,
36+
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
37+
const std::unordered_map<std::string, std::shared_ptr<ColumnHandle>>&
38+
columnHandles,
39+
ConnectorQueryCtx* connectorQueryCtx) {
40+
return std::make_unique<ParquetDataSource>(
41+
outputType,
42+
tableHandle,
43+
columnHandles,
44+
executor_,
45+
connectorQueryCtx,
46+
ParquetReaderConfig_);
47+
}
48+
49+
std::shared_ptr<Connector> ParquetConnectorFactory::newConnector(
50+
const std::string& id,
51+
std::shared_ptr<const facebook::velox::config::ConfigBase> config,
52+
folly::Executor* executor) {
53+
return std::make_shared<ParquetConnector>(id, config, executor);
54+
}
55+
56+
} // namespace facebook::velox::cudf_velox::connector::parquet
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/Connector.h"
20+
#include "velox/experimental/cudf/connectors/parquet/ParquetDataSource.h"
21+
#include "velox/experimental/cudf/connectors/parquet/ParquetReaderConfig.h"
22+
#include "velox/experimental/cudf/connectors/parquet/ParquetTableHandle.h"
23+
24+
#include <cudf/io/parquet.hpp>
25+
#include <cudf/io/types.hpp>
26+
#include <cudf/types.hpp>
27+
28+
namespace facebook::velox::cudf_velox::connector::parquet {
29+
30+
using namespace facebook::velox::connector;
31+
using namespace facebook::velox::config;
32+
33+
class ParquetConnector final : public Connector {
34+
public:
35+
ParquetConnector(
36+
const std::string& id,
37+
std::shared_ptr<const ConfigBase> config,
38+
folly::Executor* executor);
39+
40+
std::unique_ptr<DataSource> createDataSource(
41+
const std::shared_ptr<const RowType>& outputType,
42+
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
43+
const std::unordered_map<std::string, std::shared_ptr<ColumnHandle>>&
44+
columnHandles,
45+
ConnectorQueryCtx* connectorQueryCtx) override final;
46+
47+
const std::shared_ptr<const ConfigBase>& connectorConfig() const override {
48+
return ParquetReaderConfig_->config();
49+
}
50+
51+
std::unique_ptr<DataSink> createDataSink(
52+
RowTypePtr /*inputType*/,
53+
std::shared_ptr<
54+
55+
ConnectorInsertTableHandle> /*connectorInsertTableHandle*/,
56+
ConnectorQueryCtx* /*connectorQueryCtx*/,
57+
CommitStrategy /*commitStrategy*/) override final {
58+
// TODO: Implement cudf parquet writer
59+
VELOX_NYI("cudf::ParquetConnector does not yet support data sink.");
60+
}
61+
62+
folly::Executor* executor() const override {
63+
return executor_;
64+
}
65+
66+
protected:
67+
const std::shared_ptr<ParquetReaderConfig> ParquetReaderConfig_;
68+
folly::Executor* executor_;
69+
};
70+
71+
class ParquetConnectorFactory : public ConnectorFactory {
72+
public:
73+
static constexpr const char* kParquetConnectorName = "parquet";
74+
75+
ParquetConnectorFactory() : ConnectorFactory(kParquetConnectorName) {}
76+
77+
explicit ParquetConnectorFactory(const char* connectorName)
78+
: ConnectorFactory(connectorName) {}
79+
80+
std::shared_ptr<Connector> newConnector(
81+
const std::string& id,
82+
std::shared_ptr<const ConfigBase> config,
83+
folly::Executor* executor = nullptr) override;
84+
};
85+
86+
} // namespace facebook::velox::cudf_velox::connector::parquet
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <string>
18+
19+
#include "velox/experimental/cudf/connectors/parquet/ParquetConnectorSplit.h"
20+
21+
namespace facebook::velox::cudf_velox::connector::parquet {
22+
23+
std::string ParquetConnectorSplit::toString() const {
24+
return fmt::format("Parquet: {}", filePath);
25+
}
26+
27+
std::string ParquetConnectorSplit::getFileName() const {
28+
const auto i = filePath.rfind('/');
29+
return i == std::string::npos ? filePath : filePath.substr(i + 1);
30+
}
31+
32+
// static
33+
std::shared_ptr<ParquetConnectorSplit> ParquetConnectorSplit::create(
34+
const folly::dynamic& obj) {
35+
const auto connectorId = obj["connectorId"].asString();
36+
const auto splitWeight = obj["splitWeight"].asInt();
37+
const auto filePath = obj["filePath"].asString();
38+
39+
return std::make_shared<ParquetConnectorSplit>(
40+
connectorId, filePath, splitWeight);
41+
}
42+
43+
} // namespace facebook::velox::cudf_velox::connector::parquet
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <string>
20+
21+
#include "velox/connectors/Connector.h"
22+
#include "velox/dwio/common/Options.h"
23+
24+
#include <cudf/io/types.hpp>
25+
26+
namespace facebook::velox::cudf_velox::connector::parquet {
27+
28+
struct ParquetConnectorSplit
29+
: public facebook::velox::connector::ConnectorSplit {
30+
const std::string filePath;
31+
const facebook::velox::dwio::common::FileFormat fileFormat{
32+
facebook::velox::dwio::common::FileFormat::PARQUET};
33+
const cudf::io::source_info cudfSourceInfo;
34+
35+
ParquetConnectorSplit(
36+
const std::string& connectorId,
37+
const std::string& _filePath,
38+
int64_t _splitWeight = 0)
39+
: facebook::velox::connector::ConnectorSplit(connectorId, _splitWeight),
40+
filePath(_filePath),
41+
cudfSourceInfo({filePath}) {}
42+
43+
std::string toString() const override;
44+
std::string getFileName() const;
45+
46+
const cudf::io::source_info& getCudfSourceInfo() const {
47+
return cudfSourceInfo;
48+
}
49+
50+
static std::shared_ptr<ParquetConnectorSplit> create(
51+
const folly::dynamic& obj);
52+
};
53+
54+
class ParquetConnectorSplitBuilder {
55+
public:
56+
explicit ParquetConnectorSplitBuilder(std::string filePath)
57+
: filePath_{std::move(filePath)} {}
58+
59+
ParquetConnectorSplitBuilder& splitWeight(int64_t splitWeight) {
60+
splitWeight_ = splitWeight;
61+
return *this;
62+
}
63+
64+
ParquetConnectorSplitBuilder& connectorId(const std::string& connectorId) {
65+
connectorId_ = connectorId;
66+
return *this;
67+
}
68+
69+
std::shared_ptr<ParquetConnectorSplit> build() const {
70+
return std::make_shared<ParquetConnectorSplit>(
71+
connectorId_, filePath_, splitWeight_);
72+
}
73+
74+
private:
75+
const std::string filePath_;
76+
std::string connectorId_;
77+
int64_t splitWeight_{0};
78+
};
79+
80+
} // namespace facebook::velox::cudf_velox::connector::parquet

0 commit comments

Comments
 (0)