Skip to content

Create Utilities related to sort #3923

@carloea2

Description

@carloea2

Background

There are multiple places where operations are implemented with explicit type checks (pattern-matching on AttributeType) and duplicated logic. Including comparisons, arithmetic (addition), and default values for different data types (min, max). Examples:

  • AggregationOperation: Implements comparison, addition, zero, min, and max for each numeric type via pattern matching:

    attributeType match {
    case AttributeType.INTEGER => a.asInstanceOf[Integer].compareTo(b.asInstanceOf[Integer])
    case AttributeType.DOUBLE =>
    a.asInstanceOf[java.lang.Double].compareTo(b.asInstanceOf[java.lang.Double])
    case AttributeType.LONG =>
    a.asInstanceOf[java.lang.Long].compareTo(b.asInstanceOf[java.lang.Long])
    case AttributeType.TIMESTAMP =>
    a.asInstanceOf[Timestamp].getTime.compareTo(b.asInstanceOf[Timestamp].getTime)
    case _ =>

    private def maxValue(attributeType: AttributeType): Object =
    attributeType match {
    case AttributeType.INTEGER => Integer.MAX_VALUE.asInstanceOf[Object]
    case AttributeType.DOUBLE => java.lang.Double.MAX_VALUE.asInstanceOf[Object]
    case AttributeType.LONG => java.lang.Long.MAX_VALUE.asInstanceOf[Object]
    case AttributeType.TIMESTAMP => new Timestamp(java.lang.Long.MAX_VALUE)
    case _ =>
    throw new UnsupportedOperationException(
    "Unsupported attribute type for max value: " + attributeType

  • StableMergeSortOpExec: Defines compareTypedNonNullValues with a large match on AttributeType for ordering tuples:

    case AttributeType.INTEGER =>
    java.lang.Integer.compare(
    leftValue.asInstanceOf[Number].intValue(),
    rightValue.asInstanceOf[Number].intValue()
    )
    case AttributeType.LONG =>
    java.lang.Long.compare(
    leftValue.asInstanceOf[Number].longValue(),
    rightValue.asInstanceOf[Number].longValue()

  • SortPartitionsOpExec: Also uses a match on AttributeType to perform a less-than comparison for sorting each partition:

    case AttributeType.LONG =>
    t1.getField[Long](attributeIndex) < t2.getField[Long](attributeIndex)
    case AttributeType.INTEGER =>
    t1.getField[Int](attributeIndex) < t2.getField[Int](attributeIndex)
    case AttributeType.DOUBLE =>
    t1.getField[Double](attributeIndex) < t2.getField[Double](attributeIndex)
    case _ =>
    true // unsupported type

  • IntervalJoinOpExec: Uses a chain of if (dataType == X) conditions to handle interval comparisons for Long, Double, Integer, and Timestamp types. While this operator’s logic is more complex, and may be difficult to refactor, I still include it here for context purposes:

    if (dataType == AttributeType.LONG) {
    val pointValue: Long = point.asInstanceOf[Long]
    val leftBoundValue: Long = leftBound.asInstanceOf[Long]
    val constantValue: Long = desc.constant
    val rightBoundValue: Long = leftBoundValue + constantValue
    result = processNumValue[Long](
    pointValue,
    leftBoundValue,
    rightBoundValue

What is available:

  • AttributeTypeUtils: Provides parsing and type-casting utilities (e.g., parsing strings to numeric types, inferring schema types), but it does not provide general arithmetic or comparison operations per type.

  • ComparisonType (used in FilterPredicate) defines comparison operators like EQUAL, LESS_THAN, etc.

  • TupleUtils

Proposed elements:

  • Add an utility object AttributeTypeOperations (or direct additions to AttributeTypeUtils), to provide methods like:
    compare(a: Any, b: Any, t: AttributeType): Int
    add(a: Any, b: Any, t: AttributeType): Any
    zeroValue(t: AttributeType): Any
    minValue/maxValue(t: AttributeType): Any
    Usage example: Instead of each operator doing attributeType match { case AttributeType.INTEGER => ... }, they call AttributeTypeUtils.compare(a, b, attributeType).

  • Add Convenience Builder for Schema and Tuple respectively in their classes, examples of implementation:

    /** Build a Schema with (name, type) pairs, in-order. */
    private def schemaOf(attributes: (String, AttributeType)*): Schema = {
    attributes.foldLeft(Schema()) {
    case (acc, (name, attrType)) => acc.add(new Attribute(name, attrType))
    }
    }

    private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
    val valueMap = values.toMap
    val builder = Tuple.builder(schema)
    schema.getAttributeNames.asJava.forEach { name =>
    builder.add(schema.getAttribute(name), valueMap(name))
    }
    builder.build()
    }

    As an alternative to do not clutter the core API those elements could be added to TupleUtils.

@Yicong-Huang brought the original overall idea in the PR: #3774

Priority

P3 – Low

Task Type

  • Code Implementation
  • Documentation
  • Refactor / Cleanup
  • Testing / QA
  • DevOps / Deployment

Metadata

Metadata

Assignees

No one assigned

    Labels

    triagePending for triaging

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions