Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ struct flb_config {
char *storage_type; /* global storage type */
int storage_inherit; /* apply storage type to inputs */

/* DLQ for non-retriable output failures */
int storage_keep_rejected; /* 0/1 */
char *storage_rejected_path; /* relative to storage_path, default "rejected" */
void *storage_rejected_stream; /* NULL until first use */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
struct mk_list sqldb_list;
Expand Down Expand Up @@ -411,6 +416,9 @@ enum conf_type {
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_TYPE "storage.type"
#define FLB_CONF_STORAGE_INHERIT "storage.inherit"
/* Storage DLQ */
#define FLB_CONF_STORAGE_KEEP_REJECTED "storage.keep.rejected"
#define FLB_CONF_STORAGE_REJECTED_PATH "storage.rejected.path"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static inline char *flb_storage_get_type(int type)
return NULL;
}

struct flb_input_instance;

int flb_storage_create(struct flb_config *ctx);
int flb_storage_input_create(struct cio_ctx *cio,
struct flb_input_instance *in);
Expand All @@ -85,4 +87,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met

void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);

/* DLQ */
int flb_storage_quarantine_chunk(struct flb_config *ctx,
struct cio_chunk *ch,
const char *tag,
int status_code,
const char *out_name);

#endif
11 changes: 11 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_INHERIT,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_inherit)},
/* Storage / DLQ */
{FLB_CONF_STORAGE_KEEP_REJECTED,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_keep_rejected)},
{FLB_CONF_STORAGE_REJECTED_PATH,
FLB_CONF_TYPE_STR,
offsetof(struct flb_config, storage_rejected_path)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
Expand Down Expand Up @@ -312,6 +319,7 @@ struct flb_config *flb_config_init()
config->storage_type = NULL;
config->storage_inherit = FLB_FALSE;
config->storage_bl_flush_on_shutdown = FLB_FALSE;
config->storage_rejected_path = NULL;
config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
config->json_escape_unicode = FLB_TRUE;
Expand Down Expand Up @@ -573,6 +581,9 @@ void flb_config_exit(struct flb_config *config)
if (config->storage_bl_mem_limit) {
flb_free(config->storage_bl_mem_limit);
}
if (config->storage_rejected_path) {
flb_free(config->storage_rejected_path);
}

#ifdef FLB_HAVE_STREAM_PROCESSOR
if (config->stream_processor_file) {
Expand Down
51 changes: 51 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance
((double)ins->total_limit_size));
}

static void handle_dlq_if_available(struct flb_config *config,
struct flb_task *task,
struct flb_output_instance *ins,
int status_code /* pass 0 if unknown */)
{
const char *tag_buf = NULL;
int tag_len = 0;
flb_sds_t tag_sds = NULL;
const char *tag = NULL;
const char *out = NULL;
struct flb_input_chunk *ic;
struct cio_chunk *cio_ch;

if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) {
return;
}

ic = (struct flb_input_chunk *) task->ic;

if (!ic || !ic->chunk) {
return;
}

/* Obtain tag from the input chunk API (no direct field available) */
if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) {
tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */
tag = tag_sds;
}
else {
/* Fallback: use input instance name */
tag = flb_input_name(task->i_ins);
}

out = flb_output_name(ins);
cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */

/* Copy bytes into DLQ stream (filesystem) */
(void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out);

if (tag_sds) {
flb_sds_destroy(tag_sds);
}
}

static inline int handle_output_event(uint64_t ts,
struct flb_config *config,
uint64_t val)
Expand Down Expand Up @@ -353,6 +397,8 @@ static inline int handle_output_event(uint64_t ts,
}
else if (ret == FLB_RETRY) {
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
handle_dlq_if_available(config, task, ins, 0);

/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
Expand Down Expand Up @@ -388,6 +434,8 @@ static inline int handle_output_event(uint64_t ts,
* - It reached the maximum number of re-tries
*/

handle_dlq_if_available(config, task, ins, 0);

/* cmetrics */
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
Expand Down Expand Up @@ -429,6 +477,8 @@ static inline int handle_output_event(uint64_t ts,
* memory available or we ran out of file descriptors.
*/
if (retry_seconds == -1) {
handle_dlq_if_available(config, task, ins, 0);

flb_warn("[engine] retry for chunk '%s' could not be scheduled: "
"input=%s > output=%s",
flb_input_chunk_get_name(task->ic),
Expand Down Expand Up @@ -465,6 +515,7 @@ static inline int handle_output_event(uint64_t ts,
}
}
else if (ret == FLB_ERROR) {
handle_dlq_if_available(config, task, ins, 0);
/* cmetrics */
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
Expand Down
141 changes: 141 additions & 0 deletions src/flb_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,147 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
*fs_chunks = storage_st.chunks_fs;
}

/* Replace '/', '\\' and ':' with '_' to make filename components safe */
static inline void sanitize_name_component(const char *in, char *out, size_t out_sz)
{
size_t i;

if (out_sz == 0) {
return;
}

if (!in) {
in = "no-tag";
}

for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) {
out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i];
}
out[i] = '\0';
}

static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
{
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
struct cio_stream *st;
const char *name;

if (!ctx || !ctx->cio) {
return NULL;
}
if (!ctx->storage_keep_rejected || !ctx->storage_path) {
return NULL;
}

if (ctx->storage_rejected_stream) {
return ctx->storage_rejected_stream;
}

name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";

st = cio_stream_get(ctx->cio, name);
if (!st) {
st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS);
}
if (!st) {
flb_warn("[storage] failed to create rejected stream '%s'", name);
return NULL;
}

ctx->storage_rejected_stream = st;
return st;
#else
FLB_UNUSED(ctx);
return NULL;
#endif
}

int flb_storage_quarantine_chunk(struct flb_config *ctx,
struct cio_chunk *src,
const char *tag,
int status_code,
const char *out_name)
{
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
struct cio_stream *dlq;
void *buf = NULL;
int was_up = 0;
size_t size = 0;
int err = 0;
char name[256];
struct cio_chunk *dst;
char safe_tag[128];
char safe_out[64];

if (!ctx || !src) {
return -1;
}
dlq = get_or_create_rejected_stream(ctx);
if (!dlq) {
return -1;
}

/* Remember original state and bring the chunk up if needed */
was_up = (cio_chunk_is_up(src) == CIO_TRUE);
if (!was_up) {
if (cio_chunk_up_force(src) != CIO_OK) {
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
return -1;
}
}

sanitize_name_component(tag, safe_tag, sizeof(safe_tag));
sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out));

/* Compose a simple, unique-ish file name with sanitized pieces */
snprintf(name, sizeof(name),
"%s_%d_%s_%p.flb",
safe_tag, status_code, safe_out, (void *) src);

if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
return -1;
}

/* Create + write the DLQ copy */
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
if (!dst) {
flb_warn("[storage] DLQ open failed (err=%d)", err);
flb_free(buf);
return -1;
}
if (cio_chunk_write(dst, buf, size) != CIO_OK ||
cio_chunk_sync(dst) != CIO_OK) {
flb_warn("[storage] DLQ write/sync failed");
cio_chunk_close(dst, CIO_TRUE);
flb_free(buf);
return -1;
}

cio_chunk_close(dst, CIO_FALSE);
flb_free(buf);

flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);

/* Restore original state if we brought the chunk up */
if (!was_up) {
if (cio_chunk_down(src) != CIO_OK) {
flb_debug("[storage] failed to bring chunk back down after DLQ copy");
}
}

return 0;
#else
FLB_UNUSED(ctx);
FLB_UNUSED(src);
FLB_UNUSED(tag);
FLB_UNUSED(status_code);
FLB_UNUSED(out_name);

return -1;
#endif
}

void flb_storage_destroy(struct flb_config *ctx)
{
struct cio_ctx *cio;
Expand Down
1 change: 1 addition & 0 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(UNIT_TESTS_FILES
storage_inherit.c
unicode.c
opentelemetry.c
storage_dlq.c
)

# TLS helpers
Expand Down
Loading
Loading