Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
<includes>
<include>parquet-hadoop*.jar</include>
<include>spark-avro*.jar</include>
<include>spark-protobuf*.jar</include>
<include>protobuf-java-*.jar</include>
</includes>
</filesets>
</filesets>
Expand Down Expand Up @@ -166,6 +168,31 @@
</artifactItems>
</configuration>
</execution>
<!-- spark-protobuf shades `com.google.protobuf` internally, so the
unshaded jar is shipped separately for the tests to use. -->
<execution>
<id>copy-spark-protobuf</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<skip>${spark.protobuf.skipCopy}</skip>
<useBaseVersion>true</useBaseVersion>
<artifactItems>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-protobuf_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.5</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
24 changes: 22 additions & 2 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
# To run all tests, including Avro tests:
# INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh
#
# To run tests WITHOUT Protobuf tests (protobuf is included by default):
# INCLUDE_SPARK_PROTOBUF_JAR=false ./run_pyspark_from_build.sh
#
# To run a specific test:
# TEST=my_test ./run_pyspark_from_build.sh
#
Expand Down Expand Up @@ -100,6 +103,7 @@ else
# support alternate local jars NOT building from the source code
if [ -d "$LOCAL_JAR_PATH" ]; then
AVRO_JARS=$(echo "$LOCAL_JAR_PATH"/spark-avro*.jar)
PROTOBUF_JARS=$(echo "$LOCAL_JAR_PATH"/spark-protobuf*.jar "$LOCAL_JAR_PATH"/protobuf-java-*.jar)
PLUGIN_JAR=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar)
if [ -f $(echo $LOCAL_JAR_PATH/parquet-hadoop*.jar) ]; then
export INCLUDE_PARQUET_HADOOP_TEST_JAR=true
Expand All @@ -116,6 +120,7 @@ else
else
[[ "$SCALA_VERSION" != "2.12" ]] && TARGET_DIR=${TARGET_DIR/integration_tests/scala$SCALA_VERSION\/integration_tests}
AVRO_JARS=$(echo "$TARGET_DIR"/dependency/spark-avro*.jar)
PROTOBUF_JARS=$(echo "$TARGET_DIR"/dependency/spark-protobuf*.jar "$TARGET_DIR"/dependency/protobuf-java-*.jar)
PARQUET_HADOOP_TESTS=$(echo "$TARGET_DIR"/dependency/parquet-hadoop*.jar)
# remove the log4j.properties file so it doesn't conflict with ours, ignore errors
# if it isn't present or already removed
Expand All @@ -141,9 +146,24 @@ else
AVRO_JARS=""
fi

# ALL_JARS includes dist.jar integration-test.jar avro.jar parquet.jar if they exist
# spark-protobuf shades `com.google.protobuf.*` internally and Spark does not bundle the
# unshaded jar, so we must ship both jars to the test classpath.
INCLUDE_SPARK_PROTOBUF_JAR_REQUESTED=$(echo "${INCLUDE_SPARK_PROTOBUF_JAR}" | tr '[:upper:]' '[:lower:]')
if [[ "$INCLUDE_SPARK_PROTOBUF_JAR_REQUESTED" != "false" \
&& $(readlink -e $PROTOBUF_JARS 2>/dev/null | wc -l) -eq 2 ]];
then
export INCLUDE_SPARK_PROTOBUF_JAR=true
else
if [[ "$INCLUDE_SPARK_PROTOBUF_JAR_REQUESTED" == "true" ]]; then
>&2 echo "WARNING: INCLUDE_SPARK_PROTOBUF_JAR=true was requested but spark-protobuf/protobuf-java jars were not found under $TARGET_DIR/dependency; disabling protobuf tests."
fi
export INCLUDE_SPARK_PROTOBUF_JAR=false
PROTOBUF_JARS=""
fi

# ALL_JARS includes dist.jar integration-test.jar avro.jar parquet.jar protobuf.jar if they exist
# Remove non-existing paths and canonicalize the paths including get rid of links and `..`
ALL_JARS=$(readlink -e $PLUGIN_JAR $TEST_JARS $AVRO_JARS $PARQUET_HADOOP_TESTS || true)
ALL_JARS=$(readlink -e $PLUGIN_JAR $TEST_JARS $AVRO_JARS $PARQUET_HADOOP_TESTS $PROTOBUF_JARS || true)
# `:` separated jars
ALL_JARS="${ALL_JARS//$'\n'/:}"

Expand Down
130 changes: 130 additions & 0 deletions integration_tests/src/main/python/protobuf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright (c) 2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import os

import pytest

from asserts import assert_gpu_fallback_collect
from marks import allow_non_gpu
from spark_session import is_before_spark_340, with_cpu_session
import pyspark.sql.functions as f

if os.environ.get('INCLUDE_SPARK_PROTOBUF_JAR', 'true').lower() == 'false':
pytestmark = pytest.mark.skip(reason="INCLUDE_SPARK_PROTOBUF_JAR is disabled")
else:
pytestmark = pytest.mark.skipif(
is_before_spark_340(), reason="from_protobuf is Spark 3.4.0+")


def _try_import_from_protobuf():
try:
from pyspark.sql.protobuf.functions import from_protobuf
return from_protobuf
except Exception:
return None


@pytest.fixture(scope="module")
def from_protobuf_fn():
fn = _try_import_from_protobuf()
if fn is None:
pytest.skip("from_protobuf not available")
return fn


def _encode_varint(value):
out = bytearray()
value &= 0xFFFFFFFFFFFFFFFF
while True:
bits = value & 0x7F
value >>= 7
if value:
out.append(bits | 0x80)
else:
out.append(bits)
return bytes(out)


def _encode_simple_message(i32_value, s_value):
buf = bytearray()
buf += _encode_varint((1 << 3) | 0) # field 1, VARINT
buf += _encode_varint(i32_value)
s_bytes = s_value.encode("utf-8")
buf += _encode_varint((2 << 3) | 2) # field 2, LENGTH-DELIMITED
buf += _encode_varint(len(s_bytes))
buf += s_bytes
return bytes(buf)


def _build_simple_descriptor_bytes(spark):
D = spark.sparkContext._jvm.com.google.protobuf.DescriptorProtos
i32_field = D.FieldDescriptorProto.newBuilder() \
.setName("i32").setNumber(1) \
.setLabel(D.FieldDescriptorProto.Label.LABEL_OPTIONAL) \
.setType(D.FieldDescriptorProto.Type.TYPE_INT32).build()
s_field = D.FieldDescriptorProto.newBuilder() \
.setName("s").setNumber(2) \
.setLabel(D.FieldDescriptorProto.Label.LABEL_OPTIONAL) \
.setType(D.FieldDescriptorProto.Type.TYPE_STRING).build()
msg = D.DescriptorProto.newBuilder() \
.setName("Simple").addField(i32_field).addField(s_field).build()
file_builder = D.FileDescriptorProto.newBuilder() \
.setName("simple.proto").setPackage("test").addMessageType(msg) \
.setSyntax("proto2")
fds = D.FileDescriptorSet.newBuilder().addFile(file_builder.build()).build()
return bytes(fds.toByteArray())


@pytest.fixture
def simple_desc(spark_tmp_path):
desc_path = spark_tmp_path + "/simple.desc"
desc_bytes = with_cpu_session(_build_simple_descriptor_bytes)
with open(desc_path, "wb") as fp:
fp.write(desc_bytes)
return desc_path, desc_bytes


_smoke_rows = [(1, "a"), (-2, "bb"), (0, ""), (12345, "hello")]


def _make_smoke_df(spark):
encoded = [(_encode_simple_message(i, s),) for (i, s) in _smoke_rows]
return spark.createDataFrame(encoded, ["bin"])


@allow_non_gpu("ProjectExec", "ProtobufDataToCatalyst")
def test_from_protobuf_smoke_path_api(simple_desc, from_protobuf_fn):
desc_path, _ = simple_desc

def run(spark):
return _make_smoke_df(spark).select(
from_protobuf_fn(f.col("bin"), "test.Simple", desc_path).alias("d"))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please validate that these tests work on a distributed setup like with HDFS? According to AI (which could totally be wrong) desc_path is read as a local file by pyspark. Because desc_path is written to with spark_tmp_path, which is a distributed setup for things like Dataproc, it could fail.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, it did not work on HDFS. Updated to spark_tmp_path now.


assert_gpu_fallback_collect(run, "ProtobufDataToCatalyst")


@allow_non_gpu("ProjectExec", "ProtobufDataToCatalyst")
def test_from_protobuf_smoke_binary_descriptor_api(simple_desc, from_protobuf_fn):
if "binaryDescriptorSet" not in inspect.signature(from_protobuf_fn).parameters:
pytest.skip("binaryDescriptorSet kwarg is Spark 3.5+ only")
_, desc_bytes = simple_desc

def run(spark):
return _make_smoke_df(spark).select(
from_protobuf_fn(f.col("bin"), "test.Simple",
binaryDescriptorSet=bytearray(desc_bytes)).alias("d"))

assert_gpu_fallback_collect(run, "ProtobufDataToCatalyst")
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -118,6 +119,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -142,6 +144,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -166,6 +169,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -190,6 +194,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand Down Expand Up @@ -806,6 +811,8 @@
<rapids.secondaryCacheDir>${spark.rapids.project.basedir}/target/${spark.version.classifier}/.sbt/1.0/zinc/org.scala-sbt</rapids.secondaryCacheDir>
<allowConventionalDistJar>false</allowConventionalDistJar>
<buildver>330</buildver>
<!-- spark-protobuf is a Spark 3.4.0+ module; release33x profiles override to `true`. -->
<spark.protobuf.skipCopy>false</spark.protobuf.skipCopy>
<maven.compiler.source>1.8</maven.compiler.source>
<java.major.version>8</java.major.version>
<scala.compiler.release>${java.major.version}</scala.compiler.release>
Expand Down
27 changes: 27 additions & 0 deletions scala2.13/integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
<includes>
<include>parquet-hadoop*.jar</include>
<include>spark-avro*.jar</include>
<include>spark-protobuf*.jar</include>
<include>protobuf-java-*.jar</include>
</includes>
</filesets>
</filesets>
Expand Down Expand Up @@ -166,6 +168,31 @@
</artifactItems>
</configuration>
</execution>
<!-- spark-protobuf shades `com.google.protobuf` internally, so the
unshaded jar is shipped separately for the tests to use. -->
<execution>
<id>copy-spark-protobuf</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<skip>${spark.protobuf.skipCopy}</skip>
<useBaseVersion>true</useBaseVersion>
<artifactItems>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-protobuf_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.5</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
7 changes: 7 additions & 0 deletions scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -118,6 +119,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -142,6 +144,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -166,6 +169,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand All @@ -190,6 +194,7 @@
<rapids.delta.artifactId1>rapids-4-spark-delta-21x</rapids.delta.artifactId1>
<rapids.delta.artifactId2>rapids-4-spark-delta-22x</rapids.delta.artifactId2>
<rapids.delta.artifactId3>rapids-4-spark-delta-23x</rapids.delta.artifactId3>
<spark.protobuf.skipCopy>true</spark.protobuf.skipCopy>
</properties>
<modules>
<module>delta-lake/delta-21x</module>
Expand Down Expand Up @@ -806,6 +811,8 @@
<rapids.secondaryCacheDir>${spark.rapids.project.basedir}/target/${spark.version.classifier}/.sbt/1.0/zinc/org.scala-sbt</rapids.secondaryCacheDir>
<allowConventionalDistJar>false</allowConventionalDistJar>
<buildver>330</buildver>
<!-- spark-protobuf is a Spark 3.4.0+ module; release33x profiles override to `true`. -->
<spark.protobuf.skipCopy>false</spark.protobuf.skipCopy>
<maven.compiler.source>1.8</maven.compiler.source>
<java.major.version>8</java.major.version>
<scala.compiler.release>${java.major.version}</scala.compiler.release>
Expand Down