Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},

{
FLB_CONFIG_MAP_STR, "frame", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, frame_str),
"TCP framing: newline (default) or octet_counting (RFC 6587)"
},

/* EOF */
{0}
Expand Down
8 changes: 8 additions & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
/* 32KB chunk size */
#define FLB_SYSLOG_CHUNK "32768"

/* TCP framing */
#define FLB_SYSLOG_FRAME_NEWLINE 0
#define FLB_SYSLOG_FRAME_OCTET_COUNTING 1

struct syslog_conn;

/* Context / Config*/
Expand Down Expand Up @@ -67,6 +71,10 @@ struct flb_syslog {
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

/* TCP framing */
flb_sds_t frame_str;
int frame_type;

int dgram_mode_flag;
int collector_id;
struct mk_event *collector_event;
Expand Down
13 changes: 13 additions & 0 deletions plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
ctx->mode = FLB_SYSLOG_UNIX_UDP;
}

/* TCP Frame type (only applies to stream modes; default newline) */
ctx->frame_type = FLB_SYSLOG_FRAME_NEWLINE;
if (ctx->frame_str != NULL) {
if (strcasecmp(ctx->frame_str, "octet_counting") == 0 ||
strcasecmp(ctx->frame_str, "octet") == 0) {
ctx->frame_type = FLB_SYSLOG_FRAME_OCTET_COUNTING;
}
else if (strcasecmp(ctx->frame_str, "newline") != 0) {
flb_plg_warn(ins, "[in_syslog] unknown frame '%s', using 'newline'",
ctx->frame_str);
}
}

/* Check if TCP mode was requested */
if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) {
/* Listen interface (if not set, defaults to 0.0.0.0:5140) */
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ struct syslog_conn *syslog_conn_add(struct flb_connection *connection,
conn->ins = ctx->ins;
conn->buf_len = 0;
conn->buf_parsed = 0;
conn->frame_expected_len = 0;
conn->frame_have_len = 0;

/* Allocate read buffer */
conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_syslog/syslog_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ struct syslog_conn {
size_t buf_size; /* Buffer size */
size_t buf_len; /* Buffer length */
size_t buf_parsed; /* Parsed buffer (offset) */
/* Octet-counting framing state */
size_t frame_expected_len; /* remaining message bytes needed */
int frame_have_len; /* 0 = need length, 1 = have length */
struct flb_input_instance *ins; /* Parent plugin instance */
struct flb_syslog *ctx; /* Plugin configuration context */
struct flb_connection *connection;
Expand Down
67 changes: 56 additions & 11 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,55 @@ int syslog_prot_process(struct syslog_conn *conn)

flb_log_event_encoder_reset(ctx->log_encoder);

/* Always parse while some remaining bytes exists */
/* Always parse while some remaining bytes exist */
while (eof < end) {
/* Lookup the ending byte */
eof = p = conn->buf_data + conn->buf_parsed;
while (*eof != '\n' && *eof != '\0' && eof < end) {
eof++;
if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) {
/* newline framing (current behavior) */
eof = p = conn->buf_data + conn->buf_parsed;
while (*eof != '\n' && *eof != '\0' && eof < end) {
eof++;
}
/* Incomplete message */
if (eof == end || (*eof != '\n' && *eof != '\0')) {
break;
}
len = (eof - p);
}

/* Incomplete message */
if (eof == end || (*eof != '\n' && *eof != '\0')) {
break;
else {
/* RFC 6587 octet-counting framing: <len> SP <msg> */
p = conn->buf_data + conn->buf_parsed;

if (!conn->frame_have_len) {
char *sp = p;
size_t n = 0;
while (sp < end && *sp >= '0' && *sp <= '9') {
if (n > SIZE_MAX / 10) {
n = SIZE_MAX;
break;
}
n = n * 10 + (size_t)(*sp - '0');
sp++;
}
if (sp == end) {
break;
}
if (*sp != ' ') {
flb_plg_warn(ctx->ins, "invalid octet-counting length");
return -1;
}
conn->buf_parsed += (sp - p) + 1;
conn->frame_expected_len = n;
conn->frame_have_len = 1;
p = conn->buf_data + conn->buf_parsed;
end = conn->buf_data + conn->buf_len;
}
if ((size_t)(end - p) < conn->frame_expected_len) {
break;
}
len = (int)conn->frame_expected_len;
}

/* No data ? */
len = (eof - p);
if (len == 0) {
consume_bytes(conn->buf_data, 1, conn->buf_len);
conn->buf_len--;
Expand Down Expand Up @@ -266,7 +300,18 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p);
}

conn->buf_parsed += len + 1;
if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) {
conn->buf_parsed += len + 1;
}
else {
conn->buf_parsed += len;
conn->frame_expected_len = 0;
conn->frame_have_len = 0;
if (conn->buf_parsed < conn->buf_len &&
conn->buf_data[conn->buf_parsed] == '\n') {
conn->buf_parsed += 1;
}
}
end = conn->buf_data + conn->buf_len;
eof = conn->buf_data + conn->buf_parsed;
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fluent-bit/flb_info.h>

#include <stdint.h>

#include "syslog.h"

#define FLB_MAP_EXPAND_SUCCESS 0
Expand Down
Loading
Loading