File tree Expand file tree Collapse file tree 3 files changed +23
-8
lines changed
include/rapidsmpf/streaming/core Expand file tree Collapse file tree 3 files changed +23
-8
lines changed Original file line number Diff line number Diff line change @@ -17,6 +17,20 @@ namespace rapidsmpf::streaming {
1717 */
1818using Node = coro::task<void >;
1919
20+ /* *
21+ * @brief Produce a new coroutine that waits for completion of all tasks.
22+ *
23+ * This function schedules each node and returns a new coroutine to
24+ * await on after which all of them will have finished execution.
25+ *
26+ * @param nodes A vector of nodes to await.
27+ * @throws If any of the underlying tasks throws an exception it is
28+ * rethrown.
29+ * @return Coroutine representing the completion of all of the tasks
30+ * in `nodes`.
31+ */
32+ Node when_all_or_throw (std::vector<Node>&& nodes);
33+
2034/* *
2135 * @brief Runs a list of nodes concurrently and waits for all to complete.
2236 *
Original file line number Diff line number Diff line change 77
88namespace rapidsmpf ::streaming {
99
10- void run_streaming_pipeline (std::vector<Node> nodes) {
11- auto results = coro::sync_wait (coro::when_all (std::move (nodes)));
10+ Node when_all_or_throw (std::vector<Node>&& nodes) {
11+ auto results = co_await coro::when_all (std::move (nodes));
12+ // The node result itself is always `void` but we access it here to re-throw
13+ // possible unhandled exceptions.
1214 for (auto & result : results) {
13- // The node result itself is always `void` but we access it here to re-throw
14- // possible unhandled exceptions.
1515 result.return_value ();
1616 }
1717}
1818
19+ void run_streaming_pipeline (std::vector<Node> nodes) {
20+ coro::sync_wait (when_all_or_throw (std::move (nodes)));
21+ }
22+
1923} // namespace rapidsmpf::streaming
Original file line number Diff line number Diff line change @@ -78,10 +78,7 @@ Node shutdown(
7878 std::shared_ptr<Context> ctx, std::shared_ptr<Channel> ch, std::vector<Node>&& tasks
7979) {
8080 ShutdownAtExit c{ch};
81- auto results = co_await coro::when_all (std::move (tasks));
82- for (auto & r : results) {
83- r.return_value ();
84- }
81+ co_await when_all_or_throw (std::move (tasks));
8582 co_await ch->drain (ctx->executor ());
8683}
8784
You can’t perform that action at this time.
0 commit comments