Skip to content

Commit 71d67a0

Browse files
authored
Implement FlintJob Logic for EMR-S (#52)
* Implement FlintJob Logic for EMR-S This commit introduces FlintJob logic for EMR-S, mirroring the existing SQLJob implementation for EMR cluster. The key differences in FlintJob are: 1. It reads OpenSearch host information from spark command parameters. 2. It ensures the existence of a result index with the correct mapping in OpenSearch, creating it if necessary. This process occurs in parallel to SQL query execution. 3. It reports an error if the result index mapping is incorrect. 4. It saves a failure status if the SQL execution fails. Testing: 1. Manual testing was conducted using the EMR-S CLI. 2. New unit tests were added to verify the functionality. Signed-off-by: Kaituo Li <[email protected]> * address comments Signed-off-by: Kaituo Li <[email protected]> --------- Signed-off-by: Kaituo Li <[email protected]>
1 parent 6d87b81 commit 71d67a0

File tree

5 files changed

+547
-7
lines changed

5 files changed

+547
-7
lines changed

.scalafmt.conf

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
version = 2.7.5

build.sbt

+27-2
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,37 @@ lazy val standaloneCosmetic = project
165165
Compile / packageBin := (flintSparkIntegration / assembly).value)
166166

167167
lazy val sparkSqlApplication = (project in file("spark-sql-application"))
168+
// dependency will be provided at runtime, so it doesn't need to be included in the assembled JAR
169+
.dependsOn(flintSparkIntegration % "provided")
168170
.settings(
169171
commonSettings,
170172
name := "sql-job",
171173
scalaVersion := scala212,
172-
libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"),
173-
libraryDependencies ++= deps(sparkVersion))
174+
libraryDependencies ++= Seq(
175+
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
176+
libraryDependencies ++= deps(sparkVersion),
177+
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
178+
// Assembly settings
179+
// the sbt assembly plugin found multiple copies of the module-info.class file with
180+
// different contents in the jars that it was merging flintCore dependencies.
181+
// This can happen if you have multiple dependencies that include the same library,
182+
// but with different versions.
183+
assemblyPackageScala / assembleArtifact := false,
184+
assembly / assemblyOption ~= {
185+
_.withIncludeScala(false)
186+
},
187+
assembly / assemblyMergeStrategy := {
188+
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
189+
MergeStrategy.discard
190+
case PathList("module-info.class") => MergeStrategy.discard
191+
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
192+
MergeStrategy.discard
193+
case x =>
194+
val oldStrategy = (assembly / assemblyMergeStrategy).value
195+
oldStrategy(x)
196+
},
197+
assembly / test := (Test / test).value
198+
)
174199

175200
lazy val sparkSqlApplicationCosmetic = project
176201
.settings(

spark-sql-application/README.md

+81-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
# Spark SQL Application
22

3-
This application execute sql query and store the result in OpenSearch index in following format
3+
We have two applications: SQLJob and FlintJob.
4+
5+
SQLJob is designed for EMR Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:
46
```
57
"stepId":"<emr-step-id>",
6-
"applicationId":"<spark-application-id>"
8+
"applicationId":"<spark-application-id>",
79
"schema": "json blob",
810
"result": "json blob"
911
```
1012

13+
FlintJob is designed for EMR Serverless Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:
14+
15+
```
16+
"jobRunId":"<emrs-job-id>",
17+
"applicationId":"<spark-application-id>",
18+
"schema": "json blob",
19+
"result": "json blob",
20+
"dataSourceName":"<opensearch-data-source-name>"
21+
```
22+
1123
## Prerequisites
1224

1325
+ Spark 3.3.1
@@ -16,8 +28,9 @@ This application execute sql query and store the result in OpenSearch index in f
1628

1729
## Usage
1830

19-
To use this application, you can run Spark with Flint extension:
31+
To use these applications, you can run Spark with Flint extension:
2032

33+
SQLJob
2134
```
2235
./bin/spark-submit \
2336
--class org.opensearch.sql.SQLJob \
@@ -32,11 +45,41 @@ To use this application, you can run Spark with Flint extension:
3245
<opensearch-region> \
3346
```
3447

48+
FlintJob
49+
```
50+
aws emr-serverless start-job-run \
51+
--region <region-name> \
52+
--application-id <application-id> \
53+
--execution-role-arn <execution-role> \
54+
--job-driver '{"sparkSubmit": {"entryPoint": "<flint-job-s3-path>", \
55+
"entryPointArguments":["'<sql-query>'", "<result-index>", "<data-source-name>"], \
56+
"sparkSubmitParameters":"--class org.opensearch.sql.FlintJob \
57+
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
58+
--conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
59+
--conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
60+
--conf spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory \
61+
--conf spark.hive.metastore.glue.role.arn=<role-to-access-s3-and-opensearch> \
62+
--conf spark.jars=<path-to-AWSGlueDataCatalogHiveMetaStoreAuth-jar> \
63+
--conf spark.jars.packages=<flint-spark-integration-jar-name> \
64+
--conf spark.jars.repositories=<path-to-download_spark-integration-jar> \
65+
--conf spark.emr-serverless.driverEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
66+
--conf spark.executorEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
67+
--conf spark.datasource.flint.host=<opensearch-url> \
68+
--conf spark.datasource.flint.port=<opensearch-port> \
69+
--conf spark.datasource.flint.scheme=<http-or-https> \
70+
--conf spark.datasource.flint.auth=<auth-type> \
71+
--conf spark.datasource.flint.region=<region-name> \
72+
--conf spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
73+
--conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions \
74+
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory "}}'
75+
<data-source-name>
76+
```
77+
3578
## Result Specifications
3679

3780
Following example shows how the result is written to OpenSearch index after query execution.
3881

39-
Let's assume sql query result is
82+
Let's assume SQL query result is
4083
```
4184
+------+------+
4285
|Letter|Number|
@@ -46,7 +89,7 @@ Let's assume sql query result is
4689
|C |3 |
4790
+------+------+
4891
```
49-
OpenSearch index document will look like
92+
For SQLJob, OpenSearch index document will look like
5093
```json
5194
{
5295
"_index" : ".query_execution_result",
@@ -68,6 +111,31 @@ OpenSearch index document will look like
68111
}
69112
```
70113

114+
For FlintJob, OpenSearch index document will look like
115+
```json
116+
{
117+
"_index" : ".query_execution_result",
118+
"_id" : "A2WOsYgBMUoqCqlDJHrn",
119+
"_score" : 1.0,
120+
"_source" : {
121+
"result" : [
122+
"{'Letter':'A','Number':1}",
123+
"{'Letter':'B','Number':2}",
124+
"{'Letter':'C','Number':3}"
125+
],
126+
"schema" : [
127+
"{'column_name':'Letter','data_type':'string'}",
128+
"{'column_name':'Number','data_type':'integer'}"
129+
],
130+
"jobRunId" : "s-JZSB1139WIVU",
131+
"applicationId" : "application_1687726870985_0003",
132+
"dataSourceName": "myS3Glue",
133+
"status": "SUCCESS",
134+
"error": ""
135+
}
136+
}
137+
```
138+
71139
## Build
72140

73141
To build and run this application with Spark, you can run:
@@ -76,6 +144,8 @@ To build and run this application with Spark, you can run:
76144
sbt clean sparkSqlApplicationCosmetic/publishM2
77145
```
78146

147+
The jar file is located at `spark-sql-application/target/scala-2.12` folder.
148+
79149
## Test
80150

81151
To run tests, you can use:
@@ -92,6 +162,12 @@ To check code with scalastyle, you can run:
92162
sbt scalastyle
93163
```
94164

165+
To check code with scalastyle, you can run:
166+
167+
```
168+
sbt testScalastyle
169+
```
170+
95171
## Code of Conduct
96172

97173
This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).

0 commit comments

Comments
 (0)