Skip to content

Commit f8e9db6

Browse files
authored
Shrink TransitEventBuffer in backend when frontend unbounded queue is explicitly shrunk (#734)
1 parent f6ba07c commit f8e9db6

File tree

4 files changed

+57
-5
lines changed

4 files changed

+57
-5
lines changed

include/quill/backend/BackendOptions.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ struct BackendOptions
6464
* If a frontend threads continuously push messages to the queue (e.g., logging in a loop),
6565
* no logs can ever be processed.
6666
*
67-
* When the soft limit is reached (default: 800), the backend worker thread will try to process
68-
* a batch of cached transit events all at once
67+
* When the soft limit is reached the backend worker thread will try to process a batch of cached
68+
* transit events all at once
6969
*
7070
* The frontend queues are emptied on each iteration, so the actual popped messages
7171
* can be much greater than the transit_events_soft_limit.

include/quill/backend/BackendWorker.h

+23
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ class BackendWorker
319319
{
320320
_cleanup_invalidated_thread_contexts();
321321
_cleanup_invalidated_loggers();
322+
_try_shrink_empty_transit_event_buffers();
322323

323324
// There is nothing left to do, and we can let this thread sleep for a while
324325
// buffer events are 0 here and also all the producer queues are empty
@@ -1162,6 +1163,13 @@ class BackendWorker
11621163

11631164
if (read_result.allocation)
11641165
{
1166+
if ((read_result.new_capacity < read_result.previous_capacity) && thread_context->_transit_event_buffer)
1167+
{
1168+
// The user explicitly requested to shrink the queue, indicating a preference for low memory
1169+
// usage. To align with this intent, we also request shrinking the backend buffer.
1170+
thread_context->_transit_event_buffer->request_shrink();
1171+
}
1172+
11651173
// When allocation_info has a value it means that the queue has re-allocated
11661174
if (_options.error_notifier)
11671175
{
@@ -1428,6 +1436,21 @@ class BackendWorker
14281436
}
14291437
}
14301438

1439+
/**
1440+
* Shrinks empty TransitEvent buffers. This is triggered only when the user explicitly
1441+
* requests shrinking of the unbounded frontend queue to optimize memory usage.
1442+
*/
1443+
QUILL_ATTRIBUTE_HOT void _try_shrink_empty_transit_event_buffers()
1444+
{
1445+
for (ThreadContext* thread_context : _active_thread_contexts_cache)
1446+
{
1447+
if (thread_context->_transit_event_buffer)
1448+
{
1449+
thread_context->_transit_event_buffer->try_shrink();
1450+
}
1451+
}
1452+
}
1453+
14311454
/**
14321455
* This function takes an `format_args_store` containing multiple arguments and formats them into
14331456
* a single string using a generated format string. Due to limitations in the ability to

include/quill/backend/TransitEventBuffer.h

+24-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class TransitEventBuffer
2020
{
2121
public:
2222
explicit TransitEventBuffer(size_t initial_capacity)
23-
: _capacity(next_power_of_two(initial_capacity)),
23+
: _initial_capacity(next_power_of_two(initial_capacity)),
24+
_capacity(_initial_capacity),
2425
_storage(std::make_unique<TransitEvent[]>(_capacity)),
2526
_mask(_capacity - 1u)
2627
{
@@ -97,6 +98,26 @@ class TransitEventBuffer
9798
return _reader_pos == _writer_pos;
9899
}
99100

101+
void request_shrink() noexcept { _shrink_requested = true; }
102+
103+
void try_shrink()
104+
{
105+
// we only shrink empty buffers
106+
if (_shrink_requested && empty())
107+
{
108+
if (_capacity > _initial_capacity)
109+
{
110+
_storage = std::make_unique<TransitEvent[]>(_initial_capacity);
111+
_capacity = _initial_capacity;
112+
_mask = _capacity - 1;
113+
_writer_pos = 0;
114+
_reader_pos = 0;
115+
}
116+
117+
_shrink_requested = false;
118+
}
119+
}
120+
100121
private:
101122
void _expand()
102123
{
@@ -120,11 +141,13 @@ class TransitEventBuffer
120141
_reader_pos = 0;
121142
}
122143

144+
size_t _initial_capacity;
123145
size_t _capacity;
124146
std::unique_ptr<TransitEvent[]> _storage;
125147
size_t _mask;
126148
size_t _reader_pos{0};
127149
size_t _writer_pos{0};
150+
bool _shrink_requested{false};
128151
};
129152

130153
} // namespace detail

test/integration_tests/ShrinkThreadLocalQueueTest.cpp

+8-2
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ using CustomLogger = LoggerImpl<CustomFrontendOptions>;
2929
TEST_CASE("shrink_thread_local_queue")
3030
{
3131
static constexpr size_t number_of_messages = 5000;
32-
static constexpr size_t iterations = 5;
32+
static constexpr size_t iterations = 6;
3333
static constexpr char const* filename = "shrink_thread_local_queue.log";
3434
static std::string const logger_name = "logger";
3535

36-
// Start the logging backend thread
3736
Backend::start();
3837

3938
// just for testing - call before logging anything
@@ -69,6 +68,13 @@ TEST_CASE("shrink_thread_local_queue")
6968

7069
CustomFrontend::shrink_thread_local_queue(16 * 1024);
7170
REQUIRE_EQ(CustomFrontend::get_thread_local_queue_capacity(), 16 * 1024);
71+
72+
if (iter % 2 == 0)
73+
{
74+
// flush the log so that the backend goes idle and also backend shrink is tested
75+
logger->flush_log();
76+
std::this_thread::sleep_for(std::chrono::milliseconds{1});
77+
}
7278
}
7379

7480
logger->flush_log();

0 commit comments

Comments
 (0)