Skip to content

Commit 5706e9d

Browse files
modified endpoint
1 parent 10c0a98 commit 5706e9d

File tree

3 files changed

+65
-5
lines changed

3 files changed

+65
-5
lines changed

plugins/out_azure_kusto/azure_kusto.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
943943
return -1;
944944
}
945945

946+
/*
947+
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
948+
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
949+
*/
950+
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
951+
flb_sds_t cluster_endpoint = NULL;
952+
953+
/* Check if ingestion endpoint contains "ingest-" prefix */
954+
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
955+
/* Create cluster endpoint by removing "ingest-" prefix */
956+
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
957+
if (!cluster_endpoint) {
958+
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
959+
return -1;
960+
}
961+
962+
/* Replace "ingest-" with empty string to get cluster endpoint */
963+
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
964+
if (ingest_pos) {
965+
/* Move the rest of the string to remove "ingest-" */
966+
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
967+
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
968+
}
969+
970+
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
971+
972+
/* Create upstream connection to cluster endpoint */
973+
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
974+
if (!ctx->u_cluster) {
975+
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
976+
flb_sds_destroy(cluster_endpoint);
977+
return -1;
978+
}
979+
980+
flb_sds_destroy(cluster_endpoint);
981+
} else {
982+
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
983+
/* Use ingestion endpoint directly as cluster endpoint */
984+
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
985+
if (!ctx->u_cluster) {
986+
flb_plg_error(ctx->ins, "cluster upstream creation failed");
987+
return -1;
988+
}
989+
}
990+
991+
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
992+
}
993+
946994
flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
947995

948996
/* Create oauth2 context */
@@ -1529,6 +1577,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
15291577
ctx->u = NULL;
15301578
}
15311579

1580+
if (ctx->u_cluster) {
1581+
flb_upstream_destroy(ctx->u_cluster);
1582+
ctx->u_cluster = NULL;
1583+
}
1584+
15321585
pthread_mutex_destroy(&ctx->resources_mutex);
15331586
pthread_mutex_destroy(&ctx->token_mutex);
15341587
pthread_mutex_destroy(&ctx->blob_mutex);

plugins/out_azure_kusto/azure_kusto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ struct flb_azure_kusto {
170170
/* Upstream connection to the backend server */
171171
struct flb_upstream *u;
172172

173+
/* Upstream connection to the main Kusto cluster for streaming ingestion */
174+
struct flb_upstream *u_cluster;
175+
173176
struct flb_upstream *imds_upstream;
174177

175178
/* Fluent Bit context */

plugins/out_azure_kusto/azure_kusto_ingest.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,16 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
544544
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
545545
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp);
546546

547-
/* Get upstream connection to the main Kusto engine (not ingestion endpoint) */
548-
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to endpoint: %s",
549-
ctx->ingestion_endpoint ? ctx->ingestion_endpoint : "NULL");
550-
u_conn = flb_upstream_conn_get(ctx->u);
547+
/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
548+
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint");
549+
if (!ctx->u_cluster) {
550+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Cluster upstream not available - streaming ingestion requires cluster endpoint");
551+
return -1;
552+
}
553+
554+
u_conn = flb_upstream_conn_get(ctx->u_cluster);
551555
if (!u_conn) {
552-
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to get upstream connection");
556+
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to get cluster upstream connection");
553557
return -1;
554558
}
555559
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection");

0 commit comments

Comments
 (0)