Skip to content

Commit a00dfcf

Browse files
committed
Add a streaming read_parquet node
1 parent 59f1583 commit a00dfcf

File tree

3 files changed

+223
-0
lines changed

3 files changed

+223
-0
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ if(RAPIDSMPF_HAVE_STREAMING)
174174
src/streaming/core/leaf_node.cpp
175175
src/streaming/core/node.cpp
176176
src/streaming/cudf/partition.cpp
177+
src/streaming/cudf/parquet.cpp
177178
src/streaming/cudf/table_chunk.cpp
178179
)
179180
endif()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
#include <cstddef>
8+
#include <memory>
9+
#include <string>
10+
#include <vector>
11+
12+
#include <cudf/ast/expressions.hpp>
13+
#include <cudf/types.hpp>
14+
15+
#include <rapidsmpf/streaming/core/channel.hpp>
16+
#include <rapidsmpf/streaming/core/context.hpp>
17+
#include <rapidsmpf/streaming/core/node.hpp>
18+
19+
namespace rapidsmpf::streaming::node {
20+
21+
/**
22+
* @brief A container for an expression with a stream.
23+
*/
24+
struct Filter {
25+
std::unique_ptr<cudf::ast::expression> expression; ///< Filter expression.
26+
rmm::cuda_stream_view
27+
stream; ///< CUDA stream any allocations in expression are valid on.
28+
};
29+
30+
/**
31+
* @brief Asynchronously read parquet files into an output channel.
32+
*
33+
* @param ctx The execution context to use.
34+
* @param ch_out Channel to which `TableChunk`s are sent.
35+
* @param max_tickets Maximum number of tickets to throttle production of chunks. Up to
36+
* this many tasks can start producing data simultaneously.
37+
* @param files Vector of file names to read from. These must all have the same schema.
38+
* @param columns Vector of column names to read from the files.
39+
* @param num_rows_per_chunk Target (maximum) number of rows any sent `TableChunk` should
40+
* have.
41+
* @param predicate Optional predicate to apply during the read.
42+
*
43+
* @return Streaming node representing the asynchronous read.
44+
*/
45+
Node read_parquet(
46+
std::shared_ptr<Context> ctx,
47+
std::shared_ptr<Channel> ch_out,
48+
std::ptrdiff_t max_tickets,
49+
// TODO: use parquet_reader_options, but more complicated...
50+
// See https://github.com/rapidsai/cudf/issues/20052
51+
std::vector<std::string> files,
52+
std::vector<std::string> columns,
53+
// TODO: use byte count, not row count?
54+
cudf::size_type num_rows_per_chunk,
55+
std::shared_ptr<Filter> predicate = nullptr
56+
);
57+
58+
} // namespace rapidsmpf::streaming::node

cpp/src/streaming/cudf/parquet.cpp

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include <algorithm>
7+
#include <cstddef>
8+
9+
#include <cudf/ast/expressions.hpp>
10+
#include <cudf/io/datasource.hpp>
11+
#include <cudf/io/parquet.hpp>
12+
#include <cudf/io/parquet_metadata.hpp>
13+
#include <cudf/io/types.hpp>
14+
#include <rmm/cuda_stream_view.hpp>
15+
16+
#include <rapidsmpf/cuda_stream.hpp>
17+
#include <rapidsmpf/streaming/core/channel.hpp>
18+
#include <rapidsmpf/streaming/core/node.hpp>
19+
#include <rapidsmpf/streaming/cudf/parquet.hpp>
20+
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
21+
22+
namespace rapidsmpf::streaming::node {
23+
namespace {
24+
25+
/**
26+
* @brief Read a single chunk from a parquet source and send it to an output channel.
27+
*
28+
* @param ctx The execution context to use.
29+
* @param ch_out Channel to which `TableChunk`s are sent.
30+
* @param stream The stream on which to read the chunk.
31+
* @param source The `cudf::io::source_info` describing the data to read.
32+
* @param columns Named columns to read from the file.
33+
* @param skip_rows Number of rows to skip from the beginning of the file.
34+
* @param num_rows Number of rows to read.
35+
* @param predicate Optional predicate to apply during the read.
36+
* @param sequence_number The ordered chunk id to reconstruct original ordering of the
37+
* data.
38+
*
39+
* @note The caller is responsible for scheduling this coroutine onto a thread pool for
40+
* execution.
41+
*
42+
* @return Streaming node representing the asynchronous read of a chunk and send to the
43+
* output channel.
44+
*/
45+
Node read_parquet_chunk(
46+
std::shared_ptr<Context> ctx,
47+
std::shared_ptr<ThrottlingAdaptor> ch_out,
48+
rmm::cuda_stream_view stream,
49+
cudf::io::source_info source,
50+
std::vector<std::string> columns,
51+
std::int64_t skip_rows,
52+
cudf::size_type num_rows,
53+
std::shared_ptr<Filter> predicate,
54+
std::uint64_t sequence_number
55+
) {
56+
auto ticket = co_await ch_out->acquire();
57+
auto builder = cudf::io::parquet_reader_options::builder(source)
58+
.columns(columns)
59+
.num_rows(num_rows)
60+
.skip_rows(skip_rows);
61+
if (predicate != nullptr) {
62+
cuda_stream_join(stream, predicate->stream);
63+
builder.filter(*predicate->expression);
64+
}
65+
auto options = builder.build();
66+
auto result = std::make_unique<TableChunk>(
67+
sequence_number,
68+
cudf::io::read_parquet(options, stream, ctx->br()->device_mr()).tbl,
69+
stream
70+
);
71+
if (predicate != nullptr) {
72+
cuda_stream_join(predicate->stream, stream);
73+
}
74+
auto [_, receipt] = co_await ticket.send(std::move(result));
75+
// Move this coroutine to the back of the queue so that when we release the semaphore
76+
// it is likely to occur on a different thread, releasing the semaphore resumes any
77+
// waiters on the current thread which is not what we typically want for throttled
78+
// reads, we want the next waiting read task to run on a different thread.
79+
co_await ctx->executor()->yield();
80+
co_await receipt;
81+
}
82+
83+
} // namespace
84+
85+
Node read_parquet(
86+
std::shared_ptr<Context> ctx,
87+
std::shared_ptr<Channel> ch_out,
88+
std::ptrdiff_t max_tickets,
89+
std::vector<std::string> files,
90+
std::vector<std::string> columns,
91+
cudf::size_type num_rows_per_chunk,
92+
std::shared_ptr<Filter> predicate
93+
) {
94+
ShutdownAtExit c{ch_out};
95+
auto throttle = std::make_shared<ThrottlingAdaptor>(ch_out, max_tickets);
96+
co_await ctx->executor()->schedule();
97+
auto size = static_cast<std::size_t>(ctx->comm()->nranks());
98+
auto rank = static_cast<std::size_t>(ctx->comm()->rank());
99+
RAPIDSMPF_EXPECTS(
100+
files.size() < std::numeric_limits<int>::max(), "Trying to read too many files"
101+
);
102+
int files_per_rank =
103+
static_cast<int>(files.size() / size + (rank < (files.size() % size)));
104+
int file_offset = 0;
105+
for (auto i = std::size_t{0}; i < rank; i++) {
106+
file_offset +=
107+
static_cast<int>(files.size() / size + (i < (files.size() % size)));
108+
}
109+
files = std::vector(
110+
files.begin() + file_offset, files.begin() + file_offset + files_per_rank
111+
);
112+
int files_per_split = 1;
113+
// TODO: Handle case where multiple ranks are reading from a single file.
114+
// TODO: We could be smarter here, suppose that the number of files we end up wanting
115+
// is one, but each file is marginally larger than our target_rows_per_chunk, we'd end
116+
// up producing many small chunks.
117+
if (files_per_rank > 1) {
118+
// Figure out a guesstimated splitting.
119+
auto source = cudf::io::source_info(files[0]);
120+
auto metadata = cudf::io::read_parquet_metadata(source);
121+
auto const rg = metadata.rowgroup_metadata();
122+
auto const num_rows = metadata.num_rows();
123+
files_per_split =
124+
std::max(static_cast<int>(num_rows_per_chunk / num_rows), files_per_split);
125+
}
126+
std::vector<Node> read_tasks;
127+
for (file_offset = 0; file_offset < files_per_rank; file_offset += files_per_split) {
128+
auto nfiles = std::min(files_per_split, files_per_rank - file_offset);
129+
std::vector<std::string> chunk;
130+
chunk.reserve(static_cast<std::size_t>(nfiles));
131+
std::ranges::move(
132+
files.begin() + file_offset,
133+
files.begin() + file_offset + nfiles,
134+
std::back_inserter(chunk)
135+
);
136+
auto source = cudf::io::source_info(std::move(chunk));
137+
std::int64_t skip_rows = 0;
138+
std::uint64_t sequence_number = 0;
139+
auto metadata = cudf::io::read_parquet_metadata(source);
140+
auto const source_num_rows = metadata.num_rows();
141+
while (skip_rows < source_num_rows) {
142+
cudf::size_type num_rows = std::min(
143+
static_cast<std::int64_t>(num_rows_per_chunk), source_num_rows - skip_rows
144+
);
145+
read_tasks.push_back(ctx->executor()->schedule(read_parquet_chunk(
146+
ctx,
147+
throttle,
148+
ctx->br()->stream_pool().get_stream(),
149+
source,
150+
columns,
151+
skip_rows,
152+
num_rows,
153+
predicate,
154+
// TODO: sequence number being correct relies on read_parquet_chunk
155+
// sending only one chunk.
156+
sequence_number++
157+
)));
158+
skip_rows += num_rows;
159+
}
160+
}
161+
co_await when_all_or_throw(std::move(read_tasks));
162+
co_await ch_out->drain(ctx->executor());
163+
}
164+
} // namespace rapidsmpf::streaming::node

0 commit comments

Comments
 (0)