Skip to content

Commit 2308e23

Browse files
committed
Merge branch-24.03 into branch-24.06
2 parents 0701b13 + e1d0e9d commit 2308e23

File tree

10 files changed

+219
-78
lines changed

10 files changed

+219
-78
lines changed

.github/workflows/pr.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ jobs:
7070

7171
checks:
7272
needs: [prepare]
73-
if: ${{ ! fromJSON(needs.prepare.outputs.has_skip_ci_label) }}
73+
if: ${{ !fromJSON(needs.prepare.outputs.has_skip_ci_label) && fromJSON(needs.prepare.outputs.is_pr )}}
7474
secrets: inherit
7575
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
7676
with:

CHANGELOG.md

+38
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,41 @@
1+
# MRC 24.03.00 (7 Apr 2024)
2+
3+
## 🚨 Breaking Changes
4+
5+
- Update cast_from_pyobject to throw on unsupported types rather than returning null ([#451](https://github.com/nv-morpheus/MRC/pull/451)) [@dagardner-nv](https://github.com/dagardner-nv)
6+
- RAPIDS 24.02 Upgrade ([#433](https://github.com/nv-morpheus/MRC/pull/433)) [@cwharris](https://github.com/cwharris)
7+
8+
## 🐛 Bug Fixes
9+
10+
- Update CR year ([#460](https://github.com/nv-morpheus/MRC/pull/460)) [@dagardner-nv](https://github.com/dagardner-nv)
11+
- Removing the INFO log when creating an AsyncioRunnable ([#456](https://github.com/nv-morpheus/MRC/pull/456)) [@mdemoret-nv](https://github.com/mdemoret-nv)
12+
- Update cast_from_pyobject to throw on unsupported types rather than returning null ([#451](https://github.com/nv-morpheus/MRC/pull/451)) [@dagardner-nv](https://github.com/dagardner-nv)
13+
- Adopt updated builds of CI runners ([#442](https://github.com/nv-morpheus/MRC/pull/442)) [@dagardner-nv](https://github.com/dagardner-nv)
14+
- Update Conda channels to prioritize `conda-forge` over `nvidia` ([#436](https://github.com/nv-morpheus/MRC/pull/436)) [@cwharris](https://github.com/cwharris)
15+
- Remove redundant copy of libmrc_pymrc.so ([#429](https://github.com/nv-morpheus/MRC/pull/429)) [@dagardner-nv](https://github.com/dagardner-nv)
16+
- Unifying cmake exports name across all Morpheus repos ([#427](https://github.com/nv-morpheus/MRC/pull/427)) [@mdemoret-nv](https://github.com/mdemoret-nv)
17+
- Updating the workspace settings to remove deprecated python options ([#425](https://github.com/nv-morpheus/MRC/pull/425)) [@mdemoret-nv](https://github.com/mdemoret-nv)
18+
- Use `dependencies.yaml` to generate environment files ([#416](https://github.com/nv-morpheus/MRC/pull/416)) [@cwharris](https://github.com/cwharris)
19+
20+
## 📖 Documentation
21+
22+
- Update minimum requirements ([#467](https://github.com/nv-morpheus/MRC/pull/467)) [@dagardner-nv](https://github.com/dagardner-nv)
23+
24+
## 🚀 New Features
25+
26+
- Add maximum simultaneous tasks support to `TaskContainer` ([#464](https://github.com/nv-morpheus/MRC/pull/464)) [@cwharris](https://github.com/cwharris)
27+
- Add `TestScheduler` to support testing time-based coroutines without waiting for timeouts ([#453](https://github.com/nv-morpheus/MRC/pull/453)) [@cwharris](https://github.com/cwharris)
28+
- Adding RoundRobinRouter node type for distributing values to downstream nodes ([#449](https://github.com/nv-morpheus/MRC/pull/449)) [@mdemoret-nv](https://github.com/mdemoret-nv)
29+
- Add IoScheduler to enable epoll-based Task scheduling ([#448](https://github.com/nv-morpheus/MRC/pull/448)) [@cwharris](https://github.com/cwharris)
30+
- Update ops-bot.yaml ([#446](https://github.com/nv-morpheus/MRC/pull/446)) [@AyodeAwe](https://github.com/AyodeAwe)
31+
- RAPIDS 24.02 Upgrade ([#433](https://github.com/nv-morpheus/MRC/pull/433)) [@cwharris](https://github.com/cwharris)
32+
33+
## 🛠️ Improvements
34+
35+
- Update MRC to use CCCL instead of libcudacxx ([#444](https://github.com/nv-morpheus/MRC/pull/444)) [@cwharris](https://github.com/cwharris)
36+
- Optionally skip the CI pipeline if the PR contains the skip-ci label ([#426](https://github.com/nv-morpheus/MRC/pull/426)) [@dagardner-nv](https://github.com/dagardner-nv)
37+
- Add flake8, yapf, and isort pre-commit hooks. ([#420](https://github.com/nv-morpheus/MRC/pull/420)) [@cwharris](https://github.com/cwharris)
38+
139
# MRC 23.11.00 (30 Nov 2023)
240

341
## 🐛 Bug Fixes

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ MRC includes both Python and C++ bindings and supports installation via [conda](
3838

3939
### Prerequisites
4040

41-
- Pascal architecture (Compute capability 6.0) or better
42-
- NVIDIA driver `450.80.02` or higher
41+
- Volta architecture (Compute capability 7.0) or better
42+
- [CUDA 12.1](https://developer.nvidia.com/cuda-12-1-0-download-archive)
4343
- [conda or miniconda](https://conda.io/projects/conda/en/latest/user-guide/install/linux.html)
4444
- If using Docker:
4545
- [Docker](https://docs.docker.com/get-docker/)

cpp/mrc/benchmarks/bench_baselines.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "mrc/benchmarking/util.hpp"
2020

2121
#include <benchmark/benchmark.h>
22-
#include <nlohmann/json.hpp>
2322
#include <rxcpp/rx.hpp>
2423

2524
#include <chrono>

cpp/mrc/include/mrc/coroutines/task_container.hpp

+16-18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
* SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
33
* SPDX-License-Identifier: Apache-2.0
44
*
55
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -37,15 +37,14 @@
3737
*/
3838

3939
#pragma once
40-
4140
#include "mrc/coroutines/task.hpp"
4241

43-
#include <atomic>
4442
#include <cstddef>
4543
#include <list>
4644
#include <memory>
4745
#include <mutex>
4846
#include <optional>
47+
#include <queue>
4948
#include <vector>
5049

5150
namespace mrc::coroutines {
@@ -60,7 +59,7 @@ class TaskContainer
6059
* @param e Tasks started in the container are scheduled onto this executor. For tasks created
6160
* from a coro::io_scheduler, this would usually be that coro::io_scheduler instance.
6261
*/
63-
TaskContainer(std::shared_ptr<Scheduler> e);
62+
TaskContainer(std::shared_ptr<Scheduler> e, std::size_t max_concurrent_tasks = 0);
6463

6564
TaskContainer(const TaskContainer&) = delete;
6665
TaskContainer(TaskContainer&&) = delete;
@@ -93,30 +92,20 @@ class TaskContainer
9392
*/
9493
auto garbage_collect() -> std::size_t;
9594

96-
/**
97-
* @return The number of tasks that are awaiting deletion.
98-
*/
99-
auto delete_task_size() const -> std::size_t;
100-
101-
/**
102-
* @return True if there are no tasks awaiting deletion.
103-
*/
104-
auto delete_tasks_empty() const -> bool;
105-
10695
/**
10796
* @return The number of active tasks in the container.
10897
*/
109-
auto size() const -> std::size_t;
98+
auto size() -> std::size_t;
11099

111100
/**
112101
* @return True if there are no active tasks in the container.
113102
*/
114-
auto empty() const -> bool;
103+
auto empty() -> bool;
115104

116105
/**
117106
* @return The capacity of this task manager before it will need to grow in size.
118107
*/
119-
auto capacity() const -> std::size_t;
108+
auto capacity() -> std::size_t;
120109

121110
/**
122111
* Will continue to garbage collect and yield until all tasks are complete. This method can be
@@ -138,6 +127,11 @@ class TaskContainer
138127
*/
139128
auto gc_internal() -> std::size_t;
140129

130+
/**
131+
* Starts the next taks in the queue if one is available and max concurrent tasks has not yet been met.
132+
*/
133+
void try_start_next_task(std::unique_lock<std::mutex> lock);
134+
141135
/**
142136
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
143137
* completion. Simply co_await the users task until its completed and then mark the given
@@ -156,7 +150,7 @@ class TaskContainer
156150
/// thread pools for indeterminate lifetime requests.
157151
std::mutex m_mutex{};
158152
/// The number of alive tasks.
159-
std::atomic<std::size_t> m_size{};
153+
std::size_t m_size{};
160154
/// Maintains the lifetime of the tasks until they are completed.
161155
std::list<std::optional<Task<void>>> m_tasks{};
162156
/// The set of tasks that have completed and need to be deleted.
@@ -166,6 +160,10 @@ class TaskContainer
166160
std::shared_ptr<Scheduler> m_scheduler_lifetime{nullptr};
167161
/// This is used internally since io_scheduler cannot pass itself in as a shared_ptr.
168162
Scheduler* m_scheduler{nullptr};
163+
/// tasks to be processed in order of start
164+
std::queue<decltype(m_tasks.end())> m_next_tasks;
165+
/// maximum number of tasks to be run simultaneously
166+
std::size_t m_max_concurrent_tasks;
169167

170168
friend Scheduler;
171169
};

cpp/mrc/include/mrc/coroutines/test_scheduler.hpp

+7
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ class TestScheduler : public Scheduler
8080
*/
8181
mrc::coroutines::Task<> yield_until(std::chrono::time_point<std::chrono::steady_clock> time) override;
8282

83+
/**
84+
* Returns the time according to the scheduler. Time may be progressed by resume_next, resume_for, and resume_until.
85+
*
86+
* @return the current time according to the scheduler.
87+
*/
88+
std::chrono::time_point<std::chrono::steady_clock> time();
89+
8390
/**
8491
* Immediately resumes the next-in-queue coroutine handle.
8592
*

cpp/mrc/src/public/coroutines/task_container.cpp

+61-41
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030

3131
namespace mrc::coroutines {
3232

33-
TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e) :
33+
TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e, std::size_t max_concurrent_tasks) :
3434
m_scheduler_lifetime(std::move(e)),
35-
m_scheduler(m_scheduler_lifetime.get())
35+
m_scheduler(m_scheduler_lifetime.get()),
36+
m_max_concurrent_tasks(max_concurrent_tasks)
3637
{
3738
if (m_scheduler_lifetime == nullptr)
3839
{
@@ -43,17 +44,17 @@ TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e) :
4344
TaskContainer::~TaskContainer()
4445
{
4546
// This will hang the current thread.. but if tasks are not complete thats also pretty bad.
46-
while (!this->empty())
47+
while (not empty())
4748
{
48-
this->garbage_collect();
49+
garbage_collect();
4950
}
5051
}
5152

5253
auto TaskContainer::start(Task<void>&& user_task, GarbageCollectPolicy cleanup) -> void
5354
{
54-
m_size.fetch_add(1, std::memory_order::relaxed);
55+
auto lock = std::unique_lock(m_mutex);
5556

56-
std::scoped_lock lk{m_mutex};
57+
m_size += 1;
5758

5859
if (cleanup == GarbageCollectPolicy::yes)
5960
{
@@ -64,48 +65,42 @@ auto TaskContainer::start(Task<void>&& user_task, GarbageCollectPolicy cleanup)
6465
auto pos = m_tasks.emplace(m_tasks.end(), std::nullopt);
6566
auto task = make_cleanup_task(std::move(user_task), pos);
6667
*pos = std::move(task);
68+
m_next_tasks.push(pos);
6769

68-
// Start executing from the cleanup task to schedule the user's task onto the thread pool.
69-
pos->value().resume();
70+
auto current_task_count = m_size - m_next_tasks.size();
71+
72+
if (m_max_concurrent_tasks == 0 or current_task_count < m_max_concurrent_tasks)
73+
{
74+
try_start_next_task(std::move(lock));
75+
}
7076
}
7177

7278
auto TaskContainer::garbage_collect() -> std::size_t
7379
{
74-
std::scoped_lock lk{m_mutex};
80+
auto lock = std::scoped_lock(m_mutex);
7581
return gc_internal();
7682
}
7783

78-
auto TaskContainer::delete_task_size() const -> std::size_t
79-
{
80-
std::atomic_thread_fence(std::memory_order::acquire);
81-
return m_tasks_to_delete.size();
82-
}
83-
84-
auto TaskContainer::delete_tasks_empty() const -> bool
84+
auto TaskContainer::size() -> std::size_t
8585
{
86-
std::atomic_thread_fence(std::memory_order::acquire);
87-
return m_tasks_to_delete.empty();
86+
auto lock = std::scoped_lock(m_mutex);
87+
return m_size;
8888
}
8989

90-
auto TaskContainer::size() const -> std::size_t
91-
{
92-
return m_size.load(std::memory_order::relaxed);
93-
}
94-
95-
auto TaskContainer::empty() const -> bool
90+
auto TaskContainer::empty() -> bool
9691
{
9792
return size() == 0;
9893
}
9994

100-
auto TaskContainer::capacity() const -> std::size_t
95+
auto TaskContainer::capacity() -> std::size_t
10196
{
102-
std::atomic_thread_fence(std::memory_order::acquire);
97+
auto lock = std::scoped_lock(m_mutex);
10398
return m_tasks.size();
10499
}
105100

106101
auto TaskContainer::garbage_collect_and_yield_until_empty() -> Task<void>
107102
{
108-
while (!empty())
103+
while (not empty())
109104
{
110105
garbage_collect();
111106
co_await m_scheduler->yield();
@@ -115,22 +110,44 @@ auto TaskContainer::garbage_collect_and_yield_until_empty() -> Task<void>
115110
TaskContainer::TaskContainer(Scheduler& e) : m_scheduler(&e) {}
116111
auto TaskContainer::gc_internal() -> std::size_t
117112
{
118-
std::size_t deleted{0};
119-
if (!m_tasks_to_delete.empty())
113+
if (m_tasks_to_delete.empty())
114+
{
115+
return 0;
116+
}
117+
118+
std::size_t delete_count = m_tasks_to_delete.size();
119+
120+
for (const auto& pos : m_tasks_to_delete)
120121
{
121-
for (const auto& pos : m_tasks_to_delete)
122+
// Destroy the cleanup task and the user task.
123+
if (pos->has_value())
122124
{
123-
// Destroy the cleanup task and the user task.
124-
if (pos->has_value())
125-
{
126-
pos->value().destroy();
127-
}
128-
m_tasks.erase(pos);
125+
pos->value().destroy();
129126
}
130-
deleted = m_tasks_to_delete.size();
131-
m_tasks_to_delete.clear();
127+
128+
m_tasks.erase(pos);
129+
}
130+
131+
m_tasks_to_delete.clear();
132+
133+
return delete_count;
134+
}
135+
136+
void TaskContainer::try_start_next_task(std::unique_lock<std::mutex> lock)
137+
{
138+
if (m_next_tasks.empty())
139+
{
140+
// no tasks to process
141+
return;
132142
}
133-
return deleted;
143+
144+
auto pos = m_next_tasks.front();
145+
m_next_tasks.pop();
146+
147+
// release the lock before starting the task
148+
lock.unlock();
149+
150+
pos->value().resume();
134151
}
135152

136153
auto TaskContainer::make_cleanup_task(Task<void> user_task, task_position_t pos) -> Task<void>
@@ -155,11 +172,14 @@ auto TaskContainer::make_cleanup_task(Task<void> user_task, task_position_t pos)
155172
LOG(ERROR) << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
156173
}
157174

158-
std::scoped_lock lk{m_mutex};
175+
auto lock = std::unique_lock(m_mutex);
159176
m_tasks_to_delete.push_back(pos);
160177
// This has to be done within scope lock to make sure this coroutine task completes before the
161178
// task container object destructs -- if it was waiting on .empty() to become true.
162-
m_size.fetch_sub(1, std::memory_order::relaxed);
179+
m_size -= 1;
180+
181+
try_start_next_task(std::move(lock));
182+
163183
co_return;
164184
}
165185

cpp/mrc/src/public/coroutines/test_scheduler.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "mrc/coroutines/test_scheduler.hpp"
1919

20+
#include <chrono>
2021
#include <compare>
2122

2223
namespace mrc::coroutines {
@@ -56,8 +57,15 @@ mrc::coroutines::Task<> TestScheduler::yield_until(std::chrono::time_point<std::
5657
co_return co_await TestScheduler::Operation{this, time};
5758
}
5859

60+
std::chrono::time_point<std::chrono::steady_clock> TestScheduler::time()
61+
{
62+
return m_time;
63+
}
64+
5965
bool TestScheduler::resume_next()
6066
{
67+
using namespace std::chrono_literals;
68+
6169
if (m_queue.empty())
6270
{
6371
return false;
@@ -69,6 +77,11 @@ bool TestScheduler::resume_next()
6977

7078
m_time = handle.second;
7179

80+
if (not m_queue.empty())
81+
{
82+
m_time = m_queue.top().second;
83+
}
84+
7285
handle.first.resume();
7386

7487
return true;

0 commit comments

Comments
 (0)