Skip to content

Commit 5ba1f11

Browse files
authored
Accept multiple ingest pipelines in Filebeat (elastic#8914)
Motivated by elastic#8852 (comment). Starting with 6.5.0, Elasticsearch Ingest Pipelines have gained the ability to: - run sub-pipelines via the [`pipeline` processor](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/pipeline-processor.html), and - conditionally run processors via an [`if` field](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/ingest-processors.html). These abilities combined present the opportunity for a fileset to ingest the same _logical_ information presented in different formats, e.g. plaintext vs. json versions of the same log files. Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally delegates further processing of that log entry, depending on the format, to another pipeline. This PR allows filesets to specify one or more ingest pipelines via the `ingest_pipeline` property in their `manifest.yml`. If more than one ingest pipeline is specified, the first one is taken to be the entry point ingest pipeline. #### Example with multiple pipelines ```yaml ingest_pipeline: - pipeline-ze-boss.json - pipeline-plain.json - pipeline-json.json ``` #### Example with a single pipeline _This is just to show that the existing functionality will continue to work as-is._ ```yaml ingest_pipeline: pipeline.json ``` Now, if the root pipeline wants to delegate processing to another pipeline, it must use a `pipeline` processor to do so. This processor's `name` field will need to reference the other pipeline by its name. To ensure correct referencing, the `name` field must be specified as follows: ```json { "pipeline" : { "name": "{< IngestPipeline "pipeline-plain" >}" } } ``` This will ensure that the specified name gets correctly converted to the corresponding name in Elasticsearch, since Filebeat prefixes it's "raw" Ingest pipeline names with `filebeat-<version>-<module>-<fileset>-` when loading them into Elasticsearch.
1 parent 11a1917 commit 5ba1f11

17 files changed

+479
-61
lines changed

CHANGELOG-developer.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only.
7272
- Simplified exporting of dashboards. {pull}7730[7730]
7373
- Update Beats to use go 1.11.2 {pull}8746[8746]
7474
- Allow/Merge fields.yml overrides {pull}9188[9188]
75+
- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914]

docs/devguide/modules-dev-guide.asciidoc

+75
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,24 @@ This example selects the ingest pipeline file based on the value of the
229229
resolve to `ingest/with_plugins.json` (assuming the variable value isn't
230230
overridden at runtime.)
231231

232+
In 6.6 and later, you can specify multiple ingest pipelines.
233+
234+
[source,yaml]
235+
----
236+
ingest_pipeline:
237+
- ingest/main.json
238+
- ingest/plain_logs.json
239+
- ingest/json_logs.json
240+
----
241+
242+
When multiple ingest pipelines are specified the first one in the list is
243+
considered to be the entry point pipeline.
244+
245+
One reason for using multiple pipelines might be to send all logs harvested
246+
by this fileset to the entry point pipeline and have it delegate different parts of
247+
the processing to other pipelines. You can read details about setting
248+
this up in <<ingest-json-entry-point-pipeline, the `ingest/*.json` section>>.
249+
232250
[float]
233251
==== config/*.yml
234252

@@ -336,6 +354,63 @@ Note that you should follow the convention of naming of fields prefixed with the
336354
module and fileset name: `{module}.{fileset}.field`, e.g.
337355
`nginx.access.remote_ip`. Also, please review our <<event-conventions>>.
338356

357+
[[ingest-json-entry-point-pipeline]]
358+
In 6.6 and later, ingest pipelines can use the
359+
{ref}/conditionals-with-multiple-pipelines.html[`pipeline` processor] to delegate
360+
parts of the processings to other pipelines.
361+
362+
This can be useful if you want a fileset to ingest the same _logical_ information
363+
presented in different formats, e.g. csv vs. json versions of the same log files.
364+
Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally
365+
delegates further processing of that log entry, depending on the format, to another
366+
pipeline.
367+
368+
["source","json",subs="callouts"]
369+
----
370+
{
371+
"processors": [
372+
{
373+
"grok": {
374+
"field": "message",
375+
"patterns": [
376+
"^%{CHAR:first_char}"
377+
],
378+
"pattern_definitions": {
379+
"CHAR": "."
380+
}
381+
}
382+
},
383+
{
384+
"pipeline": {
385+
"if": "ctx.first_char == '{'",
386+
"name": "{< IngestPipeline "json-log-processing-pipeline" >}" <1>
387+
}
388+
},
389+
{
390+
"pipeline": {
391+
"if": "ctx.first_char != '{'",
392+
"name": "{< IngestPipeline "plain-log-processing-pipeline" >}"
393+
}
394+
}
395+
]
396+
}
397+
----
398+
<1> Use the `IngestPipeline` template function to resolve the name. This function converts the
399+
specified name into the fully qualified pipeline ID that is stored in Elasticsearch.
400+
401+
In order for the above pipeline to work, Filebeat must load the entry point pipeline
402+
as well as any sub-pipelines into Elasticsearch. You can tell Filebeat to do
403+
so by specifying all the necessary pipelines for the fileset in its `manifest.yml`
404+
file. The first pipeline in the list is considered to be the entry point pipeline.
405+
406+
[source,yaml]
407+
----
408+
ingest_pipeline:
409+
- ingest/main.json
410+
- ingest/plain_logs.json
411+
- ingest/json_logs.json
412+
----
413+
339414
While developing the pipeline definition, we recommend making use of the
340415
{elasticsearch}/simulate-pipeline-api.html[Simulate Pipeline API] for testing
341416
and quick iteration.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
- module: foo
2+
# Fileset with multiple pipelines
3+
multi:
4+
enabled: true
5+
6+
# Fileset with multiple pipelines with the last one being bad
7+
multibad:
8+
enabled: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
type: log
2+
paths:
3+
- /tmp
4+
exclude_files: [".gz$"]
5+
6+
fields:
7+
service.name: "foo"
8+
fields_under_root: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"processors": [
3+
{
4+
"rename": {
5+
"field": "json",
6+
"target_field": "log.meta"
7+
}
8+
}
9+
]
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"processors": [
3+
{
4+
"grok": {
5+
"field": "message",
6+
"patterns": [
7+
"^%{CHAR:first_char}"
8+
],
9+
"pattern_definitions": {
10+
"CHAR": "."
11+
}
12+
}
13+
},
14+
{
15+
"pipeline": {
16+
"if": "ctx.first_char == '{'",
17+
"name": "{< IngestPipeline "json_logs" >}"
18+
}
19+
},
20+
{
21+
"pipeline": {
22+
"if": "ctx.first_char != '{'",
23+
"name": "{< IngestPipeline "plain_logs" >}"
24+
}
25+
}
26+
]
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"processors": [
3+
{
4+
"grok": {
5+
"field": "message",
6+
"patterns": [
7+
"^%{DATA:some_data}"
8+
]
9+
}
10+
}
11+
]
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module_version: 1.0
2+
3+
ingest_pipeline:
4+
- ingest/pipeline.json
5+
- ingest/json_logs.json
6+
- ingest/plain_logs.json
7+
8+
input: config/multi.yml
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
type: log
2+
paths:
3+
- /tmp
4+
exclude_files: [".gz$"]
5+
6+
fields:
7+
service.name: "foo"
8+
fields_under_root: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"processors": [
3+
{
4+
"rename": {
5+
"field": "json",
6+
"target_field": "log.meta"
7+
}
8+
}
9+
]
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"processors": [
3+
{
4+
"grok": {
5+
"field": "message",
6+
"patterns": [
7+
"^%{CHAR:first_char}"
8+
],
9+
"pattern_definitions": {
10+
"CHAR": "."
11+
}
12+
}
13+
},
14+
{
15+
"pipeline": {
16+
"if": "ctx.first_char == '{'",
17+
"name": "{< IngestPipeline "json_logs" >}"
18+
}
19+
},
20+
{
21+
"pipeline": {
22+
"if": "ctx.first_char != '{'",
23+
"name": "{< IngestPipeline "plain_logs" >}"
24+
}
25+
}
26+
]
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"processors": [
3+
{
4+
"invalid_processor": {
5+
"field": "message",
6+
"patterns": [
7+
"^%{DATA:some_data}"
8+
]
9+
}
10+
}
11+
]
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module_version: 1.0
2+
3+
ingest_pipeline:
4+
- ingest/pipeline.json
5+
- ingest/json_logs.json
6+
- ingest/plain_logs_bad.json
7+
8+
input: config/multi.yml

0 commit comments

Comments
 (0)