Skip to content

Commit 4647fc3

Browse files
author
Chong Gao
committed
Iceberg: extract version-divergent scan APIs behind a shim
Refactors iceberg/common so the {SparkScan, SparkBatchQueryScan, SparkCopyOnWriteScan, SparkBatch, DataWriteResult} APIs that diverge between Iceberg 1.10.x and 1.11.x are hidden behind a small interface, with per-version implementations in iceberg-1-6-x / iceberg-1-9-x / iceberg-1-10-x. No behavior change for the existing Iceberg versions this PR ships; sets the stage for a follow-up that adds iceberg-1-11-x. Common: - GpuSparkCopyOnWriteScan -> renamed to GpuSparkCopyOnWriteScanBase (abstract); per-version concrete subclass mixes in the right runtime- filter trait (SupportsRuntimeFiltering vs SupportsRuntimeV2Filtering) and the matching filter() signature. - GpuSparkScan: rewrite hasNestedType via Spark's readSchema() + Spark types so it no longer depends on the Iceberg 1.10-only cpuScan.expectedSchema(); dispatch SparkCopyOnWriteScan construction through ShimUtils.newCopyOnWriteScan. - GpuSparkBatchQueryScan: toString uses cpuScan.description() (public, available in both Iceberg 1.10 and 1.11) instead of branch / expectedSchema / filterExpressions which 1.11 removed. runtimeFilterExpressions field read tolerates both 1.10 name (runtimeFilterExpressions) and 1.11 name (runtimeFilters) — a tactical fallback to be replaced with proper per-version shim methods. - GpuSparkBatch: same tolerance for expectedSchema (1.10) vs projection (1.11). - GpuSparkWrite: type-annotate `new Array[DataFile](0)` so Scala 2.13 doesn't infer Array[Nothing] under 1.11's wildcarded DataWriteResult.dataFiles(). - IcebergShimUtils / ShimUtils: add newCopyOnWriteScan(Scan, ...) factory whose parameter is Spark's public Scan because Iceberg's SparkCopyOnWriteScan is package-private — cross-package callers cannot reference it directly. Per-Iceberg-version module: - New GpuSparkCopyOnWriteScan in org.apache.iceberg.spark.source (so it can reference the package-private SparkCopyOnWriteScan). Companion object exposes create(Scan, ...): GpuScan for cross-package callers. 1.6/1.9/1.10 mix in SupportsRuntimeFiltering + filter(Filter[]). - ShimUtilsImpl.java: implement newCopyOnWriteScan via GpuSparkCopyOnWriteScan.create. Signed-off-by: Chong Gao <res_life@163.com>
1 parent c7f3d89 commit 4647fc3

13 files changed

Lines changed: 257 additions & 49 deletions

File tree

iceberg/common/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergShimUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package com.nvidia.spark.rapids.iceberg;
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.GpuScan;
2021
import com.nvidia.spark.rapids.NoopMetric$;
22+
import com.nvidia.spark.rapids.RapidsConf;
2123
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2224
import org.apache.hadoop.fs.Path;
2325
import org.apache.iceberg.ContentFile;
@@ -28,6 +30,7 @@
2830
import org.apache.iceberg.parquet.GpuParquetIO;
2931
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
3032
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
33+
import org.apache.spark.sql.connector.read.Scan;
3134
import scala.Option;
3235

3336
import java.io.IOException;
@@ -98,4 +101,23 @@ default ParquetFileReader openParquetReader(
98101
missCounter.$plus$eq(1L);
99102
return ParquetFileReader.open(GpuParquetIO.file(inputFile.getDelegate()), options);
100103
}
104+
105+
/**
106+
* Constructs the version-appropriate {@code GpuSparkCopyOnWriteScan} subclass.
107+
*
108+
* <p>Iceberg 1.6.x, 1.9.x, and 1.10.x have {@code SparkCopyOnWriteScan} implementing
109+
* {@code SupportsRuntimeFiltering} with {@code filter(Filter[])}; Iceberg 1.11.x
110+
* switched to {@code SupportsRuntimeV2Filtering} with {@code filter(Predicate[])}.
111+
* The concrete class therefore differs per Iceberg version and is constructed
112+
* here rather than directly in common code.
113+
*
114+
* <p>The parameter is declared as the public {@code Scan} interface because
115+
* Iceberg's {@code SparkCopyOnWriteScan} is package-private — callers outside
116+
* {@code org.apache.iceberg.spark.source} cannot reference it directly. Each
117+
* impl downcasts inside a helper that lives in the right package.
118+
*/
119+
GpuScan newCopyOnWriteScan(
120+
Scan cpuScan,
121+
RapidsConf rapidsConf,
122+
boolean queryUsesInputFile);
101123
}

iceberg/common/src/main/java/com/nvidia/spark/rapids/iceberg/ShimUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.nvidia.spark.rapids.iceberg;
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.GpuScan;
21+
import com.nvidia.spark.rapids.RapidsConf;
2022
import com.nvidia.spark.rapids.ShimLoader;
2123
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2224

@@ -28,6 +30,7 @@
2830
import org.apache.iceberg.io.FileIO;
2931
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
3032
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
33+
import org.apache.spark.sql.connector.read.Scan;
3134

3235
import java.io.IOException;
3336
import java.util.Map;
@@ -71,4 +74,11 @@ public static ParquetFileReader openParquetReader(
7174
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
7275
return IMPL.openParquetReader(inputFile, filePath, options, metrics);
7376
}
77+
78+
public static GpuScan newCopyOnWriteScan(
79+
Scan cpuScan,
80+
RapidsConf rapidsConf,
81+
boolean queryUsesInputFile) {
82+
return IMPL.newCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile);
83+
}
7484
}

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkBatch.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,8 +37,13 @@ class GpuSparkBatch(
3737
}
3838

3939
override def planInputPartitions(): Array[InputPartition] = {
40-
val expectedSchema = FieldUtils.readField(cpuBatch, "expectedSchema", true)
41-
.asInstanceOf[Schema]
40+
// Iceberg 1.10.x: SparkBatch.expectedSchema. Iceberg 1.11.x: SparkBatch.projection.
41+
val expectedSchema = (try {
42+
FieldUtils.readField(cpuBatch, "projection", true)
43+
} catch {
44+
case _: IllegalArgumentException =>
45+
FieldUtils.readField(cpuBatch, "expectedSchema", true)
46+
}).asInstanceOf[Schema]
4247
val expectedSchemaString = SchemaParser.toJson(expectedSchema)
4348

4449
val sparkContext = SparkSession.getActiveSession.get.sparkContext

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkBatchQueryScan.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,11 +36,17 @@ class GpuSparkBatchQueryScan(
3636
GpuSparkPartitioningAwareScan[PartitionScanTask](cpuScan, rapidsConf, queryUsesInputFile)
3737
with SupportsRuntimeV2Filtering {
3838

39-
private val runtimeFilterExpressions: List[Expression] = FieldUtils.readField(
40-
cpuScan, "runtimeFilterExpressions", true)
41-
.asInstanceOf[java.util.List[Expression]]
42-
.asScala
43-
.toList
39+
// Iceberg 1.10.x: SparkBatchQueryScan.runtimeFilterExpressions.
40+
// Iceberg 1.11.x: SparkRuntimeFilterableScan.runtimeFilters (renamed + moved to parent).
41+
private val runtimeFilterExpressions: List[Expression] = {
42+
val raw = try {
43+
FieldUtils.readField(cpuScan, "runtimeFilters", true)
44+
} catch {
45+
case _: IllegalArgumentException =>
46+
FieldUtils.readField(cpuScan, "runtimeFilterExpressions", true)
47+
}
48+
raw.asInstanceOf[java.util.List[Expression]].asScala.toList
49+
}
4450

4551
override def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()
4652

@@ -62,13 +68,9 @@ class GpuSparkBatchQueryScan(
6268
}
6369

6470
override def toString: String = {
65-
s"GpuSparkBatchQueryScan(table=${cpuScan.table()}, )" +
66-
s"branch=${cpuScan.branch()}, " +
67-
s"type=${cpuScan.expectedSchema().asStruct()}, " +
68-
s"filters=${cpuScan.filterExpressions()}, " +
71+
s"GpuSparkBatchQueryScan(${cpuScan.description()}, " +
6972
s"runtimeFilters=$runtimeFilterExpressions, " +
70-
s"caseSensitive=${cpuScan.caseSensitive()}, " +
71-
s"queryUseInputFile=$queryUsesInputFile"
73+
s"queryUseInputFile=$queryUsesInputFile)"
7274
}
7375

7476
/** Create a version of this scan with input file name support */

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala renamed to iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScanBase.scala

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,27 +18,34 @@ package org.apache.iceberg.spark.source
1818

1919
import java.util.Objects
2020

21-
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
21+
import com.nvidia.spark.rapids.RapidsConf
2222
import org.apache.iceberg.FileScanTask
2323

2424
import org.apache.spark.sql.connector.expressions.NamedReference
25-
import org.apache.spark.sql.connector.read.{Statistics, SupportsRuntimeFiltering}
26-
import org.apache.spark.sql.sources.Filter
27-
28-
class GpuSparkCopyOnWriteScan(
25+
import org.apache.spark.sql.connector.read.Statistics
26+
27+
/**
28+
* Version-agnostic base for the GPU copy-on-write scan. Iceberg 1.10.x has
29+
* {@code SparkCopyOnWriteScan implements SupportsRuntimeFiltering} with
30+
* {@code filter(Filter[])}; Iceberg 1.11.x switched to
31+
* {@code SupportsRuntimeV2Filtering} with {@code filter(Predicate[])}. The
32+
* per-version concrete subclass lives in {@code iceberg-1-1N-x} and mixes
33+
* in the matching Spark runtime-filter trait + delegates {@code filter}
34+
* to the matching Iceberg API.
35+
*/
36+
abstract class GpuSparkCopyOnWriteScanBase(
2937
override val cpuScan: SparkCopyOnWriteScan,
3038
override val rapidsConf: RapidsConf,
3139
override val queryUsesInputFile: Boolean) extends
32-
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile)
33-
with SupportsRuntimeFiltering {
40+
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile) {
3441

35-
override def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()
42+
def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()
3643

3744
override def estimateStatistics(): Statistics = cpuScan.estimateStatistics()
3845

3946
override def equals(obj: Any): Boolean = {
4047
obj match {
41-
case that: GpuSparkCopyOnWriteScan =>
48+
case that: GpuSparkCopyOnWriteScanBase =>
4249
this.cpuScan == that.cpuScan &&
4350
this.queryUsesInputFile == that.queryUsesInputFile
4451
case _ => false
@@ -50,18 +57,7 @@ class GpuSparkCopyOnWriteScan(
5057
}
5158

5259
override def toString: String = {
53-
s"GpuSparkCopyOnWriteScan(table=${cpuScan.table()}, " +
54-
s"branch=${cpuScan.branch()}, " +
55-
s"type=${cpuScan.expectedSchema().asStruct()}, " +
56-
s"filters=${cpuScan.filterExpressions()}, " +
57-
s"caseSensitive=${cpuScan.caseSensitive()}, " +
60+
s"GpuSparkCopyOnWriteScan(${cpuScan.description()}, " +
5861
s"queryUseInputFile=$queryUsesInputFile)"
5962
}
60-
61-
/** Create a version of this scan with input file name support */
62-
override def withInputFile(): GpuScan = {
63-
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
64-
}
65-
66-
override def filter(filters: Array[Filter]): Unit = cpuScan.filter(filters)
67-
}
63+
}

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,10 @@
1616

1717
package org.apache.iceberg.spark.source
1818

19-
import scala.collection.JavaConverters._
2019
import scala.util.{Failure, Success, Try}
2120

2221
import com.nvidia.spark.rapids._
22+
import com.nvidia.spark.rapids.iceberg.ShimUtils
2323
import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.FieldUtils
2424
import org.apache.iceberg.{BaseMetadataTable, ScanTaskGroup}
2525
import org.apache.iceberg.spark.{GpuSparkReadConf, SparkReadConf}
@@ -28,7 +28,7 @@ import org.apache.iceberg.types.Types
2828
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
2929
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
3030
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
31-
import org.apache.spark.sql.types.StructType
31+
import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
3232

3333

3434
abstract class GpuSparkScan(val cpuScan: SparkScan,
@@ -65,11 +65,12 @@ abstract class GpuSparkScan(val cpuScan: SparkScan,
6565
protected def taskGroups(): Seq[_ <: ScanTaskGroup[_]]
6666

6767
def hasNestedType: Boolean = {
68-
cpuScan.expectedSchema()
69-
.asStruct()
70-
.fields()
71-
.asScala
72-
.exists { field => field.`type`().isNestedType }
68+
cpuScan.readSchema().fields.exists { field =>
69+
field.dataType match {
70+
case _: StructType | _: ArrayType | _: MapType => true
71+
case _ => false
72+
}
73+
}
7374
}
7475
}
7576

@@ -85,7 +86,7 @@ object GpuSparkScan {
8586
case icebergScan: SparkBatchQueryScan =>
8687
new GpuSparkBatchQueryScan(icebergScan, rapidsConf, false)
8788
case s: SparkCopyOnWriteScan =>
88-
new GpuSparkCopyOnWriteScan(s, rapidsConf, false)
89+
ShimUtils.newCopyOnWriteScan(s, rapidsConf, false).asInstanceOf[GpuSparkScan]
8990
case _ =>
9091
throw new IllegalArgumentException(
9192
s"Currently iceberg support only supports batch query scan and copy-on-write scan, " +

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class GpuUnpartitionedDataWriter(
391391
close()
392392

393393
val result = delegate.result()
394-
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array(0)))
394+
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array[DataFile](0)))
395395
taskCommit.reportOutputMetrics()
396396
taskCommit
397397
}
@@ -441,7 +441,7 @@ class GpuPartitionedDataWriter(
441441
close()
442442

443443
val result = delegate.result()
444-
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array(0)))
444+
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array[DataFile](0)))
445445
taskCommit.reportOutputMetrics()
446446
taskCommit
447447
}

iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.nvidia.spark.rapids.iceberg.iceberg110x;
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.GpuScan;
21+
import com.nvidia.spark.rapids.RapidsConf;
2022
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2123
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
2224
import org.apache.hadoop.fs.Path;
@@ -27,7 +29,9 @@
2729
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
2830
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
2931
import org.apache.iceberg.spark.SparkUtil;
32+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
3033
import org.apache.iceberg.types.Types;
34+
import org.apache.spark.sql.connector.read.Scan;
3135
import org.apache.iceberg.util.PartitionUtil;
3236

3337
import java.io.IOException;
@@ -74,4 +78,12 @@ public ParquetFileReader openParquetReader(
7478
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
7579
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
7680
}
81+
82+
@Override
83+
public GpuScan newCopyOnWriteScan(
84+
Scan cpuScan,
85+
RapidsConf rapidsConf,
86+
boolean queryUsesInputFile) {
87+
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
88+
}
7789
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.iceberg.spark.source
18+
19+
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
20+
21+
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
22+
import org.apache.spark.sql.sources.Filter
23+
24+
/** Iceberg 1.10.x copy-on-write scan: SupportsRuntimeFiltering with Array[Filter]. */
25+
class GpuSparkCopyOnWriteScan(
26+
cpuScanArg: SparkCopyOnWriteScan,
27+
rapidsConfArg: RapidsConf,
28+
queryUsesInputFileArg: Boolean)
29+
extends GpuSparkCopyOnWriteScanBase(cpuScanArg, rapidsConfArg, queryUsesInputFileArg)
30+
with SupportsRuntimeFiltering {
31+
32+
override def filter(filters: Array[Filter]): Unit = cpuScan.filter(filters)
33+
34+
override def withInputFile(): GpuScan =
35+
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
36+
}
37+
38+
object GpuSparkCopyOnWriteScan {
39+
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean): GpuScan =
40+
new GpuSparkCopyOnWriteScan(
41+
cpuScan.asInstanceOf[SparkCopyOnWriteScan], rapidsConf, queryUsesInputFile)
42+
}

iceberg/iceberg-1-6-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg16x/ShimUtilsImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package com.nvidia.spark.rapids.iceberg.iceberg16x;
1818

19+
import com.nvidia.spark.rapids.GpuScan;
20+
import com.nvidia.spark.rapids.RapidsConf;
1921
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
2022
import org.apache.iceberg.*;
2123
import org.apache.iceberg.io.FileIO;
2224
import org.apache.iceberg.spark.source.GpuBaseReader;
25+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
2326
import org.apache.iceberg.types.Types;
27+
import org.apache.spark.sql.connector.read.Scan;
2428
import org.apache.iceberg.util.PartitionUtil;
2529

2630
import java.util.Collections;
@@ -54,4 +58,12 @@ public Map<String, Map<String, String>> storageCredentialOverlays(FileIO fileIO)
5458
// openParquetReader: inherits the no-cache default from IcebergShimUtils. The shaded
5559
// ParquetFileReader in 1.6.x has no public API to inject pre-parsed footer metadata,
5660
// so file-cache routing is not possible here.
61+
62+
@Override
63+
public GpuScan newCopyOnWriteScan(
64+
Scan cpuScan,
65+
RapidsConf rapidsConf,
66+
boolean queryUsesInputFile) {
67+
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
68+
}
5769
}

0 commit comments

Comments
 (0)