From a07c53bbc982dea1400687da778176a1eb5338fd Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Oct 2025 17:46:49 +0900 Subject: [PATCH 01/10] config: Add dead letter queue related config Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_config.h | 7 +++++++ src/flb_config.c | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1e8f2f0783..3276fe835e7 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -251,6 +251,10 @@ struct flb_config { char *storage_type; /* global storage type */ int storage_inherit; /* apply storage type to inputs */ + /* DLQ for non-retriable output failures */ + int storage_keep_rejected; /* 0/1 */ + char *storage_rejected_path; /* relative to storage_path, default "rejected" */ + /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB struct mk_list sqldb_list; @@ -411,6 +415,9 @@ enum conf_type { #define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" #define FLB_CONF_STORAGE_TYPE "storage.type" #define FLB_CONF_STORAGE_INHERIT "storage.inherit" +/* Storage DLQ */ +#define FLB_CONF_STORAGE_KEEP_REJECTED "storage.keep.rejected" +#define FLB_CONF_STORAGE_REJECTED_PATH "storage.rejected.path" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/src/flb_config.c b/src/flb_config.c index ddfdd010a1c..e61ff331d0f 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -163,6 +163,13 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_INHERIT, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_inherit)}, + /* Storage / DLQ */ + {FLB_CONF_STORAGE_KEEP_REJECTED, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, storage_keep_rejected)}, + {FLB_CONF_STORAGE_REJECTED_PATH, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, storage_rejected_path)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -312,6 +319,7 @@ struct flb_config *flb_config_init() config->storage_type = NULL; config->storage_inherit = FLB_FALSE; config->storage_bl_flush_on_shutdown = FLB_FALSE; + config->storage_rejected_path = NULL; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; config->json_escape_unicode = FLB_TRUE; @@ -573,6 +581,9 @@ void flb_config_exit(struct flb_config *config) if (config->storage_bl_mem_limit) { flb_free(config->storage_bl_mem_limit); } + if (config->storage_rejected_path) { + flb_free(config->storage_rejected_path); + } #ifdef FLB_HAVE_STREAM_PROCESSOR if (config->stream_processor_file) { From 32bb91b2ddd74a9df475bb4018cd4d772922185c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Oct 2025 18:40:46 +0900 Subject: [PATCH 02/10] config: storage: Implement dlq for filesystem chunks Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_config.h | 1 + include/fluent-bit/flb_storage.h | 7 ++ src/flb_storage.c | 111 +++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 3276fe835e7..7709d318387 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -254,6 +254,7 @@ struct flb_config { /* DLQ for non-retriable output failures */ int storage_keep_rejected; /* 0/1 */ char *storage_rejected_path; /* relative to storage_path, default "rejected" */ + void *storage_rejected_stream; /* NULL until first use */ /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 57b50e19016..e4d3884d5fb 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -85,4 +85,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); +/* DLQ */ +int flb_storage_quarantine_chunk(struct flb_config *ctx, + struct cio_chunk *ch, + const char *tag, + int status_code, + const char *out_name); + #endif diff --git a/src/flb_storage.c b/src/flb_storage.c index 0f0ff93f1ed..9cce791f170 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -747,6 +747,117 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch *fs_chunks = storage_st.chunks_fs; } + +static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx) +{ +#ifdef CIO_HAVE_BACKEND_FILESYSTEM + struct cio_stream *st; + const char *name; + + if (!ctx || !ctx->cio) { + return NULL; + } + if (!ctx->storage_keep_rejected || !ctx->storage_path) { + return NULL; + } + + if (ctx->storage_rejected_stream) { + return ctx->storage_rejected_stream; + } + + name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + + st = cio_stream_get(ctx->cio, name); + if (!st) { + st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS); + } + if (!st) { + flb_warn("[storage] failed to create rejected stream '%s'", name); + return NULL; + } + + ctx->storage_rejected_stream = st; + return st; +#else + FLB_UNUSED(ctx); + return NULL; +#endif +} + +int flb_storage_quarantine_chunk(struct flb_config *ctx, + struct cio_chunk *src, + const char *tag, + int status_code, + const char *out_name) +{ +#ifdef CIO_HAVE_BACKEND_FILESYSTEM + struct cio_stream *dlq; + void *buf = NULL; + size_t size = 0; + int err = 0; + char name[256]; + struct cio_chunk *dst; + + if (!ctx || !src) { + return -1; + } + dlq = get_or_create_rejected_stream(ctx); + if (!dlq) { + return -1; + } + + if (cio_chunk_is_up(src) != CIO_TRUE) { + if (cio_chunk_up_force(src) != CIO_OK) { + flb_warn("[storage] cannot bring chunk up to copy into DLQ"); + return -1; + } + } + + if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) { + flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size); + return -1; + } + + /* Compose a simple, unique-ish file name */ + snprintf(name, sizeof(name), + "%s_%d_%s_%p.flb", + tag ? tag : "no-tag", + status_code, + out_name ? out_name : "out", + (void *) src); + + /* Create + write the DLQ copy */ + dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err); + if (!dst) { + flb_warn("[storage] DLQ open failed (err=%d)", err); + flb_free(buf); + return -1; + } + if (cio_chunk_write(dst, buf, size) != CIO_OK || + cio_chunk_sync(dst) != CIO_OK) { + flb_warn("[storage] DLQ write/sync failed"); + cio_chunk_close(dst, CIO_TRUE); + flb_free(buf); + return -1; + } + + cio_chunk_close(dst, CIO_FALSE); + flb_free(buf); + + flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size); + + return 0; +#else + FLB_UNUSED(ctx); + FLB_UNUSED(src); + FLB_UNUSED(tag); + FLB_UNUSED(status_code); + FLB_UNUSED(out_name); + + return -1; +#endif +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio; From 268b178fd508527cf783e4eae80a2dadefd344da Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Oct 2025 18:58:40 +0900 Subject: [PATCH 03/10] engine: Add a capability to handle dead letter queue for preserving invalid chunks for later verifications Signed-off-by: Hiroshi Hatake --- src/flb_engine.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/flb_engine.c b/src/flb_engine.c index d7fcd7a6223..c2ac94b414a 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -231,6 +231,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance ((double)ins->total_limit_size)); } +static void handle_dlq_if_available(struct flb_config *config, + struct flb_task *task, + struct flb_output_instance *ins, + int status_code /* pass 0 if unknown */) +{ + const char *tag_buf = NULL; + int tag_len = 0; + flb_sds_t tag_sds = NULL; + const char *tag = NULL; + const char *out = NULL; + struct flb_input_chunk *ic; + struct cio_chunk *cio_ch; + + if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) { + return; + } + + ic = (struct flb_input_chunk *) task->ic; + + if (!ic || !ic->chunk) { + return; + } + + /* Obtain tag from the input chunk API (no direct field available) */ + if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) { + tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */ + tag = tag_sds; + } + else { + /* Fallback: use input instance name */ + tag = flb_input_name(task->i_ins); + } + + out = flb_output_name(ins); + cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */ + + /* Copy bytes into DLQ stream (filesystem) */ + (void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out); + + if (tag_sds) { + flb_sds_destroy(tag_sds); + } +} + static inline int handle_output_event(uint64_t ts, struct flb_config *config, uint64_t val) @@ -353,6 +397,8 @@ static inline int handle_output_event(uint64_t ts, } else if (ret == FLB_RETRY) { if (ins->retry_limit == FLB_OUT_RETRY_NONE) { + handle_dlq_if_available(config, task, ins, 0); + /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, 1, (char *[]) {name}); @@ -388,6 +434,8 @@ static inline int handle_output_event(uint64_t ts, * - It reached the maximum number of re-tries */ + handle_dlq_if_available(config, task, ins, 0); + /* cmetrics */ cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, @@ -429,6 +477,8 @@ static inline int handle_output_event(uint64_t ts, * memory available or we ran out of file descriptors. */ if (retry_seconds == -1) { + handle_dlq_if_available(config, task, ins, 0); + flb_warn("[engine] retry for chunk '%s' could not be scheduled: " "input=%s > output=%s", flb_input_chunk_get_name(task->ic), @@ -465,6 +515,7 @@ static inline int handle_output_event(uint64_t ts, } } else if (ret == FLB_ERROR) { + handle_dlq_if_available(config, task, ins, 0); /* cmetrics */ cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, From 73e3e7d3da158dc759ba4909955b82c7046b8445 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Oct 2025 19:54:50 +0900 Subject: [PATCH 04/10] storage: Use correct visibility of struct flb_input_instance Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_storage.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index e4d3884d5fb..854216b1038 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -72,6 +72,8 @@ static inline char *flb_storage_get_type(int type) return NULL; } +struct flb_input_instance; + int flb_storage_create(struct flb_config *ctx); int flb_storage_input_create(struct cio_ctx *cio, struct flb_input_instance *in); From 9273eec5869e8e7a08b8b4cb9eab17f02e2fb258 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Oct 2025 19:55:47 +0900 Subject: [PATCH 05/10] storage: tests: internal: Add DLQ internal tests Signed-off-by: Hiroshi Hatake --- tests/internal/CMakeLists.txt | 1 + tests/internal/storage_dlq.c | 393 ++++++++++++++++++++++++++++++++++ 2 files changed, 394 insertions(+) create mode 100644 tests/internal/storage_dlq.c diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index a270f1dc3c8..2ba9dc65f87 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -53,6 +53,7 @@ set(UNIT_TESTS_FILES storage_inherit.c unicode.c opentelemetry.c + storage_dlq.c ) # TLS helpers diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c new file mode 100644 index 00000000000..7bd6851e966 --- /dev/null +++ b/tests/internal/storage_dlq.c @@ -0,0 +1,393 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "flb_tests_internal.h" + +static int mkpath(const char *p) { + if (mkdir(p, 0777) == 0) { + return 0; + } + if (errno == EEXIST) { + return 0; + } + return -1; +} + +static void tmpdir_for(char *out, size_t n, const char *name) { + snprintf(out, n, "/tmp/flb-dlq-%s-%ld", name, (long) getpid()); + mkpath(out); +} + +/* helper: open a DLQ chunk by basename and return its content copy */ +static int read_dlq_chunk_content(struct flb_config *ctx, + const char *rejected_stream_name, + const char *chunk_basename, + void **out_buf, size_t *out_size) +{ + int err = 0; + struct cio_stream *st; + struct cio_chunk *ch; + + *out_buf = NULL; + *out_size = 0; + + st = cio_stream_get(ctx->cio, rejected_stream_name); + if (!st) { + st = cio_stream_create(ctx->cio, rejected_stream_name, FLB_STORAGE_FS); + if (!st) { return -1; } + } + + /* Open existing DLQ file by name */ + ch = cio_chunk_open(ctx->cio, st, chunk_basename, CIO_OPEN, 0, &err); + if (!ch) { + return -1; + } + + /* ensure it's readable */ + if (cio_chunk_is_up(ch) != CIO_TRUE) { + if (cio_chunk_up_force(ch) != CIO_OK) { + cio_chunk_close(ch, CIO_FALSE); + return -1; + } + } + + if (cio_chunk_get_content_copy(ch, out_buf, out_size) != CIO_OK) { + cio_chunk_close(ch, CIO_FALSE); + return -1; + } + + cio_chunk_close(ch, CIO_FALSE); + return 0; +} + +/* tiny binary “contains” (since memmem is non-portable) */ +static int buf_contains(const void *hay, size_t hlen, + const void *needle, size_t nlen) +{ + if (nlen == 0 || hlen < nlen) return 0; + const unsigned char *h = (const unsigned char *) hay; + const unsigned char *n = (const unsigned char *) needle; + + for (size_t i = 0; i + nlen <= hlen; i++) { + if (h[i] == n[0] && memcmp(h + i, n, nlen) == 0) { + return 1; + } + } + return 0; +} + +/* find the most recent *.flb file in dir; write full path into out */ +static int find_latest_flb(const char *dir, char *out, size_t out_sz) +{ + struct dirent *e; + time_t best_t = 0; + char best_path[1024] = {0}; + struct stat st; + char full[1024]; + size_t len = 0; + DIR *d = opendir(dir); + + if (!d) { + return -1; + } + + while ((e = readdir(d)) != NULL) { + len = strlen(e->d_name); + if (len < 5) { + continue; + } + if (strcmp(e->d_name + (len - 4), ".flb") != 0) { + continue; + } + + snprintf(full, sizeof(full), "%s/%s", dir, e->d_name); + if (stat(full, &st) == 0) { + if (st.st_mtime >= best_t) { + best_t = st.st_mtime; + strncpy(best_path, full, sizeof(best_path) - 1); + } + } + } + closedir(d); + + if (best_path[0] == '\0') { + return -1; + } + strncpy(out, best_path, out_sz - 1); + out[out_sz-1] = '\0'; + return 0; +} + +static void free_ctx(struct flb_config *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->cio) { + cio_destroy(ctx->cio); + ctx->cio = NULL; + } + + flb_config_exit(ctx); +} + +static const char *get_dlq_stream_name(struct flb_config *ctx) +{ + if (ctx->storage_rejected_stream) { + return ((struct cio_stream *)ctx->storage_rejected_stream)->name; + } + return ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; +} + +static void delete_all_chunks_in_stream(struct cio_ctx *cio, struct cio_stream *st) +{ + struct mk_list *head; + struct mk_list *tmp; + struct cio_chunk *ch; + + if (!cio || !st) { + return; + } + + mk_list_foreach_safe(head, tmp, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + + char *name_copy = flb_strdup(ch->name); + if (!name_copy) { + continue; + } + + cio_chunk_close(ch, CIO_FALSE); + + (void) cio_chunk_delete(cio, st, name_copy); + + flb_free(name_copy); + } +} + +static void rmdir_stream_dir(const char *root, const char *stream_name) +{ + if (!root || !stream_name) { + return; + } + + char path[1024]; + snprintf(path, sizeof(path), "%s/%s", root, stream_name); + path[sizeof(path)-1] = '\0'; + + /* Best-effort: ignore errors */ + (void) rmdir(path); +} + +/* Minimal POSIX rm -rf for the whole temp tree after CIO is gone */ +static void rm_rf_best_effort(const char *root) +{ + DIR *d; + struct dirent *e; + char p[1024]; + struct stat st; + + if (!root) { return; } + + d = opendir(root); + if (!d) { + (void) rmdir(root); + return; + } + while ((e = readdir(d)) != NULL) { + if (!strcmp(e->d_name, ".") || !strcmp(e->d_name, "..")) { + continue; + } + snprintf(p, sizeof(p), "%s/%s", root, e->d_name); + if (lstat(p, &st) != 0) { + continue; + } + if (S_ISDIR(st.st_mode)) { + rm_rf_best_effort(p); + } + else { + (void) unlink(p); + } + } + closedir(d); + (void) rmdir(root); +} + +static void test_cleanup_with_cio(struct flb_config *ctx, const char *root) +{ + if (ctx && ctx->cio) { + struct cio_stream *st_in = cio_stream_get(ctx->cio, "in_tail"); + struct cio_stream *st_dlq = cio_stream_get(ctx->cio, get_dlq_stream_name(ctx)); + + delete_all_chunks_in_stream(ctx->cio, st_in); + delete_all_chunks_in_stream(ctx->cio, st_dlq); + + rmdir_stream_dir(root, "in_tail"); + rmdir_stream_dir(root, get_dlq_stream_name(ctx)); + } + + free_ctx(ctx); + + rm_rf_best_effort(root); +} + +static struct flb_config *make_ctx_fs(const char *root, const char *rejected) +{ + struct cio_options opts; + struct flb_config *ctx = flb_config_init(); + TEST_CHECK(ctx != NULL); + + ctx->storage_path = flb_strdup(root); + ctx->storage_keep_rejected = FLB_TRUE; + ctx->storage_rejected_path = flb_strdup(rejected); + + cio_options_init(&opts); + opts.root_path = ctx->storage_path; + opts.flags = CIO_OPEN | CIO_CHECKSUM; + opts.log_cb = NULL; + + ctx->cio = cio_create(&opts); + TEST_CHECK(ctx->cio != NULL); + + /* mimic engine behavior: load + qsort */ + TEST_CHECK(cio_load(ctx->cio, NULL) == 0); + cio_qsort(ctx->cio, NULL); + + return ctx; +} + +static struct cio_chunk *make_src_chunk(struct flb_config *ctx, + int storage_type, /* FLB_STORAGE_FS */ + const char *stream_name, + const char *file_name, + const char *payload) +{ + int err = 0; + int cio_type = storage_type; + struct cio_stream *st = NULL; + struct cio_chunk *ch = NULL; + + st = cio_stream_get(ctx->cio, stream_name); + if (!st) { + st = cio_stream_create(ctx->cio, stream_name, cio_type); + } + TEST_CHECK(st != NULL); + + ch = cio_chunk_open(ctx->cio, st, file_name, CIO_OPEN, 0, &err); + TEST_CHECK(ch != NULL); + + TEST_CHECK(cio_chunk_write(ch, payload, strlen(payload)) == CIO_OK); + TEST_CHECK(cio_chunk_sync(ch) == CIO_OK); + + return ch; +} + +static void test_dlq_copy_from_fs_chunk(void) +{ + char root[256], rejdir[256], latest[1024]; + struct cio_chunk *src = NULL; + struct flb_config *ctx = NULL; + int rc; + const char *payload = + "{\"time\":\"2024-09-03 14:51:05.064735+00:00\",\"msg\":\"oops FS\"}\n"; + char latest_copy[1024]; + void *content = NULL; + size_t content_size = 0; + char *base = NULL; + + tmpdir_for(root, sizeof(root), "fs"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "t0-0-0000000000.000000000.flb", + payload); + + rc = flb_storage_quarantine_chunk(ctx, src, + "kube.var.log.containers.test", + 400, "http"); + TEST_CHECK(rc == 0); + + TEST_CHECK(find_latest_flb(rejdir, latest, sizeof(latest)) == 0); + + /* get just the filename (basename) */ + strncpy(latest_copy, latest, sizeof(latest_copy)-1); + latest_copy[sizeof(latest_copy)-1] = '\0'; + base = basename(latest_copy); + + TEST_CHECK(read_dlq_chunk_content(ctx, "rejected", base, &content, &content_size) == 0); + TEST_CHECK(content != NULL); + TEST_CHECK(content_size > 0); + TEST_CHECK(buf_contains(content, content_size, payload, strlen(payload)) == 1); + + flb_free(content); + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +static void test_dlq_disabled_no_copy(void) +{ + char root[256], rejdir[256], latest[1024]; + struct cio_chunk *src = NULL; + struct flb_config *ctx = NULL; + struct cio_options opts; + int rc; + const char *payload = "{\"msg\":\"should not be copied\"}\n"; + + tmpdir_for(root, sizeof(root), "disabled"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + /* DLQ disabled */ + ctx = flb_config_init(); + TEST_CHECK(ctx != NULL); + + ctx->storage_path = flb_strdup(root); + ctx->storage_keep_rejected = FLB_FALSE; + ctx->storage_rejected_path = flb_strdup("rejected"); + + cio_options_init(&opts); + opts.root_path = ctx->storage_path; + opts.flags = CIO_OPEN; + ctx->cio = cio_create(&opts); + TEST_CHECK(ctx->cio != NULL); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "t1-0.flb", + payload); + + /* Attempt to copy: should fail because DLQ is disabled */ + rc = flb_storage_quarantine_chunk(ctx, src, + "tag", 400, "out"); + TEST_CHECK(rc != 0); + + TEST_CHECK(find_latest_flb(rejdir, latest, sizeof(latest)) != 0); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +TEST_LIST = { + { "dlq_copy_from_fs_chunk", test_dlq_copy_from_fs_chunk }, + { "dlq_disabled_no_copy", test_dlq_disabled_no_copy }, + { NULL, NULL } +}; From 74398c8c31f5302f75225143b075749414c548d8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 9 Oct 2025 17:14:31 +0900 Subject: [PATCH 06/10] storage: tests: internal: Make to be able to compile on Windows Signed-off-by: Hiroshi Hatake --- tests/internal/storage_dlq.c | 170 ++++++++++++++++++++++++++++++----- 1 file changed, 148 insertions(+), 22 deletions(-) diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c index 7bd6851e966..8cfe536aae7 100644 --- a/tests/internal/storage_dlq.c +++ b/tests/internal/storage_dlq.c @@ -17,18 +17,49 @@ #include "flb_tests_internal.h" +#ifdef _WIN32 +# define FLB_UNLINK _unlink +# define FLB_RMDIR _rmdir +#else +# define FLB_UNLINK unlink +# define FLB_RMDIR rmdir +#endif + static int mkpath(const char *p) { +#if FLB_SYSTEM_WINDOWS + if (_mkdir(p) == 0) { + return 0; + } +#else if (mkdir(p, 0777) == 0) { return 0; } +#endif if (errno == EEXIST) { return 0; } return -1; } -static void tmpdir_for(char *out, size_t n, const char *name) { +static void join_path(char *out, size_t cap, const char *a, const char *b) +{ +#ifdef _WIN32 + _snprintf(out, cap, "%s\\%s", a, b); +#else + snprintf(out, cap, "%s/%s", a, b); +#endif + out[cap - 1] = '\0'; +} + +static void tmpdir_for(char *out, size_t n, const char *name) +{ +#ifdef _WIN32 + DWORD pid = GetCurrentProcessId(); + _snprintf(out, n, "C:\\Windows\\Temp\\flb-dlq-%s-%lu", name, (unsigned long) pid); +#else snprintf(out, n, "/tmp/flb-dlq-%s-%ld", name, (long) getpid()); +#endif + out[n-1] = '\0'; mkpath(out); } @@ -90,23 +121,59 @@ static int buf_contains(const void *hay, size_t hlen, return 0; } -/* find the most recent *.flb file in dir; write full path into out */ -static int find_latest_flb(const char *dir, char *out, size_t out_sz) +#if FLB_SYSTEM_WINDOWS +static int find_latest_flb_win32(const char *dir, char *out, size_t out_sz) { + WIN32_FIND_DATAA ffd; + HANDLE h = INVALID_HANDLE_VALUE; + char pattern[1024]; + ULONGLONG best_ts = 0ULL; + char best_name[MAX_PATH] = {0}; + + _snprintf(pattern, sizeof(pattern), "%s\\*.flb", dir); + pattern[sizeof(pattern)-1] = '\0'; + + h = FindFirstFileA(pattern, &ffd); + if (h == INVALID_HANDLE_VALUE) { + return -1; + } + + do { + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + ULONGLONG ts = (((ULONGLONG)ffd.ftLastWriteTime.dwHighDateTime) << 32) | + (ULONGLONG)ffd.ftLastWriteTime.dwLowDateTime; + if (ts >= best_ts) { + best_ts = ts; + strncpy(best_name, ffd.cFileName, sizeof(best_name)-1); + best_name[sizeof(best_name)-1] = '\0'; + } + } while (FindNextFileA(h, &ffd)); + + FindClose(h); + + if (best_name[0] == '\0') { + return -1; + } + + join_path(out, out_sz, dir, best_name); + return 0; +} +#else +static int find_latest_flb_unix(const char *dir, char *out, size_t out_sz) +{ + DIR *d = opendir(dir); struct dirent *e; time_t best_t = 0; char best_path[1024] = {0}; struct stat st; char full[1024]; - size_t len = 0; - DIR *d = opendir(dir); - if (!d) { - return -1; - } + if (!d) return -1; while ((e = readdir(d)) != NULL) { - len = strlen(e->d_name); + size_t len = strlen(e->d_name); if (len < 5) { continue; } @@ -114,11 +181,11 @@ static int find_latest_flb(const char *dir, char *out, size_t out_sz) continue; } - snprintf(full, sizeof(full), "%s/%s", dir, e->d_name); + join_path(full, sizeof(full), dir, e->d_name); if (stat(full, &st) == 0) { if (st.st_mtime >= best_t) { best_t = st.st_mtime; - strncpy(best_path, full, sizeof(best_path) - 1); + strncpy(best_path, full, sizeof(best_path)-1); } } } @@ -131,6 +198,17 @@ static int find_latest_flb(const char *dir, char *out, size_t out_sz) out[out_sz-1] = '\0'; return 0; } +#endif + +/* find the most recent *.flb file in dir; write full path into out */ +static int find_latest_flb(const char *dir, char *out, size_t out_sz) +{ +#if FLB_SYSTEM_WINDOWS + return find_latest_flb_win32(dir, out, out_sz); +#else + return find_latest_flb_unix(dir, out, out_sz); +#endif +} static void free_ctx(struct flb_config *ctx) { @@ -195,37 +273,85 @@ static void rmdir_stream_dir(const char *root, const char *stream_name) } /* Minimal POSIX rm -rf for the whole temp tree after CIO is gone */ -static void rm_rf_best_effort(const char *root) +#if FLB_SYSTEM_WINDOWS +static void rm_rf_best_effort_win32(const char *root) +{ + WIN32_FIND_DATAA ffd; + HANDLE h = INVALID_HANDLE_VALUE; + char pattern[1024], p[1024]; + + _snprintf(pattern, sizeof(pattern), "%s\\*", + root ? root : ""); + pattern[sizeof(pattern)-1] = '\0'; + + h = FindFirstFileA(pattern, &ffd); + if (h == INVALID_HANDLE_VALUE) { + /* try removing root itself */ + (void) FLB_RMDIR(root); + return; + } + + do { + const char *name = ffd.cFileName; + if (!strcmp(name, ".") || !strcmp(name, "..")) continue; + + join_path(p, sizeof(p), root, name); + + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + rm_rf_best_effort_win32(p); + } + else { + /* clear read-only if needed */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_READONLY) { + SetFileAttributesA(p, + ffd.dwFileAttributes & ~FILE_ATTRIBUTE_READONLY); + } + (void) DeleteFileA(p); + } + } while (FindNextFileA(h, &ffd)); + + FindClose(h); + (void) FLB_RMDIR(root); +} +#else +static void rm_rf_best_effort_unix(const char *root) { - DIR *d; + DIR *d = opendir(root); struct dirent *e; char p[1024]; struct stat st; - if (!root) { return; } - - d = opendir(root); if (!d) { - (void) rmdir(root); + (void) FLB_RMDIR(root); return; } while ((e = readdir(d)) != NULL) { - if (!strcmp(e->d_name, ".") || !strcmp(e->d_name, "..")) { + if (!strcmp(e->d_name, ".") || !strcmp(e->d_name, "..")) { continue; } - snprintf(p, sizeof(p), "%s/%s", root, e->d_name); + join_path(p, sizeof(p), root, e->d_name); if (lstat(p, &st) != 0) { continue; } if (S_ISDIR(st.st_mode)) { - rm_rf_best_effort(p); + rm_rf_best_effort_unix(p); } else { - (void) unlink(p); + (void) FLB_UNLINK(p); } } closedir(d); - (void) rmdir(root); + (void) FLB_RMDIR(root); +} +#endif + +static void rm_rf_best_effort(const char *root) +{ +#if FLB_SYSTEM_WINDOWS + rm_rf_best_effort_win32(root); +#else + rm_rf_best_effort_unix(root); +#endif } static void test_cleanup_with_cio(struct flb_config *ctx, const char *root) From 110953ab74166df7141ad991388d6107cfda6634 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 9 Oct 2025 17:29:31 +0900 Subject: [PATCH 07/10] storage: tests: internal: Fix a compilation error on CentOS 7 Signed-off-by: Hiroshi Hatake --- tests/internal/storage_dlq.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c index 8cfe536aae7..b20358e869c 100644 --- a/tests/internal/storage_dlq.c +++ b/tests/internal/storage_dlq.c @@ -109,11 +109,12 @@ static int read_dlq_chunk_content(struct flb_config *ctx, static int buf_contains(const void *hay, size_t hlen, const void *needle, size_t nlen) { + size_t i; if (nlen == 0 || hlen < nlen) return 0; const unsigned char *h = (const unsigned char *) hay; const unsigned char *n = (const unsigned char *) needle; - for (size_t i = 0; i + nlen <= hlen; i++) { + for (i = 0; i + nlen <= hlen; i++) { if (h[i] == n[0] && memcmp(h + i, n, nlen) == 0) { return 1; } From 120c3aa19c64d0dfdb10e713b0a059da27bddf12 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 10 Oct 2025 14:40:39 +0900 Subject: [PATCH 08/10] storage: Restore the original state of chunks Signed-off-by: Hiroshi Hatake --- src/flb_storage.c | 48 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/flb_storage.c b/src/flb_storage.c index 9cce791f170..9f854be63ec 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -747,6 +747,24 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch *fs_chunks = storage_st.chunks_fs; } +/* Replace '/', '\\' and ':' with '_' to make filename components safe */ +static inline void sanitize_name_component(const char *in, char *out, size_t out_sz) +{ + size_t i; + + if (out_sz == 0) { + return; + } + + if (!in) { + in = "no-tag"; + } + + for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) { + out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i]; + } + out[i] = '\0'; +} static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx) { @@ -793,10 +811,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, #ifdef CIO_HAVE_BACKEND_FILESYSTEM struct cio_stream *dlq; void *buf = NULL; + int was_up = 0; size_t size = 0; int err = 0; char name[256]; struct cio_chunk *dst; + char safe_tag[128]; + char safe_out[64]; if (!ctx || !src) { return -1; @@ -806,26 +827,28 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, return -1; } - if (cio_chunk_is_up(src) != CIO_TRUE) { + /* Remember original state and bring the chunk up if needed */ + was_up = (cio_chunk_is_up(src) == CIO_TRUE); + if (!was_up) { if (cio_chunk_up_force(src) != CIO_OK) { flb_warn("[storage] cannot bring chunk up to copy into DLQ"); return -1; } } + sanitize_name_component(tag, safe_tag, sizeof(safe_tag)); + sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out)); + + /* Compose a simple, unique-ish file name with sanitized pieces */ + snprintf(name, sizeof(name), + "%s_%d_%s_%p.flb", + safe_tag, status_code, safe_out, (void *) src); + if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) { flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size); return -1; } - /* Compose a simple, unique-ish file name */ - snprintf(name, sizeof(name), - "%s_%d_%s_%p.flb", - tag ? tag : "no-tag", - status_code, - out_name ? out_name : "out", - (void *) src); - /* Create + write the DLQ copy */ dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err); if (!dst) { @@ -846,6 +869,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size); + /* Restore original state if we brought the chunk up */ + if (!was_up) { + if (cio_chunk_down(src) != CIO_OK) { + flb_debug("[storage] failed to bring chunk back down after DLQ copy"); + } + } + return 0; #else FLB_UNUSED(ctx); From 249a59bf2de0badce1357156782076612cad7ab6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 10 Oct 2025 15:05:19 +0900 Subject: [PATCH 09/10] storage: Restore status of chunks for error paths Signed-off-by: Hiroshi Hatake --- src/flb_storage.c | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/flb_storage.c b/src/flb_storage.c index 9f854be63ec..ae619e57cc6 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -802,6 +802,17 @@ static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx) #endif } +static inline int flb_storage_chunk_restore_state(struct cio_chunk *src, int was_up, int ret_val) +{ + if (!was_up) { + if (cio_chunk_down(src) != CIO_OK) { + flb_debug("[storage] failed to bring chunk back down"); + } + } + + return ret_val; +} + int flb_storage_quarantine_chunk(struct flb_config *ctx, struct cio_chunk *src, const char *tag, @@ -846,7 +857,7 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) { flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size); - return -1; + return flb_storage_chunk_restore_state(src, was_up, -1); } /* Create + write the DLQ copy */ @@ -854,14 +865,14 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, if (!dst) { flb_warn("[storage] DLQ open failed (err=%d)", err); flb_free(buf); - return -1; + return flb_storage_chunk_restore_state(src, was_up, -1); } if (cio_chunk_write(dst, buf, size) != CIO_OK || cio_chunk_sync(dst) != CIO_OK) { flb_warn("[storage] DLQ write/sync failed"); cio_chunk_close(dst, CIO_TRUE); flb_free(buf); - return -1; + return flb_storage_chunk_restore_state(src, was_up, -1); } cio_chunk_close(dst, CIO_FALSE); @@ -869,14 +880,7 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size); - /* Restore original state if we brought the chunk up */ - if (!was_up) { - if (cio_chunk_down(src) != CIO_OK) { - flb_debug("[storage] failed to bring chunk back down after DLQ copy"); - } - } - - return 0; + return flb_storage_chunk_restore_state(src, was_up, 0); #else FLB_UNUSED(ctx); FLB_UNUSED(src); From caca20960acd34080d9d0c6069f6846c913a01fe Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 10 Oct 2025 17:15:54 +0900 Subject: [PATCH 10/10] storage: internal: tests: Confirm the initial state of chunks Signed-off-by: Hiroshi Hatake --- tests/internal/storage_dlq.c | 73 ++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c index b20358e869c..bca4d269079 100644 --- a/tests/internal/storage_dlq.c +++ b/tests/internal/storage_dlq.c @@ -513,8 +513,81 @@ static void test_dlq_disabled_no_copy(void) test_cleanup_with_cio(ctx, root); } +static void test_dlq_restores_chunk_state_when_initially_down(void) +{ + char root[256], rejdir[256]; + struct flb_config *ctx = NULL; + struct cio_chunk *src = NULL; + int rc; + const char *payload = "{\"msg\":\"state-restore-down\"}\n"; + + tmpdir_for(root, sizeof(root), "restore-down"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + /* Create the chunk */ + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "restore-down-0-0000000000.000000000.flb", + payload); + TEST_CHECK(src != NULL); + + if (cio_chunk_is_up(src) == CIO_TRUE) { + TEST_CHECK(cio_chunk_down(src) == CIO_OK); + } + TEST_CHECK(cio_chunk_is_up(src) != CIO_TRUE); + + rc = flb_storage_quarantine_chunk(ctx, src, + "tag.down", 500, "out_http"); + TEST_CHECK(rc == 0); + + TEST_CHECK(cio_chunk_is_up(src) != CIO_TRUE); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +static void test_dlq_preserves_chunk_state_when_initially_up(void) +{ + char root[256], rejdir[256]; + struct flb_config *ctx = NULL; + struct cio_chunk *src = NULL; + int rc; + const char *payload = "{\"msg\":\"state-preserve-up\"}\n"; + + tmpdir_for(root, sizeof(root), "preserve-up"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "preserve_in", + "preserve-up-0-0000000000.000000000.flb", + payload); + TEST_CHECK(src != NULL); + + if (cio_chunk_is_up(src) != CIO_TRUE) { + TEST_CHECK(cio_chunk_up_force(src) == CIO_OK); + } + TEST_CHECK(cio_chunk_is_up(src) == CIO_TRUE); + + rc = flb_storage_quarantine_chunk(ctx, src, + "tag.up", 502, "out_es"); + TEST_CHECK(rc == 0); + + TEST_CHECK(cio_chunk_is_up(src) == CIO_TRUE); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + TEST_LIST = { { "dlq_copy_from_fs_chunk", test_dlq_copy_from_fs_chunk }, { "dlq_disabled_no_copy", test_dlq_disabled_no_copy }, + { "dlq_restores_chunk_state_when_initially_down", test_dlq_restores_chunk_state_when_initially_down }, + { "dlq_preserves_chunk_state_when_initially_up", test_dlq_preserves_chunk_state_when_initially_up }, { NULL, NULL } };