Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OpenSearch ip field type #1044

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ykmr1224
Copy link
Collaborator

Description

  • Support OpenSearch ip field type
  • This PR implements ip field to be stored as StringType.
  • Following section summarize the possible approach and their pros/cons. Considering the filtering can be pushed down to OpenSearch, We don't have much benefit to use UDT or Binary type.

Comparison of the possible approaches

  1. String Type (StringType)

    • ✅ Simple and fully compatible with Spark
    • ❌ Storage inefficient and slower for filtering
  2. User Defined Type (UDT)

    • ✅ Enables custom IP-specific operations (e.g., subnet calculations)
    • ❌ Requires additional maintenance and potential compatibility issues
    • ❌ Serialization / Deserialization overhead
  3. Binary Type (BinaryType)

    • ✅ More storage-efficient and performant
    • ❌ Less readable and requires explicit type conversion

Related Issues

#1034

Check List

  • Updated documentation (docs/ppl-lang/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • New added source code should include a copyright header
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Collaborator

@dai-chen dai-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder any benefits for UDT approach if we build functions on IP type in future?

@ykmr1224
Copy link
Collaborator Author

Just wonder any benefits for UDT approach if we build functions on IP type in future?

If we expect very intensive comparison on ip address field, UDT could perform better since it can store the IP address data in more optimized way (like byte array), and don't need to parse text every time.
One possible use case is join between ip address and ip address range (or CIDR), but considering Spark doesn't have functions to support ip address related calculation/comparison, I think this is not a popular use case.

val ip4 = "2001:db8::1234:5678"
checkAnswer(df, Seq(Row(ip0, ip2), Row(ip1, ip2), Row(ip3, ip4)))

df = spark.sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does = push down to DSL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is pushed down.

@currantw
Copy link
Contributor

Further to this conversation, #3145 added support for an IP address data type in OpenSearch SQL (corresponding pull request).

In OpenSearch SQL, the IP address data type:

  • Uses the inet.ipaddr library.
  • Supports equality and comparison operators. Two IP address are equal if they represent the same address, even if they have different string representations.
  • Supports conversion to/from strings and comparison between IP addresses and strings.
  • Supports sorting IP addresses.

For consistency between OpenSearch SQL and Spark, it probably makes sense to have the same behaviour in OpenSearch Spark - or at least to work towards it? I'm not particularly with how UDT's work in Spark, but I assume this would be necessary to implement the above functionality?

Note that there is also an existing cidrmatch function in OpenSearch Spark (see issue and pull request). I assume that any new IP data should work with that existing function, and that the associated documentation/tests/etc. should be updated if needed?

@currantw
Copy link
Contributor

Just wonder any benefits for UDT approach if we build functions on IP type in future?

If we expect very intensive comparison on ip address field, UDT could perform better since it can store the IP address data in more optimized way (like byte array), and don't need to parse text every time. One possible use case is join between ip address and ip address range (or CIDR), but considering Spark doesn't have functions to support ip address related calculation/comparison, I think this is not a popular use case.

As mentioned in my comment above, there is an existing cidrmatch function in PPL that may be relevant.

@ykmr1224
Copy link
Collaborator Author

Further to this conversation, #3145 added support for an IP address data type in OpenSearch SQL (corresponding pull request).

In OpenSearch SQL, the IP address data type:

  • Uses the inet.ipaddr library.
  • Supports equality and comparison operators. Two IP address are equal if they represent the same address, even if they have different string representations.
  • Supports conversion to/from strings and comparison between IP addresses and strings.
  • Supports sorting IP addresses.

For consistency between OpenSearch SQL and Spark, it probably makes sense to have the same behaviour in OpenSearch Spark - or at least to work towards it? I'm not particularly with how UDT's work in Spark, but I assume this would be necessary to implement the above functionality?

Note that there is also an existing cidrmatch function in OpenSearch Spark (see issue and pull request). I assume that any new IP data should work with that existing function, and that the associated documentation/tests/etc. should be updated if needed?

Thank you very much for the extra context.
I didn't notice ip address support was added to OpenSearch SQL. It would make more sense to keep consistency.
Seems cidrmatch works against STRING field, and we need to consider it if we use UDT to store IP field.
Let me explore options to fulfill the compatibility.

@ykmr1224
Copy link
Collaborator Author

I did some more investigation regarding UDT and tried if we can utilize it to realize similar functionality as in SQL plugin.
I an not quite familiar with the internal of Spark, please let me know if you have further insights or objection to my findings.

Refer this commit for the implementation.
I identified several gaps:

  • Even if we use UDT, Spark calculates the equality by serialized data. In the case of above implementation, the equality is calculated by UTF8String. This means we cannot realize equality between different notation. (If we serialize to normalized IP address, it can realize equality check but that cannot recover the original notation)
  • We cannot automatically cast string to UDT. In case of WHERE ip = '192.168.0.10', we need explicit type conversion like WHERE ip = text_to_ip('192.168.0.10') (We might be able to implement custom analyzer rule to apply implicit type coercion, I haven't tried it yet.)
  • If we need explicit type conversion, we cannot push down the predicate since the type conversion is done in Spark.

I'll try if implicit type coercion is possible, but let me know your thoughts.

@LantaoJin
Copy link
Member

Refer this commit for the implementation.

I love this UDT implementation. We will try UDT in Calcite too.

@ykmr1224
Copy link
Collaborator Author

ykmr1224 commented Feb 14, 2025

I found the implementation of equality check, and we cannot override the behavior unless we make change to Spark repository.

Equality is checked by generated code, and executed in executor. The code generation is defined in CodeGenerator.genEqual

  def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match {
    case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
    case FloatType =>
      s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2)"
    case DoubleType =>
      s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2)"
    case st: StringType if st.supportsBinaryOrdering => s"$c1.binaryEquals($c2)"
    case st: StringType => s"$c1.semanticEquals($c2, ${st.collationId})"
    case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
    case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
    case array: ArrayType => genComp(array, c1, c2) + " == 0"
    case struct: StructType => genComp(struct, c1, c2) + " == 0"
    case udt: UserDefinedType[_] => genEqual(udt.sqlType, c1, c2)
    case CalendarIntervalType => s"$c1.equals($c2)"
    case NullType => "false"
    case _ =>
      throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError(
        "equality", dataType)
  }

If we look at case udt: ... it falls down to udt.sqlType, which means the equality checking code is generated based on the underlying type used for serialization. And it does not have any extension point to override this behavior.
One possible workaround is to normalize the ip address before serializing it, but it would change the notation from original form and affect the output (even if original data is ::ffff:192.168.0.120, output would be 192.168.0.120 and we cannot recover original form).

I think it is better storing it as String and prepare matching functions for ip address if users need.

@currantw @penghuo @dai-chen

Could you give your opinion?

@ykmr1224 ykmr1224 added backport 0.x Backport to 0.x branch (stable branch) and removed backport 0.x Backport to 0.x branch (stable branch) labels Feb 14, 2025
@penghuo
Copy link
Collaborator

penghuo commented Feb 14, 2025

I found the implementation of equality check, and we cannot override the behavior unless we make change to Spark repository.

Equality is checked by generated code, and executed in executor. The code generation is defined in CodeGenerator.genEqual

  def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match {
    case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
    case FloatType =>
      s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2)"
    case DoubleType =>
      s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2)"
    case st: StringType if st.supportsBinaryOrdering => s"$c1.binaryEquals($c2)"
    case st: StringType => s"$c1.semanticEquals($c2, ${st.collationId})"
    case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
    case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
    case array: ArrayType => genComp(array, c1, c2) + " == 0"
    case struct: StructType => genComp(struct, c1, c2) + " == 0"
    case udt: UserDefinedType[_] => genEqual(udt.sqlType, c1, c2)
    case CalendarIntervalType => s"$c1.equals($c2)"
    case NullType => "false"
    case _ =>
      throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError(
        "equality", dataType)
  }

If we look at case udt: ... it falls down to udt.sqlType, which means the equality checking code is generated based on the underlying type used for serialization. And it does not have any extension point to override this behavior. One possible workaround is to normalize the ip address before serializing it, but it would change the notation from original form and affect the output (even if original data is ::ffff:192.168.0.120, output would be 192.168.0.120 and we cannot recover original form).

I think it is better storing it as String and prepare matching functions for ip address if users need.

@currantw @penghuo @dai-chen

Could you give your opinion?

  1. Mapping to Spark SQL Data Type
    Since Spark SQL lacks a native IP data type, it supports implicit conversion. Mapping the OpenSearch index IP field as a string in Spark SQL is sensible because comparing string values works naturally (i.e., stringValue = stringValue). IMO, Spark-SQL and OpenSearch-SQL could maintain their own dialects. @dai-chen @LantaoJin what is your thoughts?

  2. Mapping to Spark PPL Data Type
    PPL supports an IP data type along with implicit conversion. Therefore, mapping the IP field as an IP data type in Spark PPL is appropriate, enabling implicit conversion for expressions such as ipValue = stringValue. In my opinion, it is acceptable not to support implicit conversion on extended UDTs, and this limitation can be noted in the documentation.

@dai-chen
Copy link
Collaborator

dai-chen commented Feb 18, 2025

@ykmr1224 I see some UDT example like this: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala.

Can the custom equals in the example work for us?

@ykmr1224
Copy link
Collaborator Author

@ykmr1224 I see some UDT example like this: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala.

Can the custom equals in the example work for us?

I initially refer that example and thought we can override the equal method, but as I mentioned above, it didn't work and I was able to identify the actual implementation where override is not considered.

@ykmr1224
Copy link
Collaborator Author

ykmr1224 commented Feb 21, 2025

Recap of the comparison between storing ip field as string and UDT. I prefer StringType considering the simpler notation for condition and predicate push down.

  1. String Type (StringType)
    • ✅ Simple and fully compatible with Spark
    • ✅ Predicate can be pushed down
    • ❌ Equality check work differently when predicate is pushed down (it works as string match in spark, but work as ip address match in OpenSearch)
    • ❌ Slower for filtering executed in Spark

Query Example:

... WHERE ip_field = '192.168.0.10';
... WHERE cidrmatch(ip_field, '192.168.0.0/24'); -- takes (string, string) as params
  1. User Defined Type (UDT)
    (Suppose we store normalized form in UDT for consistent equality check)

    • ✅ Provide type safe implementation for UDFs
    • ❌ Explicit type conversion will be needed when writing conditions, and it prevents predicate push down
    • ❌ Output would be different from original notation (due to normalization)
    • ❌ Serialization / Deserialization overhead

Query Example:

... WHERE ip_field = text_to_ip('192.168.0.10');
... WHERE cidrmatch(ip_field, '192.168.0.0/24'); -- takes (IPAddress, string) as params

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
@penghuo
Copy link
Collaborator

penghuo commented Feb 25, 2025

❌ Explicit type conversion will be needed when writing conditions, and it prevents predicate push down

In WHERE cidrmatch(ip_field, '192.168.0.0/24') cidrmatch can be pushed down, right?.

Couple questions.

  • What is expected result If literal value is not valid?
  • How to support ip type data in create table statement?

@ykmr1224
Copy link
Collaborator Author

ykmr1224 commented Feb 27, 2025

In WHERE cidrmatch(ip_field, '192.168.0.0/24') cidrmatch can be pushed down, right?

I suppose it is possible (haven't been able to realize it), but we would need special logic to push this down.

What is expected result If literal value is not valid?

Literal value would be handled in the function. And the function can ignore invalid IP address or raise error depending on the requirements.

How to support ip type data in create table statement?

I found Spark doesn't support UDT in create table statement.
To create table with UDT field, we need to use DataFrame API.
With this approach, we can make a field with metadata and it won't differentiate UDT and type mapping with metadata.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants