Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions include/fluent-bit/flb_atomic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2026 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_ATOMIC_H
#define FLB_ATOMIC_H

/*
* Minimal relaxed-atomic helpers for scalar values that are read and written
* by more than one thread (e.g. counters and one-shot status fields shared
* between a threaded input worker and the main engine).
*
* Aligned word-sized loads/stores are already atomic on every platform Fluent
* Bit targets, but accessing them from multiple threads with plain operators is
* a C-level data race (undefined behavior, and flagged by ThreadSanitizer).
* These helpers make such accesses well defined. Relaxed ordering is used on
* purpose: callers only require atomicity of the individual value, not ordering
* relative to other memory (for ordered hand-offs use a mutex instead).
*
* The helpers are type-generic (int, size_t, uint64_t, ...).
*/

#if defined(__GNUC__) || defined(__clang__)

#define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED)
#define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED)
#define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED)

#elif defined(_MSC_VER)

/*
* MSVC backend: the Interlocked intrinsics are atomic (full barrier, which is
* stronger than the relaxed ordering we need but always correct). The helpers
* dispatch on the operand width so they work for both 32-bit and 64-bit scalars
* on 32-bit and 64-bit targets.
*/
#include <intrin.h>

static __forceinline long long flb_atomic_load_n(volatile void *ptr, size_t width)
{
#ifdef _WIN64
if (width == 8) {
return (long long) _InterlockedOr64((volatile __int64 *) ptr, 0);
}
#endif
(void) width;
return (long long) _InterlockedOr((volatile long *) ptr, 0);
}

static __forceinline void flb_atomic_store_n(volatile void *ptr, long long val,
size_t width)
{
#ifdef _WIN64
if (width == 8) {
(void) _InterlockedExchange64((volatile __int64 *) ptr, (__int64) val);
return;
}
#endif
(void) width;
(void) _InterlockedExchange((volatile long *) ptr, (long) val);
}

static __forceinline long long flb_atomic_fetch_add_n(volatile void *ptr,
long long val, size_t width)
{
#ifdef _WIN64
if (width == 8) {
return (long long) _InterlockedExchangeAdd64((volatile __int64 *) ptr,
(__int64) val);
}
#endif
(void) width;
return (long long) _InterlockedExchangeAdd((volatile long *) ptr, (long) val);
}

#define flb_atomic_load(ptr) flb_atomic_load_n((ptr), sizeof(*(ptr)))
#define flb_atomic_store(ptr, val) flb_atomic_store_n((ptr), (long long) (val), \
sizeof(*(ptr)))
#define flb_atomic_fetch_add(ptr, v) flb_atomic_fetch_add_n((ptr), (long long) (v), \
sizeof(*(ptr)))

#else
#error "flb_atomic.h: no atomic backend available for this compiler"
#endif

#endif
12 changes: 9 additions & 3 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_atomic.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_engine_dispatch.h>
#include <fluent-bit/flb_network.h>
Expand Down Expand Up @@ -1123,6 +1124,14 @@ int flb_engine_start(struct flb_config *config)
return -1;
}

/*
* Publish the supervisor grace window before signaling startup: the start
* notification is the happens-before edge the main thread synchronizes on,
* so storing grace_input afterwards could let it observe a stale value.
*/
flb_atomic_store(&config->grace_input, config->grace / 2);
flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input);

/* Signal that we have started */
flb_engine_started(config);

Expand All @@ -1134,9 +1143,6 @@ int flb_engine_start(struct flb_config *config)
return -2;
}

config->grace_input = config->grace / 2;
flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input);

while (1) {
rb_flush_flag = FLB_FALSE;

Expand Down
11 changes: 11 additions & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,17 @@ int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config)

mk_list_foreach(head, &config->inputs) {
ins = mk_list_entry(head, struct flb_input_instance, _head);

/*
* Collectors of a threaded input are registered and dispatched in the
* input's own thread/event loop (see flb_input_thread.c), never through
* this main-thread handler. Skipping them avoids a benign data race with
* the worker thread that concurrently initializes those collector fds.
*/
if (flb_input_is_threaded(ins)) {
continue;
}

mk_list_foreach(head_coll, &ins->collectors) {
collector = mk_list_entry(head_coll, struct flb_input_collector, _head);
if (collector->fd_event == fd) {
Expand Down
12 changes: 9 additions & 3 deletions src/flb_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_version.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_atomic.h>
#include <msgpack.h>

static int id_exists(int id, struct flb_metrics *metrics)
Expand Down Expand Up @@ -181,7 +182,12 @@ int flb_metrics_sum(int id, size_t val, struct flb_metrics *metrics)
return -1;
}

m->val += val;
/*
* The counter is summed from the owning input/output worker thread while the
* metrics exporter reads it from the main engine thread; use a relaxed
* atomic so the access is well defined (see flb_metrics_dump_values()).
*/
flb_atomic_fetch_add(&m->val, val);
return 0;
}

Expand Down Expand Up @@ -214,7 +220,7 @@ int flb_metrics_print(struct flb_metrics *metrics)

mk_list_foreach(head, &metrics->list) {
m = mk_list_entry(head, struct flb_metric, _head);
printf(", '%s' => %lu", m->title, m->val);
printf(", '%s' => %lu", m->title, (unsigned long) flb_atomic_load(&m->val));
}
printf("\n");

Expand All @@ -240,7 +246,7 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size,
m = mk_list_entry(head, struct flb_metric, _head);
msgpack_pack_str(&mp_pck, flb_sds_len(m->title));
msgpack_pack_str_body(&mp_pck, m->title, flb_sds_len(m->title));
msgpack_pack_uint64(&mp_pck, m->val);
msgpack_pack_uint64(&mp_pck, flb_atomic_load(&m->val));
}

*out_buf = mp_sbuf.data;
Expand Down
8 changes: 5 additions & 3 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include <fluent-bit/flb_reload.h>
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/flb_supervisor.h>
#include <fluent-bit/flb_atomic.h>

#ifdef FLB_HAVE_MTRACE
#include <mcheck.h>
Expand Down Expand Up @@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv)
ctx = flb_context_get();

if (ctx != NULL && ctx->config != NULL) {
/* grace_input is published by the engine thread (flb_engine_start) */
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

#ifdef FLB_HAVE_CHUNK_TRACE
Expand Down Expand Up @@ -1514,7 +1516,7 @@ static int flb_main_run(int argc, char **argv)
if (supervisor_reload_notified == FLB_FALSE &&
ctx != NULL && ctx->config != NULL) {
flb_supervisor_child_signal_shutdown(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
supervisor_reload_notified = FLB_TRUE;
}

Expand All @@ -1526,7 +1528,7 @@ static int flb_main_run(int argc, char **argv)
supervisor_reload_notified = FLB_FALSE;
if (ctx != NULL && ctx->config != NULL) {
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
}
}
else {
Expand Down
Loading