Skip to content

Commit 9b47a6d

Browse files
out_azure_kusto: add streaming ingestion support
Signed-off-by: Tanmaya Panda <[email protected]>
1 parent eb77790 commit 9b47a6d

File tree

5 files changed

+308
-19
lines changed

5 files changed

+308
-19
lines changed

plugins/out_azure_kusto/azure_kusto.c

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
912912

913913
ctx->timer_created = FLB_FALSE;
914914
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
915-
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
915+
flb_plg_debug(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
916916
}
917917

918918
flb_output_set_context(ins, ctx);
@@ -943,6 +943,63 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
943943
return -1;
944944
}
945945

946+
/*
947+
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
948+
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
949+
*/
950+
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
951+
flb_sds_t cluster_endpoint = NULL;
952+
const char *prefix = "ingest-";
953+
const char *schema_end = strstr(ctx->ingestion_endpoint, "://");
954+
const char *hostname_start = schema_end ? schema_end + 3 : ctx->ingestion_endpoint;
955+
956+
/* Check if hostname starts with "ingest-" prefix */
957+
if (strncmp(hostname_start, prefix, strlen(prefix)) == 0) {
958+
/* Create cluster endpoint by removing "ingest-" prefix from hostname */
959+
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
960+
if (!cluster_endpoint) {
961+
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
962+
flb_upstream_destroy(ctx->u);
963+
ctx->u = NULL;
964+
return -1;
965+
}
966+
967+
/* Find the position in our copy and remove the prefix */
968+
char *copy_hostname = strstr(cluster_endpoint, "://");
969+
if (copy_hostname) {
970+
copy_hostname += 3;
971+
/* Verify the prefix is still at the expected position */
972+
if (strncmp(copy_hostname, prefix, strlen(prefix)) == 0) {
973+
memmove(copy_hostname, copy_hostname + strlen(prefix),
974+
strlen(copy_hostname + strlen(prefix)) + 1);
975+
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - strlen(prefix));
976+
}
977+
}
978+
979+
/* Create upstream connection to cluster endpoint */
980+
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
981+
if (!ctx->u_cluster) {
982+
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
983+
flb_sds_destroy(cluster_endpoint);
984+
flb_upstream_destroy(ctx->u);
985+
ctx->u = NULL;
986+
return -1;
987+
}
988+
989+
flb_sds_destroy(cluster_endpoint);
990+
} else {
991+
flb_plg_warn(ctx->ins, "ingestion endpoint hostname does not start with 'ingest-' prefix, using as cluster endpoint");
992+
/* Use ingestion endpoint directly as cluster endpoint */
993+
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
994+
if (!ctx->u_cluster) {
995+
flb_plg_error(ctx->ins, "cluster upstream creation failed");
996+
flb_upstream_destroy(ctx->u);
997+
ctx->u = NULL;
998+
return -1;
999+
}
1000+
}
1001+
}
1002+
9461003
flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
9471004

9481005
/* Create oauth2 context */
@@ -1396,22 +1453,64 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
13961453
}
13971454
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);
13981455

1399-
/* Load or refresh ingestion resources */
1400-
ret = azure_kusto_load_ingestion_resources(ctx, config);
1401-
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1402-
if (ret != 0) {
1403-
flb_plg_error(ctx->ins, "cannot load ingestion resources");
1404-
ret = FLB_RETRY;
1405-
goto error;
1406-
}
1456+
/* Check if streaming ingestion is enabled */
1457+
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
1458+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
1459+
1460+
/*
1461+
* Azure Kusto streaming ingestion has TWO limits:
1462+
* 1. Uncompressed data: 4MB limit
1463+
* 2. Compressed data: 1MB limit
1464+
* We need to check both limits
1465+
*/
1466+
1467+
/* Check uncompressed data limit (4MB) */
1468+
if (json_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
1469+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Uncompressed payload size %zu bytes exceeds 4MB limit for streaming ingestion", json_size);
1470+
ret = FLB_ERROR;
1471+
goto error;
1472+
}
1473+
1474+
/* Check compressed data limit (1MB) if compression is enabled */
1475+
if (ctx->compression_enabled == FLB_TRUE && final_payload_size > 1048576) { /* 1MB = 1024 * 1024 */
1476+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Compressed payload size %zu bytes exceeds 1MB limit for streaming ingestion (uncompressed: %zu bytes)", final_payload_size, json_size);
1477+
ret = FLB_ERROR;
1478+
goto error;
1479+
}
1480+
1481+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Payload size checks passed - uncompressed: %zu bytes, compressed: %zu bytes",
1482+
json_size, final_payload_size);
14071483

1408-
/* Perform queued ingestion to Kusto */
1409-
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1410-
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1411-
if (ret != 0) {
1412-
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1413-
ret = FLB_RETRY;
1414-
goto error;
1484+
/* Perform streaming ingestion to Kusto */
1485+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
1486+
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
1487+
1488+
if (ret != 0) {
1489+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");
1490+
ret = FLB_RETRY;
1491+
goto error;
1492+
} else {
1493+
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully");
1494+
}
1495+
} else {
1496+
flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)");
1497+
/* Load or refresh ingestion resources for queued ingestion */
1498+
ret = azure_kusto_load_ingestion_resources(ctx, config);
1499+
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1500+
if (ret != 0) {
1501+
flb_plg_error(ctx->ins, "cannot load ingestion resources");
1502+
ret = FLB_RETRY;
1503+
goto error;
1504+
}
1505+
1506+
/* Perform queued ingestion to Kusto */
1507+
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1508+
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1509+
if (ret != 0) {
1510+
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1511+
ret = FLB_RETRY;
1512+
goto error;
1513+
}
14151514
}
14161515

14171516
ret = FLB_OK;
@@ -1501,6 +1600,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
15011600
ctx->u = NULL;
15021601
}
15031602

1603+
if (ctx->u_cluster) {
1604+
flb_upstream_destroy(ctx->u_cluster);
1605+
ctx->u_cluster = NULL;
1606+
}
1607+
15041608
pthread_mutex_destroy(&ctx->resources_mutex);
15051609
pthread_mutex_destroy(&ctx->token_mutex);
15061610
pthread_mutex_destroy(&ctx->blob_mutex);
@@ -1565,6 +1669,11 @@ static struct flb_config_map config_map[] = {
15651669
offsetof(struct flb_azure_kusto, compression_enabled),
15661670
"Enable HTTP payload compression (gzip)."
15671671
"The default is true."},
1672+
{FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE,
1673+
offsetof(struct flb_azure_kusto, streaming_ingestion_enabled),
1674+
"Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. "
1675+
"Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering."
1676+
"The default is false (uses queued ingestion)."},
15681677
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
15691678
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
15701679
"Set the azure kusto ingestion resources refresh interval"

plugins/out_azure_kusto/azure_kusto.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ struct flb_azure_kusto {
108108
/* compress payload */
109109
int compression_enabled;
110110

111+
/* streaming ingestion mode */
112+
int streaming_ingestion_enabled;
113+
111114
int ingestion_resources_refresh_interval;
112115

113116
/* records configuration */
@@ -167,6 +170,9 @@ struct flb_azure_kusto {
167170
/* Upstream connection to the backend server */
168171
struct flb_upstream *u;
169172

173+
/* Upstream connection to the main Kusto cluster for streaming ingestion */
174+
struct flb_upstream *u_cluster;
175+
170176
struct flb_upstream *imds_upstream;
171177

172178
/* Fluent Bit context */
@@ -179,4 +185,4 @@ struct flb_azure_kusto {
179185
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
180186
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);
181187

182-
#endif
188+
#endif

plugins/out_azure_kusto/azure_kusto_conf.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
796796
return NULL;
797797
}
798798

799+
/* Validate mutual exclusivity between buffering and streaming ingestion
800+
* Prefer queued ingestion when buffering is explicitly enabled */
801+
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
802+
ctx->streaming_ingestion_enabled = FLB_FALSE;
803+
flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
804+
}
805+
806+
/* Log ingestion mode selection */
807+
if (ctx->streaming_ingestion_enabled) {
808+
flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)");
809+
} else {
810+
if (ctx->buffering_enabled) {
811+
flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues");
812+
} else {
813+
flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues");
814+
}
815+
}
816+
799817
/* Create oauth2 context */
800818
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
801819
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {

plugins/out_azure_kusto/azure_kusto_ingest.c

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,155 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t
517517
return ret;
518518
}
519519

520+
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
521+
size_t tag_len, flb_sds_t payload, size_t payload_size)
522+
{
523+
int ret = -1;
524+
struct flb_connection *u_conn;
525+
struct flb_http_client *c;
526+
flb_sds_t uri = NULL;
527+
flb_sds_t token = NULL;
528+
size_t resp_size;
529+
time_t now;
530+
struct tm tm;
531+
char tmp[64];
532+
int len;
533+
size_t size_limit;
534+
535+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
536+
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
537+
538+
/*
539+
* Size validation (matching ManagedStreamingIngestClient behavior)
540+
* Check payload size against limits before attempting streaming ingestion
541+
*/
542+
if (ctx->compression_enabled) {
543+
size_limit = KUSTO_STREAMING_MAX_COMPRESSED_SIZE;
544+
} else {
545+
size_limit = KUSTO_STREAMING_MAX_UNCOMPRESSED_SIZE;
546+
}
547+
548+
if (payload_size > size_limit) {
549+
flb_plg_warn(ctx->ins, "[STREAMING_INGESTION] Payload size %zu bytes exceeds %s limit of %zu bytes - falling back to queued ingestion",
550+
payload_size, ctx->compression_enabled ? "compressed" : "uncompressed", size_limit);
551+
552+
/* Fallback to queued ingestion */
553+
ret = azure_kusto_queued_ingestion(ctx, tag, tag_len, payload, payload_size, NULL);
554+
if (ret == 0) {
555+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Successfully fell back to queued ingestion");
556+
} else {
557+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Fallback to queued ingestion failed");
558+
}
559+
return ret;
560+
}
561+
562+
now = time(NULL);
563+
gmtime_r(&now, &tm);
564+
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
565+
566+
/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
567+
if (!ctx->u_cluster) {
568+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
569+
return -1;
570+
}
571+
572+
u_conn = flb_upstream_conn_get(ctx->u_cluster);
573+
if (!u_conn) {
574+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection");
575+
return -1;
576+
}
577+
578+
/* Get authentication token */
579+
token = get_azure_kusto_token(ctx);
580+
if (!token) {
581+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token");
582+
flb_upstream_conn_release(u_conn);
583+
return -1;
584+
}
585+
586+
/* Build the streaming ingestion URI */
587+
uri = flb_sds_create_size(256);
588+
if (!uri) {
589+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer");
590+
flb_sds_destroy(token);
591+
flb_upstream_conn_release(u_conn);
592+
return -1;
593+
}
594+
595+
/* Create the streaming ingestion URI */
596+
if (ctx->ingestion_mapping_reference) {
597+
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
598+
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s",
599+
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference);
600+
} else {
601+
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
602+
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON",
603+
ctx->database_name, ctx->table_name);
604+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified");
605+
}
606+
607+
/* Create HTTP client for streaming ingestion */
608+
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
609+
NULL, 0, NULL, 0);
610+
611+
if (c) {
612+
/* Add required headers for streaming ingestion */
613+
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
614+
flb_http_add_header(c, "Accept", 6, "application/json", 16);
615+
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
616+
flb_http_add_header(c, "x-ms-date", 9, tmp, len);
617+
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
618+
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
619+
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);
620+
621+
/* Set Content-Type based on whether compression is enabled */
622+
if (ctx->compression_enabled) {
623+
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
624+
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4);
625+
} else {
626+
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31);
627+
}
628+
629+
/* Send the HTTP request */
630+
ret = flb_http_do(c, &resp_size);
631+
632+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size);
633+
634+
if (ret == 0) {
635+
/* Check for successful HTTP status codes */
636+
if (c->resp.status == 200 || c->resp.status == 204) {
637+
ret = 0;
638+
} else {
639+
ret = -1;
640+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);
641+
642+
if (c->resp.payload_size > 0) {
643+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s",
644+
c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload);
645+
}
646+
}
647+
} else {
648+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret);
649+
}
650+
651+
flb_http_client_destroy(c);
652+
} else {
653+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context");
654+
}
655+
656+
/* Cleanup */
657+
if (uri) {
658+
flb_sds_destroy(uri);
659+
}
660+
if (token) {
661+
flb_sds_destroy(token);
662+
}
663+
flb_upstream_conn_release(u_conn);
664+
665+
return ret;
666+
}
667+
668+
520669
/* Function to generate a random alphanumeric string */
521670
void generate_random_string(char *str, size_t length)
522671
{
@@ -658,4 +807,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
658807
}
659808

660809
return ret;
661-
}
810+
}

0 commit comments

Comments
 (0)