-
Notifications
You must be signed in to change notification settings - Fork 21
Introduce a streaming read_parquet node #574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
a00dfcf to
67fb873
Compare
17a4099 to
c3a4fbd
Compare
c3a4fbd to
208c876
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments. Should we also add some c++ tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass, looks good.
It would be good with some basic C++ testing
4bfdd8a to
5ecc1e9
Compare
|
Added C++ tests, ready for another look |
5ecc1e9 to
17d72e1
Compare
c12668c to
4cdc520
Compare
| std::shared_ptr<Channel> ch_out, | ||
| std::ptrdiff_t max_tickets, | ||
| cudf::io::parquet_reader_options options, | ||
| cudf::size_type num_rows_per_chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This num_rows_per_chunk argument is the only "knob" for tuning the chunk size, which is understandable.
We may have trouble using this variation of the read_parquet node in cudf-polars unless we implement dynamic partitioning, or we force the metadata sampling to gather all footer metadata before the streaming network is constructed. Recall that the statically-partitioned version of cudf-polars needs to know how many chunks the Scan operation will produce ahead of time.
We are already planning to do dynamic partitioning in the future, so temporarily collecting extra/redundant parquet metadata up-front probably isn't a big deal. Even so, I'm wondering if there is a more general way to configure the chunk size here?
For example, it would be great if we could set an upper limit on the row-count-per chunk, but could also ask for "N chunks per file" or "N files per chunk" (to temporarily satisfy the static partitioned case). Even if we do focus on a row-count-based configuration, it may make sense to align the chunks with row-group boundaries as long as the row-count doesn't exceed the upper limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recall that the statically-partitioned version of cudf-polars needs to know how many chunks the Scan operation will produce ahead of time.
I still struggle to understand this statement. I understand that the output of a shuffle needs to specify ahead of time the number of partitions it is going to use, but the scan is an input.
We are already planning to do dynamic partitioning in the future, so temporarily collecting extra/redundant parquet metadata up-front probably isn't a big deal. Even so, I'm wondering if there is a more general way to configure the chunk size here?
I am very happy to try different chunking strategies but they have to be amenable to the multi-threaded read approach I have here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still struggle to understand this statement. I understand that the output of a shuffle needs to specify ahead of time the number of partitions it is going to use, but the scan is an input.
Yeah, I'm hoping that this is mostly true. The only immediate problem I can think of is if the "plan" tells us we will get a single chunk, but the IO node actually produces 2 or more. Since we cannot dynamically inject a concatenation or shuffle yet, we will have an issue if we run into an operation that is not supported for multiple partitions.
I am very happy to try different chunking strategies but they have to be amenable to the multi-threaded read approach I have here.
My naive hypothesis is that row-group aligned reads would be easier to do well, but I could be wrong.
e1edb18 to
6deea8f
Compare
|
This is ready for another look. I've updating things so that the read parquet sends chunks into the channel "in-order". I will do some benchmarking tomorrow to check everything still looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good
OK, I did this and it looks as before for the examples I have, so I think this is good. |
92bf2b2 to
282f21e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some questions mostly.
| RAPIDSMPF_EXPECTS( | ||
| files.size() < std::numeric_limits<int>::max(), "Trying to read too many files" | ||
| ); | ||
| // TODO: Handle case where multiple ranks are reading from a single file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, shall we open an issue to track these TODOs?
We advertise in the type stubs that these are generics so we should probably allow them to be used as such.
It is convenient if we send zero-sized chunks so that we don't need to have out-of-band approaches to communicating the metadata (and hence shape) of any inserted chunks.
282f21e to
2ad2c30
Compare
The provided options are collective across the participating ranks. The input file list will be split (approximately evenly) across ranks.
The caller can control the number of rows that are read per chunk sent to the output channel, as well as the number producer tasks that may be waiting, suspended, having read a chunk. Increasing the number of producer tasks can increase pipelining and latency-hiding opportunities for the execution scheduler, at the cost of higher memory pressure.
To ensure that, in this multi-producer setup, chunks are still inserted into the output channel in increasing sequence number order, we implement a
Lineariserutility that one should always use when sending from multiple producers. The idea is that it is created with one input channel per producer and each producer sends its sequence of chunks into the input channel in order, the lineariser buffers them and sends things out in global total order.