Skip to content

Commit 9424a79

Browse files
authored
Improve pre-validation for Flint index refresh options (#297)
* Add index refresh validation Signed-off-by: Chen Dai <[email protected]> * Add Java doc Signed-off-by: Chen Dai <[email protected]> * Implement index refresh options validation Signed-off-by: Chen Dai <[email protected]> * Move validate to index builder and add separate suite for Hive test Signed-off-by: Chen Dai <[email protected]> * Use in-memory Derby as Hive metastore Signed-off-by: Chen Dai <[email protected]> * Fix broken IT Signed-off-by: Chen Dai <[email protected]> * Add more IT Signed-off-by: Chen Dai <[email protected]> * Rename Hive test base suite Signed-off-by: Chen Dai <[email protected]> * Polish Javadoc and comments Signed-off-by: Chen Dai <[email protected]> * Address PR comments Signed-off-by: Chen Dai <[email protected]> --------- Signed-off-by: Chen Dai <[email protected]>
1 parent b69f38b commit 9424a79

15 files changed

+508
-106
lines changed

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala

-3
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
407407
case (true, false) => AUTO
408408
case (false, false) => FULL
409409
case (false, true) => INCREMENTAL
410-
case (true, true) =>
411-
throw new IllegalArgumentException(
412-
"auto_refresh and incremental_refresh options cannot both be true")
413410
}
414411

415412
// validate allowed options depending on refresh mode

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala

+20-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package org.opensearch.flint.spark
88
import scala.collection.JavaConverters.mapAsJavaMapConverter
99

1010
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
11+
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
1112

1213
import org.apache.spark.sql.catalog.Column
1314
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -59,7 +60,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
5960
* ignore existing index
6061
*/
6162
def create(ignoreIfExists: Boolean = false): Unit =
62-
flint.createIndex(buildIndex(), ignoreIfExists)
63+
flint.createIndex(validateIndex(buildIndex()), ignoreIfExists)
6364

6465
/**
6566
* Copy Flint index with updated options.
@@ -80,7 +81,24 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
8081
val updatedMetadata = index
8182
.metadata()
8283
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
83-
FlintSparkIndexFactory.create(updatedMetadata).get
84+
validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get)
85+
}
86+
87+
/**
88+
* Pre-validate index to ensure its validity. By default, this method validates index options by
89+
* delegating to specific index refresh (index options are mostly serving index refresh).
90+
* Subclasses can extend this method to include additional validation logic.
91+
*
92+
* @param index
93+
* Flint index to be validated
94+
* @return
95+
* the index or exception occurred if validation failed
96+
*/
97+
protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = {
98+
FlintSparkIndexRefresh
99+
.create(index.name(), index) // TODO: remove first argument?
100+
.validate(flint.spark)
101+
index
84102
}
85103

86104
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.spark
7+
8+
import java.io.IOException
9+
10+
import org.apache.hadoop.fs.Path
11+
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
12+
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
13+
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
14+
15+
import org.apache.spark.internal.Logging
16+
import org.apache.spark.sql.SparkSession
17+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
18+
import org.apache.spark.sql.execution.command.DDLUtils
19+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
20+
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}
21+
22+
/**
23+
* Flint Spark validation helper.
24+
*/
25+
trait FlintSparkValidationHelper extends Logging {
26+
27+
/**
28+
* Determines whether the source table(s) for a given Flint index are supported.
29+
*
30+
* @param spark
31+
* Spark session
32+
* @param index
33+
* Flint index
34+
* @return
35+
* true if all non Hive, otherwise false
36+
*/
37+
def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = {
38+
// Extract source table name (possibly more than one for MV query)
39+
val tableNames = index match {
40+
case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName)
41+
case covering: FlintSparkCoveringIndex => Seq(covering.tableName)
42+
case mv: FlintSparkMaterializedView =>
43+
spark.sessionState.sqlParser
44+
.parsePlan(mv.query)
45+
.collect { case relation: UnresolvedRelation =>
46+
qualifyTableName(spark, relation.tableName)
47+
}
48+
}
49+
50+
// Validate if any source table is not supported (currently Hive only)
51+
tableNames.exists { tableName =>
52+
val (catalog, ident) = parseTableName(spark, tableName)
53+
val table = loadTable(catalog, ident).get
54+
55+
// TODO: add allowed table provider list
56+
DDLUtils.isHiveTable(Option(table.properties().get("provider")))
57+
}
58+
}
59+
60+
/**
61+
* Checks whether a specified checkpoint location is accessible. Accessibility, in this context,
62+
* means that the folder exists and the current Spark session has the necessary permissions to
63+
* access it.
64+
*
65+
* @param spark
66+
* Spark session
67+
* @param checkpointLocation
68+
* checkpoint location
69+
* @return
70+
* true if accessible, otherwise false
71+
*/
72+
def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = {
73+
try {
74+
val checkpointManager =
75+
CheckpointFileManager.create(
76+
new Path(checkpointLocation),
77+
spark.sessionState.newHadoopConf())
78+
79+
checkpointManager.exists(new Path(checkpointLocation))
80+
} catch {
81+
case e: IOException =>
82+
logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e)
83+
false
84+
}
85+
}
86+
}

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala

+35-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package org.opensearch.flint.spark.refresh
77

8-
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions}
8+
import java.util.Collections
9+
10+
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
911
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
1012
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
1113

@@ -23,10 +25,41 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
2325
* @param index
2426
* Flint index
2527
*/
26-
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh {
28+
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
29+
extends FlintSparkIndexRefresh
30+
with FlintSparkValidationHelper {
2731

2832
override def refreshMode: RefreshMode = AUTO
2933

34+
override def validate(spark: SparkSession): Unit = {
35+
// Incremental refresh cannot enabled at the same time
36+
val options = index.options
37+
require(
38+
!options.incrementalRefresh(),
39+
"Incremental refresh cannot be enabled if auto refresh is enabled")
40+
41+
// Hive table doesn't support auto refresh
42+
require(
43+
!isTableProviderSupported(spark, index),
44+
"Index auto refresh doesn't support Hive table")
45+
46+
// Checkpoint location is required if mandatory option set
47+
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
48+
val checkpointLocation = options.checkpointLocation()
49+
if (flintSparkConf.isCheckpointMandatory) {
50+
require(
51+
checkpointLocation.isDefined,
52+
s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled")
53+
}
54+
55+
// Checkpoint location must be accessible
56+
if (checkpointLocation.isDefined) {
57+
require(
58+
isCheckpointLocationAccessible(spark, checkpointLocation.get),
59+
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
60+
}
61+
}
62+
3063
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
3164
val options = index.options
3265
val tableName = index.metadata().source

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala

+14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@ trait FlintSparkIndexRefresh extends Logging {
2424
*/
2525
def refreshMode: RefreshMode
2626

27+
/**
28+
* Validates the current index refresh settings before the actual execution begins. This method
29+
* checks for the integrity of the index refresh configurations and ensures that all options set
30+
* for the current refresh mode are valid. This preemptive validation helps in identifying
31+
* configuration issues before the refresh operation is initiated, minimizing runtime errors and
32+
* potential inconsistencies.
33+
*
34+
* @param spark
35+
* Spark session
36+
* @throws IllegalArgumentException
37+
* if any invalid or inapplicable config identified
38+
*/
39+
def validate(spark: SparkSession): Unit
40+
2741
/**
2842
* Start refreshing the index.
2943
*

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ class FullIndexRefresh(
3131

3232
override def refreshMode: RefreshMode = FULL
3333

34+
override def validate(spark: SparkSession): Unit = {
35+
// Full refresh validates nothing for now, including Hive table validation.
36+
// This allows users to continue using their existing Hive table with full refresh only.
37+
}
38+
3439
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
3540
logInfo(s"Start refreshing index $indexName in full mode")
3641
index

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala

+20-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package org.opensearch.flint.spark.refresh
77

8-
import org.opensearch.flint.spark.FlintSparkIndex
8+
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper}
99
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode}
1010

1111
import org.apache.spark.sql.SparkSession
@@ -20,18 +20,31 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
2020
* Flint index
2121
*/
2222
class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
23-
extends FlintSparkIndexRefresh {
23+
extends FlintSparkIndexRefresh
24+
with FlintSparkValidationHelper {
2425

2526
override def refreshMode: RefreshMode = INCREMENTAL
2627

28+
override def validate(spark: SparkSession): Unit = {
29+
// Non-Hive table is required for incremental refresh
30+
require(
31+
!isTableProviderSupported(spark, index),
32+
"Index incremental refresh doesn't support Hive table")
33+
34+
// Checkpoint location is required regardless of mandatory option
35+
val options = index.options
36+
val checkpointLocation = options.checkpointLocation()
37+
require(
38+
options.checkpointLocation().nonEmpty,
39+
"Checkpoint location is required by incremental refresh")
40+
require(
41+
isCheckpointLocationAccessible(spark, checkpointLocation.get),
42+
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
43+
}
44+
2745
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
2846
logInfo(s"Start refreshing index $indexName in incremental mode")
2947

30-
// TODO: move this to validation method together in future
31-
if (index.options.checkpointLocation().isEmpty) {
32-
throw new IllegalStateException("Checkpoint location is required by incremental refresh")
33-
}
34-
3548
// Reuse auto refresh which uses AvailableNow trigger and will stop once complete
3649
val jobId =
3750
new AutoIndexRefresh(indexName, index)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.apache.spark.sql
7+
8+
import org.apache.spark.SparkConf
9+
import org.apache.spark.sql.hive.HiveSessionStateBuilder
10+
import org.apache.spark.sql.internal.{SessionState, StaticSQLConf}
11+
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
12+
13+
/**
14+
* Flint Spark base suite with Hive support enabled. Because enabling Hive support in Spark
15+
* configuration alone is not adequate, as [[TestSparkSession]] disregards it and consistently
16+
* creates its own instance of [[org.apache.spark.sql.test.TestSQLSessionStateBuilder]]. We need
17+
* to override its session state with that of Hive in the meanwhile.
18+
*
19+
* Note that we need to extend [[SharedSparkSession]] to call super.sparkConf() method.
20+
*/
21+
trait SparkHiveSupportSuite extends SharedSparkSession {
22+
23+
override protected def sparkConf: SparkConf = {
24+
super.sparkConf
25+
// Enable Hive support
26+
.set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive")
27+
// Use in-memory Derby as Hive metastore so no need to clean up metastore_db folder after test
28+
.set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:metastore_db;create=true")
29+
.set("hive.metastore.uris", "")
30+
}
31+
32+
override protected def createSparkSession: TestSparkSession = {
33+
SparkSession.cleanupAnyExistingSession()
34+
new FlintTestSparkSession(sparkConf)
35+
}
36+
37+
class FlintTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sparkConf) { self =>
38+
39+
override lazy val sessionState: SessionState = {
40+
// Override to replace [[TestSQLSessionStateBuilder]] with Hive session state
41+
new HiveSessionStateBuilder(spark, None).build()
42+
}
43+
}
44+
}

integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
125125
test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") {
126126
setFlintSparkConf(CHECKPOINT_MANDATORY, "true")
127127
try {
128-
the[IllegalStateException] thrownBy {
128+
the[IllegalArgumentException] thrownBy {
129129
sql(s"""
130130
| CREATE INDEX $testIndex ON $testTable
131131
| (name, age)

0 commit comments

Comments
 (0)