Skip to content

Commit 9d377fc

Browse files
committed
fix build index with special full table name
Signed-off-by: Sean Kao <[email protected]>
1 parent d9c0ba8 commit 9d377fc

File tree

3 files changed

+18
-21
lines changed

3 files changed

+18
-21
lines changed

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ object FlintSparkIndex {
102102
}
103103

104104
/**
105-
* Add backticks to table name to escape special character
105+
* Add backticks to all parts of full table name to escape special character
106106
*
107107
* @param fullTableName
108108
* source full table name
@@ -113,7 +113,7 @@ object FlintSparkIndex {
113113
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")
114114

115115
val parts = fullTableName.split('.')
116-
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
116+
s"`${parts(0)}`.`${parts(1)}`.`${parts.drop(2).mkString(".")}`"
117117
}
118118

119119
/**

flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala

+8-14
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.scalatest.matchers.must.Matchers.contain
99
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
1010

1111
import org.apache.spark.FlintSuite
12+
import org.apache.spark.sql.AnalysisException
1213

1314
class FlintSparkCoveringIndexSuite extends FlintSuite {
1415

@@ -31,22 +32,15 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
3132
}
3233
}
3334

34-
test("can build index building job with unique ID column") {
35-
val index =
36-
new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string"))
37-
38-
val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
39-
val indexDf = index.build(spark, Some(df))
40-
indexDf.schema.fieldNames should contain only ("name")
41-
}
42-
43-
test("can build index on table name with special characters") {
44-
val testTableSpecial = "spark_catalog.default.test/2023/10"
35+
test("can parse identifier name with special characters during index build") {
36+
val testTableSpecial = "spark_catalog.de-fault.test/2023/10"
4537
val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string"))
4638

47-
val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
48-
val indexDf = index.build(spark, Some(df))
49-
indexDf.schema.fieldNames should contain only ("name")
39+
val error = intercept[AnalysisException] {
40+
index.build(spark, None)
41+
}
42+
// Getting this error means that parsing doesn't fail with unquoted identifier
43+
assert(error.getMessage().contains("UnresolvedRelation"))
5044
}
5145

5246
test("should fail if no indexed column given") {

flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala

+8-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
2020
import org.scalatestplus.mockito.MockitoSugar.mock
2121

2222
import org.apache.spark.FlintSuite
23+
import org.apache.spark.sql.AnalysisException
2324
import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet
2425
import org.apache.spark.sql.functions.col
2526

@@ -71,17 +72,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
7172
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
7273
}
7374

74-
test("can build index on table name with special characters") {
75-
val testTableSpecial = "spark_catalog.default.test/2023/10"
75+
test("can parse identifier name with special characters during index build") {
76+
val testTableSpecial = "spark_catalog.de-fault.test/2023/10"
7677
val indexCol = mock[FlintSparkSkippingStrategy]
7778
when(indexCol.outputSchema()).thenReturn(Map("name" -> "string"))
7879
when(indexCol.getAggregators).thenReturn(
7980
Seq(CollectSet(col("name").expr).toAggregateExpression()))
8081
val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol))
8182

82-
val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
83-
val indexDf = index.build(spark, Some(df))
84-
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
83+
val error = intercept[AnalysisException] {
84+
index.build(spark, None)
85+
}
86+
// Getting this error means that parsing doesn't fail with unquoted identifier
87+
assert(error.getMessage().contains("UnresolvedRelation"))
8588
}
8689

8790
// Test index build for different column type

0 commit comments

Comments
 (0)