Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Unify all PPL functions in BuiltinFunctionName #1062

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.opensearch.flint.spark.ppl

import java.util

import org.opensearch.sql.expression.function.BuiltinFunctionName.IP_TO_INT
import org.opensearch.sql.expression.function.BuiltinFunctionName.IS_IPV4
import org.opensearch.sql.expression.function.SerializableUdf.visit
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

Expand Down Expand Up @@ -57,8 +59,8 @@ class FlintSparkPPLGeoipITSuite
ipAddress: UnresolvedAttribute,
left: LogicalPlan,
right: LogicalPlan): LogicalPlan = {
val is_ipv4 = visit("is_ipv4", util.List.of[Expression](ipAddress))
val ip_to_int = visit("ip_to_int", util.List.of[Expression](ipAddress))
val is_ipv4 = visit(IS_IPV4, util.List.of[Expression](ipAddress))
val ip_to_int = visit(IP_TO_INT, util.List.of[Expression](ipAddress))

val t1 = SubqueryAlias("t1", left)
val t2 = SubqueryAlias("t2", right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ package org.opensearch.flint.spark.ppl

import java.util

import org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_APPEND
import org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_DELETE
import org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTEND
import org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_SET
import org.opensearch.sql.expression.function.SerializableUdf.visit

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
Expand Down Expand Up @@ -408,7 +412,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_DELETE, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -429,7 +433,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_DELETE, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -450,7 +454,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"f1\":\"abc\",\"f2\":{\"f3\":\"a\",\"f4\":\"b\"}}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_DELETE, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -475,7 +479,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_DELETE, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -500,7 +504,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_DELETE, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -525,7 +529,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_set", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_SET, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -552,7 +556,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_set", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_SET, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -573,7 +577,7 @@ class FlintSparkPPLJsonFunctionITSuite
val jsonObjExp =
Literal("{\"f1\":\"abc\",\"f2\":{\"f3\":\"a\",\"f4\":\"b\"}}")
val jsonFunc =
Alias(visit("json_set", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_SET, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -598,7 +602,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_set", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_SET, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -625,7 +629,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -653,7 +657,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -681,7 +685,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -709,7 +713,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -737,7 +741,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -765,7 +769,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -797,7 +801,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_APPEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -824,7 +828,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand All @@ -849,7 +853,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -877,7 +881,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -905,7 +909,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -933,7 +937,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -961,7 +965,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down Expand Up @@ -993,7 +997,7 @@ class FlintSparkPPLJsonFunctionITSuite
Literal(
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")
val jsonFunc =
Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")()
Alias(visit(JSON_EXTEND, util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ public enum BuiltinFunctionName {
JSON_EXTRACT(FunctionName.of("json_extract")),
JSON_KEYS(FunctionName.of("json_keys")),
JSON_VALID(FunctionName.of("json_valid")),
JSON_DELETE(FunctionName.of("json_delete")),
JSON_SET(FunctionName.of("json_set")),
JSON_APPEND(FunctionName.of("json_append")),
JSON_EXTEND(FunctionName.of("json_extend")),
// JSON_ARRAY_ALL_MATCH(FunctionName.of("json_array_all_match")),
// JSON_ARRAY_ANY_MATCH(FunctionName.of("json_array_any_match")),
// JSON_ARRAY_FILTER(FunctionName.of("json_array_filter")),
Expand Down Expand Up @@ -293,7 +297,13 @@ public enum BuiltinFunctionName {
MULTIMATCHQUERY(FunctionName.of("multimatchquery")),
WILDCARDQUERY(FunctionName.of("wildcardquery")),
WILDCARD_QUERY(FunctionName.of("wildcard_query")),
COALESCE(FunctionName.of("coalesce"));
COALESCE(FunctionName.of("coalesce")),

/** IP Relevance Function. */
CIDR(FunctionName.of("cidr")),
IS_IPV4(FunctionName.of("is_ipv4")),
IP_TO_INT(FunctionName.of("ip_to_int")),
;

private FunctionName name;

Expand Down
Loading
Loading