Skip to content

Commit 2226afe

Browse files
authored
Several fixes & improvements of C++ MonitorStage (nv-morpheus#2170)
- For some short-running pipelines, `MonitorStage` now ensures there's at least one line of progress bar output with throughput shown. - For pipelines need explicitly stopped with `Ctrl+C`, `MonitorStage` now ensures to output the progress bars again when the pipeline is completed, avoids the progress bars from being covered by other logs. - Using `microseconds` when calculating the throughput to avoid `inf` throughput - Each completed progress bar will turn into green with text `[Completed]` Closes nv-morpheus#2148 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - Yuchen Zhang (https://github.com/yczhang-nv) Approvers: - Will Killian (https://github.com/willkill07) URL: nv-morpheus#2170
1 parent 159fc7d commit 2226afe

File tree

2 files changed

+60
-36
lines changed

2 files changed

+60
-36
lines changed

python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp

+60-34
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,22 @@ class ProgressBarContextManager
104104
{
105105
std::lock_guard<std::mutex> lock(m_mutex);
106106

107-
// To avoid display_all() being executed after calling mark_pbar_as_completed() in some race conditions
107+
// If the progress bars needs to be updated after completion, move the cursor up to the beginning
108108
if (m_is_completed)
109109
{
110-
return;
110+
move_cursor_up(m_progress_bars.size());
111111
}
112112

113-
// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
114-
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
115-
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
116-
// The internal function here is used to manually enable the font display.
117-
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;
113+
display_all_impl();
118114

119-
for (auto& pbar : m_progress_bars)
115+
// If all the progress bars are completed, keep the cursor position as it is
116+
if (m_is_completed)
120117
{
121-
pbar->print_progress(true);
122-
m_stdout_os << termcolor::reset; // The font option only works for the current bar
123-
m_stdout_os << std::endl;
118+
return;
124119
}
125120

126-
// After each round of display, move cursor up ("\033[A") to the beginning of the first bar
127-
m_stdout_os << "\033[" << m_progress_bars.size() << "A" << std::flush;
121+
// Otherwise, move cursor up to the beginning after each round of display
122+
move_cursor_up(m_progress_bars.size());
128123
}
129124

130125
void mark_pbar_as_completed(size_t bar_id)
@@ -145,17 +140,9 @@ class ProgressBarContextManager
145140
}
146141
if (all_pbars_completed)
147142
{
148-
// Move the cursor down to the bottom of the last progress bar
149-
// Doing this here instead of the destructor to avoid a race condition with the pipeline's
150-
// "====Pipeline Complete====" log message.
151-
// Using a string stream to ensure other logs are not interleaved.
152-
std::ostringstream new_lines;
153-
for (std::size_t i = 0; i < m_progress_bars.size(); ++i)
154-
{
155-
new_lines << "\n";
156-
}
143+
// Display again when completed to avoid progress bars being covered by other logs
144+
display_all_impl();
157145

158-
m_stdout_os << new_lines.str() << std::flush;
159146
m_is_completed = true;
160147
}
161148
}
@@ -199,6 +186,34 @@ class ProgressBarContextManager
199186
return std::move(progress_bar);
200187
}
201188

189+
void display_all_impl()
190+
{
191+
// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
192+
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
193+
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
194+
// The internal function here is used to manually enable the font display.
195+
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;
196+
197+
for (auto& pbar : m_progress_bars)
198+
{
199+
pbar->print_progress(true);
200+
m_stdout_os << termcolor::reset; // The font option only works for the current bar
201+
m_stdout_os << std::endl;
202+
}
203+
}
204+
205+
void move_cursor_up(size_t lines)
206+
{
207+
// "\033[<n>A" means moving the cursor up for n lines
208+
m_stdout_os << "\033[" << lines << "A" << std::flush;
209+
}
210+
211+
void move_cursor_down(size_t lines)
212+
{
213+
// "\033[<n>B" means moving the cursor down for n lines
214+
m_stdout_os << "\033[" << lines << "B" << std::flush;
215+
}
216+
202217
indicators::DynamicProgress<indicators::IndeterminateProgressBar> m_dynamic_progress_bars;
203218
std::vector<std::unique_ptr<indicators::IndeterminateProgressBar>> m_progress_bars;
204219
std::mutex m_mutex;
@@ -227,8 +242,8 @@ class MonitorController
227242
* @param unit : the unit of message count
228243
* @param determine_count_fn : A function that computes the count for each incoming message
229244
*/
230-
MonitorController(const std::string& description,
231-
std::string unit = "messages",
245+
MonitorController(const std::string& description = "Progress",
246+
const std::string& unit = "messages",
232247
indicators::Color text_color = indicators::Color::cyan,
233248
indicators::FontStyle font_style = indicators::FontStyle::bold,
234249
std::optional<std::function<size_t(MessageT)>> determine_count_fn = std::nullopt);
@@ -239,23 +254,26 @@ class MonitorController
239254
void sink_on_completed();
240255

241256
private:
242-
static std::string format_duration(std::chrono::seconds duration);
243-
static std::string format_throughput(std::chrono::seconds duration, size_t count, const std::string& unit);
257+
static std::string format_duration(std::chrono::microseconds duration);
258+
static std::string format_throughput(std::chrono::microseconds duration, size_t count, const std::string& unit);
244259

245260
size_t m_bar_id;
261+
const std::string m_description;
246262
const std::string m_unit;
247263
std::optional<std::function<size_t(MessageT)>> m_determine_count_fn;
248264
size_t m_count{0};
249265
time_point_t m_start_time;
250266
bool m_is_started{false}; // Set to true after the first call to progress_sink()
267+
bool m_is_completed{false};
251268
};
252269

253270
template <typename MessageT>
254271
MonitorController<MessageT>::MonitorController(const std::string& description,
255-
std::string unit,
272+
const std::string& unit,
256273
indicators::Color text_color,
257274
indicators::FontStyle font_style,
258275
std::optional<std::function<size_t(MessageT)>> determine_count_fn) :
276+
m_description(std::move(description)),
259277
m_unit(std::move(unit)),
260278
m_determine_count_fn(determine_count_fn)
261279
{
@@ -268,7 +286,7 @@ MonitorController<MessageT>::MonitorController(const std::string& description,
268286
}
269287
}
270288

271-
m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(description, text_color, font_style);
289+
m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(m_description, text_color, font_style);
272290
}
273291

274292
template <typename MessageT>
@@ -280,13 +298,15 @@ MessageT MonitorController<MessageT>::progress_sink(MessageT msg)
280298
m_is_started = true;
281299
}
282300
m_count += (*m_determine_count_fn)(msg);
283-
auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_start_time);
301+
auto duration =
302+
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - m_start_time);
284303

285304
auto& manager = ProgressBarContextManager::get_instance();
286305
auto& pbar = manager.progress_bars()[m_bar_id];
287306

288307
// Update the progress bar
289308
pbar->set_option(indicators::option::PostfixText{format_throughput(duration, m_count, m_unit)});
309+
pbar->set_option(indicators::option::PrefixText{m_description});
290310
pbar->tick();
291311

292312
manager.display_all();
@@ -298,14 +318,19 @@ template <typename MessageT>
298318
void MonitorController<MessageT>::sink_on_completed()
299319
{
300320
auto& manager = ProgressBarContextManager::get_instance();
321+
auto& pbar = manager.progress_bars()[m_bar_id];
322+
323+
pbar->set_option(indicators::option::PrefixText{"[Completed]" + m_description});
324+
pbar->set_option(indicators::option::ForegroundColor{indicators::Color::green});
325+
301326
manager.mark_pbar_as_completed(m_bar_id);
302327
}
303328

304329
template <typename MessageT>
305-
std::string MonitorController<MessageT>::format_duration(std::chrono::seconds duration)
330+
std::string MonitorController<MessageT>::format_duration(std::chrono::microseconds duration)
306331
{
307332
auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
308-
auto seconds = duration - minutes;
333+
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration - minutes);
309334

310335
std::ostringstream oss;
311336
oss << std::setw(2) << std::setfill('0') << minutes.count() << "m:" << std::setw(2) << std::setfill('0')
@@ -314,11 +339,12 @@ std::string MonitorController<MessageT>::format_duration(std::chrono::seconds du
314339
}
315340

316341
template <typename MessageT>
317-
std::string MonitorController<MessageT>::format_throughput(std::chrono::seconds duration,
342+
std::string MonitorController<MessageT>::format_throughput(std::chrono::microseconds duration,
318343
size_t count,
319344
const std::string& unit)
320345
{
321-
double throughput = static_cast<double>(count) / duration.count();
346+
double time_in_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
347+
double throughput = static_cast<double>(count) / time_in_seconds;
322348
std::ostringstream oss;
323349
oss << count << " " << unit << " in " << format_duration(duration) << ", "
324350
<< "Throughput: " << std::fixed << std::setprecision(2) << throughput << " " << unit << "/s";

python/morpheus/morpheus/stages/general/monitor_stage.py

-2
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
150150
self._mc._font_style,
151151
self._mc._determine_count_fn)
152152

153-
node.launch_options.pe_count = self._config.num_threads
154-
155153
else:
156154
# Use a component so we track progress using the upstream progress engine. This will provide more accurate
157155
# results

0 commit comments

Comments
 (0)