Skip to content

Commit 400fbad

Browse files
committed
update correlation related traversal
- add plan branches context traversal - add resolving of un-resolved attributes (columns) - add join spec transformer util API - add documentation about the correlation design considerations Signed-off-by: YANGDB <[email protected]>
1 parent 9b0251c commit 400fbad

16 files changed

+769
-166
lines changed

docs/PPL-Correlation-command.md

+283
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
## PPL Correlation Command
2+
3+
## Overview
4+
5+
In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility.
6+
The key idea behind our work was to enable the users to dig in their data and emerge the hidden insight within the massive corpus of logs, events and observations.
7+
8+
One fundamental concept that will help and support this process is the ability to correlate different data sources according to common dimensions and timeframe.
9+
This subject is well documented and described and this RFC will not dive into the necessity of the correlation (appendix will refer to multiple resources related) but for the structuring of the linguistic support for such capability .
10+
11+
![](https://user-images.githubusercontent.com/48943349/253685892-225e78e1-0942-46b0-8f67-97f9412a1c4c.png)
12+
13+
14+
### Problem definition
15+
16+
In the appendix I’ll add some formal references to the domain of the problem both in Observability / Security, but the main takeaway is that such capability is fundamental in the daily work of such domain experts and SRE’s.
17+
The daily encounters with huge amount of data arriving from different verticals (data-sources) which share the same time-frames but are not synchronized in a formal manner.
18+
19+
The correlation capability to intersect these different verticals according to the timeframe and the similar dimensions will enrich the data and allow the desired insight to surface.
20+
21+
**Example**
22+
Lets take the Observability domain for which we have 3 distinct data sources
23+
*- Logs*
24+
*- Metrics*
25+
*- Traces*
26+
27+
Each datasource may share many common dimensions but to be able to transition from one data-source to another its necessary to be able to correctly correlate them.
28+
According to the semantic naming conventions we know that both logs, traces and metrics
29+
30+
Lets take the following examples:
31+
32+
**Log**
33+
34+
```
35+
{
36+
"@timestamp": "2018-07-02T22:23:00.186Z",
37+
"aws": {
38+
"elb": {
39+
"backend": {
40+
"http": {
41+
"response": {
42+
"status_code": 500
43+
}
44+
},
45+
"ip": "10.0.0.1",
46+
"port": "80"
47+
},
48+
...
49+
"target_port": [
50+
"10.0.0.1:80"
51+
],
52+
"target_status_code": [
53+
"500"
54+
],
55+
"traceId": "Root=1-58337262-36d228ad5d99923122bbe354",
56+
"type": "http"
57+
}
58+
},
59+
"cloud": {
60+
"provider": "aws"
61+
},
62+
"http": {
63+
"request": {
64+
...
65+
},
66+
"communication": {
67+
"source": {
68+
"address": "192.168.131.39",
69+
"ip": "192.168.131.39",
70+
"port": 2817
71+
}
72+
},
73+
"traceId": "Root=1-58337262-36d228ad5d99923122bbe354"
74+
}
75+
```
76+
77+
This is an AWS ELB log arriving from a service residing on aws.
78+
It shows that a `backend.http.response.status_code` was 500 - which is an error.
79+
80+
This may come up as part of a monitoring process or an alert triggered by some rule. Once this is identified, the next step would be to collect as much data surrounding this event so that an investigation could be done in the most Intelligent and thorough way.
81+
82+
The most obviously step would be to create a query that brings all data related to that timeframe - but in many case this is too much of a brute force action.
83+
84+
Data may be too large to analyze and would result in spending most of the time only filtering the none-relevant data instead of actually trying to locate the root cause of the problem.
85+
86+
87+
### **Suggest Correlation command**
88+
89+
The next approach would allow to search in a much fine-grained manner and further simplify the analysis stage.
90+
91+
Lets review the known facts - we have multiple dimensions that can be used to correlate data data from other sources:
92+
93+
- **IP** - `"ip": "10.0.0.1" | "ip": "192.168.131.39"`
94+
95+
- **Port** - `"port": 2817 | ` "target_port": `"10.0.0.1:80"`
96+
97+
So assuming we have the additional traces / metrics indices available and using the fact that we know our schema structure (see appendix with relevant schema references) we can generate a query for getting all relevant data bearing these dimensions during the same timeframe.
98+
99+
Here is a snipped of the trace index document that has http information that we would like to correlate with:
100+
101+
```
102+
{
103+
"traceId": "c1d985bd02e1dbb85b444011f19a1ecc",
104+
"spanId": "55a698828fe06a42",
105+
"traceState": [],
106+
"parentSpanId": "",
107+
"name": "mysql",
108+
"kind": "CLIENT",
109+
"@timestamp": "2021-11-13T20:20:39+00:00",
110+
"events": [
111+
{
112+
"@timestamp": "2021-03-25T17:21:03+00:00",
113+
...
114+
}
115+
],
116+
"links": [
117+
{
118+
"traceId": "c1d985bd02e1dbb85b444011f19a1ecc",
119+
"spanId": "55a698828fe06a42w2",
120+
},
121+
"droppedAttributesCount": 0
122+
}
123+
],
124+
"resource": {
125+
"service@name": "database",
126+
"telemetry@sdk@name": "opentelemetry",
127+
"host@hostname": "ip-172-31-10-8.us-west-2.compute.internal"
128+
},
129+
"status": {
130+
...
131+
},
132+
"attributes": {
133+
"http": {
134+
"user_agent": {
135+
"original": "Mozilla/5.0"
136+
},
137+
"network": {
138+
...
139+
}
140+
},
141+
"request": {
142+
...
143+
}
144+
},
145+
"response": {
146+
"status_code": "200",
147+
"body": {
148+
"size": 500
149+
}
150+
},
151+
"client": {
152+
"server": {
153+
"socket": {
154+
"address": "192.168.0.1",
155+
"domain": "example.com",
156+
"port": 80
157+
},
158+
"address": "192.168.0.1",
159+
"port": 80
160+
},
161+
"resend_count": 0,
162+
"url": {
163+
"full": "http://example.com"
164+
}
165+
},
166+
"server": {
167+
"route": "/index",
168+
"address": "192.168.0.2",
169+
"port": 8080,
170+
"socket": {
171+
...
172+
},
173+
"client": {
174+
...
175+
}
176+
},
177+
"url": {
178+
...
179+
}
180+
}
181+
}
182+
}
183+
}
184+
```
185+
186+
In the above document we can see both the `traceId` and the http’s client/server `ip` that can be correlated with the elb logs to better understand the system’s behaviour and condition .
187+
188+
189+
### New Correlation Query Command
190+
191+
Here is the new command that would allow this type of investigation :
192+
193+
`source alb_logs, traces | where alb_logs.ip="10.0.0.1" AND alb_logs.cloud.provider="aws"| `
194+
`correlate exact fields(traceId, ip) scope(@timestamp, 1D) mapping(alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId ) `
195+
196+
Lets break this down a bit:
197+
198+
`1. source alb_logs, traces` allows to select all the data-sources that will be correlated to one another
199+
200+
`2. where ip="10.0.0.1" AND cloud.provider="aws"` predicate clause constraints the scope of the search corpus
201+
202+
`3. correlate exact fields(traceId, ip)` express the correlation operation on the following list of field :
203+
204+
`- ip` has an explicit filter condition so this will be propagated into the correlation condition for all the data-sources
205+
`- traceId` has no explicit filter so the correlation will only match same traceId’s from all the data-sources
206+
207+
The fields names indicate the logical meaning the function within the correlation command, the actual join condition will take the mapping statement described bellow.
208+
209+
The term `exact` means that the correlation statements will require all the fields to match in order to fulfill the query statement.
210+
211+
Other alternative for this can be `approximate` that will attempt to match on a best case scenario and will not reject rows with partially match.
212+
213+
214+
### Addressing different field mapping
215+
216+
In cases where the same logical field (such as `ip` ) may have different mapping within several data-sources, the explicit mapping field path is expected.
217+
218+
The next syntax will extend the correlation conditions to allow matching different field names with similar logical meaning
219+
`alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId `
220+
221+
It is expected that for each `field` that participates in the correlation join, there should be a relevant `mapping` statement that includes all the tables that should be joined by this correlation command.
222+
223+
**Example****:**
224+
In our case there are 2 sources : `alb_logs, traces`
225+
There are 2 fields: `traceId, ip`
226+
These are 2 mapping statements : `alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId`
227+
228+
229+
### Scoping the correlation timeframes
230+
231+
In order to simplify the work that has to be done by the execution engine (driver) the scope statement was added to explicitly direct the join query on the time it should scope for this search.
232+
233+
`scope(@timestamp, 1D)` in this example, the scope of the search should be focused on a daily basis so that correlations appearing in the same day should be grouped together. This scoping mechanism simplifies and allows better control over results and allows incremental search resolution base on the user’s needs.
234+
235+
***Diagram***
236+
These are the correlation conditions that explicitly state how the sources are going to be joined.
237+
[Image: Screenshot 2023-10-06 at 12.23.59 PM.png]* * *
238+
239+
## Supporting Drivers
240+
241+
The new correlation command is actually a ‘hidden’ join command therefore the only following PPL drivers support this command:
242+
243+
- [ppl-spark](https://github.com/opensearch-project/opensearch-spark/tree/main/ppl-spark-integration)
244+
In this driver the `correlation` command will be directly translated into the appropriate Catalyst Join logical plan
245+
246+
**Example:**
247+
*`source alb_logs, traces, metrics | where ip="10.0.0.1" AND cloud.provider="aws"| correlate exact on (ip, port) scope(@timestamp, 2018-07-02T22:23:00, 1 D)`*
248+
249+
**Logical Plan:**
250+
251+
```
252+
'Project [*]
253+
+- 'Join Inner, ('ip && 'port)
254+
:- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
255+
+- 'UnresolvedRelation [alb_logs]
256+
+- 'Join Inner, ('ip && 'port)
257+
:- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
258+
+- 'UnresolvedRelation [traces]
259+
+- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
260+
+- 'UnresolvedRelation [metrics]
261+
```
262+
263+
Catalyst engine will optimize this query according to the most efficient join ordering.
264+
265+
* * *
266+
267+
## Appendix
268+
269+
* Correlation concepts
270+
* https://github.com/opensearch-project/sql/issues/1583
271+
* https://github.com/opensearch-project/dashboards-observability/issues?q=is%3Aopen+is%3Aissue+label%3Ametrics
272+
* Observability Correlation
273+
* https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/
274+
* https://github.com/opensearch-project/dashboards-observability/wiki/Observability-Future-Vision#data-correlation
275+
* Security Correlation
276+
* [OpenSearch new correlation engine](https://opensearch.org/docs/latest/security-analytics/usage/correlation-graph/)
277+
* [ocsf](https://github.com/ocsf/)
278+
* Simple schema
279+
* [correlation use cases](https://github.com/opensearch-project/dashboards-observability/wiki/Observability-Future-Vision#data-correlation)
280+
* [correlation mapping metadata](https://github.com/opensearch-project/opensearch-catalog/tree/main/docs/schema)
281+
282+
![](https://user-images.githubusercontent.com/48943349/274153824-9c6008e0-fdaf-434f-8e5d-4347cee66ac4.png)
283+

integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala

+10-18
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
122122

123123
val index = flint.describeIndex(testIndex)
124124
index shouldBe defined
125-
val optionJson = compact(render(
126-
parse(index.get.metadata().getContent) \ "_meta" \ "options"))
125+
val optionJson = compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options"))
127126
optionJson should matchJson("""
128127
| {
129128
| "auto_refresh": "true",
@@ -321,8 +320,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
321320
|""".stripMargin)
322321

323322
query.queryExecution.executedPlan should
324-
useFlintSparkSkippingFileIndex(
325-
hasIndexFilter(col("year") === 2023))
323+
useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023))
326324
}
327325

328326
test("should not rewrite original query if filtering condition has disjunction") {
@@ -388,8 +386,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
388386
// Prepare test table
389387
val testTable = "spark_catalog.default.data_type_table"
390388
val testIndex = getSkippingIndexName(testTable)
391-
sql(
392-
s"""
389+
sql(s"""
393390
| CREATE TABLE $testTable
394391
| (
395392
| boolean_col BOOLEAN,
@@ -408,8 +405,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
408405
| )
409406
| USING PARQUET
410407
|""".stripMargin)
411-
sql(
412-
s"""
408+
sql(s"""
413409
| INSERT INTO $testTable
414410
| VALUES (
415411
| TRUE,
@@ -449,8 +445,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
449445

450446
val index = flint.describeIndex(testIndex)
451447
index shouldBe defined
452-
index.get.metadata().getContent should matchJson(
453-
s"""{
448+
index.get.metadata().getContent should matchJson(s"""{
454449
| "_meta": {
455450
| "name": "flint_spark_catalog_default_data_type_table_skipping_index",
456451
| "version": "${current()}",
@@ -587,17 +582,15 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
587582
test("can build skipping index for varchar and char and rewrite applicable query") {
588583
val testTable = "spark_catalog.default.varchar_char_table"
589584
val testIndex = getSkippingIndexName(testTable)
590-
sql(
591-
s"""
585+
sql(s"""
592586
| CREATE TABLE $testTable
593587
| (
594588
| varchar_col VARCHAR(20),
595589
| char_col CHAR(20)
596590
| )
597591
| USING PARQUET
598592
|""".stripMargin)
599-
sql(
600-
s"""
593+
sql(s"""
601594
| INSERT INTO $testTable
602595
| VALUES (
603596
| "sample varchar",
@@ -613,8 +606,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
613606
.create()
614607
flint.refreshIndex(testIndex, FULL)
615608

616-
val query = sql(
617-
s"""
609+
val query = sql(s"""
618610
| SELECT varchar_col, char_col
619611
| FROM $testTable
620612
| WHERE varchar_col = "sample varchar" AND char_col = "sample char"
@@ -624,8 +616,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
624616
val paddedChar = "sample char".padTo(20, ' ')
625617
checkAnswer(query, Row("sample varchar", paddedChar))
626618
query.queryExecution.executedPlan should
627-
useFlintSparkSkippingFileIndex(hasIndexFilter(
628-
col("varchar_col") === "sample varchar" && col("char_col") === paddedChar))
619+
useFlintSparkSkippingFileIndex(
620+
hasIndexFilter(col("varchar_col") === "sample varchar" && col("char_col") === paddedChar))
629621

630622
flint.deleteIndex(testIndex)
631623
}

0 commit comments

Comments
 (0)