Skip to content

Commit e399465

Browse files
committed
Expose read_parquet node to python
1 parent 7ed516e commit e399465

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed

python/rapidsmpf/rapidsmpf/streaming/cudf/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# SPDX-License-Identifier: Apache-2.0
44
# =================================================================================
55

6-
set(cython_modules partition.pyx table_chunk.pyx)
6+
set(cython_modules parquet.pyx partition.pyx table_chunk.pyx)
77

88
rapids_cython_create_modules(
99
CXX
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from pylibcudf.io.parquet import ParquetReaderOptions
5+
6+
from rapidsmpf.streaming.core.channel import Channel
7+
from rapidsmpf.streaming.core.context import Context
8+
from rapidsmpf.streaming.core.node import CppNode
9+
from rapidsmpf.streaming.cudf.table_chunk import TableChunk
10+
11+
def read_parquet(
12+
ctx: Context,
13+
ch_out: Channel[TableChunk],
14+
max_tickets: int,
15+
options: ParquetReaderOptions,
16+
num_rows_per_chunk: int,
17+
) -> CppNode: ...
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from libc.stddef cimport ptrdiff_t
5+
from libcpp.memory cimport make_unique, shared_ptr
6+
from libcpp.utility cimport move
7+
from pylibcudf.io.parquet cimport ParquetReaderOptions
8+
from pylibcudf.libcudf.io.parquet cimport parquet_reader_options
9+
from pylibcudf.libcudf.types cimport size_type
10+
11+
from rapidsmpf.streaming.core.channel cimport Channel, cpp_Channel
12+
from rapidsmpf.streaming.core.context cimport Context, cpp_Context
13+
from rapidsmpf.streaming.core.node cimport CppNode, cpp_Node
14+
15+
16+
cdef extern from "<rapidsmpf/streaming/cudf/parquet.hpp>" nogil:
17+
cdef cpp_Node cpp_read_parquet \
18+
"rapidsmpf::streaming::node::read_parquet"(
19+
shared_ptr[cpp_Context] ctx,
20+
shared_ptr[cpp_Channel] ch_out,
21+
ptrdiff_t max_tickets,
22+
parquet_reader_options options,
23+
size_type num_rows_per_chunk,
24+
) except +
25+
26+
27+
def read_parquet(
28+
Context ctx not None,
29+
Channel ch_out not None,
30+
ptrdiff_t max_tickets,
31+
ParquetReaderOptions options not None,
32+
size_type num_rows_per_chunk
33+
):
34+
"""
35+
Create a streaming node to read from parquet.
36+
37+
Parameters
38+
----------
39+
ctx
40+
Streaming execution context
41+
ch_out
42+
Output channel to receive the TableChunks.
43+
max_tickets
44+
Maximum number of tasks that may be suspended having read a chunk.
45+
options
46+
Reader options
47+
"""
48+
cdef cpp_Node _ret
49+
with nogil:
50+
_ret = cpp_read_parquet(
51+
ctx._handle,
52+
ch_out._handle,
53+
max_tickets,
54+
options.c_obj,
55+
num_rows_per_chunk,
56+
)
57+
return CppNode.from_handle(
58+
make_unique[cpp_Node](move(_ret)), owner=None
59+
)

0 commit comments

Comments
 (0)