Skip to content

Commit 2c43106

Browse files
committed
Extend metadata and metadatabuilder
Signed-off-by: Peng Huo <[email protected]>
1 parent 162032d commit 2c43106

File tree

7 files changed

+148
-150
lines changed

7 files changed

+148
-150
lines changed

Diff for: flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.json4s.jackson.JsonMethods
1212
import org.json4s.native.Serialization
1313

1414
import org.apache.spark.sql.catalyst.util.DateFormatter
15+
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions.{MetadataBuilderExtension, MetadataExtension}
1516
import org.apache.spark.sql.types._
1617

1718
/**
@@ -97,14 +98,12 @@ object FlintDataType {
9798

9899
// Text with possible multi-fields
99100
case JString("text") =>
100-
FlintMetadataHelper.addTextFieldMetadata(metadataBuilder)
101+
metadataBuilder.withTextField()
101102
(fieldProperties \ "fields") match {
102103
case fields: JObject =>
103-
FlintMetadataHelper.addMultiFieldMetadata(
104-
metadataBuilder,
105-
fields.obj.map { case (name, props) =>
106-
(s"$fieldName.$name", (props \ "type").extract[String])
107-
}.toMap)
104+
metadataBuilder.withMultiFields(fields.obj.map { case (name, props) =>
105+
(s"$fieldName.$name", (props \ "type").extract[String])
106+
}.toMap)
108107
StringType
109108
case _ => StringType
110109
}
@@ -164,7 +163,7 @@ object FlintDataType {
164163

165164
// string
166165
case StringType | _: VarcharType | _: CharType =>
167-
if (FlintMetadataHelper.isTextField(metadata)) {
166+
if (metadata.isTextField) {
168167
JObject("type" -> JString("text"))
169168
} else {
170169
JObject("type" -> JString("keyword"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.apache.spark.sql.flint.datatype
7+
8+
import org.apache.spark.sql.types.{MetadataBuilder, _}
9+
10+
/**
11+
* Helper class for handling Flint metadata operations
12+
*/
13+
object FlintMetadataExtensions {
14+
// OpenSearch Mappings. https://opensearch.org/docs/latest/field-types/
15+
val OS_TYPE_KEY = "osType"
16+
val FIELDS_NAMES_KEY = "fields"
17+
18+
// OpenSearch field types. https://opensearch.org/docs/latest/field-types/supported-field-types/index/
19+
val TEXT_TYPE = "text"
20+
val KEYWORD_TYPE = "keyword"
21+
22+
implicit class MetadataExtension(val metadata: Metadata) {
23+
24+
/**
25+
* Determines if this metadata represents a text field.
26+
*
27+
* @return
28+
* `true` if the metadata contains an `osType` property set to "text", `false` otherwise
29+
*/
30+
def isTextField: Boolean =
31+
metadata.contains(OS_TYPE_KEY) && metadata.getString(OS_TYPE_KEY) == TEXT_TYPE
32+
33+
/**
34+
* Retrieves the first keyword subfield from multi-field definitions.
35+
*
36+
* @return
37+
* [[Some]] containing the first keyword subfield name if defined, [[None]] if no keyword
38+
* subfields exist
39+
*/
40+
def keywordSubfield: Option[String] = {
41+
if (metadata.contains(FIELDS_NAMES_KEY)) {
42+
val multiFieldMetadata = metadata.getMetadata(FIELDS_NAMES_KEY)
43+
multiFieldMetadata
44+
.getStringArray(KEYWORD_TYPE)
45+
.headOption
46+
} else {
47+
None
48+
}
49+
}
50+
}
51+
52+
implicit class MetadataBuilderExtension(val builder: MetadataBuilder) {
53+
54+
/**
55+
* Marks the field as a text field in OpenSearch metadata.
56+
*
57+
* @return
58+
* the modified [[MetadataBuilder]] instance for method chaining
59+
* @see
60+
* [[https://opensearch.org/docs/latest/field-types/supported-field-types/text/ Text Field Documentation]]
61+
*/
62+
def withTextField(): MetadataBuilder = {
63+
builder.putString(OS_TYPE_KEY, TEXT_TYPE)
64+
builder
65+
}
66+
67+
/**
68+
* Adds multi-field definitions to the metadata.
69+
*
70+
* @param fields
71+
* Map where keys are field names and values are OpenSearch field types
72+
* @return
73+
* the modified [[MetadataBuilder]] instance for method chaining
74+
* @example
75+
* {{{builder.withMultiFields(Map( "raw" -> KEYWORD_TYPE, "analyzed" -> TEXT_TYPE ))}}}
76+
* @note
77+
* Groups fields by type and stores them under the `fields` metadata key
78+
*/
79+
def withMultiFields(fields: Map[String, String]): MetadataBuilder = {
80+
val nestedBuilder = new MetadataBuilder()
81+
fields
82+
.groupBy { case (_, fieldType) => fieldType }
83+
.foreach { case (fieldType, entries) =>
84+
nestedBuilder.putStringArray(fieldType, entries.keys.toArray)
85+
}
86+
builder.putMetadata(FIELDS_NAMES_KEY, nestedBuilder.build())
87+
builder
88+
}
89+
}
90+
}

Diff for: flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintMetadataHelper.scala

-88
This file was deleted.

Diff for: flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
1313
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue}
1414
import org.apache.spark.sql.connector.expressions.filter.{And, Predicate}
1515
import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS
16-
import org.apache.spark.sql.flint.datatype.FlintMetadataHelper
16+
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions
17+
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions.MetadataExtension
1718
import org.apache.spark.sql.internal.SQLConf
1819
import org.apache.spark.sql.types._
1920

@@ -157,7 +158,7 @@ case class FlintQueryCompiler(schema: StructType) {
157158
case Some((_, field)) =>
158159
field.dataType match {
159160
case StringType =>
160-
FlintMetadataHelper.isTextField(field.metadata)
161+
field.metadata.isTextField
161162
case _ => false
162163
}
163164
case None => false
@@ -169,8 +170,7 @@ case class FlintQueryCompiler(schema: StructType) {
169170
*/
170171
protected def getKeywordSubfield(attribute: String): Option[String] = {
171172
schema.apply(attribute) match {
172-
case StructField(_, StringType, _, metadata) =>
173-
FlintMetadataHelper.getKeywordSubfield(metadata)
173+
case StructField(_, StringType, _, metadata) => metadata.keywordSubfield
174174
case _ => None
175175
}
176176
}

Diff for: flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala

+5-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.json4s.jackson.JsonMethods
1010
import org.scalatest.matchers.should.Matchers
1111

1212
import org.apache.spark.FlintSuite
13+
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions.MetadataBuilderExtension
1314
import org.apache.spark.sql.types._
1415

1516
class FlintDataTypeSuite extends FlintSuite with Matchers {
@@ -221,12 +222,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
221222
| }
222223
| }
223224
|}""".stripMargin
224-
val mb = new MetadataBuilder()
225-
FlintMetadataHelper.addTextFieldMetadata(mb)
226-
FlintMetadataHelper.addMultiFieldMetadata(
227-
mb,
228-
Map("city.raw" -> "keyword", "city.keyword" -> "keyword"))
229-
val metadata = mb.build()
225+
val metadata = new MetadataBuilder().withTextField
226+
.withMultiFields(Map("city.raw" -> "keyword", "city.keyword" -> "keyword"))
227+
.build()
230228
val expectedStructType = StructType(StructField("city", StringType, true, metadata) :: Nil)
231229

232230
FlintDataType.deserialize(flintDataType) should contain theSameElementsAs expectedStructType
@@ -241,9 +239,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
241239
| }
242240
|}""".stripMargin
243241

244-
val mb = new MetadataBuilder()
245-
FlintMetadataHelper.addTextFieldMetadata(mb)
246-
val metadata = mb.build()
242+
val metadata = new MetadataBuilder().withTextField().build()
247243

248244
val expectedStructType =
249245
StructType(StructField("description", StringType, true, metadata) :: Nil)

Diff for: flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintMetadataHelperSuite.scala renamed to flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintMetadataExtensionsSuite.scala

+31-27
Original file line numberDiff line numberDiff line change
@@ -8,79 +8,83 @@ package org.apache.spark.sql.flint.datatype
88
import org.scalatest.matchers.should.Matchers
99

1010
import org.apache.spark.FlintSuite
11+
import org.apache.spark.sql.flint.datatype.FlintMetadataExtensions.{MetadataBuilderExtension, MetadataExtension}
1112
import org.apache.spark.sql.types._
1213

13-
class FlintMetadataHelperSuite extends FlintSuite with Matchers {
14+
class FlintMetadataExtensionsSuite extends FlintSuite with Matchers {
1415

1516
test("isTextField returns true when osType is text") {
1617
val builder = new MetadataBuilder()
17-
.putString(FlintMetadataHelper.OS_TYPE_KEY, FlintMetadataHelper.TEXT_TYPE)
18+
.putString(FlintMetadataExtensions.OS_TYPE_KEY, FlintMetadataExtensions.TEXT_TYPE)
1819
val metadata: Metadata = builder.build()
19-
assert(FlintMetadataHelper.isTextField(metadata))
20+
assert(metadata.isTextField)
2021
}
2122

2223
test("isTextField returns false when osType is not text") {
23-
val builder = new MetadataBuilder().putString(FlintMetadataHelper.OS_TYPE_KEY, "non-text")
24+
val builder = new MetadataBuilder().putString(FlintMetadataExtensions.OS_TYPE_KEY, "non-text")
2425
val metadata: Metadata = builder.build()
25-
assert(!FlintMetadataHelper.isTextField(metadata))
26+
assert(!metadata.isTextField)
2627
}
2728

2829
test("addTextFieldMetadata sets osType to text") {
2930
val builder = new MetadataBuilder()
30-
val updatedBuilder = FlintMetadataHelper.addTextFieldMetadata(builder)
31+
val updatedBuilder = builder.withTextField
3132
val metadata: Metadata = updatedBuilder.build()
32-
assert(metadata.getString(FlintMetadataHelper.OS_TYPE_KEY) == FlintMetadataHelper.TEXT_TYPE)
33+
assert(
34+
metadata.getString(
35+
FlintMetadataExtensions.OS_TYPE_KEY) == FlintMetadataExtensions.TEXT_TYPE)
3336
}
3437

3538
test("addMultiFieldMetadata groups fields by field type") {
3639
val builder = new MetadataBuilder()
3740
val fields = Map(
38-
"field1" -> FlintMetadataHelper.TEXT_TYPE,
39-
"field2" -> FlintMetadataHelper.KEYWORD_TYPE,
40-
"field3" -> FlintMetadataHelper.KEYWORD_TYPE)
41-
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
41+
"field1" -> FlintMetadataExtensions.TEXT_TYPE,
42+
"field2" -> FlintMetadataExtensions.KEYWORD_TYPE,
43+
"field3" -> FlintMetadataExtensions.KEYWORD_TYPE)
44+
val updatedBuilder = builder.withMultiFields(fields)
4245
val metadata: Metadata = updatedBuilder.build()
4346

4447
// Verify that multi-field metadata is added under FIELDS_NAMES_KEY.
45-
assert(metadata.contains(FlintMetadataHelper.FIELDS_NAMES_KEY))
46-
val multiFieldMetadata: Metadata = metadata.getMetadata(FlintMetadataHelper.FIELDS_NAMES_KEY)
48+
assert(metadata.contains(FlintMetadataExtensions.FIELDS_NAMES_KEY))
49+
val multiFieldMetadata: Metadata =
50+
metadata.getMetadata(FlintMetadataExtensions.FIELDS_NAMES_KEY)
4751

4852
// Verify text type field grouping.
49-
assert(multiFieldMetadata.contains(FlintMetadataHelper.TEXT_TYPE))
50-
val textFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.TEXT_TYPE)
53+
assert(multiFieldMetadata.contains(FlintMetadataExtensions.TEXT_TYPE))
54+
val textFields = multiFieldMetadata.getStringArray(FlintMetadataExtensions.TEXT_TYPE)
5155
assert(textFields.sameElements(Array("field1")))
5256

5357
// Verify keyword type field grouping.
54-
assert(multiFieldMetadata.contains(FlintMetadataHelper.KEYWORD_TYPE))
55-
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.KEYWORD_TYPE)
58+
assert(multiFieldMetadata.contains(FlintMetadataExtensions.KEYWORD_TYPE))
59+
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataExtensions.KEYWORD_TYPE)
5660
// Since the order of grouping may vary, compare sorted arrays.
5761
assert(keywordFields.sorted.sameElements(Array("field2", "field3")))
5862
}
5963

6064
test("getKeywordSubfield returns the first keyword field if available") {
6165
val builder = new MetadataBuilder()
6266
val fields = Map(
63-
"field1" -> FlintMetadataHelper.TEXT_TYPE,
64-
"field2" -> FlintMetadataHelper.KEYWORD_TYPE,
65-
"field3" -> FlintMetadataHelper.KEYWORD_TYPE)
66-
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
67+
"field1" -> FlintMetadataExtensions.TEXT_TYPE,
68+
"field2" -> FlintMetadataExtensions.KEYWORD_TYPE,
69+
"field3" -> FlintMetadataExtensions.KEYWORD_TYPE)
70+
val updatedBuilder = builder.withMultiFields(fields)
6771
val metadata: Metadata = updatedBuilder.build()
6872

6973
// Retrieve keyword fields from the nested metadata.
70-
val multiFieldMetadata = metadata.getMetadata(FlintMetadataHelper.FIELDS_NAMES_KEY)
71-
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.KEYWORD_TYPE)
74+
val multiFieldMetadata = metadata.getMetadata(FlintMetadataExtensions.FIELDS_NAMES_KEY)
75+
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataExtensions.KEYWORD_TYPE)
7276

7377
// Expect the first keyword field.
74-
assert(FlintMetadataHelper.getKeywordSubfield(metadata) == keywordFields.headOption)
78+
assert(metadata.keywordSubfield == keywordFields.headOption)
7579
}
7680

7781
test("getKeywordSubfield returns None if no keyword field exists") {
7882
val builder = new MetadataBuilder()
79-
val fields = Map("field1" -> FlintMetadataHelper.TEXT_TYPE)
80-
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
83+
val fields = Map("field1" -> FlintMetadataExtensions.TEXT_TYPE)
84+
val updatedBuilder = builder.withMultiFields(fields)
8185
val metadata: Metadata = updatedBuilder.build()
8286

8387
// Since there is no keyword type, getKeywordSubfield should return None.
84-
assert(FlintMetadataHelper.getKeywordSubfield(metadata).isEmpty)
88+
assert(metadata.keywordSubfield.isEmpty)
8589
}
8690
}

0 commit comments

Comments
 (0)