Skip to content

Commit c3473e4

Browse files
committed
[SPARK-56411][SQL] Register Decimal in KryoSerializer so cached-batch spill works
### What changes were proposed in this pull request? Register `Decimal`, `Decimal[]`, `java.math.BigDecimal`, and `java.math.BigInteger` in `KryoSerializer.loadableSparkClasses` so that Kryo strict registration mode(`spark.kryo.registrationRequired=true`) can serialize these types without throwing `Class is not registered`. ### Why are the changes needed? `DefaultCachedBatchSerializer` writes cached batch stats via `kryo.writeClassAndObject(output, batch.stats)`. The stats row (`GenericInternalRow`) can contain `Decimal` values (e.g. min/max stats for a `DecimalType` column). When Kryo walks the row it encounters: - `org.apache.spark.sql.types.Decimal` -- the value class itself. - `java.math.BigDecimal` / `java.math.BigInteger` -- used internally when the value overflows `Long` precision (> 18 digits). None of these were registered. Under strict mode, any cache spill or eviction of a batch with Decimal stats crashes: ``` com.esotericsoftware.kryo.KryoException: Class is not registered: org.apache.spark.sql.types.Decimal ``` This can be reproduced by caching a table with `DecimalType` columns(e.g. `CACHE TABLE store_sales`) and triggering a memory-pressure spill under Kryo strict mode. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Added new tests `DefaultCachedBatchKryoSerializerSuite` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #55287 from LuciferYang/SPARK-56411. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 2ad52c2 commit c3473e4

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,9 +598,13 @@ private[serializer] object KryoSerializer {
598598
"org.apache.spark.sql.types.StructType",
599599
"[Lorg.apache.spark.sql.types.StructType;",
600600
"org.apache.spark.sql.types.DateType$",
601+
"org.apache.spark.sql.types.Decimal",
602+
"[Lorg.apache.spark.sql.types.Decimal;",
601603
"org.apache.spark.sql.types.DecimalType",
602604
"org.apache.spark.sql.types.Decimal$DecimalAsIfIntegral$",
603605
"org.apache.spark.sql.types.Decimal$DecimalIsFractional$",
606+
"java.math.BigDecimal",
607+
"java.math.BigInteger",
604608
"org.apache.spark.sql.execution.command.PartitionStatistics",
605609
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
606610
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.columnar
19+
20+
import java.math.{BigInteger => JBigInteger}
21+
22+
import org.apache.spark.{SparkConf, SparkFunSuite}
23+
import org.apache.spark.internal.config.Kryo.KRYO_REGISTRATION_REQUIRED
24+
import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
25+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
26+
import org.apache.spark.sql.types.Decimal
27+
28+
/**
29+
* Regression tests for Kryo round-trip of types that appear in
30+
* [[DefaultCachedBatch]] stats rows when `kryo.registrationRequired`
31+
* is enabled.
32+
*/
33+
class DefaultCachedBatchKryoSerializerSuite extends SparkFunSuite {
34+
35+
private def newSerializer(): SerializerInstance = {
36+
val conf = new SparkConf(false)
37+
conf.set(KRYO_REGISTRATION_REQUIRED, true)
38+
new KryoSerializer(conf).newInstance()
39+
}
40+
41+
test("SPARK-56411: round-trip Decimal in strict Kryo mode") {
42+
val ser = newSerializer()
43+
val value = Decimal(123456789L, 10, 2)
44+
val deserialized = ser.deserialize[Decimal](ser.serialize(value))
45+
assert(deserialized === value)
46+
}
47+
48+
test("SPARK-56411: round-trip GenericInternalRow containing Decimal") {
49+
val ser = newSerializer()
50+
val row = new GenericInternalRow(
51+
Array[Any](42, Decimal(987654321L, 18, 6), 3.14D))
52+
val bytes = ser.serialize(row)
53+
val back = ser.deserialize[GenericInternalRow](bytes)
54+
assert(back.numFields === 3)
55+
assert(back.getInt(0) === 42)
56+
assert(back.getDecimal(1, 18, 6) === Decimal(987654321L, 18, 6))
57+
assert(back.getDouble(2) === 3.14D)
58+
}
59+
60+
test("SPARK-56411: round-trip DefaultCachedBatch with Decimal stats") {
61+
val ser = newSerializer()
62+
val stats = new GenericInternalRow(
63+
Array[Any](Decimal(0L, 10, 2), Decimal(100000000L, 10, 2), 0L, 0L))
64+
val batch = DefaultCachedBatch(
65+
numRows = 10,
66+
buffers = Array(Array[Byte](1, 2, 3)),
67+
stats = stats)
68+
val bytes = ser.serialize(batch)
69+
val back = ser.deserialize[DefaultCachedBatch](bytes)
70+
assert(back.numRows === 10)
71+
assert(back.buffers.length === 1)
72+
assert(back.stats.getDecimal(0, 10, 2) === Decimal(0L, 10, 2))
73+
assert(back.stats.getDecimal(1, 10, 2) === Decimal(100000000L, 10, 2))
74+
}
75+
76+
test("SPARK-56411: round-trip high-precision Decimal (BigDecimal-backed)") {
77+
val ser = newSerializer()
78+
// 30-digit value forces BigDecimal representation (overflows Long).
79+
val bigBacked =
80+
Decimal(BigDecimal("123456789012345678901234567890.123"), 33, 3)
81+
assert(bigBacked.precision > 18,
82+
"test precondition: precision must exceed Long range")
83+
val back = ser.deserialize[Decimal](ser.serialize(bigBacked))
84+
assert(back === bigBacked)
85+
}
86+
87+
test("SPARK-56411: round-trip java.math.BigInteger in strict Kryo mode") {
88+
// BigInteger is serialized transitively when Kryo walks
89+
// java.math.BigDecimal's internal `intVal` field.
90+
val ser = newSerializer()
91+
val value = new JBigInteger("123456789012345678901234567890")
92+
val back = ser.deserialize[JBigInteger](ser.serialize(value))
93+
assert(back === value)
94+
}
95+
96+
test("SPARK-56411: round-trip DefaultCachedBatch with BigDecimal-backed stats") {
97+
val ser = newSerializer()
98+
val stats = new GenericInternalRow(Array[Any](
99+
Decimal(BigDecimal("1.0"), 38, 6),
100+
Decimal(
101+
BigDecimal("99999999999999999999999999999999.999999"), 38, 6),
102+
100L, 0L))
103+
val batch = DefaultCachedBatch(
104+
numRows = 1000,
105+
buffers = Array(Array[Byte](10, 20, 30)),
106+
stats = stats)
107+
val bytes = ser.serialize(batch)
108+
val back = ser.deserialize[DefaultCachedBatch](bytes)
109+
assert(back.numRows === 1000)
110+
assert(back.stats.getDecimal(0, 38, 6) ===
111+
Decimal(BigDecimal("1.0"), 38, 6))
112+
assert(back.stats.getDecimal(1, 38, 6) ===
113+
Decimal(
114+
BigDecimal("99999999999999999999999999999999.999999"), 38, 6))
115+
}
116+
}

0 commit comments

Comments
 (0)