Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dist/maven-antrun/build-parallel-worlds.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@
dir="${project.build.directory}"
resultproperty="build-parallel-worlds.dedupeExitCode"
errorproperty="build-parallel-worlds.dedupeErrorMsg"
failonerror="false"/>
failonerror="false">
<env key="UNSHIMMED_COMMON_FROM_SINGLE_SHIM_TXT"
value="${spark.rapids.source.basedir}/${rapids.module}/unshimmed-common-from-single-shim.txt"/>
</exec>
<fail message="exec binary-dedupe.sh failed, exit code is ${build-parallel-worlds.dedupeExitCode}, error msg is ${build-parallel-worlds.dedupeErrorMsg}">
<condition>
<not>
Expand Down
36 changes: 36 additions & 0 deletions dist/scripts/binary-dedupe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export SPARK_SHARED_TXT="$PWD/spark-shared.txt"
export SPARK_SHARED_COPY_LIST="$PWD/spark-shared-copy-list.txt"
export DELETE_DUPLICATES_TXT="$PWD/delete-duplicates.txt"
export SPARK_SHARED_DIR="$PWD/spark-shared"
export UNSHIMMED_FROM_SPARK_SHARED_COPY_LIST="$PWD/unshimmed-from-spark-shared-copy-list.txt"

# This script de-duplicates .class files at the binary level.
# We could also diff classes using scalap / javap outputs.
Expand Down Expand Up @@ -99,6 +100,38 @@ function retain_single_copy() {
done >> "$DELETE_DUPLICATES_TXT" || exit 255
}

function copy_unshimmed_from_spark_shared() {
set -e
local unshimmed_patterns_txt="${UNSHIMMED_COMMON_FROM_SINGLE_SHIM_TXT:-}"

[[ -n "$unshimmed_patterns_txt" ]] || return 0
[[ -f "$unshimmed_patterns_txt" ]] || {
echo >&2 "Unshimmed common list does not exist: $unshimmed_patterns_txt"
exit 255
}

: > "$UNSHIMMED_FROM_SPARK_SHARED_COPY_LIST"
while read -r shared_path; do
local rel_path="${shared_path#./parallel-world/spark-shared/}"
local pattern
while read -r pattern; do
[[ -n "$pattern" ]] || continue
[[ "$pattern" =~ ^[[:space:]]*# ]] && continue
# shellcheck disable=SC2053
if [[ "$rel_path" == $pattern ]]; then
echo "$rel_path" >> "$UNSHIMMED_FROM_SPARK_SHARED_COPY_LIST"
break
fi
done < "$unshimmed_patterns_txt"
done < <(find ./parallel-world/spark-shared -type f)

if [[ -s "$UNSHIMMED_FROM_SPARK_SHARED_COPY_LIST" ]]; then
echo "Promoting root-layout files from spark-shared via $unshimmed_patterns_txt"
rsync --files-from="$UNSHIMMED_FROM_SPARK_SHARED_COPY_LIST" \
./parallel-world/spark-shared ./parallel-world
fi
}

# this belongs into maven initialize phase, left in here for easier
# standalone debugging
# truncate incremental files
Expand All @@ -124,6 +157,9 @@ done

mv "$SPARK_SHARED_DIR" parallel-world/

echo "$((++STEP))/ promoting allowlisted spark-shared files to root layout"
copy_unshimmed_from_spark_shared

# Verify that all class files in the conventional jar location are bitwise
# identical regardless of the Spark-version-specific jar.
#
Expand Down
40 changes: 40 additions & 0 deletions dist/unshimmed-common-from-single-shim.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,46 @@ com/nvidia/spark/rapids/Optimizer.class
com/nvidia/spark/rapids/optimizer/SQLOptimizerPlugin*
com/nvidia/spark/rapids/ShimLoaderTemp*
com/nvidia/spark/rapids/SparkShims*
com/nvidia/spark/rapids/fileio/iceberg/IcebergInputFile.class
com/nvidia/spark/rapids/fileio/iceberg/IcebergInputStream.class
com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputFile.class
com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.class
com/nvidia/spark/rapids/iceberg/GpuInternalRow.class
com/nvidia/spark/rapids/iceberg/GpuInternalRowBase.class
com/nvidia/spark/rapids/iceberg/data/GpuDeleteFilter2.class
com/nvidia/spark/rapids/iceberg/package.class
com/nvidia/spark/rapids/iceberg/package$.class
com/nvidia/spark/rapids/iceberg/parquet/FileSchemaAccessors.class
com/nvidia/spark/rapids/iceberg/parquet/GpuIcebergParquetReader$.class
com/nvidia/spark/rapids/iceberg/parquet/SingleFile.class
com/nvidia/spark/rapids/iceberg/parquet/SingleFile$.class
com/nvidia/spark/rapids/iceberg/parquet/ThreadConf.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkReadOptions.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkReadOptions$.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkSQLProperties.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkSQLProperties$.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkUtil.class
com/nvidia/spark/rapids/iceberg/spark/GpuSparkUtil$.class
com/nvidia/spark/rapids/iceberg/spark/RapidsSparkCatalog.class
com/nvidia/spark/rapids/iceberg/spark/RapidsSparkSessionCatalog.class
com/nvidia/spark/rapids/iceberg/spark/source/RapidsSparkTable.class
org/apache/iceberg/aws/s3/IcebergS3InputFileAccess.class
org/apache/iceberg/data/GpuFileHelpers.class
org/apache/iceberg/io/GpuClusteredWriterBridge.class
org/apache/iceberg/io/GpuFanoutWriterBridge.class
org/apache/iceberg/io/GpuPositionDeleteFileWriter$.class
org/apache/iceberg/parquet/GpuParquetIOAccess.class
org/apache/iceberg/spark/GpuTypeToSparkType.class
org/apache/iceberg/spark/GpuTypeToSparkType$.class
org/apache/iceberg/spark/GpuSparkReadConf.class
org/apache/iceberg/spark/GpuSparkReadConfAccess.class
org/apache/iceberg/spark/package.class
org/apache/iceberg/spark/package$.class
org/apache/iceberg/spark/source/GpuBaseReader.class
org/apache/iceberg/spark/source/GpuSparkPlanningUtil.class
org/apache/iceberg/spark/source/GpuSparkScanAccess.class
org/apache/iceberg/spark/source/GpuSparkWriteAccess.class
org/apache/iceberg/spark/source/GpuStructInternalRow.class
org/apache/spark/sql/rapids/AdaptiveSparkPlanHelperShim*
org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback*
rapids/*.py
1 change: 1 addition & 0 deletions dist/unshimmed-from-each-spark3xx.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ com/nvidia/spark/rapids/AvroProvider.class
com/nvidia/spark/rapids/HiveProvider.class
com/nvidia/spark/rapids/iceberg/IcebergProvider.class
com/nvidia/spark/rapids/iceberg/IcebergProvider$.class
com/nvidia/spark/rapids/iceberg/IcebergProviderAccess.class
com/nvidia/spark/rapids/iceberg/IcebergProbe.class
com/nvidia/spark/rapids/delta/DeltaProbe.class
com/nvidia/spark/rapids/delta/DeltaProvider.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.OptionalLong;

Expand All @@ -40,8 +39,7 @@
* {@link IcebergS3RangeCopier}. The supplied {@link FileIO} is only used for
* its property map and any per-prefix storage-credential overlays.
*
* <p>This class lives in {@code org.apache.iceberg.aws.s3} for the package-private
* {@link BaseS3File} access used to extract the bucket/key URI.
* <p>The package-private S3 file access is isolated in {@link IcebergS3InputFileAccess}.
*/
public final class IcebergS3InputFile implements RapidsInputFile {
private static final Logger LOG = LoggerFactory.getLogger(IcebergS3InputFile.class);
Expand All @@ -60,21 +58,12 @@ private IcebergS3InputFile(
public static RapidsInputFile maybeCreate(InputFile inputFile, FileIO fileIO) {
// When the gating conf is off (or the file is not an S3 file), return the
// default IcebergInputFile so the standard Iceberg SeekableInputStream path is used.
if (!RapidsInputFiles.isS3PerfEnabled() || !(inputFile instanceof BaseS3File)) {
if (!RapidsInputFiles.isS3PerfEnabled()) {
return new IcebergInputFile(inputFile);
}
BaseS3File s3File = (BaseS3File) inputFile;
S3URI uri = s3File.uri();
// Use the 4-arg URI constructor so the (raw, un-percent-encoded) bucket and
// key are encoded per-component. URI.create / new URI(String) would treat the
// input as an already-encoded URI string and throw on partition values that
// contain spaces, '#', or other reserved characters (e.g. partition=hello world).
URI s3Uri;
try {
s3Uri = new URI("s3", uri.bucket(), "/" + uri.key(), null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Invalid S3 URI for bucket=" + uri.bucket() + " key=" + uri.key(), e);
URI s3Uri = IcebergS3InputFileAccess.s3Uri(inputFile);
if (s3Uri == null) {
return new IcebergInputFile(inputFile);
}
// Iceberg < 1.7 does not have SupportsStorageCredentials; ShimUtils returns
// the per-prefix credential overlays (or an empty map on 1.6).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.aws.s3;

import java.net.URI;
import java.net.URISyntaxException;

import org.apache.iceberg.io.InputFile;

/** Package-local access to Iceberg S3 file internals. */
public final class IcebergS3InputFileAccess {
private IcebergS3InputFileAccess() {
}

public static URI s3Uri(InputFile inputFile) {
if (!(inputFile instanceof BaseS3File)) {
return null;
}
S3URI uri = ((BaseS3File) inputFile).uri();
try {
return new URI("s3", uri.bucket(), "/" + uri.key(), null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Invalid S3 URI for bucket=" + uri.bucket() + " key=" + uri.key(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.io;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;

/** Root-layout bridge to Iceberg's package-private {@link ClusteredWriter}. */
public final class GpuClusteredWriterBridge<T, R> extends ClusteredWriter<T, R> {
private final BiFunction<PartitionSpec, StructLike, FileWriter<T, R>> newWriter;
private final Consumer<R> addResult;
private final Supplier<R> aggregatedResult;

public GpuClusteredWriterBridge(
BiFunction<PartitionSpec, StructLike, FileWriter<T, R>> newWriter,
Consumer<R> addResult,
Supplier<R> aggregatedResult) {
this.newWriter = newWriter;
this.addResult = addResult;
this.aggregatedResult = aggregatedResult;
}

@Override
protected FileWriter<T, R> newWriter(PartitionSpec partitionSpec, StructLike partition) {
return newWriter.apply(partitionSpec, partition);
}

@Override
protected void addResult(R result) {
addResult.accept(result);
}

@Override
protected R aggregatedResult() {
return aggregatedResult.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.io;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;

/** Root-layout bridge to Iceberg's package-private {@link FanoutWriter}. */
public final class GpuFanoutWriterBridge<T, R> extends FanoutWriter<T, R> {
private final BiFunction<PartitionSpec, StructLike, FileWriter<T, R>> newWriter;
private final Consumer<R> addResult;
private final Supplier<R> aggregatedResult;

public GpuFanoutWriterBridge(
BiFunction<PartitionSpec, StructLike, FileWriter<T, R>> newWriter,
Consumer<R> addResult,
Supplier<R> aggregatedResult) {
this.newWriter = newWriter;
this.addResult = addResult;
this.aggregatedResult = aggregatedResult;
}

@Override
protected FileWriter<T, R> newWriter(PartitionSpec partitionSpec, StructLike partition) {
return newWriter.apply(partitionSpec, partition);
}

@Override
protected void addResult(R result) {
addResult.accept(result);
}

@Override
protected R aggregatedResult() {
return aggregatedResult.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.parquet;

import org.apache.iceberg.io.InputFile;

/** Package-local access to Iceberg Parquet internals. */
public final class GpuParquetIOAccess {
private GpuParquetIOAccess() {
}

public static org.apache.iceberg.shaded.org.apache.parquet.io.InputFile file(InputFile file) {
return ParquetIO.file(file);
}
}
Loading