Skip to content

Commit bf150d8

Browse files
Merge branch 'feature/add-streaming-support' of github.com:tanmaya-panda1/fluent-bit into feature/add-streaming-support
2 parents 6a2579e + 5706e9d commit bf150d8

File tree

2 files changed

+29
-13
lines changed

2 files changed

+29
-13
lines changed

plugins/out_azure_kusto/azure_kusto.c

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
949949
*/
950950
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
951951
flb_sds_t cluster_endpoint = NULL;
952-
952+
953953
/* Check if ingestion endpoint contains "ingest-" prefix */
954954
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
955955
/* Create cluster endpoint by removing "ingest-" prefix */
@@ -958,25 +958,25 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
958958
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
959959
return -1;
960960
}
961-
961+
962962
/* Replace "ingest-" with empty string to get cluster endpoint */
963963
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
964964
if (ingest_pos) {
965965
/* Move the rest of the string to remove "ingest-" */
966966
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
967967
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
968968
}
969-
969+
970970
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
971-
971+
972972
/* Create upstream connection to cluster endpoint */
973973
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
974974
if (!ctx->u_cluster) {
975975
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
976976
flb_sds_destroy(cluster_endpoint);
977977
return -1;
978978
}
979-
979+
980980
flb_sds_destroy(cluster_endpoint);
981981
} else {
982982
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
@@ -987,7 +987,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
987987
return -1;
988988
}
989989
}
990-
990+
991991
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
992992
}
993993

@@ -1446,15 +1446,31 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
14461446

14471447
/* Check if streaming ingestion is enabled */
14481448
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
1449+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
1450+
1451+
/* Check payload size limit for streaming ingestion (4MB) */
1452+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
1453+
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
1454+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
1455+
ret = FLB_ERROR;
1456+
goto error;
1457+
}
1458+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size);
1459+
14491460
/* Perform streaming ingestion to Kusto */
1461+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
14501462
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
1451-
1463+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret);
1464+
14521465
if (ret != 0) {
1453-
flb_plg_error(ctx->ins, "streaming ingestion failed, will retry");
1466+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");
14541467
ret = FLB_RETRY;
14551468
goto error;
1469+
} else {
1470+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully");
14561471
}
14571472
} else {
1473+
flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)");
14581474
/* Load or refresh ingestion resources for queued ingestion */
14591475
ret = azure_kusto_load_ingestion_resources(ctx, config);
14601476
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);

plugins/out_azure_kusto/azure_kusto_ingest.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
531531
char tmp[64];
532532
int len;
533533

534-
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
534+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
535535
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
536536

537537
now = time(NULL);
@@ -544,7 +544,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
544544
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
545545
return -1;
546546
}
547-
547+
548548
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint");
549549
u_conn = flb_upstream_conn_get(ctx->u_cluster);
550550
if (!u_conn) {
@@ -588,7 +588,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
588588

589589
/* Create HTTP client for streaming ingestion */
590590
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request");
591-
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
591+
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
592592
NULL, 0, NULL, 0);
593593

594594
if (c) {
@@ -600,7 +600,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
600600
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
601601
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
602602
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);
603-
603+
604604
/* Set Content-Type based on whether compression is enabled */
605605
if (ctx->compression_enabled) {
606606
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
@@ -616,7 +616,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
616616

617617
/* Send the HTTP request */
618618
ret = flb_http_do(c, &resp_size);
619-
619+
620620
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size);
621621

622622
if (ret == 0) {

0 commit comments

Comments
 (0)