Skip to content

Commit 310371e

Browse files
authored
Add unbounded_queue_max_capacity to FrontEndOptions
1 parent 7dc6d90 commit 310371e

23 files changed

+134
-230
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@
9393
allowing huge page allocation to be attempted with a fallback to normal pages if unavailable. If you are using a
9494
custom `FrontendOptions` type, you will need to update it to use the new
9595
flag. ([#707](https://github.com/odygrd/quill/issues/707))
96+
- Previously, `QueueType::UnboundedDropping` and `QueueType::UnboundedBlocking` could grow up to 2 GB in size. This
97+
limit is now configurable via `FrontendOptions::unbounded_queue_max_capacity`, which defaults to 2 GB.
98+
- `QueueType::UnboundedUnlimited` has been removed, as the same behavior can now be achieved by setting
99+
`FrontendOptions::unbounded_queue_max_capacity` to the maximum value.
96100
- The `ConsoleSink` constructor now optionally accepts a `ConsoleSinkConfig`, similar to other sinks. If no
97101
`ConsoleSinkConfig` is provided, a default one is used, logging to `stdout` with `ColourMode::Automatic`. For example:
98102
```c++

benchmarks/hot_path_latency/quill_hot_path_rdtsc_clock.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
struct FrontendOptions
1414
{
1515
static constexpr quill::QueueType queue_type = quill::QueueType::UnboundedBlocking;
16-
static constexpr uint32_t initial_queue_capacity = 131'072;
16+
static constexpr size_t initial_queue_capacity = 131'072;
1717
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
18+
static constexpr size_t unbounded_queue_max_capacity = 2ull * 1024 * 1024 * 1024; // 2 GiB
1819
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
1920
};
2021

benchmarks/hot_path_latency/quill_hot_path_system_clock.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
struct FrontendOptions
1414
{
1515
static constexpr quill::QueueType queue_type = quill::QueueType::UnboundedBlocking;
16-
static constexpr uint32_t initial_queue_capacity = 131'072;
16+
static constexpr size_t initial_queue_capacity = 131'072;
1717
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
18+
static constexpr size_t unbounded_queue_max_capacity = 2ull * 1024 * 1024 * 1024; // 2 GiB
1819
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
1920
};
2021

docs/frontend_options.rst

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ Each frontend thread operates with its own queue, which can be configured with o
99

1010
- **UnboundedBlocking**: Starts with a small initial capacity. The queue reallocates up to 2GB and then blocks further log messages.
1111
- **UnboundedDropping**: Starts with a small initial capacity. The queue reallocates up to 2GB and then discards log messages.
12-
- **UnboundedUnlimited**: Starts with a small initial capacity and reallocates without limit. This queue never blocks or drops log messages.
1312
- **BoundedBlocking**: Has a fixed capacity and never reallocates. It blocks log messages when the limit is reached.
1413
- **BoundedDropping**: Has a fixed capacity and never reallocates. It discards log messages when the limit is reached.
1514

examples/bounded_dropping_queue_frontend.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ struct CustomFrontendOptions
2424
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedDropping;
2525

2626
// Set small capacity to demonstrate dropping messages in this example
27-
static constexpr uint32_t initial_queue_capacity = 256;
27+
static constexpr size_t initial_queue_capacity = 256;
2828

2929
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
3030
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;

examples/custom_frontend_options.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
struct CustomFrontendOptions
1515
{
1616
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedDropping;
17-
static constexpr uint32_t initial_queue_capacity = 131'072;
17+
static constexpr size_t initial_queue_capacity = 131'072;
1818
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
1919
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
2020
};

include/quill/Logger.h

+4-10
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class LoggerImpl : public detail::LoggerBase
5151
using frontend_options_t = TFrontendOptions;
5252

5353
static constexpr bool using_unbounded_queue =
54-
(frontend_options_t::queue_type == QueueType::UnboundedUnlimited) ||
5554
(frontend_options_t::queue_type == QueueType::UnboundedBlocking) ||
5655
(frontend_options_t::queue_type == QueueType::UnboundedDropping);
5756

@@ -140,13 +139,8 @@ class LoggerImpl : public detail::LoggerBase
140139

141140
std::byte* write_buffer = _prepare_write_buffer(total_size);
142141

143-
if constexpr (frontend_options_t::queue_type == QueueType::UnboundedUnlimited)
144-
{
145-
assert(write_buffer &&
146-
"Unbounded unlimited queue will always allocate and have enough capacity");
147-
}
148-
else if constexpr ((frontend_options_t::queue_type == QueueType::BoundedDropping) ||
149-
(frontend_options_t::queue_type == QueueType::UnboundedDropping))
142+
if constexpr ((frontend_options_t::queue_type == QueueType::BoundedDropping) ||
143+
(frontend_options_t::queue_type == QueueType::UnboundedDropping))
150144
{
151145
if (QUILL_UNLIKELY(write_buffer == nullptr))
152146
{
@@ -370,10 +364,10 @@ class LoggerImpl : public detail::LoggerBase
370364
// MSVC doesn't like the template keyword, but every other compiler requires it
371365
#if defined(_MSC_VER)
372366
return thread_context->get_spsc_queue<frontend_options_t::queue_type>().prepare_write(
373-
total_size, frontend_options_t::queue_type);
367+
total_size, frontend_options_t::unbounded_queue_max_capacity);
374368
#else
375369
return thread_context->get_spsc_queue<frontend_options_t::queue_type>()
376-
.template prepare_write<frontend_options_t::queue_type>(total_size);
370+
.template prepare_write<frontend_options_t::unbounded_queue_max_capacity>(total_size);
377371
#endif
378372
}
379373
else

include/quill/core/BoundedSPSCQueue.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class BoundedSPSCQueueImpl
267267
mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
268268
}
269269
#endif
270-
270+
271271
if (mem == MAP_FAILED)
272272
{
273273
QUILL_THROW(QuillError{std::string{"mmap failed. errno: "} + std::to_string(errno) +

include/quill/core/Common.h

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ enum class QueueType
5050
{
5151
UnboundedBlocking,
5252
UnboundedDropping,
53-
UnboundedUnlimited,
5453
BoundedBlocking,
5554
BoundedDropping
5655
};

include/quill/core/FrontendOptions.h

+11-7
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ struct FrontendOptions
1717
{
1818
/**
1919
* Each frontend thread has its own queue, which can be configured with various options:
20-
* - UnboundedBlocking: Starts with initial_queue_capacity and reallocates up to 2GB, then blocks.
21-
* - UnboundedDropping: Starts with initial_queue_capacity and reallocates up to 2GB, then drops log messages.
22-
* - UnboundedUnlimited: Starts with initial_queue_capacity and reallocates without limit, subsequent queues are reallocated as needed. Never blocks or drops.
20+
* - UnboundedBlocking: Starts with initial_queue_capacity and reallocates up to unbounded_queue_max_capacity, then blocks.
21+
* - UnboundedDropping: Starts with initial_queue_capacity and reallocates up to unbounded_queue_max_capacity, then drops log messages.
2322
* - BoundedBlocking: Starts with initial_queue_capacity and never reallocates; blocks when the limit is reached.
2423
* - BoundedDropping: Starts with initial_queue_capacity and never reallocates; drops log messages when the limit is reached.
2524
*
@@ -28,21 +27,26 @@ struct FrontendOptions
2827
static constexpr QueueType queue_type = QueueType::UnboundedBlocking;
2928

3029
/**
31-
* Initial capacity of the queue. Used for UnboundedBlocking, UnboundedDropping, and
32-
* UnboundedUnlimited. Also serves as the capacity for BoundedBlocking and BoundedDropping.
30+
* Initial capacity of the queue.
3331
*/
34-
static constexpr uint32_t initial_queue_capacity = 128 * 1024; // 128 KiB
32+
static constexpr size_t initial_queue_capacity = 128u * 1024u; // 128 KiB
3533

3634
/**
3735
* Interval for retrying when using BoundedBlocking or UnboundedBlocking.
3836
* Applicable only when using BoundedBlocking or UnboundedBlocking.
3937
*/
4038
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
4139

40+
/**
41+
* Maximum capacity for unbounded queues (UnboundedBlocking, UnboundedDropping).
42+
* This defines the maximum size to which the queue can grow before blocking or dropping messages.
43+
*/
44+
static constexpr size_t unbounded_queue_max_capacity = 2ull * 1024u * 1024u * 1024u; // 2 GiB
45+
4246
/**
4347
* Enables huge pages on the frontend queues to reduce TLB misses. Available only for Linux.
4448
*/
45-
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
49+
static constexpr HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never;
4650
};
4751

4852
QUILL_END_NAMESPACE

include/quill/core/ThreadContextManager.h

+9-20
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,17 @@ class ThreadContext
5959

6060
public:
6161
/***/
62-
ThreadContext(QueueType queue_type, uint32_t initial_spsc_queue_capacity, HugePagesPolicy huge_pages_policy)
62+
ThreadContext(QueueType queue_type, size_t initial_queue_capacity, HugePagesPolicy huge_pages_policy)
6363
: _queue_type(queue_type)
6464
{
6565
if (has_unbounded_queue_type())
6666
{
6767
new (&_spsc_queue_union.unbounded_spsc_queue)
68-
UnboundedSPSCQueue{initial_spsc_queue_capacity, huge_pages_policy};
68+
UnboundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
6969
}
7070
else if (has_bounded_queue_type())
7171
{
72-
new (&_spsc_queue_union.bounded_spsc_queue)
73-
BoundedSPSCQueue{initial_spsc_queue_capacity, huge_pages_policy};
72+
new (&_spsc_queue_union.bounded_spsc_queue) BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
7473
}
7574
}
7675

@@ -93,16 +92,11 @@ class ThreadContext
9392

9493
/***/
9594
template <QueueType queue_type>
96-
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
97-
std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedUnlimited) ||
98-
(queue_type == QueueType::UnboundedDropping),
99-
UnboundedSPSCQueue, BoundedSPSCQueue>&
100-
get_spsc_queue() noexcept
95+
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
10196
{
10297
assert((_queue_type == queue_type) && "ThreadContext queue_type mismatch");
10398

104-
if constexpr ((queue_type == QueueType::UnboundedBlocking) ||
105-
(queue_type == QueueType::UnboundedUnlimited) || (queue_type == QueueType::UnboundedDropping))
99+
if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
106100
{
107101
return _spsc_queue_union.unbounded_spsc_queue;
108102
}
@@ -114,16 +108,12 @@ class ThreadContext
114108

115109
/***/
116110
template <QueueType queue_type>
117-
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
118-
std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedUnlimited) ||
119-
(queue_type == QueueType::UnboundedDropping),
120-
UnboundedSPSCQueue, BoundedSPSCQueue> const&
121-
get_spsc_queue() const noexcept
111+
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue> const& get_spsc_queue()
112+
const noexcept
122113
{
123114
assert((_queue_type == queue_type) && "ThreadContext queue_type mismatch");
124115

125-
if constexpr ((queue_type == QueueType::UnboundedBlocking) ||
126-
(queue_type == QueueType::UnboundedUnlimited) || (queue_type == QueueType::UnboundedDropping))
116+
if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
127117
{
128118
return _spsc_queue_union.unbounded_spsc_queue;
129119
}
@@ -148,8 +138,7 @@ class ThreadContext
148138
/***/
149139
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_unbounded_queue_type() const noexcept
150140
{
151-
return (_queue_type == QueueType::UnboundedBlocking) ||
152-
(_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::UnboundedUnlimited);
141+
return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
153142
}
154143

155144
/***/

include/quill/core/UnboundedSPSCQueue.h

+26-38
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ class UnboundedSPSCQueue
107107
*/
108108
#if defined(_MSC_VER)
109109
// MSVC doesn't like this as template <QueueType queue_type> when called from Logger, while it compiles on MSVC there will be false positives from clang-tidy
110-
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes, QueueType queue_type)
110+
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes, size_t unbounded_queue_max_capacity)
111111
#else
112-
template <QueueType queue_type>
112+
template <size_t unbounded_queue_max_capacity>
113113
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes)
114114
#endif
115115
{
@@ -122,9 +122,9 @@ class UnboundedSPSCQueue
122122
}
123123

124124
#if defined(_MSC_VER)
125-
return _handle_full_queue(nbytes, queue_type);
125+
return _handle_full_queue(nbytes, unbounded_queue_max_capacity);
126126
#else
127-
return _handle_full_queue<queue_type>(nbytes);
127+
return _handle_full_queue<unbounded_queue_max_capacity>(nbytes);
128128
#endif
129129
}
130130

@@ -242,57 +242,45 @@ class UnboundedSPSCQueue
242242
return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) == nullptr);
243243
}
244244

245-
246245
private:
247246
/***/
248247
#if defined(_MSC_VER)
249-
QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes, QueueType queue_type)
248+
QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes, size_t unbounded_queue_max_capacity)
250249
#else
251-
template <QueueType queue_type>
250+
template <size_t unbounded_queue_max_capacity>
252251
QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes)
253252
#endif
254253
{
255254
// Then it means the queue doesn't have enough size
256255
size_t capacity = _producer->bounded_queue.capacity() * 2ull;
257-
while (capacity < (nbytes + 1))
256+
while (capacity < nbytes)
258257
{
259258
capacity = capacity * 2ull;
260259
}
261260

262-
#if defined(_MSC_VER)
263-
if ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
264-
#else
265-
if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
266-
#endif
261+
if (QUILL_UNLIKELY(capacity > unbounded_queue_max_capacity))
267262
{
268-
size_t constexpr max_bounded_queue_size = 2ull * 1024 * 1024 * 1024; // 2 GB
269-
270-
if (QUILL_UNLIKELY(capacity > max_bounded_queue_size))
263+
if (nbytes > unbounded_queue_max_capacity)
271264
{
272-
if (nbytes > max_bounded_queue_size)
273-
{
274-
QUILL_THROW(QuillError{
275-
"Logging single messages larger than 2 GB is not supported with the current queue "
276-
"type. For UnboundedBlocking or UnboundedDropping queues, this limitation applies.\n"
277-
"To log single messages larger than 2 GB, consider using the UnboundedUnlimited queue "
278-
"type.\n"
279-
"Message size: " +
280-
std::to_string(nbytes) +
281-
" bytes\n"
282-
"Required queue capacity: " +
283-
std::to_string(capacity) +
284-
" bytes\n"
285-
"Maximum allowed queue capacity: " +
286-
std::to_string(max_bounded_queue_size) + " bytes"});
287-
}
288-
289-
// we reached the max_bounded_queue_size we won't be allocating more
290-
// instead return nullptr to block or drop
291-
return nullptr;
265+
QUILL_THROW(
266+
QuillError{"Logging single messages larger than the configured maximum queue capacity "
267+
"is not possible.\n"
268+
"To log single messages exceeding this limit, consider increasing "
269+
"FrontendOptions::unbounded_queue_max_capacity.\n"
270+
"Message size: " +
271+
std::to_string(nbytes) +
272+
" bytes\n"
273+
"Required queue capacity: " +
274+
std::to_string(capacity) +
275+
" bytes\n"
276+
"Configured maximum queue capacity: " +
277+
std::to_string(unbounded_queue_max_capacity) + " bytes"});
292278
}
293-
}
294279

295-
// else the UnboundedUnlimited queue has no limits
280+
// we reached the unbounded_queue_max_capacity we won't be allocating more
281+
// instead return nullptr to block or drop
282+
return nullptr;
283+
}
296284

297285
// commit previous write to the old queue before switching
298286
_producer->bounded_queue.commit_write();

include/quill/sinks/JsonSink.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ class JsonFileSink : public detail::JsonSink<FileSink>
141141
public:
142142
JsonFileSink(fs::path const& filename, FileSinkConfig const& config,
143143
FileEventNotifier file_event_notifier = FileEventNotifier{}, bool do_fopen = true)
144-
: detail::JsonSink<FileSink>(filename, static_cast<FileSinkConfig const&>(config), std::move(file_event_notifier), do_fopen)
144+
: detail::JsonSink<FileSink>(filename, static_cast<FileSinkConfig const&>(config),
145+
std::move(file_event_notifier), do_fopen)
145146
{
146147
}
147148

include/quill/sinks/SyslogSink.h

+4-6
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,10 @@ class SyslogSink : public quill::Sink
154154
/**
155155
* @brief Writes a formatted log message to the stream
156156
*/
157-
void write_log(MacroMetadata const* /* log_metadata */,
158-
uint64_t /* log_timestamp */, std::string_view /* thread_id */,
159-
std::string_view /* thread_name */, std::string const& /* process_id */,
160-
std::string_view /* logger_name */, LogLevel log_level,
161-
std::string_view /* log_level_description */,
162-
std::string_view /* log_level_short_code */,
157+
void write_log(MacroMetadata const* /* log_metadata */, uint64_t /* log_timestamp */,
158+
std::string_view /* thread_id */, std::string_view /* thread_name */,
159+
std::string const& /* process_id */, std::string_view /* logger_name */, LogLevel log_level,
160+
std::string_view /* log_level_description */, std::string_view /* log_level_short_code */,
163161
std::vector<std::pair<std::string, std::string>> const* /* named_args */,
164162
std::string_view log_message, std::string_view log_statement) override
165163
{

test/integration_tests/BoundedBlockingQueueTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using namespace quill;
1515
struct CustomFrontendOptions
1616
{
1717
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedBlocking;
18-
static constexpr uint32_t initial_queue_capacity = 131'072;
18+
static constexpr size_t initial_queue_capacity = 131'072;
1919
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
2020
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
2121
};

test/integration_tests/BoundedDroppingQueueDropMessagesTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using namespace quill;
1515
struct CustomFrontendOptions
1616
{
1717
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedDropping;
18-
static constexpr uint32_t initial_queue_capacity = 1024;
18+
static constexpr size_t initial_queue_capacity = 1024;
1919
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
2020
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
2121
};

test/integration_tests/BoundedDroppingQueueTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using namespace quill;
1515
struct CustomFrontendOptions
1616
{
1717
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedDropping;
18-
static constexpr uint32_t initial_queue_capacity = 131'072;
18+
static constexpr size_t initial_queue_capacity = 131'072;
1919
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
2020
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
2121
};

test/integration_tests/CsvWritingCustomFrontendTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct OrderCsvSchema
1717
struct CustomFrontendOptions
1818
{
1919
static constexpr quill::QueueType queue_type = quill::QueueType::BoundedBlocking;
20-
static constexpr uint32_t initial_queue_capacity = 131'072;
20+
static constexpr size_t initial_queue_capacity = 131'072;
2121
static constexpr uint32_t blocking_queue_retry_interval_ns = 800;
2222
static constexpr quill::HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never;
2323
};

0 commit comments

Comments
 (0)