Skip to content

Commit 4fd3fd4

Browse files
azure_kusto:fixed code comments
Signed-off-by: Tanmaya Panda <[email protected]>
1 parent bf150d8 commit 4fd3fd4

File tree

4 files changed

+81
-46
lines changed

4 files changed

+81
-46
lines changed

plugins/out_azure_kusto/azure_kusto.c

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
912912

913913
ctx->timer_created = FLB_FALSE;
914914
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
915-
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
915+
flb_plg_debug(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
916916
}
917917

918918
flb_output_set_context(ins, ctx);
@@ -949,46 +949,55 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
949949
*/
950950
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
951951
flb_sds_t cluster_endpoint = NULL;
952-
953-
/* Check if ingestion endpoint contains "ingest-" prefix */
954-
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
955-
/* Create cluster endpoint by removing "ingest-" prefix */
952+
const char *prefix = "ingest-";
953+
const char *schema_end = strstr(ctx->ingestion_endpoint, "://");
954+
const char *hostname_start = schema_end ? schema_end + 3 : ctx->ingestion_endpoint;
955+
956+
/* Check if hostname starts with "ingest-" prefix */
957+
if (strncmp(hostname_start, prefix, strlen(prefix)) == 0) {
958+
/* Create cluster endpoint by removing "ingest-" prefix from hostname */
956959
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
957960
if (!cluster_endpoint) {
958961
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
962+
flb_upstream_destroy(ctx->u);
963+
ctx->u = NULL;
959964
return -1;
960965
}
961966

962-
/* Replace "ingest-" with empty string to get cluster endpoint */
963-
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
964-
if (ingest_pos) {
965-
/* Move the rest of the string to remove "ingest-" */
966-
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
967-
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
967+
/* Find the position in our copy and remove the prefix */
968+
char *copy_hostname = strstr(cluster_endpoint, "://");
969+
if (copy_hostname) {
970+
copy_hostname += 3;
971+
/* Verify the prefix is still at the expected position */
972+
if (strncmp(copy_hostname, prefix, strlen(prefix)) == 0) {
973+
memmove(copy_hostname, copy_hostname + strlen(prefix),
974+
strlen(copy_hostname + strlen(prefix)) + 1);
975+
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - strlen(prefix));
976+
}
968977
}
969978

970-
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
971-
972979
/* Create upstream connection to cluster endpoint */
973980
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
974981
if (!ctx->u_cluster) {
975982
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
976983
flb_sds_destroy(cluster_endpoint);
984+
flb_upstream_destroy(ctx->u);
985+
ctx->u = NULL;
977986
return -1;
978987
}
979988

980989
flb_sds_destroy(cluster_endpoint);
981990
} else {
982-
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
991+
flb_plg_warn(ctx->ins, "ingestion endpoint hostname does not start with 'ingest-' prefix, using as cluster endpoint");
983992
/* Use ingestion endpoint directly as cluster endpoint */
984993
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
985994
if (!ctx->u_cluster) {
986995
flb_plg_error(ctx->ins, "cluster upstream creation failed");
996+
flb_upstream_destroy(ctx->u);
997+
ctx->u = NULL;
987998
return -1;
988999
}
9891000
}
990-
991-
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
9921001
}
9931002

9941003
flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
@@ -1446,21 +1455,35 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
14461455

14471456
/* Check if streaming ingestion is enabled */
14481457
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
1449-
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
1450-
1451-
/* Check payload size limit for streaming ingestion (4MB) */
1452-
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
1453-
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
1454-
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
1458+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
1459+
1460+
/*
1461+
* Azure Kusto streaming ingestion has TWO limits:
1462+
* 1. Uncompressed data: 4MB limit
1463+
* 2. Compressed data: 1MB limit
1464+
* We need to check both limits
1465+
*/
1466+
1467+
/* Check uncompressed data limit (4MB) */
1468+
if (json_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
1469+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Uncompressed payload size %zu bytes exceeds 4MB limit for streaming ingestion", json_size);
1470+
ret = FLB_ERROR;
1471+
goto error;
1472+
}
1473+
1474+
/* Check compressed data limit (1MB) if compression is enabled */
1475+
if (ctx->compression_enabled == FLB_TRUE && final_payload_size > 1048576) { /* 1MB = 1024 * 1024 */
1476+
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Compressed payload size %zu bytes exceeds 1MB limit for streaming ingestion (uncompressed: %zu bytes)", final_payload_size, json_size);
14551477
ret = FLB_ERROR;
14561478
goto error;
14571479
}
1458-
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size);
1480+
1481+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Payload size checks passed - uncompressed: %zu bytes, compressed: %zu bytes",
1482+
json_size, final_payload_size);
14591483

14601484
/* Perform streaming ingestion to Kusto */
1461-
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
1485+
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
14621486
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
1463-
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret);
14641487

14651488
if (ret != 0) {
14661489
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");

plugins/out_azure_kusto/azure_kusto_conf.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -796,11 +796,11 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
796796
return NULL;
797797
}
798798

799-
/* Validate mutual exclusivity between buffering and streaming ingestion */
799+
/* Validate mutual exclusivity between buffering and streaming ingestion
800+
* Prefer queued ingestion when buffering is explicitly enabled */
800801
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
801-
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
802-
flb_azure_kusto_conf_destroy(ctx);
803-
return NULL;
802+
ctx->streaming_ingestion_enabled = FLB_FALSE;
803+
flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
804804
}
805805

806806
/* Log ingestion mode selection */

plugins/out_azure_kusto/azure_kusto_ingest.c

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -530,38 +530,58 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
530530
struct tm tm;
531531
char tmp[64];
532532
int len;
533+
size_t size_limit;
533534

534535
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
535536
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
536537

538+
/*
539+
* Size validation (matching ManagedStreamingIngestClient behavior)
540+
* Check payload size against limits before attempting streaming ingestion
541+
*/
542+
if (ctx->compression_enabled) {
543+
size_limit = KUSTO_STREAMING_MAX_COMPRESSED_SIZE;
544+
} else {
545+
size_limit = KUSTO_STREAMING_MAX_UNCOMPRESSED_SIZE;
546+
}
547+
548+
if (payload_size > size_limit) {
549+
flb_plg_warn(ctx->ins, "[STREAMING_INGESTION] Payload size %zu bytes exceeds %s limit of %zu bytes - falling back to queued ingestion",
550+
payload_size, ctx->compression_enabled ? "compressed" : "uncompressed", size_limit);
551+
552+
/* Fallback to queued ingestion */
553+
ret = azure_kusto_queued_ingestion(ctx, tag, tag_len, payload, payload_size, NULL);
554+
if (ret == 0) {
555+
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Successfully fell back to queued ingestion");
556+
} else {
557+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Fallback to queued ingestion failed");
558+
}
559+
return ret;
560+
}
561+
537562
now = time(NULL);
538563
gmtime_r(&now, &tm);
539564
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
540-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp);
541565

542566
/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
543567
if (!ctx->u_cluster) {
544568
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
545569
return -1;
546570
}
547571

548-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint");
549572
u_conn = flb_upstream_conn_get(ctx->u_cluster);
550573
if (!u_conn) {
551574
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection");
552575
return -1;
553576
}
554-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection");
555577

556578
/* Get authentication token */
557-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token");
558579
token = get_azure_kusto_token(ctx);
559580
if (!token) {
560581
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token");
561582
flb_upstream_conn_release(u_conn);
562583
return -1;
563584
}
564-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token));
565585

566586
/* Build the streaming ingestion URI */
567587
uri = flb_sds_create_size(256);
@@ -577,17 +597,14 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
577597
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
578598
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s",
579599
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference);
580-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference);
581600
} else {
582601
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
583602
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON",
584603
ctx->database_name, ctx->table_name);
585604
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified");
586605
}
587-
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri);
588606

589607
/* Create HTTP client for streaming ingestion */
590-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request");
591608
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
592609
NULL, 0, NULL, 0);
593610

@@ -605,15 +622,10 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
605622
if (ctx->compression_enabled) {
606623
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
607624
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4);
608-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload");
609625
} else {
610626
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31);
611-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload");
612627
}
613628

614-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload);
615-
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine");
616-
617629
/* Send the HTTP request */
618630
ret = flb_http_do(c, &resp_size);
619631

@@ -623,7 +635,6 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
623635
/* Check for successful HTTP status codes */
624636
if (c->resp.status == 200 || c->resp.status == 204) {
625637
ret = 0;
626-
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status);
627638
} else {
628639
ret = -1;
629640
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);
@@ -637,14 +648,12 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
637648
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret);
638649
}
639650

640-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client");
641651
flb_http_client_destroy(c);
642652
} else {
643653
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context");
644654
}
645655

646656
/* Cleanup */
647-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources");
648657
if (uri) {
649658
flb_sds_destroy(uri);
650659
}
@@ -653,7 +662,6 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
653662
}
654663
flb_upstream_conn_release(u_conn);
655664

656-
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret);
657665
return ret;
658666
}
659667

plugins/out_azure_kusto/azure_kusto_ingest.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
#include "azure_kusto.h"
2424
#include "azure_kusto_store.h"
2525

26+
/* Kusto Streaming Ingestion Limits (matching ManagedStreamingIngestClient behavior) */
27+
#define KUSTO_STREAMING_MAX_UNCOMPRESSED_SIZE (4 * 1024 * 1024) /* 4 MB */
28+
#define KUSTO_STREAMING_MAX_COMPRESSED_SIZE (1 * 1024 * 1024) /* 1 MB */
29+
2630
int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
2731
size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file);
2832

0 commit comments

Comments
 (0)