Skip to content

Use merge and set validity in cast struct to struct#14846

Open
rishic3 wants to merge 1 commit into
NVIDIA:mainfrom
rishic3:opt-cast-struct-to-struct
Open

Use merge and set validity in cast struct to struct#14846
rishic3 wants to merge 1 commit into
NVIDIA:mainfrom
rishic3:opt-cast-struct-to-struct

Conversation

@rishic3
Copy link
Copy Markdown
Collaborator

@rishic3 rishic3 commented May 20, 2026

Contributes to #14588.

Description

The struct-to-struct path in castStructToStruct originally restored parent struct nulls with isNull / ifElse after building the casted struct view. This change uses mergeAndSetValidity to set the casted struct validity from the input struct directly, preserving the same output values and parent null mask while avoiding the extra null-mask and conditional copy.
This is covered by existing cast tests, e.g. test_cast_day_time_interval_to_integral_no_overflow and test_cast_integral_to_day_time_interval_no_overflow.

Benchmark

Nanobenchmark

Isolating the rewritten block in a nanobenchmark shows 3.3x speedup.

struct-to-struct null restore nanobenchmark
import java.util.Arrays

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, Cuda, DType, Rmm, RmmAllocationMode,
  Scalar}
import ai.rapids.cudf.HostColumnVector.{BasicType, StructData, StructType}

object GpuCastStructMergeValidityBench {
  val NumRows = 100000
  val Warmup = 5
  val Measured = 20

  def main(args: Array[String]): Unit = {
    Rmm.initialize(RmmAllocationMode.POOL, null, Cuda.memGetInfo().free / 2 & ~255L)

    withResource(buildStructCol(NumRows)) { input =>
      for (_ <- 0 until Warmup) {
        structBase(input).close()
        structNew(input).close()
      }
      val timesBase = (0 until Measured).map { _ =>
        val t = System.nanoTime()
        structBase(input).close()
        System.nanoTime() - t
      }
      val timesNew = (0 until Measured).map { _ =>
        val t = System.nanoTime()
        structNew(input).close()
        System.nanoTime() - t
      }
      println(f"BASE median: ${timesBase.sorted.apply(Measured / 2) / 1e6}%.3f ms")
      println(f"NEW  median: ${timesNew.sorted.apply(Measured / 2) / 1e6}%.3f ms")
    }
  }

  // Pre-change castStructToStruct pipeline: build casted children into a struct view,
  // then restore input parent nulls with isNull + ifElse.
  def structBase(input: ColumnVector): ColumnVector = {
    withResource(input.getChildColumnView(0)) { intChild =>
      val castInt = intChild.castTo(DType.INT64)
      withResource(castInt) { _ =>
        withResource(input.getChildColumnView(1)) { strChild =>
          withResource(strChild.copyToColumnVector()) { castStr =>
            withResource(ColumnView.makeStructView(castInt, castStr)) { casted =>
              withResource(input.isNull) { isNull =>
                withResource(Scalar.structFromNull(
                  new BasicType(true, DType.INT64),
                  new BasicType(true, DType.STRING))) { nullStruct =>
                  isNull.ifElse(nullStruct, casted)
                }
              }
            }
          }
        }
      }
    }
  }

  // mergeAndSetValidity replaces the trailing isNull + ifElse + null-scalar pipeline.
  def structNew(input: ColumnVector): ColumnVector = {
    withResource(input.getChildColumnView(0)) { intChild =>
      val castInt = intChild.castTo(DType.INT64)
      withResource(castInt) { _ =>
        withResource(input.getChildColumnView(1)) { strChild =>
          withResource(strChild.copyToColumnVector()) { castStr =>
            withResource(ColumnView.makeStructView(castInt, castStr)) { casted =>
              casted.mergeAndSetValidity(BinaryOp.BITWISE_AND, input)
            }
          }
        }
      }
    }
  }

  // ~25% null parent structs, with nullable children to exercise nested null sanitization.
  def buildStructCol(n: Int): ColumnVector = {
    val structType = new StructType(true,
      new BasicType(true, DType.INT32),
      new BasicType(true, DType.STRING))
    val rows = Arrays.asList(Array.tabulate(n) { i =>
      if (i % 4 == 0) {
        null
      } else {
        new StructData(
          Integer.valueOf(i),
          if (i % 7 == 0) null else f"s$i%05d")
      }
    }: _*)
    ColumnVector.fromStructs(structType, rows)
  }

  def withResource[T <: AutoCloseable, R](r: T)(f: T => R): R =
    try f(r) finally if (r != null) r.close()
}
BASE median: 0.464 ms
NEW  median: 0.139 ms
Spark benchmark

An end-to-end Spark benchmark shows 1.14-1.15x speedup.

spark-shell \
  --master local[*] \
  --conf spark.driver.memory=16g \
  --conf spark.sql.shuffle.partitions=8 <<'EOF'
import org.apache.spark.sql.functions._

val rows = sys.env.getOrElse("BENCH_ROWS", "5000000").toLong
val structs = sys.env.getOrElse("BENCH_STRUCTS", "16").toInt
val out = "/tmp/spark-rapids-struct-bench/struct_data"
val cols = (0 until structs).map { i =>
  val value = struct(((col("id") + lit(i)) % Int.MaxValue).cast("int").alias("a"))
  when(((col("id") + lit(i)) % 4) === 0, lit(null).cast("struct<a:int>"))
    .otherwise(value)
    .alias(s"s$i")
}
spark.range(rows).select(cols: _*).write.mode("overwrite").parquet(out)
println(s"generated rows=$rows structs=$structs path=$out")
:quit
EOF
cat <<'EOF' | spark-shell \
  --master local[*] \
  --jars "$JAR" \
  --conf spark.driver.memory=16g \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.rapids.sql.explain=NONE \
  --conf spark.sql.files.maxPartitionBytes=256MB \
  --conf spark.sql.shuffle.partitions=8 \
  --conf spark.ui.showConsoleProgress=false
spark.sparkContext.setLogLevel("ERROR")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val path = "/tmp/spark-rapids-struct-bench/struct_data"
val structs = sys.env.getOrElse("BENCH_STRUCTS", "16").toInt
val warmup = 3
val measured = 7
val targetType = StructType(Seq(StructField("a", LongType, true)))
val df = spark.read.parquet(path)
val casted = df.select((0 until structs).map { i =>
  col(s"s$i").cast(targetType).alias(s"s$i")
}: _*)

def runOnce(): Double = {
  val t = System.nanoTime()
  casted.write.format("noop").mode("overwrite").save()
  (System.nanoTime() - t) / 1e6
}

for (i <- 1 to warmup) {
  println(f"warmup$i: ${runOnce()}%.1f ms")
}
val times = (1 to measured).map { i =>
  val ms = runOnce()
  println(f"run$i: $ms%.1f ms")
  ms
}
println(f"median: ${times.sorted.apply(measured / 2)}%.1f ms")
:quit
EOF
Build Runs (ms) Median
Baseline 460.2, 417.6, 437.7, 424.3, 415.0, 422.9, 408.5 422.9 ms
Optimized 424.8, 369.8, 392.7, 350.2, 349.4, 367.4, 346.1 367.4 ms

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
    (Please provide the names of the existing tests in the PR description.)
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

Signed-off-by: Rishi Chandra <rishic@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 20, 2026

Greptile Summary

This PR replaces the null-restoration logic in castStructToStruct (GpuCast.scala): the previous isNull + ifElse(nullStruct, casted) pipeline is replaced with a single casted.mergeAndSetValidity(BinaryOp.BITWISE_AND, input) call, trading a null-scalar creation and conditional copy for a cheaper bitwise mask merge.

  • Performance gain: the nanobenchmark shows 3.3× speedup at the isolated operation level; end-to-end Spark benchmark shows ~1.14–1.15× improvement.
  • Semantic concern: mergeAndSetValidity only patches the top-level struct validity. Children's null masks at null-parent rows are left with their casted values rather than being forced to null, diverging from the previous ifElse behaviour that produced fully-null structs. All existing mergeAndSetValidity calls in this file target flat STRING results; this is the first application to a nested (STRUCT) column, the exact pattern flagged in issue [BUG] stop using mergeAndSetValidity for any nested type #7485.

Confidence Score: 3/5

The optimization is straightforward but changes null-propagation semantics for nested struct columns in a way that may silently break Parquet/ORC serialization or any operation that accesses child columns without checking the parent validity mask.

The replacement of isNull+ifElse with mergeAndSetValidity on a struct column leaves children with non-null values at rows where the parent struct is null. Spark-rapids issue #7485 was filed specifically about this pattern. All pre-existing mergeAndSetValidity calls in the same file target flat STRING results; this is the first use on a nested type. Existing tests exercise the cast path but do not directly assert child-column null masks, so a subtle regression in serialization or downstream nested-access could go undetected.

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala — specifically the castStructToStruct method and how its output is consumed by Parquet/ORC writers and any nested struct projection paths.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala Replaces isNull+ifElse with mergeAndSetValidity in castStructToStruct; the new call only patches the top-level struct validity and leaves children's null masks unchanged for null parent rows, which is the problematic pattern called out in issue #7485 for nested types.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant castStructToStruct
    participant doCast
    participant cuDF

    Caller->>castStructToStruct: input StructType with nulls
    loop for each child field
        castStructToStruct->>doCast: getChildColumnView(i)
        doCast-->>castStructToStruct: castChild[i]
    end
    castStructToStruct->>cuDF: makeStructView(castChildren)
    cuDF-->>castStructToStruct: castedView all rows valid
    alt "input.getNullCount == 0"
        castStructToStruct->>cuDF: castedView.copyToColumnVector()
    else has nulls NEW path
        castStructToStruct->>cuDF: castedView.mergeAndSetValidity BITWISE_AND input
        Note over cuDF: Top-level struct mask patched only
        Note over cuDF: Children keep cast values at null rows
    end
    castStructToStruct-->>Caller: ColumnVector
Loading

Reviews (1): Last reviewed commit: "use merge and set validity in cast struc..." | Re-trigger Greptile

isNull.ifElse(nullVal, casted)
}
}
casted.mergeAndSetValidity(BinaryOp.BITWISE_AND, input)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 mergeAndSetValidity on nested struct type only updates top-level validity

mergeAndSetValidity replaces the top-level null mask of the struct, but the child columns' validity bitmaps are not updated. For any row where input was null, the casted children will retain their values (the results of casting whatever was in the child slots under the null parent) rather than being null. This is the same class of issue tracked in #7485, which specifically calls out mergeAndSetValidity on nested types.

Every other existing call to mergeAndSetValidity in this file is applied to the result of ColumnVector.stringConcatenate — a flat STRING column — not to a struct. The original isNull + ifElse(nullStruct, casted) path was correct because it materialized a fully-null struct (children included) at each null parent row. With the new path, a null parent struct can carry non-null child values, which may produce incorrect results if any downstream operation (e.g. Parquet/ORC writing, getChildColumnView(...).getNullCount(), or deeply-nested struct projections) accesses children without first consulting the parent validity mask.

Copy link
Copy Markdown
Collaborator Author

@rishic3 rishic3 May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at #7485 and #7698 and i'm just going to dump my understanding here. rapidsai/cudf#13335 helped make this API safe only for offset-based nested types, i.e. STRING or LIST: it added an identity gather to get fresh offsets, dropping any non-zero offsets where a parent was null. But if the child itself has a null mask, i.e. STRUCT or anything containing a STRUCT, we do nothing about lining up child null masks. And generally, cuDF makes no guarantees on a child's null mask being consistent with the parent null mask (with the contract being the parent's mask is authoritative), however, in a bunch of places Spark RAPIDS relies on child null masks directly and presumably that isn't going to change anytime soon.

TLDR; I will close this change; @revans2 I know back in #7698 you preferred getting rid of this everywhere, but since i'm starting to introduce it in more places, should we now update this API to throw on anything with a STRUCT?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets fix structs properly instead. The issue is that nulls do not properly get pushed down, but cudf has APIs to do this cudf::structs::detail::superimpose_and_sanitize_nulls(...) and cudf::structs::detail::push_down_nulls(...). We should be able to make it work with a small change, and very little overhead.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Filed rapidsai/cudf#22631, this PR will depend on its closure. I can take a crack at it.

@nvauto
Copy link
Copy Markdown
Collaborator

nvauto commented May 25, 2026

NOTE: release/26.06 has been created from main. Please retarget your PR to release/26.06 if it should be included in the release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants