Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 101 additions & 9 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi

ctx->timer_created = FLB_FALSE;
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
flb_plg_debug(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
}

flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -943,6 +943,63 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
return -1;
}

/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
const char *prefix = "ingest-";
const char *schema_end = strstr(ctx->ingestion_endpoint, "://");
const char *hostname_start = schema_end ? schema_end + 3 : ctx->ingestion_endpoint;

/* Check if hostname starts with "ingest-" prefix */
if (strncmp(hostname_start, prefix, strlen(prefix)) == 0) {
/* Create cluster endpoint by removing "ingest-" prefix from hostname */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}

/* Find the position in our copy and remove the prefix */
char *copy_hostname = strstr(cluster_endpoint, "://");
if (copy_hostname) {
copy_hostname += 3;
/* Verify the prefix is still at the expected position */
if (strncmp(copy_hostname, prefix, strlen(prefix)) == 0) {
memmove(copy_hostname, copy_hostname + strlen(prefix),
strlen(copy_hostname + strlen(prefix)) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - strlen(prefix));
}
}

/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}

flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint hostname does not start with 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
}
}

flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));

/* Create oauth2 context */
Expand Down Expand Up @@ -1396,7 +1453,11 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
}
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);

/* Load or refresh ingestion resources */
/*
* Load ingestion resources regardless of streaming mode.
* This is required because streaming ingestion may fall back to queued ingestion
* when payload size exceeds limits, and queued ingestion requires these resources.
*/
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
if (ret != 0) {
Expand All @@ -1405,13 +1466,34 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
goto error;
}

/* Perform queued ingestion to Kusto */
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
ret = FLB_RETRY;
goto error;
/* Check if streaming ingestion is enabled */
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
/*
* Perform streaming ingestion to Kusto.
* Note: kusto streaming ingestion may automatically fall back to queued ingestion
* if the payload size exceeds limits ie uncompressed payload size > 4MB.
*/
flb_plg_debug(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, json_size);

if (ret != 0) {
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");
ret = FLB_RETRY;
goto error;
} else {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully");
}
} else {
flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)");

/* Perform queued ingestion to Kusto */
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
ret = FLB_RETRY;
goto error;
}
}

ret = FLB_OK;
Expand Down Expand Up @@ -1501,6 +1583,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

if (ctx->u_cluster) {
flb_upstream_destroy(ctx->u_cluster);
ctx->u_cluster = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
Expand Down Expand Up @@ -1565,6 +1652,11 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, compression_enabled),
"Enable HTTP payload compression (gzip)."
"The default is true."},
{FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, streaming_ingestion_enabled),
"Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. "
"Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering."
"The default is false (uses queued ingestion)."},
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
"Set the azure kusto ingestion resources refresh interval"
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct flb_azure_kusto {
/* compress payload */
int compression_enabled;

/* streaming ingestion mode */
int streaming_ingestion_enabled;

int ingestion_resources_refresh_interval;

/* records configuration */
Expand Down Expand Up @@ -167,6 +170,9 @@ struct flb_azure_kusto {
/* Upstream connection to the backend server */
struct flb_upstream *u;

/* Upstream connection to the main Kusto cluster for streaming ingestion */
struct flb_upstream *u_cluster;

struct flb_upstream *imds_upstream;

/* Fluent Bit context */
Expand All @@ -179,4 +185,4 @@ struct flb_azure_kusto {
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);

#endif
#endif
18 changes: 18 additions & 0 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* Validate mutual exclusivity between buffering and streaming ingestion
* Prefer queued ingestion when buffering is explicitly enabled */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
ctx->streaming_ingestion_enabled = FLB_FALSE;
flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
}

/* Log ingestion mode selection */
if (ctx->streaming_ingestion_enabled) {
flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)");
} else {
if (ctx->buffering_enabled) {
flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues");
} else {
flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues");
}
}

/* Create oauth2 context */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {
Expand Down
147 changes: 146 additions & 1 deletion plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,151 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t
return ret;
}

int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size,
size_t uncompressed_size)
{
int ret = -1;
struct flb_connection *u_conn;
struct flb_http_client *c;
flb_sds_t uri = NULL;
flb_sds_t token = NULL;
size_t resp_size;
time_t now;
struct tm tm;
char tmp[64];
int len;

flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, uncompressed: %zu bytes, payload: %zu bytes, db: %s, table: %s, compression: %s",
(int)tag_len, tag, uncompressed_size, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");

/*
* Size validation per Azure Kusto documentation:
* https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
* "Data size limit: The data size limit for a streaming ingestion request is 4 MB."
* This limit applies to the uncompressed data size.
*/
if (uncompressed_size > KUSTO_STREAMING_MAX_UNCOMPRESSED_SIZE) {
flb_plg_warn(ctx->ins, "[STREAMING_INGESTION] Uncompressed data size %zu bytes exceeds 4MB limit - falling back to queued ingestion",
uncompressed_size);

/* Fallback to queued ingestion */
ret = azure_kusto_queued_ingestion(ctx, tag, tag_len, payload, payload_size, NULL);
if (ret == 0) {
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Successfully fell back to queued ingestion");
} else {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Fallback to queued ingestion failed");
}
return ret;
}

now = time(NULL);
gmtime_r(&now, &tm);
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);

/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
return -1;
}

u_conn = flb_upstream_conn_get(ctx->u_cluster);
if (!u_conn) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection");
return -1;
}

/* Get authentication token */
token = get_azure_kusto_token(ctx);
if (!token) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token");
flb_upstream_conn_release(u_conn);
return -1;
}

/* Build the streaming ingestion URI */
uri = flb_sds_create_size(256);
if (!uri) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer");
flb_sds_destroy(token);
flb_upstream_conn_release(u_conn);
return -1;
}

/* Create the streaming ingestion URI */
if (ctx->ingestion_mapping_reference) {
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s",
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference);
} else {
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON",
ctx->database_name, ctx->table_name);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified");
}

/* Create HTTP client for streaming ingestion */
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
NULL, 0, NULL, 0);

if (c) {
/* Add required headers for streaming ingestion */
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
flb_http_add_header(c, "x-ms-date", 9, tmp, len);
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);

/* Set Content-Type based on whether compression is enabled */
if (ctx->compression_enabled) {
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4);
} else {
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31);
}

/* Send the HTTP request */
ret = flb_http_do(c, &resp_size);

flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size);

if (ret == 0) {
/* Check for successful HTTP status codes */
if (c->resp.status == 200 || c->resp.status == 204) {
ret = 0;
} else {
ret = -1;
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);

if (c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s",
c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload);
}
}
} else {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret);
}

flb_http_client_destroy(c);
} else {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context");
}

/* Cleanup */
if (uri) {
flb_sds_destroy(uri);
}
if (token) {
flb_sds_destroy(token);
}
flb_upstream_conn_release(u_conn);

return ret;
}
Comment on lines +520 to +662
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Streaming: accept all 2xx; add retryable fallback (413/429/5xx).

Only 200/204 are treated as success; add 202 and, safer, any 2xx. Also fall back to queued ingestion on classic retryable statuses.

-            /* Check for successful HTTP status codes */
-            if (c->resp.status == 200 || c->resp.status == 204) {
+            /* Treat any 2xx as success */
+            if (c->resp.status >= 200 && c->resp.status < 300) {
                 ret = 0;
             } else {
                 ret = -1;
                 flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);
 
                 if (c->resp.payload_size > 0) {
                     flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s",
                                 c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload);
                 }
+
+                /* Optional: automatic fallback on retryable statuses */
+                if (c->resp.status == 413 || c->resp.status == 429 ||
+                    (c->resp.status >= 500 && c->resp.status < 600)) {
+                    flb_plg_warn(ctx->ins, "[STREAMING_INGESTION] Status %i -> attempting queued ingestion fallback", c->resp.status);
+                    /* destroy client before fallback return path */
+                    flb_http_client_destroy(c);
+                    c = NULL;
+                    /* Fall back to queued ingestion */
+                    ret = azure_kusto_queued_ingestion(ctx, tag, tag_len, payload, payload_size, NULL);
+                    if (ret == 0) {
+                        flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Fallback to queued ingestion succeeded");
+                    }
+                    else {
+                        flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Fallback to queued ingestion failed");
+                    }
+                    /* Ensure we free local resources before returning */
+                    if (uri) { flb_sds_destroy(uri); }
+                    if (token) { flb_sds_destroy(token); }
+                    flb_upstream_conn_release(u_conn);
+                    return ret;
+                }
             }



/* Function to generate a random alphanumeric string */
void generate_random_string(char *str, size_t length)
{
Expand Down Expand Up @@ -658,4 +803,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
}

return ret;
}
}
9 changes: 8 additions & 1 deletion plugins/out_azure_kusto/azure_kusto_ingest.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
#include "azure_kusto.h"
#include "azure_kusto_store.h"

/* Kusto Streaming Ingestion Limit (per Azure documentation) */
#define KUSTO_STREAMING_MAX_UNCOMPRESSED_SIZE (4 * 1024 * 1024) /* 4 MB */

int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file);

#endif
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size,
size_t uncompressed_size);

#endif
Loading