From 9e46a80773c7caeaf9ed6b37f890aaa4b4c3b3d8 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Tue, 11 Feb 2025 11:32:43 -0800 Subject: [PATCH] Support OpenSearch ip field type Signed-off-by: Tomoyuki Morita --- docs/opensearch-table.md | 1 + .../sql/flint/datatype/FlintDataType.scala | 5 ++++ .../table/OpenSearchTableQueryITSuite.scala | 23 ++++++++++++++++ .../opensearch/flint/OpenSearchSuite.scala | 27 +++++++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/docs/opensearch-table.md b/docs/opensearch-table.md index 86d4612b1..3c8552aae 100644 --- a/docs/opensearch-table.md +++ b/docs/opensearch-table.md @@ -69,6 +69,7 @@ The following table defines the data type mapping between OpenSearch index field | date(Date) | DateType | | keyword | StringType, VarcharType, CharType | | text | StringType(meta(osType)=text) | +| ip | StringType(meta(osType)=ip) | | object | StructType | | alias | Inherits referenced field type | diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index d1dab9cc5..543a928d6 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -115,6 +115,11 @@ object FlintDataType { // binary types case JString("binary") => BinaryType + // ip type + case JString("ip") => + metadataBuilder.putString("osType", "ip") + StringType + // not supported case unknown => throw new IllegalStateException(s"unsupported data type: $unknown") } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala index 2cd71c2f7..aeed73f26 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchTableQueryITSuite.scala @@ -121,4 +121,27 @@ class OpenSearchTableQueryITSuite checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) } } + + test("Query index with ip data type") { + val index1 = "t0001" + withIndexName(index1) { + indexWithIp(index1) + + var df = spark.sql(s"""SELECT client, server FROM ${catalogName}.default.$index1""") + val ip0 = "192.168.0.10" + val ip1 = "192.168.0.11" + val ip2 = "100.10.12.123" + val ip3 = "2001:db8:3333:4444:5555:6666:7777:8888" + val ip4 = "2001:db8::1234:5678" + checkAnswer(df, Seq(Row(ip0, ip2), Row(ip1, ip2), Row(ip3, ip4))) + + df = spark.sql( + s"""SELECT client, server FROM ${catalogName}.default.$index1 WHERE client = '192.168.0.10'""") + checkAnswer(df, Seq(Row(ip0, ip2))) + + df = spark.sql( + s"""SELECT client, server FROM ${catalogName}.default.$index1 WHERE server = '2001:db8::1234:5678'""") + checkAnswer(df, Seq(Row(ip3, ip4))) + } + } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala index c803d9bc8..12400cffc 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -181,6 +181,33 @@ trait OpenSearchSuite extends BeforeAndAfterAll { index(indexName, oneNodeSetting, mappings, docs) } + def indexWithIp(indexName: String): Unit = { + val mappings = """{ + | "properties": { + | "client": { + | "type": "ip" + | }, + | "server": { + | "type": "ip" + | } + | } + |}""".stripMargin + val docs = Seq( + """{ + | "client": "192.168.0.10", + | "server": "100.10.12.123" + |}""".stripMargin, + """{ + | "client": "192.168.0.11", + | "server": "100.10.12.123" + |}""".stripMargin, + """{ + | "client": "2001:db8:3333:4444:5555:6666:7777:8888", + | "server": "2001:db8::1234:5678" + |}""".stripMargin) + index(indexName, oneNodeSetting, mappings, docs) + } + def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = { openSearchClient.indices.create( new CreateIndexRequest(index)