Skip to content

Commit f301136

Browse files
committed
Fix Bug
Signed-off-by: Peng Huo <[email protected]>
1 parent a33f7d0 commit f301136

File tree

2 files changed

+122
-56
lines changed

2 files changed

+122
-56
lines changed

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala

+108-56
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
1313
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue}
1414
import org.apache.spark.sql.connector.expressions.filter.{And, Predicate}
1515
import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS
16-
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions
1716
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions.MetadataExtension
1817
import org.apache.spark.sql.internal.SQLConf
1918
import org.apache.spark.sql.types._
@@ -34,6 +33,14 @@ case class FlintQueryCompiler(schema: StructType) {
3433
compile(predicates.reduce(new And(_, _)))
3534
}
3635

36+
/**
37+
* Compile an expression to a query string. Returns empty string if any part of the expression
38+
* is unsupported.
39+
*/
40+
def compile(expr: Expression, quoteString: Boolean = true): String = {
41+
compileOpt(expr, quoteString).getOrElse("")
42+
}
43+
3744
/**
3845
* Compile Expression to Flint query string.
3946
*
@@ -42,13 +49,13 @@ case class FlintQueryCompiler(schema: StructType) {
4249
* @return
4350
* empty if does not support.
4451
*/
45-
def compile(expr: Expression, quoteString: Boolean = true): String = {
52+
def compileOpt(expr: Expression, quoteString: Boolean = true): Option[String] = {
4653
expr match {
4754
case LiteralValue(value, dataType) =>
48-
quote(extract, quoteString)(value, dataType)
55+
Some(quote(extract, quoteString)(value, dataType))
4956
case p: Predicate => visitPredicate(p)
50-
case f: FieldReference => f.toString()
51-
case _ => ""
57+
case f: FieldReference => Some(f.toString())
58+
case _ => None
5259
}
5360
}
5461

@@ -77,56 +84,101 @@ case class FlintQueryCompiler(schema: StructType) {
7784
* 1. currently, we map spark contains to OpenSearch match query. Can we leverage more full
7885
* text queries for text field. 2. configuration of expensive query.
7986
*/
80-
def visitPredicate(p: Predicate): String = {
81-
val name = p.name()
82-
name match {
83-
case "IS_NULL" =>
84-
s"""{"bool":{"must_not":{"exists":{"field":"${compile(p.children()(0))}"}}}}"""
85-
case "IS_NOT_NULL" =>
86-
s"""{"exists":{"field":"${compile(p.children()(0))}"}}"""
87-
case "AND" =>
88-
s"""{"bool":{"filter":[${compile(p.children()(0))},${compile(p.children()(1))}]}}"""
89-
case "OR" =>
90-
s"""{"bool":{"should":[{"bool":{"filter":${compile(
91-
p.children()(0))}}},{"bool":{"filter":${compile(p.children()(1))}}}]}}"""
92-
case "NOT" =>
93-
s"""{"bool":{"must_not":${compile(p.children()(0))}}}"""
94-
case "=" =>
95-
val fieldName = compile(p.children()(0))
96-
if (isTextField(fieldName)) {
97-
getKeywordSubfield(fieldName) match {
98-
case Some(keywordField) =>
99-
s"""{"term":{"$keywordField":{"value":${compile(p.children()(1))}}}}"""
100-
case None => ""
87+
def visitPredicate(p: Predicate): Option[String] = p.name() match {
88+
case "IS_NULL" =>
89+
compileOpt(p.children()(0)).map { field =>
90+
s"""{"bool":{"must_not":{"exists":{"field":"$field"}}}}"""
91+
}
92+
case "IS_NOT_NULL" =>
93+
compileOpt(p.children()(0)).map { field =>
94+
s"""{"exists":{"field":"$field"}}"""
95+
}
96+
case "AND" =>
97+
for {
98+
left <- compileOpt(p.children()(0))
99+
right <- compileOpt(p.children()(1))
100+
} yield s"""{"bool":{"filter":[$left,$right]}}"""
101+
case "OR" =>
102+
for {
103+
left <- compileOpt(p.children()(0))
104+
right <- compileOpt(p.children()(1))
105+
} yield s"""{"bool":{"should":[{"bool":{"filter":$left}},{"bool":{"filter":$right}}]}}"""
106+
case "NOT" =>
107+
compileOpt(p.children()(0)).map { child =>
108+
s"""{"bool":{"must_not":$child}}"""
109+
}
110+
case "=" =>
111+
for {
112+
field <- compileOpt(p.children()(0))
113+
value <- compileOpt(p.children()(1))
114+
result <-
115+
if (isTextField(field)) {
116+
getKeywordSubfield(field) match {
117+
case Some(keywordField) =>
118+
Some(s"""{"term":{"$keywordField":{"value":$value}}}""")
119+
case None => None // Return None for unsupported text fields
120+
}
121+
} else {
122+
Some(s"""{"term":{"$field":{"value":$value}}}""")
101123
}
124+
} yield result
125+
case ">" =>
126+
for {
127+
field <- compileOpt(p.children()(0))
128+
value <- compileOpt(p.children()(1))
129+
} yield s"""{"range":{"$field":{"gt":$value}}}"""
130+
case ">=" =>
131+
for {
132+
field <- compileOpt(p.children()(0))
133+
value <- compileOpt(p.children()(1))
134+
} yield s"""{"range":{"$field":{"gte":$value}}}"""
135+
case "<" =>
136+
for {
137+
field <- compileOpt(p.children()(0))
138+
value <- compileOpt(p.children()(1))
139+
} yield s"""{"range":{"$field":{"lt":$value}}}"""
140+
case "<=" =>
141+
for {
142+
field <- compileOpt(p.children()(0))
143+
value <- compileOpt(p.children()(1))
144+
} yield s"""{"range":{"$field":{"lte":$value}}}"""
145+
case "IN" =>
146+
for {
147+
field <- compileOpt(p.children()(0))
148+
valuesList = p.children().tail.flatMap(expr => compileOpt(expr))
149+
// Only proceed if we have values
150+
if valuesList.nonEmpty
151+
} yield {
152+
val values = valuesList.mkString("[", ",", "]")
153+
s"""{"terms":{"$field":$values}}"""
154+
}
155+
case "STARTS_WITH" =>
156+
for {
157+
field <- compileOpt(p.children()(0))
158+
value <- compileOpt(p.children()(1))
159+
} yield s"""{"prefix":{"$field":{"value":$value}}}"""
160+
case "CONTAINS" =>
161+
for {
162+
field <- compileOpt(p.children()(0))
163+
quoteValue <- compileOpt(p.children()(1))
164+
unQuoteValue <- compileOpt(p.children()(1), false)
165+
} yield {
166+
if (isTextField(field)) {
167+
s"""{"match":{"$field":{"query":$quoteValue}}}"""
102168
} else {
103-
s"""{"term":{"$fieldName":{"value":${compile(p.children()(1))}}}}"""
169+
s"""{"wildcard":{"$field":{"value":"*$unQuoteValue*"}}}"""
104170
}
105-
case ">" =>
106-
s"""{"range":{"${compile(p.children()(0))}":{"gt":${compile(p.children()(1))}}}}"""
107-
case ">=" =>
108-
s"""{"range":{"${compile(p.children()(0))}":{"gte":${compile(p.children()(1))}}}}"""
109-
case "<" =>
110-
s"""{"range":{"${compile(p.children()(0))}":{"lt":${compile(p.children()(1))}}}}"""
111-
case "<=" =>
112-
s"""{"range":{"${compile(p.children()(0))}":{"lte":${compile(p.children()(1))}}}}"""
113-
case "IN" =>
114-
val values = p.children().tail.map(expr => compile(expr)).mkString("[", ",", "]")
115-
s"""{"terms":{"${compile(p.children()(0))}":$values}}"""
116-
case "STARTS_WITH" =>
117-
s"""{"prefix":{"${compile(p.children()(0))}":{"value":${compile(p.children()(1))}}}}"""
118-
case "CONTAINS" =>
119-
val fieldName = compile(p.children()(0))
120-
if (isTextField(fieldName)) {
121-
s"""{"match":{"$fieldName":{"query":${compile(p.children()(1))}}}}"""
122-
} else {
123-
s"""{"wildcard":{"$fieldName":{"value":"*${compile(p.children()(1), false)}*"}}}"""
124-
}
125-
case "ENDS_WITH" =>
126-
s"""{"wildcard":{"${compile(p.children()(0))}":{"value":"*${compile(
127-
p.children()(1),
128-
false)}"}}}"""
129-
case "BLOOM_FILTER_MIGHT_CONTAIN" =>
171+
}
172+
case "ENDS_WITH" =>
173+
for {
174+
field <- compileOpt(p.children()(0))
175+
value <- compileOpt(p.children()(1), false)
176+
} yield s"""{"wildcard":{"$field":{"value":"*$value"}}}"""
177+
case "BLOOM_FILTER_MIGHT_CONTAIN" =>
178+
for {
179+
field <- compileOpt(p.children()(0))
180+
value <- compileOpt(p.children()(1))
181+
} yield {
130182
val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ")
131183
s"""
132184
|{
@@ -137,17 +189,17 @@ case class FlintQueryCompiler(schema: StructType) {
137189
| "lang": "painless",
138190
| "source": "$code",
139191
| "params": {
140-
| "fieldName": "${compile(p.children()(0))}",
141-
| "value": ${compile(p.children()(1))}
192+
| "fieldName": "$field",
193+
| "value": $value
142194
| }
143195
| }
144196
| }
145197
| }
146198
| }
147199
|}
148200
|""".stripMargin
149-
case _ => ""
150-
}
201+
}
202+
case _ => None
151203
}
152204

153205
/**

flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala

+14
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,20 @@ class FlintQueryCompilerSuite extends FlintSuite {
196196
assertResult("")(query)
197197
}
198198

199+
test(
200+
"Bug fix, https://github.com/opensearch-project/opensearch-spark/actions/runs/13338348402/job/37258321066?pr=1052 ") {
201+
val schema = StructType(
202+
Seq(
203+
StructField(
204+
"aText",
205+
StringType,
206+
nullable = true,
207+
new MetadataBuilder().withTextField.build())))
208+
209+
val query = FlintQueryCompiler(schema).compile(Not(EqualTo("aText", "text")).toV2)
210+
assertResult("")(query)
211+
}
212+
199213
protected def schema(): StructType = {
200214
StructType(
201215
Seq(

0 commit comments

Comments
 (0)