Skip to content

Commit d138a91

Browse files
Expanding conditional routing example (#11237)
* expanding on routes example Signed-off-by: Anton Rubin <[email protected]> * expanding on routes example Signed-off-by: Anton Rubin <[email protected]> * Update pipelines.md Signed-off-by: AntonEliatra <[email protected]> * Update pipelines.md Signed-off-by: AntonEliatra <[email protected]> * Apply suggestions from code review Signed-off-by: kolchfa-aws <[email protected]> --------- Signed-off-by: Anton Rubin <[email protected]> Signed-off-by: AntonEliatra <[email protected]> Signed-off-by: kolchfa-aws <[email protected]> Co-authored-by: kolchfa-aws <[email protected]>
1 parent f7b3296 commit d138a91

File tree

1 file changed

+65
-13
lines changed

1 file changed

+65
-13
lines changed

_data-prepper/pipelines/pipelines.md

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,31 +65,83 @@ If a pipeline component fails to process and send an event, then the source rece
6565

6666
Pipelines also support conditional routing, which enables the routing of events to different sinks based on specific conditions. To add conditional routing, specify a list of named routes using the `route` component and assign specific routes to sinks using the `routes` property. Any sink with the `routes` property only accepts events matching at least one of the routing conditions.
6767

68-
In the following example pipeline, `application-logs` is a named route with a condition set to `/log_type == "application"`. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Data Prepper routes events satisfying this condition to the first OpenSearch sink. By default, Data Prepper routes all events to sinks without a defined route, as shown in the third OpenSearch sink of the given pipeline:
68+
In the following pipeline, routes are defined at the pipeline level under `route`. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Two named routes are declared:
69+
70+
- `errors: /level == "ERROR"`
71+
72+
- `slow_requests: /latency_ms != null and /latency_ms >= 1000`
73+
74+
Each OpenSearch sink can opt in to one or more routes using the `routes:` setting. Events that satisfy a route's condition are delivered to the sinks that reference that route. For example, the first sink receives events matching `errors`, and the second sink receives events matching `slow_requests`.
75+
76+
By default, any sink without a `routes:` list receives all events, regardless of whether they matched other routes. In the following example, the third sink has no `routes:` setting, so it receives all events, including those already routed to the first two sinks:
6977

7078
```yml
71-
conditional-routing-sample-pipeline:
79+
routes-demo-pipeline:
7280
source:
7381
http:
74-
processor:
82+
path: /logs
83+
ssl: false
84+
7585
route:
76-
- application-logs: '/log_type == "application"'
77-
- http-logs: '/log_type == "apache"'
86+
- errors: '/level == "ERROR"'
87+
- slow_requests: '/latency_ms != null and /latency_ms >= 1000'
88+
7889
sink:
90+
# 1) Only events matching the "errors" route
7991
- opensearch:
80-
hosts: [ "https://opensearch:9200" ]
81-
index: application_logs
82-
routes: [application-logs]
92+
hosts: ["https://opensearch:9200"]
93+
insecure: true
94+
username: admin
95+
password: admin_pass
96+
index_type: custom
97+
index: routed-errors-%{yyyy.MM.dd}
98+
routes: [errors]
99+
100+
# 2) Only events matching the "slow_requests" route
83101
- opensearch:
84-
hosts: [ "https://opensearch:9200" ]
85-
index: http_logs
86-
routes: [http-logs]
102+
hosts: ["https://opensearch:9200"]
103+
insecure: true
104+
username: admin
105+
password: admin_pass
106+
index_type: custom
107+
index: routed-slow-%{yyyy.MM.dd}
108+
routes: [slow_requests]
109+
110+
# 3) All events
87111
- opensearch:
88-
hosts: [ "https://opensearch:9200" ]
89-
index: all_logs
112+
hosts: ["https://opensearch:9200"]
113+
insecure: true
114+
username: admin
115+
password: admin_pass
116+
index_type: custom
117+
index: routed-other-%{yyyy.MM.dd}
90118
```
91119
{% include copy.html %}
92120

121+
You can test this pipeline using the following command:
122+
123+
```bash
124+
curl -sS -X POST "http://localhost:2021/logs" \
125+
-H "Content-Type: application/json" \
126+
-d '[
127+
{"level":"ERROR","message":"DB connection failed","latency_ms":120},
128+
{"level":"INFO","message":"GET /api/items","latency_ms":1500},
129+
{"level":"INFO","message":"health check ok","latency_ms":42}
130+
]'
131+
```
132+
{% include copy.html %}
133+
134+
The documents are stored in the corresponding indexes:
135+
136+
```
137+
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
138+
...
139+
green open routed-other-2025.10.14 IBZTXO3ySBGky0tIHRaRmg 1 1 3 0 5.4kb 5.4kb
140+
green open routed-slow-2025.10.14 J-hzZ9m8RkWvpMKC_oQLVQ 1 1 1 0 5kb 5kb
141+
green open routed-errors-2025.10.14 v3r7JzPfQVOS8dWOBF1o2w 1 1 1 0 5kb 5kb
142+
...
143+
```
144+
93145
## Next steps
94146

95147
- See [Common uses cases]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/common-use-cases/) for example configurations.

0 commit comments

Comments
 (0)