-
Notifications
You must be signed in to change notification settings - Fork 979
Add CUDA streams to cudf-polars #20291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add CUDA streams to cudf-polars #20291
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't finished flagging the non-trivial changes, but I have to run. I'll finish that up later.
|
||
@functools.cached_property | ||
def obj_scalar(self) -> plc.Scalar: | ||
def obj_scalar(self, stream: Stream) -> plc.Scalar: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, Column.obj_scalar
used @functools.cached_property
. Now we need it to be a method so that we can pass in a stream
for the plc.copying.get_element
.
To retain the caching behavior, we store the result on the instance at self._obj_scalar
.
Similar story for Column.nan_count
# preprocessed into pylibcudf requests. | ||
child = self.children[0] | ||
return self.op(child.evaluate(df, context=context)) | ||
return self.op(child.evaluate(df, context=context), stream=df.stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: all our reduction functions (self.op
) now accept a stream
. This is where we pass it in. The column
is valid on df.stream
since it's from child.evaluate(df)
.
py_val=False, dtype=self.dtype.plc_type | ||
py_val=False, dtype=self.dtype.plc_type, stream=df.stream | ||
), | ||
stream=df.stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: here's one of the spots where we call self._distinct
, which now accepts a stream
. Note how column
, source_value
, and target_value
are all valid on df.stream
.
# The scalars in self.preceding and self.following are constructed on the | ||
# stream dedicated to building offset/period scalars. We need to join | ||
# our stream into its stream. | ||
stream = get_joined_cuda_stream( | ||
upstreams=(df.stream, get_stream_for_offset_windows()) | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This get_stream_for_offset_windows
counts as non-trivial.
All the way down in cudf_polars.utils.windows.duration_to_scalar
, we make some plc.Scalar
s from Python scalars:
return plc.Scalar.from_py(
value, plc.DataType(plc.TypeId.DURATION_NANOSECONDS), stream=stream
)
The callers of this function (RollingWindow.__int__
-> offsets_to_windows
) don't have a stream at hand; we aren't in the context of an an IR.do_evaluate
node, for example. We really don't want to use the default stream, because IIUC any usage of the default stream will force unnecessary synchronizations across the device.
However, we still need a stable, unique stream so that the users of the output (this block of code) has something to synchronize with.
My solution: a singleton stream dedicated to getting the offset windows. This provides a way duration_to_scalar
and the ultimate user of its output to synchronize without actually being directly tied to each other in a call stack.
I'm very open to other suggestions.
Similar story for the other get_stream_for_*
functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could refactor so that offsets_to_windows
is only called in do_evaluate
. That seems probably saner anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at both calling offsets_to_windows
only in do_evaluate
and just creating a stream and synchronizing.
offsets_to_windows
is also called via rewrite_rolling
via _translate_ir
for pl_ir.GroupBy
. Threading a stream all the way through there looks challenging.
So I'll plan to create a stream in offsets_to_windows
and synchronize that after creating the scalars on that stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These stream singletons have all been removed in
- af6b7e0 - Remove get_stream_for_conditional_join_predicate
- 61ad556 - Remove get_stream_for_stats
- 84d4fda - Remove stream singleton for duration_to_scalar
Instead of a stream singleton and joining streams, we just create a temporary stream and synchronize it before the function returns.
) -> DataFrame: | ||
"""Evaluate and return a dataframe.""" | ||
keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows) | ||
# The scalars in preceding and following are constructed on the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/rapidsai/cudf/pull/20291/files#r2437563679 for context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's, similarly, defer the construction of these scalars to do_evaluate if we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, as noted for the conditionaljoin case, we can construct in __init__
and sync.
def __init__(self, predicate: expr.Expr): | ||
self.predicate = predicate | ||
ast_result = to_ast(predicate) | ||
ast_result = to_ast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/rapidsai/cudf/pull/20291/files#r2437563679 for context, but this one is for to_ast
. In this Predicate.__init__
we again don't have a stream at hand that we can use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Things we do in __init__
I think we're better off launching on some stream and then syncing that stream.
|
||
by = options["by"] | ||
|
||
stream = get_joined_cuda_stream(upstreams=(df.stream, sort_boundaries.stream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: df
and sort_boundaries
are potentially on different streams, so we join them here.
stream=DEFAULT_STREAM, | ||
stream=stream, | ||
) | ||
# TODO: figure out handoff with rapidsmpf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit more context here: we need to look into the pylibcudf methods called by rapidsmpf and ensure that they all accept a stream
argument, and that we have a way to pass that through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rapidsmpf's BufferResource
has a stream_pool
that would ideally be used (it wraps rmm::cuda_stream_pool
). It might take a bit of plumbing to make that available everywhere that needs a new stream.
27e8178
to
1d32cdc
Compare
This adds CUDA streams to all pylibcudf calls in cudf-polars. At the moment, we continue to use the default stream for all operations, so we're *explicitly* using the default stream. A future PR will update things to use non-default streams.
This adds CUDA streams to `cudf_polars.dsl.expressions.aggregation`. Streams are still missing from some `cudf_polars.containers.Column` calls in this file, but all the directly pylibcudf calls should be covered. Split off rapidsai#20291.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remaining non-trivial changes.
Description
This adds CUDA streams to all pylibcudf calls in cudf-polars.
At the moment, we continue to use the default stream for all operations, so we're explicitly using the default stream. A future PR will update things to use non-default streams.
As far as I can tell, this should get all the pylibcudf calls in cudf-polars. It's a lot of code to review. Unfortunately, it mixes many trivial changes (add
stream=stream
to a bunch of spots) with a handful of non-trivial changes. I'll comment inline on all the non-trivial changes. I'm more than happy to break those changes out to their own PR (but it gets complicated. The changes toColumn.nan_count
, for example, forces the change tobroadcast
andaggregation.py
...)Closes #20239
Part of #20228