Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx)
static int in_fw_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
int state_backup;
struct flb_connection *connection;
struct fw_conn *conn;
struct flb_in_fw_config *ctx;

ctx = in_context;

state_backup = ctx->state;
ctx->state = FW_INSTANCE_STATE_ACCEPTING_CLIENT;

connection = flb_downstream_conn_get(ctx->downstream);

if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");
ctx->state = state_backup;

return -1;
}

if (!config->is_ingestion_active) {
flb_downstream_conn_release(connection);
ctx->state = state_backup;

return -1;
}

if(ctx->is_paused) {
flb_downstream_conn_release(connection);
flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
Comment on lines 126 to 155

This comment was marked as off-topic.

ctx->state = state_backup;

return -1;
}

Expand All @@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins,
conn = fw_conn_add(connection, ctx);
if (!conn) {
flb_downstream_conn_release(connection);
ctx->state = state_backup;

return -1;
}

ctx->state = state_backup;

if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
fw_conn_del_all(ctx);
Comment on lines +171 to +174

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restore state before pause check prevents deferred cleanup

After pausing, in_fw_pause skips fw_conn_del_all() if a connection is being accepted and sets ctx->state to PAUSED so that the acceptor can clean up once it finishes. However in in_fw_collect the code unconditionally restores the previous state before checking for FW_INSTANCE_STATE_PAUSED. Because state_backup is almost always RUNNING, the fw_conn_del_all() path never runs and the state is reset back to running even when a pause happened during the accept. This leaves already-accepted connections open and the plugin marked as running despite being paused, so ingestion continues instead of being torn down.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this is a valid problem because in in_fw_collect there's an if-statement that ensures that the connections are closed when the is_paused boolean is set:

if(ctx->is_paused) {
    flb_downstream_conn_release(connection);
    flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
    ctx->state = state_backup;
    return -1;
}

}

return 0;
}

Expand Down Expand Up @@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins,
return -1;
}

ctx->state = FW_INSTANCE_STATE_RUNNING;
ctx->coll_fd = -1;
ctx->ins = ins;
mk_list_init(&ctx->connections);
Expand Down Expand Up @@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config)
return;
}

fw_conn_del_all(ctx);
if (ctx->state == FW_INSTANCE_STATE_RUNNING) {
fw_conn_del_all(ctx);
}
Comment on lines +407 to +409
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

State check under mutex but state write outside creates inconsistency.

Line 407 reads ctx->state under conn_mutex protection, but line 431 writes ctx->state after the mutex has been released. This breaks the synchronization discipline:

  • in_fw_pause reads state under mutex to decide whether to delete connections
  • fw_conn_event reads/writes state without mutex (fw_conn.c lines 141–148)
  • in_fw_collect reads/writes state without mutex (lines 134–175)

Result: The mutex protects is_paused and the fw_conn_del_all call, but not the state machine itself, defeating the purpose of the state check at line 407.

Move the state assignment inside the mutex-protected region:

         ret = pthread_mutex_lock(&ctx->conn_mutex);
         if (ret != 0) {
             flb_plg_error(ctx->ins, "cannot lock collector mutex");
             return;
         }
 
         if (ctx->state == FW_INSTANCE_STATE_RUNNING) {
             fw_conn_del_all(ctx);
         }
 
         ctx->is_paused = FLB_TRUE;
+        ctx->state = FW_INSTANCE_STATE_PAUSED;
         ret = pthread_mutex_unlock(&ctx->conn_mutex);
         if (ret != 0) {
             flb_plg_error(ctx->ins, "cannot unlock collector mutex");
             return;
         }
     }
 
     /*
      * If the plugin is paused AND the ingestion not longer active,
      * it means we are in a shutdown phase. This plugin can safetly
      * close the socket server collector.
      *
      * This socket stop is a workaround since the server API will be
      * refactored shortly.
      */
     if (config->is_ingestion_active == FLB_FALSE) {
         fw_conn_del_all(ctx);
     }
-
-    ctx->state = FW_INSTANCE_STATE_PAUSED;

Also update fw_conn_event and in_fw_collect to acquire the mutex around all state accesses (see previous comments).

Also applies to: 431-431

🤖 Prompt for AI Agents
In plugins/in_forward/fw.c around lines 407-409 (and the state write at 431),
the code reads ctx->state while holding conn_mutex but writes ctx->state after
releasing it, causing race conditions; move the state assignment (the write at
line 431) inside the conn_mutex-protected block so the read/check and write are
atomic under the same mutex, and ensure fw_conn_del_all remains inside that
lock; additionally update fw_conn_event and in_fw_collect (their state
read/write sites referenced in the review) to acquire conn_mutex around every
access to ctx->state (both reads and writes) so the state machine is
consistently protected by the mutex.


ctx->is_paused = FLB_TRUE;
ret = pthread_mutex_unlock(&ctx->conn_mutex);
if (ret != 0) {
Expand All @@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config)
if (config->is_ingestion_active == FLB_FALSE) {
fw_conn_del_all(ctx);
}

ctx->state = FW_INSTANCE_STATE_PAUSED;
}

static void in_fw_resume(void *data, struct flb_config *config) {
Expand All @@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) {
flb_plg_error(ctx->ins, "cannot unlock collector mutex");
return;
}

ctx->state = FW_INSTANCE_STATE_RUNNING;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

State assignment should be under mutex protection.

Line 454 writes ctx->state outside the mutex-protected region (lines 441–452). This is inconsistent with in_fw_pause line 407 which reads state under mutex, and creates the same data race described in the previous comment.

Move the state assignment inside the mutex:

         ret = pthread_mutex_lock(&ctx->conn_mutex);
         if (ret != 0) {
             flb_plg_error(ctx->ins, "cannot lock collector mutex");
             return;
         }
 
         ctx->is_paused = FLB_FALSE;
+        ctx->state = FW_INSTANCE_STATE_RUNNING;
         ret = pthread_mutex_unlock(&ctx->conn_mutex);
         if (ret != 0) {
             flb_plg_error(ctx->ins, "cannot unlock collector mutex");
             return;
         }
-
-        ctx->state = FW_INSTANCE_STATE_RUNNING;
     }
🤖 Prompt for AI Agents
In plugins/in_forward/fw.c around lines 441 to 454, the assignment ctx->state =
FW_INSTANCE_STATE_RUNNING at line 454 is performed outside the mutex-protected
region (lines 441–452), causing a data race with in_fw_pause which reads state
under the mutex; move the state assignment into the existing mutex-protected
block (acquire ctx->mutex before writing, set ctx->state =
FW_INSTANCE_STATE_RUNNING inside that block) and then release the mutex so all
reads/writes of ctx->state are consistently protected.

}
}

Expand Down
8 changes: 8 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#define FW_INSTANCE_STATE_RUNNING 0
#define FW_INSTANCE_STATE_ACCEPTING_CLIENT 1
#define FW_INSTANCE_STATE_PROCESSING_PACKET 2
#define FW_INSTANCE_STATE_PAUSED 3
Comment on lines +28 to +31
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

State field lacks synchronization primitives or access discipline.

The new state field will be accessed concurrently by multiple threads (event handlers in fw_conn_event, collector in in_fw_collect, pause/resume callbacks in fw.c). However, no synchronization mechanism protects these accesses:

  • fw_conn.c lines 141–143: reads and writes ctx->state without holding ctx->conn_mutex
  • fw.c line 407: reads ctx->state under mutex, but fw_conn.c line 143 writes it without mutex
  • fw.c line 431: writes ctx->state outside the mutex-protected region

This creates data races where one thread's state transition can be lost or observed partially by another thread.

Apply one of these approaches:

Option 1: Use existing conn_mutex consistently

 struct flb_in_fw_config {
     ...
     struct flb_log_event_encoder *log_encoder;
 
     pthread_mutex_t conn_mutex;
 
+    /* Protected by conn_mutex */
     int state;
     
     /* Plugin is paused */
     int is_paused;
 };

Then ensure all state reads/writes in fw.c and fw_conn.c acquire conn_mutex first.

Option 2: Use C11 atomics

+#include <stdatomic.h>
+
 struct flb_in_fw_config {
     ...
     pthread_mutex_t conn_mutex;
 
-    int state;
+    _Atomic int state;

Then replace direct assignments with atomic_store/atomic_load with appropriate memory order (likely memory_order_seq_cst for simplicity).

Also applies to: 85-86

🤖 Prompt for AI Agents
In plugins/in_forward/fw.h around lines 28–31 (and also apply to lines 85–86),
the new state field is accessed concurrently without synchronization causing
data races; pick one approach and apply it consistently: either (Option 1)
protect every read/write of ctx->state across fw.c and fw_conn.c by acquiring
ctx->conn_mutex (wrap reads/writes in the mutex scope, ensure all call sites
including fw_conn.c lines ~141–143, fw.c lines ~407 and ~431 use the mutex), or
(Option 2) make the state an atomic (use C11 _Atomic type) and replace direct
assignments/reads with atomic_store/atomic_load using memory_order_seq_cst (or
appropriate stronger ordering) and update headers and all access sites
accordingly so all state accesses are synchronized.



enum {
FW_HANDSHAKE_HELO = 1,
FW_HANDSHAKE_PINGPONG = 2,
Expand Down Expand Up @@ -76,6 +82,8 @@ struct flb_in_fw_config {

pthread_mutex_t conn_mutex;

int state;

/* Plugin is paused */
int is_paused;
};
Expand Down
37 changes: 32 additions & 5 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
#include "fw_prot.h"
#include "fw_conn.h"

/* Callback invoked every time an event is triggered for a connection */
int fw_conn_event(void *data)
static int fw_conn_event_internal(struct flb_connection *connection)
{
int ret;
int bytes;
Expand All @@ -39,9 +38,6 @@ int fw_conn_event(void *data)
struct fw_conn *conn;
struct mk_event *event;
struct flb_in_fw_config *ctx;
struct flb_connection *connection;

connection = (struct flb_connection *) data;

conn = connection->user_data;

Expand Down Expand Up @@ -127,6 +123,37 @@ int fw_conn_event(void *data)
return 0;
}

/* Callback invoked every time an event is triggered for a connection */
int fw_conn_event(void *data)
{
struct flb_in_fw_config *ctx;
struct fw_conn *conn;
int result;
struct flb_connection *connection;
int state_backup;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

state_backup = ctx->state;

ctx->state = FW_INSTANCE_STATE_PROCESSING_PACKET;
Comment on lines +141 to +143

This comment was marked as off-topic.


result = fw_conn_event_internal(connection);

if (ctx->state == FW_INSTANCE_STATE_PROCESSING_PACKET) {
ctx->state = state_backup;
}
else if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
fw_conn_del_all(ctx);
}
Comment on lines +150 to +152

This comment was marked as off-topic.


return result;
}

/* Create a new Forward request instance */
struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx)
{
Expand Down
Loading