diff --git a/plugins/in_syslog/syslog.c b/plugins/in_syslog/syslog.c index 793ea0a0c1d..45d0fa2b0ea 100644 --- a/plugins/in_syslog/syslog.c +++ b/plugins/in_syslog/syslog.c @@ -244,7 +244,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key), "Key where the source address will be injected" }, - + { + FLB_CONFIG_MAP_STR, "format", (char *) NULL, + 0, FLB_TRUE, offsetof(struct flb_syslog, format_str), + "Format of TCP framing: newline (default) or octet_counting (RFC 6587)" + }, /* EOF */ {0} diff --git a/plugins/in_syslog/syslog.h b/plugins/in_syslog/syslog.h index 7e42daa260d..aa6900ec82a 100644 --- a/plugins/in_syslog/syslog.h +++ b/plugins/in_syslog/syslog.h @@ -33,6 +33,10 @@ /* 32KB chunk size */ #define FLB_SYSLOG_CHUNK "32768" +/* TCP framing */ +#define FLB_SYSLOG_FRAME_NEWLINE 0 +#define FLB_SYSLOG_FRAME_OCTET_COUNTING 1 + struct syslog_conn; /* Context / Config*/ @@ -67,6 +71,10 @@ struct flb_syslog { flb_sds_t raw_message_key; flb_sds_t source_address_key; + /* TCP framing */ + flb_sds_t format_str; + int frame_type; + int dgram_mode_flag; int collector_id; struct mk_event *collector_event; diff --git a/plugins/in_syslog/syslog_conf.c b/plugins/in_syslog/syslog_conf.c index a20d78c6a12..a2ab053f79a 100644 --- a/plugins/in_syslog/syslog_conf.c +++ b/plugins/in_syslog/syslog_conf.c @@ -107,6 +107,19 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins, ctx->mode = FLB_SYSLOG_UNIX_UDP; } + /* TCP Frame type (only applies to stream modes; default newline) */ + ctx->frame_type = FLB_SYSLOG_FRAME_NEWLINE; + if (ctx->format_str != NULL) { + if (strcasecmp(ctx->format_str, "octet_counting") == 0 || + strcasecmp(ctx->format_str, "octet") == 0) { + ctx->frame_type = FLB_SYSLOG_FRAME_OCTET_COUNTING; + } + else if (strcasecmp(ctx->format_str, "newline") != 0) { + flb_plg_warn(ins, "[in_syslog] unknown frame '%s', using 'newline'", + ctx->format_str); + } + } + /* Check if TCP mode was requested */ if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) { /* Listen interface (if not set, defaults to 0.0.0.0:5140) */ diff --git a/plugins/in_syslog/syslog_conn.c b/plugins/in_syslog/syslog_conn.c index eec1ab4da3b..1768dba9445 100644 --- a/plugins/in_syslog/syslog_conn.c +++ b/plugins/in_syslog/syslog_conn.c @@ -178,6 +178,8 @@ struct syslog_conn *syslog_conn_add(struct flb_connection *connection, conn->ins = ctx->ins; conn->buf_len = 0; conn->buf_parsed = 0; + conn->frame_expected_len = 0; + conn->frame_have_len = 0; /* Allocate read buffer */ conn->buf_data = flb_malloc(ctx->buffer_chunk_size); diff --git a/plugins/in_syslog/syslog_conn.h b/plugins/in_syslog/syslog_conn.h index 5604da68cb9..9e20b37c418 100644 --- a/plugins/in_syslog/syslog_conn.h +++ b/plugins/in_syslog/syslog_conn.h @@ -35,6 +35,9 @@ struct syslog_conn { size_t buf_size; /* Buffer size */ size_t buf_len; /* Buffer length */ size_t buf_parsed; /* Parsed buffer (offset) */ + /* Octet-counting framing state */ + size_t frame_expected_len; /* remaining message bytes needed */ + int frame_have_len; /* 0 = need length, 1 = have length */ struct flb_input_instance *ins; /* Parent plugin instance */ struct flb_syslog *ctx; /* Plugin configuration context */ struct flb_connection *connection; diff --git a/plugins/in_syslog/syslog_prot.c b/plugins/in_syslog/syslog_prot.c index 37a005345f6..512ae251286 100644 --- a/plugins/in_syslog/syslog_prot.c +++ b/plugins/in_syslog/syslog_prot.c @@ -218,21 +218,55 @@ int syslog_prot_process(struct syslog_conn *conn) flb_log_event_encoder_reset(ctx->log_encoder); - /* Always parse while some remaining bytes exists */ + /* Always parse while some remaining bytes exist */ while (eof < end) { - /* Lookup the ending byte */ - eof = p = conn->buf_data + conn->buf_parsed; - while (*eof != '\n' && *eof != '\0' && eof < end) { - eof++; + if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) { + /* newline framing (current behavior) */ + eof = p = conn->buf_data + conn->buf_parsed; + while (*eof != '\n' && *eof != '\0' && eof < end) { + eof++; + } + /* Incomplete message */ + if (eof == end || (*eof != '\n' && *eof != '\0')) { + break; + } + len = (eof - p); } - - /* Incomplete message */ - if (eof == end || (*eof != '\n' && *eof != '\0')) { - break; + else { + /* RFC 6587 octet-counting framing: SP */ + p = conn->buf_data + conn->buf_parsed; + + if (!conn->frame_have_len) { + char *sp = p; + size_t n = 0; + while (sp < end && *sp >= '0' && *sp <= '9') { + if (n > SIZE_MAX / 10) { + n = SIZE_MAX; + break; + } + n = n * 10 + (size_t)(*sp - '0'); + sp++; + } + if (sp == end) { + break; + } + if (*sp != ' ') { + flb_plg_warn(ctx->ins, "invalid octet-counting length"); + return -1; + } + conn->buf_parsed += (sp - p) + 1; + conn->frame_expected_len = n; + conn->frame_have_len = 1; + p = conn->buf_data + conn->buf_parsed; + end = conn->buf_data + conn->buf_len; + } + if ((size_t)(end - p) < conn->frame_expected_len) { + break; + } + len = (int)conn->frame_expected_len; } /* No data ? */ - len = (eof - p); if (len == 0) { consume_bytes(conn->buf_data, 1, conn->buf_len); conn->buf_len--; @@ -266,7 +300,18 @@ int syslog_prot_process(struct syslog_conn *conn) flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p); } - conn->buf_parsed += len + 1; + if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) { + conn->buf_parsed += len + 1; + } + else { + conn->buf_parsed += len; + conn->frame_expected_len = 0; + conn->frame_have_len = 0; + if (conn->buf_parsed < conn->buf_len && + conn->buf_data[conn->buf_parsed] == '\n') { + conn->buf_parsed += 1; + } + } end = conn->buf_data + conn->buf_len; eof = conn->buf_data + conn->buf_parsed; } diff --git a/plugins/in_syslog/syslog_prot.h b/plugins/in_syslog/syslog_prot.h index 9b28c474d18..5d47e3cc2a5 100644 --- a/plugins/in_syslog/syslog_prot.h +++ b/plugins/in_syslog/syslog_prot.h @@ -22,6 +22,8 @@ #include +#include + #include "syslog.h" #define FLB_MAP_EXPAND_SUCCESS 0 diff --git a/tests/runtime/in_syslog.c b/tests/runtime/in_syslog.c index 532bf069415..26911924705 100644 --- a/tests/runtime/in_syslog.c +++ b/tests/runtime/in_syslog.c @@ -305,6 +305,65 @@ static int init_udp(char *in_host, int in_port, struct sockaddr_in *addr) return fd; } +/* Copy src into dst, stripping a single trailing '\n' if present; return len */ +static size_t rstrip_nl_copy(char *dst, size_t dstsz, const char *src) +{ + size_t n = strlen(src); + if (n > 0 && src[n - 1] == '\n') { + n -= 1; + } + if (n + 1 > dstsz) { + n = dstsz - 1; + } + memcpy(dst, src, n); + dst[n] = '\0'; + return n; +} + +/* Build one octet-counted frame into 'out' as: " " + msg [+ '\n' if add_lf] */ +/* Returns total bytes written (excluding terminal '\0' in 'out') */ +static size_t build_octet_frame(char *out, size_t outsz, + const char *msg, int add_lf) +{ + char tmp[2048]; + size_t mlen = 0; + char hdr[64]; + int hlen = 0; + size_t need = 0; + + mlen = rstrip_nl_copy(tmp, sizeof(tmp), msg); + hlen = snprintf(hdr, sizeof(hdr), "%zu ", mlen); + need = (size_t)hlen + mlen + (add_lf ? 1 : 0); + + if (need + 1 > outsz) { + /* truncate conservatively if buffer too small (shouldn't happen in tests) */ + need = outsz - 1; + add_lf = 0; + if ((size_t)hlen > need) { + hlen = (int)need; + } + } + + memcpy(out, hdr, hlen); + memcpy(out + hlen, tmp, mlen); + if (add_lf) { + out[hlen + mlen] = '\n'; + } + out[need] = '\0'; + return need; +} + +/* Build two consecutive octet-counted frames into one buffer */ +static size_t build_two_frames(char *out, size_t outsz, + const char *msg1, const char *msg2, + int add_lf_for_each) +{ + size_t off = 0; + off += build_octet_frame(out + off, outsz - off, msg1, add_lf_for_each); + off += build_octet_frame(out + off, outsz - off, msg2, add_lf_for_each); + return off; +} + void flb_test_syslog_tcp() { struct flb_lib_out_cb cb_data; @@ -1002,6 +1061,276 @@ void flb_test_syslog_rfc3164() test_ctx_destroy(ctx); } +void flb_test_syslog_tcp_octet_counting() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + struct str_list expected = { + .size = sizeof(RFC5424_EXPECTED_STRS_1)/sizeof(char*), + .lists = &RFC5424_EXPECTED_STRS_1[0], + }; + + char frame[4096]; + size_t fsize = 0; + + fsize = build_octet_frame(frame, sizeof(frame), RFC5424_EXAMPLE_1, /*add_lf=*/0); + clear_output_num(); + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "mode", "tcp", + "format", "octet_counting", + "parser", PARSER_NAME_RFC5424, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + w_size = send(fd, frame, fsize, 0); + if (!TEST_CHECK(w_size == (ssize_t)fsize)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + flb_time_msleep(500); + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs (octet_counting single)"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + +/* -------- TCP + RFC6587 octet-counting: frame with trailing LF -------- */ +void flb_test_syslog_tcp_octet_counting_lf() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + struct str_list expected = { + .size = sizeof(RFC5424_EXPECTED_STRS_1)/sizeof(char*), + .lists = &RFC5424_EXPECTED_STRS_1[0], + }; + + char frame[4096]; + size_t fsize = 0; + + fsize = build_octet_frame(frame, sizeof(frame), RFC5424_EXAMPLE_1, /*add_lf=*/1); + clear_output_num(); + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "mode", "tcp", + "format", "octet_counting", + "parser", PARSER_NAME_RFC5424, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + w_size = send(fd, frame, fsize, 0); + if (!TEST_CHECK(w_size == (ssize_t)fsize)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + flb_time_msleep(500); + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs (octet_counting + LF)"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + +/* -------- TCP + RFC6587 octet-counting: fragmented send (header then body) -------- */ +void flb_test_syslog_tcp_octet_counting_fragmented() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + struct str_list expected = { + .size = sizeof(RFC5424_EXPECTED_STRS_1)/sizeof(char*), + .lists = &RFC5424_EXPECTED_STRS_1[0], + }; + + char msg[2048]; + size_t mlen = 0; + char hdr[64]; + int hlen = 0; + + mlen = rstrip_nl_copy(msg, sizeof(msg), RFC5424_EXAMPLE_1); + hlen = snprintf(hdr, sizeof(hdr), "%zu ", mlen); + + clear_output_num(); + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "mode", "tcp", + "format", "octet_counting", + "parser", PARSER_NAME_RFC5424, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* Send header only first */ + w_size = send(fd, hdr, (size_t)hlen, 0); + if (!TEST_CHECK(w_size == hlen)) { + TEST_MSG("failed to send header, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + /* Give the input a moment to hit 'need more bytes' path */ + flb_time_msleep(50); + + /* Now send body */ + w_size = send(fd, msg, mlen, 0); + if (!TEST_CHECK(w_size == (ssize_t)mlen)) { + TEST_MSG("failed to send body, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + flb_time_msleep(500); + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs (octet_counting fragmented)"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + +/* -------- TCP + RFC6587 octet-counting: two frames back-to-back -------- */ +void flb_test_syslog_tcp_octet_counting_multi() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + struct str_list expected = { + .size = sizeof(RFC5424_EXPECTED_STRS_1)/sizeof(char*), + .lists = &RFC5424_EXPECTED_STRS_1[0], + }; + + char frames[8192]; + size_t fsize = 0; + + fsize = build_two_frames(frames, sizeof(frames), + RFC5424_EXAMPLE_1, RFC5424_EXAMPLE_1, + /*add_lf_for_each=*/0); + + clear_output_num(); + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "mode", "tcp", + "format", "octet_counting", + "parser", PARSER_NAME_RFC5424, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + w_size = send(fd, frames, fsize, 0); + if (!TEST_CHECK(w_size == (ssize_t)fsize)) { + TEST_MSG("failed to send frames, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + flb_time_msleep(500); + num = get_output_num(); + if (!TEST_CHECK(num >= 2)) { + TEST_MSG("expected at least 2 outputs (octet_counting multi), got %d", num); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"syslog_tcp", flb_test_syslog_tcp}, {"syslog_udp", flb_test_syslog_udp}, @@ -1020,6 +1349,9 @@ TEST_LIST = { {"syslog_udp_unix", flb_test_syslog_udp_unix}, #endif #endif + {"syslog_tcp_octet_counting", flb_test_syslog_tcp_octet_counting}, + {"syslog_tcp_octet_counting_lf", flb_test_syslog_tcp_octet_counting_lf}, + {"syslog_tcp_octet_counting_fragmented", flb_test_syslog_tcp_octet_counting_fragmented}, + {"syslog_tcp_octet_counting_multi", flb_test_syslog_tcp_octet_counting_multi}, {NULL, NULL} }; -