[FEA] Pre-split regex in Project to keep complex patterns on GPU instead of CPU fallback#14888
[FEA] Pre-split regex in Project to keep complex patterns on GPU instead of CPU fallback#14888wjxiz1992 wants to merge 5 commits into
Conversation
) The estimator's numRows previously defaulted to gpuTargetBatchSizeBytes / StringType.defaultSize (~53M with the 1 GiB-batch / 20-byte defaults), which over-rejected regex on small inputs even when no real memory risk existed. Read stats.rowCount / sizeInBytes from the enclosing SparkPlanMeta's logicalLink and pass it as an optional rowsHint. The estimator always takes min(perBatchRows, rowsHint), so the hint can only tighten — never inflate — the memory estimate. For the 70k-row reproduction in issue NVIDIA#14887 with the (quick brown|brown fox|...) pattern, catalyst reports the projected Statistics(sizeInBytes=5.5 KiB) after column pruning, so bySize ≈ 281 and the per-batch ceiling drops from 6.4 GiB to ~35 KB. RLike now stays on GPU (further rewritten to GpuContainsAny). Local validation: - mvn package -pl tests -am -Dbuildver=330 \ -DwildcardSuites=com.nvidia.spark.rapids.RegexComplexityEstimatorSuite Tests: succeeded 5, failed 0 - spark-submit end-to-end with the user's regextestapp.py: Statistics(sizeInBytes=5.5 KiB) at the optimized Project; physical plan uses GpuRLike → GpuContainsAny; 70k-row query completes on GPU. ### Review notes nt-code-review reported 0 must-fix, 6 should-fix, 8 suggestions. Applied: NonFatal in the stats catch, @tailrec short-form, hoisted LongMaxBigInt, added space after `if`, treat sizeInBytes==0 as a real empty input (0L) rather than the unknown sentinel. Deferred for follow-up: caching the hint per SparkPlanMeta (NVIDIA#14887 follow-up if profiling shows the stats walk is hot), extracting a unit-testable helper, and an integration test that exercises the meta-tree parent walk end-to-end. One reviewer finding (drop asInstanceOf[SparkPlan]) was incorrect — the existential wildcard SparkPlanMeta[_] erases the INPUT <: SparkPlan bound, so the upcast is required for the return type; reverted with a comment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Allen Xu <allxu@nvidia.com>
Follow-up commit addressing the test-coverage and refactor findings from
the prior review that were deferred to keep the initial diff focused.
Changes:
- Extract `RegexComplexityEstimator.deriveRowsHint(rowCount, sizeInBytes)`
as a pure helper, so the algorithm is unit-testable directly without
standing up a real SparkPlan. `GpuRegExpUtils.estimateRowsHintFromStats`
is now a thin wrapper that does the meta-tree walk and delegates the
math. The previous "negative sizeInBytes treated as unknown" sentinel
is preserved; an out-of-range sizeInBytes now returns None rather than
Some(Long.MaxValue) so the estimator distinguishes "no hint" from
"hint says unbounded" cleanly.
- Hoist `STRING_DEFAULT_ROW_BYTES` to the estimator object and reuse it
from the helper. This removes the divergence between
RegexComplexityEstimator.scala and stringFunctions.scala in how they
read DataTypes.StringType.defaultSize.
- Add 9 unit tests for `deriveRowsHint` covering: no stats, both
fields present (min), rowCount-only, sizeInBytes-only,
sizeInBytes == 0 (empty), rowCount == 0 (empty after agg), negative
sizeInBytes (unknown sentinel), rowCount overflow, sizeInBytes
overflow.
Not changed:
- The reviewer's caching suggestion ("cache the hint per SparkPlanMeta
to avoid O(N) full-subtree stats computations") is a misdiagnosis.
`LogicalPlan.stats` is a `lazy val` per plan node, so repeated access
across N regex expressions on the same Project is O(1) after the
first call. The remaining per-expression cost is the parent-walk
(parent-pointer chase, bounded by expression tree depth, typically
1–3 levels) plus a small Option/BigInt allocation, neither of which
is a meaningful planning-time hot spot. A cache would add a long-lived
reference to SparkPlanMeta objects with no demonstrated benefit;
documented inline.
- The proposed integration test (calibrated maxStateMemoryBytes that
fails without stats and succeeds with) is hard to land reliably
because the calibration depends on table size and Spark's stat
estimation rules. The unit tests on `deriveRowsHint` cover the math;
end-to-end coverage of the parent-walk is provided by the existing
pyspark validation captured on PR #TBD.
Local validation:
- mvn package -pl tests -am -Dbuildver=330 \
-DwildcardSuites=com.nvidia.spark.rapids.RegexComplexityEstimatorSuite
Tests: succeeded 14, failed 0
- Rebuilt dist jar; re-ran the original 70k-row pyspark repro on GPU.
Statistics(sizeInBytes=5.5 KiB), RLike runs on GPU (rewritten to
GpuContainsAny) — same as before this commit.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Greptile SummaryThis PR fixes spurious CPU fallbacks for regex patterns on small inputs by feeding Catalyst plan statistics (
Confidence Score: 4/5Safe to merge; the The change is well-structured and safe: the parent-chain walk is guarded with
Important Files Changed
Sequence DiagramsequenceDiagram
participant EM as ExprMeta (e.g. RLike)
participant GRU as GpuRegExpUtils
participant RPM as SparkPlanMeta
participant LP as LogicalPlan
participant RCE as RegexComplexityEstimator
EM->>GRU: validateRegExpComplexity(meta, regex)
GRU->>GRU: findEnclosingPlan(meta) walk parent chain
GRU->>RPM: SparkPlanMeta.wrapped
RPM->>LP: logicalLink (Option[LogicalPlan])
LP-->>GRU: stats.rowCount, stats.sizeInBytes
GRU->>RCE: deriveRowsHint(rowCount, sizeInBytes)
RCE-->>GRU: Option[Long] rowsHint
GRU->>RCE: isValid(conf, regex, rowsHint)
RCE->>RCE: min(perBatchRows, rowsHint)
RCE->>RCE: saturatedMultiply(numStates, numRows, 2)
RCE-->>GRU: Boolean
GRU-->>EM: willNotWorkOnGpu (if false)
Reviews (1): Last reviewed commit: "Address nt-code-review deferred findings..." | Re-trigger Greptile |
| // Pure helper that derives a per-batch row hint from catalyst Statistics. Exposed so the | ||
| // meta-tree plumbing in `org.apache.spark.sql.rapids.GpuRegExpUtils` can call it AND the | ||
| // unit-test suite can exercise the edge cases (zero rows, overflow, missing rowCount, etc.) | ||
| // directly without standing up a real SparkPlan / LogicalPlan. | ||
| // | ||
| // Returns None when sizeInBytes carries no useful information (Spark's "no stats" sentinel | ||
| // is `defaultSizeInBytes = Long.MaxValue`; any negative value is treated the same). Returns | ||
| // Some(0L) when the relation is genuinely empty (rowCount == 0 or sizeInBytes == 0), in | ||
| // which case the estimator short-circuits to 0 memory. | ||
| def deriveRowsHint( | ||
| rowCount: Option[BigInt], | ||
| sizeInBytes: BigInt): Option[Long] = { | ||
| val byCount: Option[Long] = rowCount.map { c => | ||
| if (c <= 0) 0L | ||
| else if (c > LongMaxBigInt) Long.MaxValue | ||
| else c.toLong | ||
| } | ||
| val bySize: Option[Long] = | ||
| if (sizeInBytes < 0) None | ||
| else if (sizeInBytes > LongMaxBigInt) None | ||
| else Some((sizeInBytes / STRING_DEFAULT_ROW_BYTES).toLong) |
There was a problem hiding this comment.
Long.MaxValue sentinel is not filtered out, but the comment says it is
The header comment states "Returns None when sizeInBytes carries no useful information (Spark's 'no stats' sentinel is defaultSizeInBytes = Long.MaxValue; any negative value is treated the same)". However, the check sizeInBytes > LongMaxBigInt is false when sizeInBytes == BigInt(Long.MaxValue) because LongMaxBigInt == BigInt(Long.MaxValue). So the sentinel is silently converted to Some(Long.MaxValue / STRING_DEFAULT_ROW_BYTES) (~4.6e17) instead of None.
This is functionally safe — min(perBatchRows ≈ 53M, 4.6e17) still returns perBatchRows, so the old ceiling is preserved — but the comment is factually wrong, and the test suite does not include a case for sizeInBytes = BigInt(Long.MaxValue) (the test named "unknown sentinel" uses BigInt(-1) instead). Either the guard should be changed to sizeInBytes >= LongMaxBigInt to match the documented intent, or the comment should be corrected to reflect the actual > boundary.
There was a problem hiding this comment.
Stale — already addressed in 66d2734c1 (the previous follow-up commit). After the latest 511860139, the > Long.MaxValue sentinel logic and the negative-sentinel comment are gone entirely, since deriveRowsHint no longer takes sizeInBytes. Only rowCount is used now, with negative values defensively treated as 0L.
There was a problem hiding this comment.
Pull request overview
This PR tightens the regex GPU complexity check by incorporating Catalyst plan statistics (rowCount/sizeInBytes) as an optional rowsHint, reducing unnecessary CPU fallback for complex-but-safe regex patterns on small inputs.
Changes:
- Adds
rowsHint: Option[Long]toRegexComplexityEstimator.isValidand introducesderiveRowsHintas a pure helper for turning CatalystStatisticsinto a hint. - Extracts stats from the enclosing
SparkPlan’slogicalLinkduring expression tagging and passes the derived hint into the estimator. - Expands
RegexComplexityEstimatorSuitewith new tests coveringrowsHintbehavior andderiveRowsHintedge cases.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| tests/src/test/scala/com/nvidia/spark/rapids/RegexComplexityEstimatorSuite.scala | Adds unit tests for rowsHint and deriveRowsHint edge cases. |
| sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala | Computes a rowsHint from enclosing plan stats and feeds it into regex complexity validation. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexComplexityEstimator.scala | Extends estimator API to accept rowsHint and adds deriveRowsHint helper + constant. |
| // Walks up `meta.parent` to the enclosing SparkPlanMeta, reads catalyst stats from its | ||
| // linked LogicalPlan, and delegates to RegexComplexityEstimator.deriveRowsHint for the | ||
| // actual math. Returns None when no logical link is available (e.g. synthetic plans) or | ||
| // when stats access throws. The estimator always takes min(perBatchRows, rowsHint), so a | ||
| // loose hint is safe — it can only tighten, never inflate, the memory estimate. | ||
| // | ||
| // LogicalPlan.stats is a lazy val per plan node, so repeated access across multiple regex | ||
| // expressions on the same Project is O(1) after the first read. | ||
| private def estimateRowsHintFromStats(meta: ExprMeta[_]): Option[Long] = { | ||
| findEnclosingPlan(meta).flatMap(_.logicalLink).flatMap { logical => | ||
| val stats = try { | ||
| Some(logical.stats) | ||
| } catch { | ||
| case NonFatal(_) => None | ||
| } | ||
| stats.flatMap(s => RegexComplexityEstimator.deriveRowsHint(s.rowCount, s.sizeInBytes)) |
There was a problem hiding this comment.
Addressed in 511860139. You are right — when stats come from a Project, sizeInBytes reflects the OUTPUT byte width (regex result + a small key for example) rather than the regex INPUT column. Deriving rows from that ratio could under-estimate and let the per-batch min clamp accept patterns that should be rejected.
deriveRowsHint now takes only rowCount: Option[BigInt] and returns None when unavailable. The sizeInBytes parameter, the sizeInBytes / STRING_DEFAULT_ROW_BYTES fallback, the STRING_DEFAULT_ROW_BYTES constant, and the four sizeInBytes-specific test cases are all gone.
Verified: Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0 (com.nvidia.spark.rapids.RegexComplexityEstimatorSuite).
| // when stats access throws. The estimator always takes min(perBatchRows, rowsHint), so a | ||
| // loose hint is safe — it can only tighten, never inflate, the memory estimate. |
There was a problem hiding this comment.
Addressed in 511860139. After the Thread #2 fix (drop sizeInBytes fallback), the rowsHint is now an exact rowCount from catalyst — not a sizeInBytes heuristic — so it is guaranteed to be an upper bound on the rows the regex will see in any single batch. The safety claim in the doc comment is now factually correct as written (updated wording: "the hint, when present, is an exact rowCount from catalyst, so the estimators min(perBatchRows, rowsHint) clamp can only tighten the memory estimate, never inflate").
| // The existential wildcard erases the INPUT <: SparkPlan bound at the pattern site, so the | ||
| // upcast is needed to satisfy the return type. See nt-code-review finding rebuttal in the | ||
| // PR description. |
There was a problem hiding this comment.
Addressed in 511860139. Replaced the external nt-code-review reference with a self-contained comment that explains the Scala compiler limitation directly:
// The existential wildcard `SparkPlanMeta[_]` erases the INPUT <: SparkPlan bound at the
// pattern site, so the upcast is needed to satisfy the return type.
case sp: SparkPlanMeta[_] => Some(sp.wrapped.asInstanceOf[SparkPlan])The PR description is no longer load-bearing for understanding the upcast.
| else c.toLong | ||
| } | ||
| val bySize: Option[Long] = | ||
| if (sizeInBytes < 0) None |
There was a problem hiding this comment.
Stale — the sizeInBytes sentinel branch is gone in 511860139. deriveRowsHint now only takes rowCount; the > LongMaxBigInt and < 0 sentinel arms for sizeInBytes were removed along with the parameter itself. Negative rowCount is defensively treated as 0L; sentinel handling for rowCount (which catalyst returns as None when unavailable) is unchanged from the original.
| test("deriveRowsHint returns None when no stats are available") { | ||
| assert(RegexComplexityEstimator.deriveRowsHint(None, BigInt(-1)).isEmpty) |
There was a problem hiding this comment.
Stale — the sizeInBytes-sentinel test rewrite is moot in 511860139. deriveRowsHint no longer accepts sizeInBytes, so the four sizeInBytes-specific test cases (fallback, min(byCount, bySize), unknown-sentinel, overflow) were removed. The remaining test set covers rowCount-only edge cases: missing, present, zero, negative (defensive 0L), and overflow to Long.MaxValue.
revans2
left a comment
There was a problem hiding this comment.
I am quite concerned about the accuracy of this. The metrics in spark that you are using are notoriously inaccurate, even when CBO is enabled, which no one does. For 70k rows is the GPU even the right choice generally? What is the performance difference?
|
@revans2 Thanks for pointing this out! I did see the problem of performance and The issue is raised by QA team, their tests failed due to the regex expression fallback to CPU so they were challenging like "why even a 70k rows table forces the regex operation fallback to CPU"? Then I started the whole story. For short workaround I've suggested they can modify the For production case, I also think current configs/algo is fine, but not 100% sure it's ok to convince QA. |
…VIDIA#14887) Copilot review surfaced an under-estimation risk: when `rowCount` was missing, the helper fell back to `sizeInBytes / STRING_DEFAULT_ROW_BYTES` on the enclosing Project's stats. That sizeInBytes reflects the Project's OUTPUT byte width — regex result + a small key, for example — not the regex INPUT column's width, so the derived row count could be far below actual rows and the per-batch `min` clamp would silently let oversized patterns through to the GPU. Changes: - `deriveRowsHint` now takes only `rowCount: Option[BigInt]` and returns None when it is unavailable. The unused `sizeInBytes` parameter and the `STRING_DEFAULT_ROW_BYTES` constant are removed. - `stringFunctions.estimateRowsHintFromStats` updates the caller and its doc comment to reflect that the hint is now an exact rowCount, so the estimator's `min(perBatchRows, rowsHint)` clamp can only tighten the memory estimate, never inflate (the safety claim Copilot flagged). - `findEnclosingPlan` comment no longer references the external nt-code-review thread — explains the Scala existential-wildcard upcast inline. - `RegexComplexityEstimatorSuite` drops the four sizeInBytes-specific test cases (fallback, min(byCount, bySize), unknown-sentinel, overflow) and keeps the rowCount-only edge cases. Mvn summary: Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0 (com.nvidia.spark.rapids.RegexComplexityEstimatorSuite, buildver=330). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Allen Xu <allxu@nvidia.com>
|
@revans2 — on your stats-accuracy concern from the top-level review: agreed that Spark stats are unreliable in general, which is now reflected in the implementation. The rowsHint is intentionally conservative on three axes:
So the failure mode is: stats CLAIM 70k rows when actual is, say, 5M. In that case the estimator under-estimates memory; the per-batch ceiling still bounds at ≈53M rows / batch, which is the original pre-PR behavior; and if the regex actually overflows at runtime the cuDF kernel still throws — we have not removed any safety net. The hint can only move the rejection threshold tighter for cases where stats are accurate enough to make a small-input regex regress to CPU. Tightening is opt-in via existing stats infrastructure; if you want a stronger guard I can add a CI is green (49/49). Ready for re-review when you have a moment. |
|
@revans2 — ran the GPU-vs-CPU comparison you asked for, and while doing it I verified what the current code actually does on the repro. Both are below, and I think the second part changes the plan. PerformanceSingle RTX 5880 Ada,
So "is GPU the right choice at 70k?" — marginally no in isolation, but the rejection produces the worst of the three outcomes, not the CPU one, and the same blind rule blacklists the large-N cases where GPU wins big. …but the current implementation is inert for the reproYou were more right about stats than the PR assumed. I rebuilt at HEAD (
A path-based file read reports no Where I'd like your design inputGiven that, I don't want to ship the hint as-is. The options I currently see:
I'm leaning (b) as the actual fix, with the perf data above as the justification for bothering at all. But you have the most context on the regex memory model and why the per-batch ceiling is shaped the way it is — is there a more sensible design than either of these? Happy to rework toward whatever you think is right (including "the ceiling is fine, just close this"). |
Instead of rejecting a regex whose worst-case NFA scratch (numStates * rowsPerBatch * 2) exceeds spark.rapids.sql.regexp.maxStateMemoryBytes, GpuProjectExec now pre-splits input batches to a per-batch row budget so the scratch fits, keeping the pattern on the GPU instead of falling back to CPU. - RegexComplexityEstimator: expose numStates; add maxRowsForStateBudget, estimatedSplits, isValidWithPreSplit, and the GpuRegexNumStates meta trait. - New conf spark.rapids.sql.regexp.maxPreSplits (default 32; 1 disables pre-split) and the regenerated advanced_configs.md entry. - The RLike / RegExpReplace / RegExpExtract / RegExpExtractAll metas mix GpuRegexNumStates; GpuProjectExec walks the child-expr meta tree for the max state count and pre-splits to the budget via PreProjectSplitIterator. - Gate relaxed only for these regex metas inside a Project and only within the maxPreSplits cap; Filter/Join/Aggregate and split-with-regex stay as before. - GpuProjectExec gains a 4th (defaulted) field; update the 3-arg GpuProjectExec extractor patterns in the delta-lake providers to the new arity. - Tests: RegexComplexityEstimatorSuite covers the estimator math and the cap; regexp_test.py covers in-Project pre-split GPU==CPU, the maxPreSplits=1 kill switch, and outside-Project (filter) CPU fallback. ### Review notes (nt-code-review; informational, deferred) - estimatedSplits / isValid / numStates each call countStates on the same AST at planning time (pure; left for clarity). - The per-meta numStates field + override is repeated in 4 metas; could be hoisted into GpuRegexNumStates as a concrete field. - ceilDiv is duplicated in PreProjectSplitIterator and estimatedSplits. - countStates' SimpleQuantifier inner match has no default case (unchanged code; the parser only ever produces * + ?, so it is unreachable today). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Allen Xu <allxu@nvidia.com>
Supersedes the catalyst rowsHint approach (verified inert for path-based file reads) with the GpuProjectExec pre-split implementation. The net diff versus main is the pre-split change only; the earlier rowsHint commits remain in history but their content is replaced by this merge's tree. Signed-off-by: Allen Xu <allxu@nvidia.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
@wjxiz1992 I did a bit more digging and I think the right fix is to remove the complexity estimation entirely. It was added in as a part of #4061 to avoid the need for reserve stack space (Effectively it spilled from the kernel to GPU memory and that would cause problems with RMM using all of the memory). But rapidsai/cudf#10600 changed it so that the extra memory is now allocated through RMM. So the entire point of having the complexity stopped mattering 4 years ago and we have been wasting our time on it ever since. I think that the right fix now is to delete the complexity gate entirely and see what happens. |
Closes #14887
Summary
This supersedes the original
rowsHintapproach on this PR. Local verification showed the catalyst-rowCounthint is inert for the common case: a path-basedspark.read.orc/parquet(path)reportsrowCount = Noneeven with CBO, so the hint was absent and the regex still fell back to CPU (see the findings comment and the design discussion).Instead of rejecting a regex whose worst-case NFA scratch (
numStates * rowsPerBatch * 2) exceedsspark.rapids.sql.regexp.maxStateMemoryBytes,GpuProjectExecnow pre-splits the input batches to a per-batch row budget so the scratch fits, keeping the pattern on the GPU. This is stats-independent (works for path reads) and implements @thirtiseven'sPreProjectSplitIteratorsuggestion from the issue.How it works
RegexComplexityEstimatorexposesnumStatesand addsmaxRowsForStateBudget(=maxStateMemoryBytes / (numStates * 2)),estimatedSplits, andisValidWithPreSplit, plus a meta-sideGpuRegexNumStatestrait.RLike/RegExpReplace/RegExpExtract/RegExpExtractAllmetas carry the transpiled-pattern state count.GpuProjectExecwalks its child-expression meta tree for the maximum state count and pre-splits input batches to the budget viaPreProjectSplitIterator.ProjectExec, and only within a new capspark.rapids.sql.regexp.maxPreSplits(default 32).Filter/Join/Aggregateand split-with-regex keep the existing reject-to-CPU behavior. SettingmaxPreSplits=1disables pre-split entirely (kill switch).Performance
Single RTX 5880 Ada,
local[8], Spark 3.3.0, genuine non-rewritable 35-state pattern in aselectExprProject;sum(...)over the result; median ms; default 2 GiB budget (the common case — no actual splitting needed at these sizes).cpu= vanilla Spark;fallback= today's reject-to-CPU island in an otherwise-GPU plan;presplit= this PR.The CPU-fallback island is the slowest option at every size (GPU scan →
GpuColumnarToRow→ CPU regex →GpuRowToColumnar); staying on GPU is 2× faster at 1M and 5–7× at 10M, and GPU scaling is near-flat. Split-overhead curve (10M rows, shrinking the budget to force splits): 0/8/96/992 splits → 208/205/336/1520 ms, i.e. ~1.3 ms/split, reaching break-even with CPU around ~1000 splits — which themaxPreSplitscap keeps you out of.Validation
RegexComplexityEstimatorSuite(buildver 330): unit coverage fornumStates,maxRowsForStateBudget,estimatedSplits(ceiling rounding), and themaxPreSplitscap / kill switch.integration_tests/.../regexp_test.py: in-Project pre-split produces GPU==CPU results;maxPreSplits=1falls back to CPU; an oversized regex in afilter(outside a Project) falls back to CPU.Documentation
Testing
Performance