Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OpenSearch ip field type #1044

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The following table defines the data type mapping between OpenSearch index field
| date(Date) | DateType |
| keyword | StringType, VarcharType, CharType |
| text | StringType(meta(osType)=text) |
| ip | StringType(meta(osType)=ip) |
| object | StructType |
| alias | Inherits referenced field type |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ object FlintDataType {
// binary types
case JString("binary") => BinaryType

// ip type
case JString("ip") =>
metadataBuilder.putString("osType", "ip")
StringType

// not supported
case unknown => throw new IllegalStateException(s"unsupported data type: $unknown")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,27 @@ class OpenSearchTableQueryITSuite
checkKeywordsExistsInExplain(df, expectedPlanFragment: _*)
}
}

test("Query index with ip data type") {
val index1 = "t0001"
withIndexName(index1) {
indexWithIp(index1)

var df = spark.sql(s"""SELECT client, server FROM ${catalogName}.default.$index1""")
val ip0 = "192.168.0.10"
val ip1 = "192.168.0.11"
val ip2 = "100.10.12.123"
val ip3 = "2001:db8:3333:4444:5555:6666:7777:8888"
val ip4 = "2001:db8::1234:5678"
checkAnswer(df, Seq(Row(ip0, ip2), Row(ip1, ip2), Row(ip3, ip4)))

df = spark.sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does = push down to DSL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is pushed down.

s"""SELECT client, server FROM ${catalogName}.default.$index1 WHERE client = '192.168.0.10'""")
checkAnswer(df, Seq(Row(ip0, ip2)))

df = spark.sql(
s"""SELECT client, server FROM ${catalogName}.default.$index1 WHERE server = '2001:db8::1234:5678'""")
checkAnswer(df, Seq(Row(ip3, ip4)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,33 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
index(indexName, oneNodeSetting, mappings, docs)
}

def indexWithIp(indexName: String): Unit = {
val mappings = """{
| "properties": {
| "client": {
| "type": "ip"
| },
| "server": {
| "type": "ip"
| }
| }
|}""".stripMargin
val docs = Seq(
"""{
| "client": "192.168.0.10",
| "server": "100.10.12.123"
|}""".stripMargin,
"""{
| "client": "192.168.0.11",
| "server": "100.10.12.123"
|}""".stripMargin,
"""{
| "client": "2001:db8:3333:4444:5555:6666:7777:8888",
| "server": "2001:db8::1234:5678"
|}""".stripMargin)
index(indexName, oneNodeSetting, mappings, docs)
}

def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = {
openSearchClient.indices.create(
new CreateIndexRequest(index)
Expand Down
Loading