From 9d56e0a669fe80d13d4ab345f7c303d9148a51a8 Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Tue, 14 Oct 2025 16:31:06 +0100 Subject: [PATCH 1/3] expanding on routes example Signed-off-by: Anton Rubin --- _data-prepper/pipelines/pipelines.md | 80 +++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 13 deletions(-) diff --git a/_data-prepper/pipelines/pipelines.md b/_data-prepper/pipelines/pipelines.md index 98e344ff602..9f04611b787 100644 --- a/_data-prepper/pipelines/pipelines.md +++ b/_data-prepper/pipelines/pipelines.md @@ -65,31 +65,85 @@ If a pipeline component fails to process and send an event, then the source rece Pipelines also support conditional routing, which enables the routing of events to different sinks based on specific conditions. To add conditional routing, specify a list of named routes using the `route` component and assign specific routes to sinks using the `routes` property. Any sink with the `routes` property only accepts events matching at least one of the routing conditions. -In the following example pipeline, `application-logs` is a named route with a condition set to `/log_type == "application"`. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Data Prepper routes events satisfying this condition to the first OpenSearch sink. By default, Data Prepper routes all events to sinks without a defined route, as shown in the third OpenSearch sink of the given pipeline: +In the following pipeline, routes are defined at the pipeline level under route. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Two named routes are declared: + +- `errors: /level == "ERROR"` + +- `slow_requests: /latency_ms != null and /latency_ms >= 1000` + +Each OpenSearch sink can opt in to one or more routes using the `routes:` setting. Events that satisfy a route’s condition are delivered to the sinks that reference that route, for example, the first sink receives events matching errors, and the second sink receives events matching slow_requests. + +By default, any sink without a `routes:` list receives all events, regardless of whether they matched other routes. In this example, the third sink has no `routes:` setting, so it acts as a catch-all and receives every event, including those already routed to the first two sinks. ```yml -conditional-routing-sample-pipeline: +routes-demo-pipeline: source: http: - processor: + path: /logs + ssl: false + + # Define routes at the pipeline level (not in processor) route: - - application-logs: '/log_type == "application"' - - http-logs: '/log_type == "apache"' + - errors: '/level == "ERROR"' + - slow_requests: '/latency_ms != null and /latency_ms >= 1000' + # (no catch-all route name; any sink without `routes:` becomes the default) + sink: + # 1) Only events matching the "errors" route - opensearch: - hosts: [ "https://opensearch:9200" ] - index: application_logs - routes: [application-logs] + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "routed-errors-%{yyyy.MM.dd}" + routes: [errors] + + # 2) Only events matching the "slow_requests" route - opensearch: - hosts: [ "https://opensearch:9200" ] - index: http_logs - routes: [http-logs] + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "routed-slow-%{yyyy.MM.dd}" + routes: [slow_requests] + + # 3) All events - opensearch: - hosts: [ "https://opensearch:9200" ] - index: all_logs + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "routed-other-%{yyyy.MM.dd}" ``` {% include copy.html %} +You can test this pipeline using the following command: + +```bash +curl -sS -X POST "http://localhost:2021/logs" \ + -H "Content-Type: application/json" \ + -d '[ + {"level":"ERROR","message":"DB connection failed","latency_ms":120}, + {"level":"INFO","message":"GET /api/items","latency_ms":1500}, + {"level":"INFO","message":"health check ok","latency_ms":42} + ]' +``` +{% include copy.html %} + +The documents are indexed into the corresponding index: + +``` +health status index uuid pri rep docs.count docs.deleted store.size pri.store.size +... +green open routed-other-2025.10.14 IBZTXO3ySBGky0tIHRaRmg 1 1 3 0 5.4kb 5.4kb +green open routed-slow-2025.10.14 J-hzZ9m8RkWvpMKC_oQLVQ 1 1 1 0 5kb 5kb +green open routed-errors-2025.10.14 v3r7JzPfQVOS8dWOBF1o2w 1 1 1 0 5kb 5kb +... +``` + ## Next steps - See [Common uses cases]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/common-use-cases/) for example configurations. From 9b050ca725f1910f7ecaefcbffa2c095bba68762 Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Tue, 14 Oct 2025 16:34:11 +0100 Subject: [PATCH 2/3] expanding on routes example Signed-off-by: Anton Rubin --- _data-prepper/pipelines/pipelines.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/_data-prepper/pipelines/pipelines.md b/_data-prepper/pipelines/pipelines.md index 9f04611b787..6b866e84942 100644 --- a/_data-prepper/pipelines/pipelines.md +++ b/_data-prepper/pipelines/pipelines.md @@ -82,11 +82,9 @@ routes-demo-pipeline: path: /logs ssl: false - # Define routes at the pipeline level (not in processor) route: - errors: '/level == "ERROR"' - - slow_requests: '/latency_ms != null and /latency_ms >= 1000' - # (no catch-all route name; any sink without `routes:` becomes the default) + - slow_requests: '/latency_ms != null and /latency_ms >= 1000' sink: # 1) Only events matching the "errors" route From bd3a95e5672dc35da4c6d9352fb2a16142540e98 Mon Sep 17 00:00:00 2001 From: AntonEliatra Date: Wed, 15 Oct 2025 10:28:22 +0100 Subject: [PATCH 3/3] Update pipelines.md Signed-off-by: AntonEliatra --- _data-prepper/pipelines/pipelines.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_data-prepper/pipelines/pipelines.md b/_data-prepper/pipelines/pipelines.md index 6b866e84942..9403cd08dc1 100644 --- a/_data-prepper/pipelines/pipelines.md +++ b/_data-prepper/pipelines/pipelines.md @@ -73,7 +73,7 @@ In the following pipeline, routes are defined at the pipeline level under route. Each OpenSearch sink can opt in to one or more routes using the `routes:` setting. Events that satisfy a route’s condition are delivered to the sinks that reference that route, for example, the first sink receives events matching errors, and the second sink receives events matching slow_requests. -By default, any sink without a `routes:` list receives all events, regardless of whether they matched other routes. In this example, the third sink has no `routes:` setting, so it acts as a catch-all and receives every event, including those already routed to the first two sinks. +By default, any sink without a `routes:` list receives all events, regardless of whether they matched other routes. In the following example, the third sink has no `routes:` setting, so it acts as a catch-all and receives every event, including those already routed to the first two sinks: ```yml routes-demo-pipeline: @@ -131,7 +131,7 @@ curl -sS -X POST "http://localhost:2021/logs" \ ``` {% include copy.html %} -The documents are indexed into the corresponding index: +The documents are stored in the corresponding indexes: ``` health status index uuid pri rep docs.count docs.deleted store.size pri.store.size