Skip to content

Commit 35e2b51

Browse files
committed
Add OpenSearch Dashboards IT
Signed-off-by: Peng Huo <[email protected]>
1 parent eff717a commit 35e2b51

File tree

7 files changed

+294
-14
lines changed

7 files changed

+294
-14
lines changed

Diff for: flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala

+22-14
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ object FlintDataType {
3232

3333
val METADATA_ALIAS_PATH_NAME = "aliasPath"
3434

35+
val UNSUPPORTED_OPENSEARCH_FIELD_TYPE = Set("geo_point")
36+
3537
/**
3638
* parse Flint metadata and extract properties to StructType.
3739
*/
@@ -48,25 +50,24 @@ object FlintDataType {
4850
}
4951
}
5052

51-
val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) =>
52-
deserializeField(fieldName, fieldProperties)
53-
}.toSeq
53+
val fields: Seq[StructField] = normalProps
54+
.filter { case (_, fp) => isSupported(fp) }
55+
.map { case (fieldName, fieldProperties) =>
56+
deserializeField(fieldName, fieldProperties)
57+
}
58+
.toSeq
5459

5560
val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap
5661

57-
val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) =>
62+
// Process alias fields: only include alias fields if the referenced field exists.
63+
val aliasFields: Seq[StructField] = aliasProps.flatMap { case (fieldName, fieldProperties) =>
5864
val aliasPath = (fieldProperties \ "path").extract[String]
59-
if (!normalFieldMap.contains(aliasPath)) {
60-
throw new IllegalStateException(
61-
s"Alias field [$fieldName] references undefined field [$aliasPath]")
65+
normalFieldMap.get(aliasPath).map { referencedField =>
66+
val metadataBuilder = new MetadataBuilder()
67+
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
68+
DataTypes
69+
.createStructField(fieldName, referencedField.dataType, true, metadataBuilder.build())
6270
}
63-
val metadataBuilder = new MetadataBuilder()
64-
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
65-
DataTypes.createStructField(
66-
fieldName,
67-
normalFieldMap(aliasPath).dataType,
68-
true,
69-
metadataBuilder.build())
7071
}.toSeq
7172

7273
StructType(fields ++ aliasFields)
@@ -112,6 +113,13 @@ object FlintDataType {
112113
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
113114
}
114115

116+
def isSupported(fieldProperties: JValue): Boolean = {
117+
(fieldProperties \ "type") match {
118+
case JString(fieldType) => !UNSUPPORTED_OPENSEARCH_FIELD_TYPE.contains(fieldType)
119+
case _ => true
120+
}
121+
}
122+
115123
/**
116124
* parse format in flint metadata
117125
* @return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2025-01-27T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}
2+
{"FlightNum":"X98CCZO","DestCountry":"IT","OriginWeather":"Clear","OriginCityName":"Cape Town","AvgTicketPrice":882.9826615595518,"DistanceMiles":5482.606664853586,"FlightDelay":false,"DestWeather":"Sunny","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"ZA","dayOfWeek":0,"DistanceKilometers":8823.40014044213,"timestamp":"2025-01-27T18:27:00","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":464.3894810759016,"Origin":"Cape Town International Airport","OriginLocation":{"lat":"-33.96480179","lon":"18.60169983"},"DestRegion":"IT-34","OriginAirportID":"CPT","OriginRegion":"SE-BD","DestCityName":"Venice","FlightTimeHour":7.73982468459836,"FlightDelayMin":0}
3+
{"FlightNum":"UFK2WIZ","DestCountry":"IT","OriginWeather":"Rain","OriginCityName":"Venice","AvgTicketPrice":190.6369038508356,"DistanceMiles":0,"FlightDelay":false,"DestWeather":"Cloudy","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":0,"timestamp":"2025-01-27T17:11:14","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":0,"Origin":"Venice Marco Polo Airport","OriginLocation":{"lat":"45.505299","lon":"12.3519"},"DestRegion":"IT-34","OriginAirportID":"VE05","OriginRegion":"IT-34","DestCityName":"Venice","FlightTimeHour":0,"FlightDelayMin":0}
4+
{"FlightNum":"EAYQW69","DestCountry":"IT","OriginWeather":"Thunder & Lightning","OriginCityName":"Naples","AvgTicketPrice":181.69421554118,"DistanceMiles":345.31943877289535,"FlightDelay":true,"DestWeather":"Clear","Dest":"Treviso-Sant'Angelo Airport","FlightDelayType":"Weather Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":555.7377668725265,"timestamp":"2025-01-27T10:33:28","DestLocation":{"lat":"45.648399","lon":"12.1944"},"DestAirportID":"TV01","Carrier":"OpenSearch Dashboards Airlines","Cancelled":true,"FlightTimeMin":222.74905899019436,"Origin":"Naples International Airport","OriginLocation":{"lat":"40.886002","lon":"14.2908"},"DestRegion":"IT-34","OriginAirportID":"NA01","OriginRegion":"IT-72","DestCityName":"Treviso","FlightTimeHour":3.712484316503239,"FlightDelayMin":180}
5+
{"FlightNum":"58U013N","DestCountry":"CN","OriginWeather":"Damaging Wind","OriginCityName":"Mexico City","AvgTicketPrice":730.041778346198,"DistanceMiles":8300.428124665925,"FlightDelay":false,"DestWeather":"Clear","Dest":"Xi'an Xianyang International Airport","FlightDelayType":"No Delay","OriginCountry":"MX","dayOfWeek":0,"DistanceKilometers":13358.24419986236,"timestamp":"2025-01-27T05:13:00","DestLocation":{"lat":"34.447102","lon":"108.751999"},"DestAirportID":"XIY","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":785.7790705801389,"Origin":"Licenciado Benito Juarez International Airport","OriginLocation":{"lat":"19.4363","lon":"-99.072098"},"DestRegion":"SE-BD","OriginAirportID":"AICM","OriginRegion":"MX-DIF","DestCityName":"Xi'an","FlightTimeHour":13.096317843002314,"FlightDelayMin":0}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
{
2+
"properties": {
3+
"AvgTicketPrice": {
4+
"type": "float"
5+
},
6+
"Cancelled": {
7+
"type": "boolean"
8+
},
9+
"Carrier": {
10+
"type": "keyword"
11+
},
12+
"Dest": {
13+
"type": "keyword"
14+
},
15+
"DestAirportID": {
16+
"type": "keyword"
17+
},
18+
"DestCityName": {
19+
"type": "keyword"
20+
},
21+
"DestCountry": {
22+
"type": "keyword"
23+
},
24+
"DestLocation": {
25+
"type": "geo_point"
26+
},
27+
"DestRegion": {
28+
"type": "keyword"
29+
},
30+
"DestWeather": {
31+
"type": "keyword"
32+
},
33+
"DistanceKilometers": {
34+
"type": "float"
35+
},
36+
"DistanceMiles": {
37+
"type": "float"
38+
},
39+
"FlightDelay": {
40+
"type": "boolean"
41+
},
42+
"FlightDelayMin": {
43+
"type": "integer"
44+
},
45+
"FlightDelayType": {
46+
"type": "keyword"
47+
},
48+
"FlightNum": {
49+
"type": "keyword"
50+
},
51+
"FlightTimeHour": {
52+
"type": "keyword"
53+
},
54+
"FlightTimeMin": {
55+
"type": "float"
56+
},
57+
"Origin": {
58+
"type": "keyword"
59+
},
60+
"OriginAirportID": {
61+
"type": "keyword"
62+
},
63+
"OriginCityName": {
64+
"type": "keyword"
65+
},
66+
"OriginCountry": {
67+
"type": "keyword"
68+
},
69+
"OriginLocation": {
70+
"type": "geo_point"
71+
},
72+
"OriginRegion": {
73+
"type": "keyword"
74+
},
75+
"OriginWeather": {
76+
"type": "keyword"
77+
},
78+
"dayOfWeek": {
79+
"type": "integer"
80+
},
81+
"timestamp": {
82+
"type": "date"
83+
}
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
{
2+
"tests": [
3+
{ "index": "flights" },
4+
{
5+
"do": {
6+
"sql": {
7+
"query": "SELECT COUNT(*) as count FROM dev.default.flights WHERE FlightDelay = true"
8+
},
9+
"match": [ "[1]" ]
10+
}
11+
},
12+
{
13+
"do": {
14+
"sql": {
15+
"query": "SELECT FlightDelay, COUNT(*) as count FROM dev.default.flights GROUP BY FlightDelay"
16+
},
17+
"match": [ "[false,4]", "[true,1]" ]
18+
}
19+
},
20+
{
21+
"do": {
22+
"sql": {
23+
"query": "SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev.default.flights GROUP BY bucket ORDER BY bucket"
24+
},
25+
"match": [ "[0,4]", "[180,1]" ]
26+
}
27+
},
28+
{
29+
"do": {
30+
"sql": {
31+
"query": "SELECT carrier, COUNT(*) as count FROM dev.default.flights GROUP BY Carrier ORDER BY count DESC"
32+
},
33+
"match": [ "[OpenSearch Dashboards Airlines,3]", "[Logstash Airways,2]" ]
34+
}
35+
},
36+
{
37+
"do": {
38+
"sql": {
39+
"query": "SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'),MAX(AvgTicketPrice) AS avg_ticket_price FROM dev.default.flights GROUP BY window(to_utc_timestamp(timestamp, 'UTC'),'1 day') ORDER BY window.start"
40+
},
41+
"match": [ "[2025-01-27 00:00:00,882.98267]" ]
42+
}
43+
},
44+
{
45+
"do": {
46+
"sql": {
47+
"query": "SELECT OriginCountry, DestCountry, cnt
48+
FROM (
49+
SELECT
50+
OriginCountry,
51+
DestCountry,
52+
cnt,
53+
ROW_NUMBER() OVER (PARTITION BY OriginCountry ORDER BY cnt DESC) AS dest_rn
54+
FROM (
55+
SELECT
56+
OriginCountry,
57+
DestCountry,
58+
COUNT(*) AS cnt
59+
FROM your_table
60+
WHERE OriginCountry IN (
61+
SELECT OriginCountry
62+
FROM (
63+
SELECT OriginCountry, COUNT(*) AS cnt
64+
FROM your_table
65+
GROUP BY OriginCountry
66+
ORDER BY cnt DESC
67+
LIMIT 5
68+
) AS top_origins
69+
)
70+
GROUP BY OriginCountry, DestCountry
71+
) AS grouped
72+
) AS ranked
73+
WHERE dest_rn <= 5
74+
ORDER BY cnt DESC;"
75+
},
76+
"match": [ "[2025-01-27 00:00:00,No Delay,2]", "[2025-01-27 08:00:00,Weather Delay,1]", "[2025-01-27 16:00:00,No Delay,2]" ]
77+
}
78+
}
79+
]
80+
}

Diff for: integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala

+1
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ trait OpenSearchCatalogSuite extends FlintSparkSuite {
2121
spark.conf.set(
2222
s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy",
2323
"wait_for")
24+
spark.conf.set("spark.sql.session.timeZone", "UTC")
2425
}
2526
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.apache.spark.opensearch.table
7+
8+
import org.opensearch.flint.spark.ppl.FlintPPLSuite
9+
10+
import org.apache.spark.sql.Row
11+
12+
class OpenSearchDashboardITSuite extends OpenSearchCatalogSuite with FlintPPLSuite {
13+
test("test dashboards queries") {
14+
Seq(dashboards_sample_data_flights()).foreach { config =>
15+
withIndexName(config.index) {
16+
openSearchDashboardsIndex(config.useCaseName, config.index)
17+
config.tests.foreach { sqlTest =>
18+
sqlTest.queries.foreach { query =>
19+
val df = spark.sql(query)
20+
withClue(s"Failed query: ${query}\n") {
21+
checkAnswer(df, sqlTest.expected)
22+
}
23+
}
24+
}
25+
}
26+
}
27+
}
28+
29+
case class QueryTest(queries: Seq[String], expected: Seq[Row])
30+
case class TestConfig(useCaseName: String, index: String, tests: Seq[QueryTest])
31+
32+
def dashboards_sample_data_flights(): TestConfig = {
33+
val tbl = "flights"
34+
TestConfig(
35+
"dashboards_sample_data_flights",
36+
tbl,
37+
Seq(
38+
// Airline Carrier
39+
QueryTest(
40+
Seq(
41+
s"""SELECT carrier, COUNT(*) as count
42+
FROM dev.default.$tbl
43+
GROUP BY Carrier ORDER BY count DESC""",
44+
s"""source=dev.default.$tbl
45+
|| stats count() as count by Carrier
46+
|| sort - count
47+
|| fields Carrier, count""".stripMargin),
48+
Seq(Row("OpenSearch Dashboards Airlines", 3), Row("Logstash Airways", 2))),
49+
// Average ticket price
50+
QueryTest(
51+
Seq(
52+
s"""SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'), MAX(AvgTicketPrice) AS
53+
|avg_ticket_price
54+
| FROM dev.default.$tbl
55+
| GROUP BY window(timestamp,'1 day') ORDER BY window.start
56+
|""".stripMargin,
57+
s"""source=dev.default.$tbl
58+
|| stats max(AvgTicketPrice) as avg by span(timestamp, 1d) as window
59+
|| sort window.start
60+
|| eval start = date_format(window.start, 'yyyy-MM-dd HH:mm:ss')
61+
|| fields start, avg""".stripMargin),
62+
Seq(Row("2025-01-27 00:00:00", 882.98267f))),
63+
// Total Flight Delays
64+
QueryTest(
65+
Seq(
66+
s"SELECT COUNT(*) as count FROM dev.default.$tbl WHERE FlightDelay = true",
67+
s"""source=dev.default.$tbl | where FlightDelay=True | stats count()"""),
68+
Seq(Row(1))),
69+
// Flight Delays
70+
QueryTest(
71+
Seq(
72+
s"""SELECT FlightDelay, COUNT(*) as count FROM dev.default.$tbl GROUP BY FlightDelay""",
73+
s"""source=dev.default.$tbl
74+
|| stats count() as cnt by FlightDelay
75+
|| fields FlightDelay, cnt""".stripMargin),
76+
Seq(Row(false, 4), Row(true, 1))),
77+
// Delay Buckets
78+
QueryTest(
79+
Seq(
80+
s"SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev" +
81+
s".default.$tbl GROUP BY bucket ORDER BY bucket",
82+
s"""source=dev.default.$tbl
83+
|| eval bucket=FLOOR(FlightDelayMin / 30) * 30
84+
|| stats count() as cnt by bucket
85+
|| sort bucket
86+
|| fields bucket, cnt""".stripMargin),
87+
Seq(Row(0, 4), Row(180, 1)))))
88+
}
89+
}

Diff for: integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.flint
77

8+
import scala.io.Source
9+
810
import org.apache.http.HttpHost
911
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
1012
import org.opensearch.action.bulk.BulkRequest
@@ -123,6 +125,16 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
123125
index(indexName, oneNodeSetting, mappings, docs)
124126
}
125127

128+
def openSearchDashboardsIndex(useCaseName: String, indexName: String): Unit = {
129+
val mappings =
130+
Source
131+
.fromResource(s"opensearch/${useCaseName}_mappings.json")
132+
.mkString
133+
val docs: Seq[String] =
134+
Source.fromResource(s"opensearch/${useCaseName}.json").getLines().toSeq
135+
index(indexName, oneNodeSetting, mappings, docs)
136+
}
137+
126138
def indexWithAlias(indexName: String): Unit = {
127139
val mappings = """{
128140
| "properties": {

0 commit comments

Comments
 (0)