Skip to content

Commit 6a2579e

Browse files
out_azure_kusto:added streaming ingestion support
Signed-off-by: Tanmaya Panda <[email protected]>
1 parent eb77790 commit 6a2579e

File tree

5 files changed

+256
-18
lines changed

5 files changed

+256
-18
lines changed

plugins/out_azure_kusto/azure_kusto.c

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
943943
return -1;
944944
}
945945

946+
/*
947+
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
948+
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
949+
*/
950+
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
951+
flb_sds_t cluster_endpoint = NULL;
952+
953+
/* Check if ingestion endpoint contains "ingest-" prefix */
954+
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
955+
/* Create cluster endpoint by removing "ingest-" prefix */
956+
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
957+
if (!cluster_endpoint) {
958+
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
959+
return -1;
960+
}
961+
962+
/* Replace "ingest-" with empty string to get cluster endpoint */
963+
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
964+
if (ingest_pos) {
965+
/* Move the rest of the string to remove "ingest-" */
966+
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
967+
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
968+
}
969+
970+
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
971+
972+
/* Create upstream connection to cluster endpoint */
973+
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
974+
if (!ctx->u_cluster) {
975+
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
976+
flb_sds_destroy(cluster_endpoint);
977+
return -1;
978+
}
979+
980+
flb_sds_destroy(cluster_endpoint);
981+
} else {
982+
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
983+
/* Use ingestion endpoint directly as cluster endpoint */
984+
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
985+
if (!ctx->u_cluster) {
986+
flb_plg_error(ctx->ins, "cluster upstream creation failed");
987+
return -1;
988+
}
989+
}
990+
991+
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
992+
}
993+
946994
flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
947995

948996
/* Create oauth2 context */
@@ -1396,22 +1444,34 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
13961444
}
13971445
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);
13981446

1399-
/* Load or refresh ingestion resources */
1400-
ret = azure_kusto_load_ingestion_resources(ctx, config);
1401-
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1402-
if (ret != 0) {
1403-
flb_plg_error(ctx->ins, "cannot load ingestion resources");
1404-
ret = FLB_RETRY;
1405-
goto error;
1406-
}
1447+
/* Check if streaming ingestion is enabled */
1448+
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
1449+
/* Perform streaming ingestion to Kusto */
1450+
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
1451+
1452+
if (ret != 0) {
1453+
flb_plg_error(ctx->ins, "streaming ingestion failed, will retry");
1454+
ret = FLB_RETRY;
1455+
goto error;
1456+
}
1457+
} else {
1458+
/* Load or refresh ingestion resources for queued ingestion */
1459+
ret = azure_kusto_load_ingestion_resources(ctx, config);
1460+
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1461+
if (ret != 0) {
1462+
flb_plg_error(ctx->ins, "cannot load ingestion resources");
1463+
ret = FLB_RETRY;
1464+
goto error;
1465+
}
14071466

1408-
/* Perform queued ingestion to Kusto */
1409-
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1410-
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1411-
if (ret != 0) {
1412-
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1413-
ret = FLB_RETRY;
1414-
goto error;
1467+
/* Perform queued ingestion to Kusto */
1468+
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1469+
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1470+
if (ret != 0) {
1471+
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1472+
ret = FLB_RETRY;
1473+
goto error;
1474+
}
14151475
}
14161476

14171477
ret = FLB_OK;
@@ -1501,6 +1561,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
15011561
ctx->u = NULL;
15021562
}
15031563

1564+
if (ctx->u_cluster) {
1565+
flb_upstream_destroy(ctx->u_cluster);
1566+
ctx->u_cluster = NULL;
1567+
}
1568+
15041569
pthread_mutex_destroy(&ctx->resources_mutex);
15051570
pthread_mutex_destroy(&ctx->token_mutex);
15061571
pthread_mutex_destroy(&ctx->blob_mutex);
@@ -1565,6 +1630,11 @@ static struct flb_config_map config_map[] = {
15651630
offsetof(struct flb_azure_kusto, compression_enabled),
15661631
"Enable HTTP payload compression (gzip)."
15671632
"The default is true."},
1633+
{FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE,
1634+
offsetof(struct flb_azure_kusto, streaming_ingestion_enabled),
1635+
"Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. "
1636+
"Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering."
1637+
"The default is false (uses queued ingestion)."},
15681638
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
15691639
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
15701640
"Set the azure kusto ingestion resources refresh interval"

plugins/out_azure_kusto/azure_kusto.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ struct flb_azure_kusto {
108108
/* compress payload */
109109
int compression_enabled;
110110

111+
/* streaming ingestion mode */
112+
int streaming_ingestion_enabled;
113+
111114
int ingestion_resources_refresh_interval;
112115

113116
/* records configuration */
@@ -167,6 +170,9 @@ struct flb_azure_kusto {
167170
/* Upstream connection to the backend server */
168171
struct flb_upstream *u;
169172

173+
/* Upstream connection to the main Kusto cluster for streaming ingestion */
174+
struct flb_upstream *u_cluster;
175+
170176
struct flb_upstream *imds_upstream;
171177

172178
/* Fluent Bit context */
@@ -179,4 +185,4 @@ struct flb_azure_kusto {
179185
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
180186
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);
181187

182-
#endif
188+
#endif

plugins/out_azure_kusto/azure_kusto_conf.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
796796
return NULL;
797797
}
798798

799+
/* Validate mutual exclusivity between buffering and streaming ingestion */
800+
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
801+
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
802+
flb_azure_kusto_conf_destroy(ctx);
803+
return NULL;
804+
}
805+
806+
/* Log ingestion mode selection */
807+
if (ctx->streaming_ingestion_enabled) {
808+
flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)");
809+
} else {
810+
if (ctx->buffering_enabled) {
811+
flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues");
812+
} else {
813+
flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues");
814+
}
815+
}
816+
799817
/* Create oauth2 context */
800818
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
801819
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {

plugins/out_azure_kusto/azure_kusto_ingest.c

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,147 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t
517517
return ret;
518518
}
519519

520+
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
521+
size_t tag_len, flb_sds_t payload, size_t payload_size)
522+
{
523+
int ret = -1;
524+
struct flb_connection *u_conn;
525+
struct flb_http_client *c;
526+
flb_sds_t uri = NULL;
527+
flb_sds_t token = NULL;
528+
size_t resp_size;
529+
time_t now;
530+
struct tm tm;
531+
char tmp[64];
532+
int len;
533+
534+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
535+
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
536+
537+
now = time(NULL);
538+
gmtime_r(&now, &tm);
539+
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
540+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp);
541+
542+
/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
543+
if (!ctx->u_cluster) {
544+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
545+
return -1;
546+
}
547+
548+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint");
549+
u_conn = flb_upstream_conn_get(ctx->u_cluster);
550+
if (!u_conn) {
551+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection");
552+
return -1;
553+
}
554+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection");
555+
556+
/* Get authentication token */
557+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token");
558+
token = get_azure_kusto_token(ctx);
559+
if (!token) {
560+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token");
561+
flb_upstream_conn_release(u_conn);
562+
return -1;
563+
}
564+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token));
565+
566+
/* Build the streaming ingestion URI */
567+
uri = flb_sds_create_size(256);
568+
if (!uri) {
569+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer");
570+
flb_sds_destroy(token);
571+
flb_upstream_conn_release(u_conn);
572+
return -1;
573+
}
574+
575+
/* Create the streaming ingestion URI */
576+
if (ctx->ingestion_mapping_reference) {
577+
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
578+
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s",
579+
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference);
580+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference);
581+
} else {
582+
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
583+
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON",
584+
ctx->database_name, ctx->table_name);
585+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified");
586+
}
587+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri);
588+
589+
/* Create HTTP client for streaming ingestion */
590+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request");
591+
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
592+
NULL, 0, NULL, 0);
593+
594+
if (c) {
595+
/* Add required headers for streaming ingestion */
596+
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
597+
flb_http_add_header(c, "Accept", 6, "application/json", 16);
598+
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
599+
flb_http_add_header(c, "x-ms-date", 9, tmp, len);
600+
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
601+
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
602+
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);
603+
604+
/* Set Content-Type based on whether compression is enabled */
605+
if (ctx->compression_enabled) {
606+
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
607+
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4);
608+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload");
609+
} else {
610+
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31);
611+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload");
612+
}
613+
614+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload);
615+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine");
616+
617+
/* Send the HTTP request */
618+
ret = flb_http_do(c, &resp_size);
619+
620+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size);
621+
622+
if (ret == 0) {
623+
/* Check for successful HTTP status codes */
624+
if (c->resp.status == 200 || c->resp.status == 204) {
625+
ret = 0;
626+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status);
627+
} else {
628+
ret = -1;
629+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);
630+
631+
if (c->resp.payload_size > 0) {
632+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s",
633+
c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload);
634+
}
635+
}
636+
} else {
637+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret);
638+
}
639+
640+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client");
641+
flb_http_client_destroy(c);
642+
} else {
643+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context");
644+
}
645+
646+
/* Cleanup */
647+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources");
648+
if (uri) {
649+
flb_sds_destroy(uri);
650+
}
651+
if (token) {
652+
flb_sds_destroy(token);
653+
}
654+
flb_upstream_conn_release(u_conn);
655+
656+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret);
657+
return ret;
658+
}
659+
660+
520661
/* Function to generate a random alphanumeric string */
521662
void generate_random_string(char *str, size_t length)
522663
{
@@ -658,4 +799,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
658799
}
659800

660801
return ret;
661-
}
802+
}

plugins/out_azure_kusto/azure_kusto_ingest.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@
2626
int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
2727
size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file);
2828

29-
#endif
29+
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
30+
size_t tag_len, flb_sds_t payload, size_t payload_size);
31+
32+
#endif

0 commit comments

Comments
 (0)