Skip to content

Commit 665482c

Browse files
authored
Add shrink and capacity functions for unbounded queue
1 parent 64652d3 commit 665482c

File tree

7 files changed

+254
-6
lines changed

7 files changed

+254
-6
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@
134134
example: [sink_formatter_override](https://github.com/odygrd/quill/blob/master/examples/sink_formatter_override.cpp).
135135
- Added `Frontend::remove_logger_blocking(...)`, which blocks the caller thread until the specified logger is fully
136136
removed.
137+
- Added `Frontend::shrink_thread_local_queue(capacity)` and `CustomFrontend::get_thread_local_queue_capacity()`.
138+
These functions allow dynamic management of thread-local SPSC queues when using an unbounded queue configuration. They
139+
enable on-demand shrinking of a queue that has grown due to bursty logging, helping to reduce memory usage, although
140+
in typical scenarios they won't be required.
137141
- Added the `SyslogSink`, which logs messages to the system's syslog.
138142

139143
```c++

include/quill/Frontend.h

+58
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,64 @@ class FrontendImpl
5353
(void)spsc_queue_capacity;
5454
}
5555

56+
/**
57+
* Shrink the thread-local SPSC queue to the specified target capacity.
58+
*
59+
* This function helps manage memory usage by reducing the size of the thread-local queue.
60+
* In scenarios where a thread pool executes multiple jobs, one job might log a burst of messages
61+
* that causes the queue to grow significantly. Subsequent jobs may not require such a large capacity,
62+
* so you can call this function to explicitly shrink the queue to a smaller size.
63+
*
64+
* @note This function only applies when using the **UnboundedQueue** configuration. It will have no effect
65+
* if the BoundedQueue is enabled.
66+
* @note The function will only shrink the queue if the provided target capacity is smaller than the current
67+
* queue capacity. If the target capacity is greater than or equal to the current capacity, no change is made.
68+
* @warning The Logger object may maintain multiple thread-local queues. This function will only shrink the queue
69+
* associated with the calling thread, so it is important that the appropriate thread invokes it.
70+
*
71+
* @param capacity The desired new capacity for the thread-local SPSC queue.
72+
*/
73+
static void shrink_thread_local_queue(size_t capacity)
74+
{
75+
if constexpr (logger_t::using_unbounded_queue)
76+
{
77+
detail::get_local_thread_context<TFrontendOptions>()
78+
->template get_spsc_queue<TFrontendOptions::queue_type>()
79+
.shrink(capacity);
80+
}
81+
}
82+
83+
/**
84+
* Retrieve the current capacity of the thread-local SPSC queue.
85+
*
86+
* This function returns the capacity of the SPSC queue that belongs to the calling thread.
87+
* It is particularly useful for monitoring how much an UnboundedQueue has grown over time,
88+
* while for a BoundedQueue, the capacity remains constant.
89+
*
90+
* @note When using an UnboundedQueue, the function returns the capacity as determined by the producer,
91+
* reflecting the dynamic growth of the queue. For a BoundedQueue, the returned capacity is fixed.
92+
* @note Since the Logger object can maintain multiple thread-local queues, this function always returns
93+
* the capacity of the queue associated with the thread that calls it. Ensure that the correct thread
94+
* is invoking this function to check its own queue.
95+
*
96+
* @return The current capacity of the thread-local SPSC queue.
97+
*/
98+
QUILL_NODISCARD static size_t get_thread_local_queue_capacity() noexcept
99+
{
100+
if constexpr (logger_t::using_unbounded_queue)
101+
{
102+
return detail::get_local_thread_context<TFrontendOptions>()
103+
->template get_spsc_queue<TFrontendOptions::queue_type>()
104+
.producer_capacity();
105+
}
106+
else
107+
{
108+
return detail::get_local_thread_context<TFrontendOptions>()
109+
->template get_spsc_queue<TFrontendOptions::queue_type>()
110+
.capacity();
111+
}
112+
}
113+
56114
/**
57115
* @brief Creates a new sink or retrieves an existing one with the specified name.
58116
*

include/quill/Logger.h

+6-5
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ class LoggerImpl : public detail::LoggerBase
5050
public:
5151
using frontend_options_t = TFrontendOptions;
5252

53+
static constexpr bool using_unbounded_queue =
54+
(frontend_options_t::queue_type == QueueType::UnboundedUnlimited) ||
55+
(frontend_options_t::queue_type == QueueType::UnboundedBlocking) ||
56+
(frontend_options_t::queue_type == QueueType::UnboundedDropping);
57+
5358
/***/
5459
LoggerImpl(LoggerImpl const&) = delete;
5560
LoggerImpl& operator=(LoggerImpl const&) = delete;
@@ -360,11 +365,7 @@ class LoggerImpl : public detail::LoggerBase
360365
*/
361366
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _prepare_write_buffer(size_t total_size)
362367
{
363-
constexpr bool is_unbounded_queue = (frontend_options_t::queue_type == QueueType::UnboundedUnlimited) ||
364-
(frontend_options_t::queue_type == QueueType::UnboundedBlocking) ||
365-
(frontend_options_t::queue_type == QueueType::UnboundedDropping);
366-
367-
if constexpr (is_unbounded_queue)
368+
if constexpr (using_unbounded_queue)
368369
{
369370
// MSVC doesn't like the template keyword, but every other compiler requires it
370371
#if defined(_MSC_VER)

include/quill/core/UnboundedSPSCQueue.h

+38-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class UnboundedSPSCQueue
138138
}
139139

140140
/**
141-
* Commit the write to notify the consumer bytes are ready to read
141+
* Commit write to notify the consumer bytes are ready to read
142142
*/
143143
QUILL_ATTRIBUTE_HOT void commit_write() noexcept { _producer->bounded_queue.commit_write(); }
144144

@@ -151,6 +151,40 @@ class UnboundedSPSCQueue
151151
commit_write();
152152
}
153153

154+
/**
155+
* Return the current buffer's capacity
156+
* @note: producer only
157+
* @return capacity
158+
*/
159+
QUILL_NODISCARD size_t producer_capacity() const noexcept
160+
{
161+
return _producer->bounded_queue.capacity();
162+
}
163+
164+
/**
165+
* Shrinks the queue if capacity is a valid smaller power of 2.
166+
* @param capacity New target capacity.
167+
* @note: producer (frontend) is safe to call this - Do not call on the consumer (the backend worker)
168+
*/
169+
void shrink(size_t capacity)
170+
{
171+
if (capacity > (_producer->bounded_queue.capacity() >> 1))
172+
{
173+
// We should only shrink if the new capacity is less or at least equal to the previous_power_of_2
174+
return;
175+
}
176+
177+
// We want to shrink the queue, we will create a new queue with a smaller size
178+
// the consumer will switch to the newer queue after emptying and deallocating the older queue
179+
auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
180+
181+
// store the new node pointer as next in the current node
182+
_producer->next.store(next_node, std::memory_order_release);
183+
184+
// producer is now using the next node
185+
_producer = next_node;
186+
}
187+
154188
/**
155189
* Prepare to read from the buffer
156190
* @error_notifier a callback used for notifications to the user
@@ -193,19 +227,22 @@ class UnboundedSPSCQueue
193227

194228
/**
195229
* Return the current buffer's capacity
230+
* @note: consumer only
196231
* @return capacity
197232
*/
198233
QUILL_NODISCARD size_t capacity() const noexcept { return _consumer->bounded_queue.capacity(); }
199234

200235
/**
201236
* checks if the queue is empty
237+
* @note consumer only
202238
* @return true if empty, false otherwise
203239
*/
204240
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
205241
{
206242
return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) == nullptr);
207243
}
208244

245+
209246
private:
210247
/***/
211248
#if defined(_MSC_VER)

test/integration_tests/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ quill_add_test(TEST_RotatingSinkDailyRotation RotatingSinkDailyRotationTest.cpp)
8787
quill_add_test(TEST_RotatingSinkKeepOldest RotatingSinkKeepOldestTest.cpp)
8888
quill_add_test(TEST_RotatingSinkOverwriteOldest RotatingSinkOverwriteOldestTest.cpp)
8989
quill_add_test(TEST_RuntimeMetadata RuntimeMetadataTest.cpp)
90+
quill_add_test(TEST_ShrinkThreadLocalQueueTest ShrinkThreadLocalQueueTest.cpp)
9091
quill_add_test(TEST_SignalHandler SignalHandlerTest.cpp)
9192
quill_add_test(TEST_SignalHandlerLogger SignalHandlerLoggerTest.cpp)
9293
quill_add_test(TEST_SingleFrontendThreadMultipleLoggers SingleFrontendThreadMultipleLoggersTest.cpp)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include "doctest/doctest.h"
2+
3+
#include "misc/TestUtilities.h"
4+
#include "quill/Backend.h"
5+
#include "quill/Frontend.h"
6+
#include "quill/LogMacros.h"
7+
#include "quill/sinks/FileSink.h"
8+
9+
#include <cstdio>
10+
#include <string>
11+
#include <vector>
12+
13+
using namespace quill;
14+
15+
// Define custom Frontend Options
16+
struct CustomFrontendOptions
17+
{
18+
static constexpr quill::QueueType queue_type = quill::QueueType::UnboundedBlocking;
19+
static constexpr uint32_t initial_queue_capacity = 16 * 1024; // 16 KiB
20+
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
21+
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
22+
};
23+
24+
using CustomFrontend = FrontendImpl<CustomFrontendOptions>;
25+
using CustomLogger = LoggerImpl<CustomFrontendOptions>;
26+
27+
/***/
28+
TEST_CASE("shrink_thread_local_queue")
29+
{
30+
static constexpr size_t number_of_messages = 5000;
31+
static constexpr size_t iterations = 5;
32+
static constexpr char const* filename = "shrink_thread_local_queue.log";
33+
static std::string const logger_name = "logger";
34+
35+
// Start the logging backend thread
36+
Backend::start();
37+
38+
// just for testing - call before logging anything
39+
CustomFrontend::shrink_thread_local_queue(8 * 1024);
40+
REQUIRE_EQ(CustomFrontend::get_thread_local_queue_capacity(), 8 * 1024);
41+
42+
// Set writing logging to a file
43+
auto file_sink = CustomFrontend::create_or_get_sink<FileSink>(
44+
filename,
45+
[]()
46+
{
47+
FileSinkConfig cfg;
48+
cfg.set_open_mode('w');
49+
50+
// For this test only we use the default buffer size, it should not make any difference it is just for testing the default behaviour and code coverage
51+
cfg.set_write_buffer_size(0);
52+
53+
return cfg;
54+
}(),
55+
FileEventNotifier{});
56+
57+
CustomLogger* logger = CustomFrontend::create_or_get_logger(logger_name, std::move(file_sink));
58+
59+
REQUIRE_EQ(CustomFrontend::get_thread_local_queue_capacity(), 8 * 1024);
60+
61+
size_t cnt{0};
62+
for (size_t iter = 0; iter < iterations; ++iter)
63+
{
64+
for (size_t i = 0; i < number_of_messages; ++i)
65+
{
66+
LOG_INFO(logger, "This is message {}", cnt++);
67+
}
68+
69+
CustomFrontend::shrink_thread_local_queue(16 * 1024);
70+
REQUIRE_EQ(CustomFrontend::get_thread_local_queue_capacity(), 16 * 1024);
71+
}
72+
73+
logger->flush_log();
74+
CustomFrontend::remove_logger(logger);
75+
76+
// Wait until the backend thread stops for test stability
77+
Backend::stop();
78+
79+
// Read file and check
80+
std::vector<std::string> const file_contents = quill::testing::file_contents(filename);
81+
REQUIRE_EQ(file_contents.size(), number_of_messages * iterations);
82+
83+
for (size_t i = 0; i < number_of_messages * iterations; ++i)
84+
{
85+
std::string expected_string = logger_name + " This is message " + std::to_string(i);
86+
REQUIRE(quill::testing::file_contains(file_contents, expected_string));
87+
}
88+
89+
testing::remove_file(filename);
90+
}

test/unit_tests/UnboundedQueueTest.cpp

+57
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,63 @@ TEST_SUITE_BEGIN("UnboundedQueue");
99

1010
using namespace quill::detail;
1111

12+
TEST_CASE("unbounded_queue_shrink")
13+
{
14+
constexpr size_t CHUNK{256};
15+
constexpr size_t INITIAL_SIZE{1024};
16+
17+
UnboundedSPSCQueue buffer{INITIAL_SIZE};
18+
REQUIRE_EQ(buffer.producer_capacity(), INITIAL_SIZE);
19+
20+
// This queue will grow as we request 5 * 256
21+
for (uint32_t i = 0; i < 5; ++i)
22+
{
23+
#if defined(_MSC_VER)
24+
auto* write_buffer = buffer.prepare_write(CHUNK, quill::QueueType::UnboundedBlocking);
25+
#else
26+
auto* write_buffer = buffer.prepare_write<quill::QueueType::UnboundedBlocking>(CHUNK);
27+
#endif
28+
29+
REQUIRE(write_buffer);
30+
buffer.finish_write(CHUNK);
31+
buffer.commit_write();
32+
}
33+
34+
{
35+
UnboundedSPSCQueue::ReadResult res = buffer.prepare_read();
36+
REQUIRE(res.read_pos);
37+
buffer.finish_read(INITIAL_SIZE);
38+
buffer.commit_read();
39+
}
40+
41+
{
42+
// Now read again the next part from the allocated queue
43+
UnboundedSPSCQueue::ReadResult res = buffer.prepare_read();
44+
REQUIRE(res.read_pos);
45+
REQUIRE(res.allocation);
46+
REQUIRE_EQ(res.previous_capacity, INITIAL_SIZE);
47+
REQUIRE_EQ(res.new_capacity, INITIAL_SIZE * 2);
48+
buffer.finish_read(CHUNK);
49+
buffer.commit_read();
50+
}
51+
52+
// Shrink the queue
53+
REQUIRE_EQ(buffer.producer_capacity(), INITIAL_SIZE * 2);
54+
buffer.shrink(INITIAL_SIZE);
55+
REQUIRE_EQ(buffer.producer_capacity(), INITIAL_SIZE);
56+
57+
{
58+
// On next read we should see the new queue - old is gone
59+
UnboundedSPSCQueue::ReadResult res = buffer.prepare_read();
60+
REQUIRE_EQ(res.read_pos, nullptr);
61+
REQUIRE(res.allocation);
62+
REQUIRE_EQ(res.previous_capacity, INITIAL_SIZE * 2);
63+
REQUIRE_EQ(res.new_capacity, INITIAL_SIZE);
64+
buffer.finish_read(CHUNK);
65+
buffer.commit_read();
66+
}
67+
}
68+
1269
TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints")
1370
{
1471
UnboundedSPSCQueue buffer{1024};

0 commit comments

Comments
 (0)