Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions data/artifact-tests/crc/README.md

This file was deleted.

12 changes: 0 additions & 12 deletions data/artifact-tests/crc/junitLargeJar.txt

This file was deleted.

1 change: 0 additions & 1 deletion data/artifact-tests/crc/smallJar.txt

This file was deleted.

Binary file removed data/artifact-tests/junitLargeJar.jar
Binary file not shown.
Binary file removed data/artifact-tests/smallJar.jar
Binary file not shown.
7 changes: 0 additions & 7 deletions dev/test-classes.txt
Original file line number Diff line number Diff line change
@@ -1,7 +0,0 @@
sql/core/src/test/resources/artifact-tests/Hello.class
sql/core/src/test/resources/artifact-tests/IntSumUdf.class
sql/core/src/test/resources/artifact-tests/smallClassFile.class
sql/connect/common/src/test/resources/artifact-tests/Hello.class
sql/core/src/test/resources/artifact-tests/HelloWithPackage.class
sql/connect/common/src/test/resources/artifact-tests/smallClassFile.class
sql/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class
4 changes: 0 additions & 4 deletions dev/test-jars.txt
Original file line number Diff line number Diff line change
@@ -1,4 +0,0 @@
data/artifact-tests/junitLargeJar.jar
data/artifact-tests/smallJar.jar
sql/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar
sql/connect/common/src/test/resources/artifact-tests/smallJar.jar
136 changes: 74 additions & 62 deletions python/pyspark/sql/tests/connect/client/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import tempfile
import unittest
import os
import zipfile
import zlib

from pyspark.util import is_remote_only
from pyspark.sql import SparkSession
from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect
from pyspark.testing.sqlutils import SPARK_HOME
from pyspark.sql.functions import udf, assert_true, lit

if should_test_connect:
Expand Down Expand Up @@ -205,15 +206,37 @@ def root(cls):
def setUpClass(cls):
super().setUpClass()
cls.artifact_manager: ArtifactManager = cls.spark._client._artifact_manager
cls.base_resource_dir = os.path.join(SPARK_HOME, "data")
cls.artifact_file_path = os.path.join(
cls.base_resource_dir,
"artifact-tests",
)
cls.artifact_crc_path = os.path.join(
cls.artifact_file_path,
"crc",
)
cls.artifact_file_path = tempfile.mkdtemp(prefix="artifact_tests_")

# Generate test artifacts as ZIP files with .jar extension.
# These are not valid JAR files (no MANIFEST.MF), but the tests only verify
# byte-level transfer protocol (chunking, CRC), so the .jar extension alone
# is sufficient to trigger the correct artifact routing (jars/ prefix).

# Generate smallJar.jar
small_jar_path = os.path.join(cls.artifact_file_path, "smallJar.jar")
with zipfile.ZipFile(small_jar_path, "w") as zf:
zf.writestr("dummy.txt", "hello")

# Generate largeJar.jar (>32KB for multi-chunk tests)
large_jar_path = os.path.join(cls.artifact_file_path, "largeJar.jar")
with zipfile.ZipFile(large_jar_path, "w", zipfile.ZIP_STORED) as zf:
for i in range(50):
zf.writestr(f"entry_{i}.txt", f"data_{i}" * 1000)

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.artifact_file_path, ignore_errors=True)
super().tearDownClass()

@staticmethod
def _compute_crc(data):
"""Compute per-chunk CRC32 values matching Java's CRC32."""
chunk_size = ArtifactManager.CHUNK_SIZE
crcs = []
for i in range(0, len(data), chunk_size):
crcs.append(zlib.crc32(data[i : i + chunk_size]) & 0xFFFFFFFF)
return crcs

@classmethod
def conf(cls):
Expand All @@ -224,8 +247,6 @@ def conf(cls):
def test_basic_requests(self):
file_name = "smallJar"
small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar")
if not os.path.isfile(small_jar_path):
raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.")
response = self.artifact_manager._retrieve_responses(
self.artifact_manager._create_requests(
small_jar_path, pyfile=False, archive=False, file=False
Expand All @@ -236,9 +257,6 @@ def test_basic_requests(self):
def test_single_chunk_artifact(self):
file_name = "smallJar"
small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar")
small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt")
if not os.path.isfile(small_jar_path):
raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.")

requests = list(
self.artifact_manager._create_requests(
Expand All @@ -257,24 +275,21 @@ def test_single_chunk_artifact(self):
self.assertTrue(single_artifact.name.endswith(".jar"))

self.assertEqual(os.path.join("jars", f"{file_name}.jar"), single_artifact.name)
with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2:
self.assertEqual(single_artifact.data.crc, int(f1.readline()))
self.assertEqual(single_artifact.data.data, f2.read())
with open(small_jar_path, "rb") as f:
data = f.read()
expected_crc = self._compute_crc(data)[0]
self.assertEqual(single_artifact.data.crc, expected_crc)
self.assertEqual(single_artifact.data.data, data)

def test_chunked_artifacts(self):
file_name = "junitLargeJar"
file_name = "largeJar"
large_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar")
large_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt")
if not os.path.isfile(large_jar_path):
raise unittest.SkipTest(f"Skipped as {large_jar_path} does not exist.")

requests = list(
self.artifact_manager._create_requests(
large_jar_path, pyfile=False, archive=False, file=False
)
)
# Expected chunks = roundUp( file_size / chunk_size) = 12
# File size of `junitLargeJar.jar` is 384581 bytes.
large_jar_size = os.path.getsize(large_jar_path)
expected_chunks = int(
(large_jar_size + (ArtifactManager.CHUNK_SIZE - 1)) / ArtifactManager.CHUNK_SIZE
Expand All @@ -289,21 +304,22 @@ def test_chunked_artifacts(self):
other_requests = requests[1:]
data_chunks = [begin_chunk.initial_chunk] + [req.chunk for req in other_requests]

with open(large_jar_crc_path) as f1, open(large_jar_path, "rb") as f2:
with open(large_jar_path, "rb") as f:
data = f.read()
expected_cscs = self._compute_crc(data)
cscs = [chunk.crc for chunk in data_chunks]
expected_cscs = [int(line.rstrip()) for line in f1]
self.assertEqual(cscs, expected_cscs)

binaries = [chunk.data for chunk in data_chunks]
expected_binaries = list(iter(lambda: f2.read(ArtifactManager.CHUNK_SIZE), b""))
expected_binaries = [
data[i : i + ArtifactManager.CHUNK_SIZE]
for i in range(0, len(data), ArtifactManager.CHUNK_SIZE)
]
self.assertEqual(binaries, expected_binaries)

def test_batched_artifacts(self):
file_name = "smallJar"
small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar")
small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt")
if not os.path.isfile(small_jar_path):
raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.")

requests = list(
self.artifact_manager._create_requests(
Expand All @@ -325,26 +341,23 @@ def test_batched_artifacts(self):
self.assertTrue(artifact2.name.endswith(".jar"))

self.assertEqual(os.path.join("jars", f"{file_name}.jar"), artifact1.name)
with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2:
crc = int(f1.readline())
data = f2.read()
with open(small_jar_path, "rb") as f:
data = f.read()
crc = self._compute_crc(data)[0]
self.assertEqual(artifact1.data.crc, crc)
self.assertEqual(artifact1.data.data, data)
self.assertEqual(artifact2.data.crc, crc)
self.assertEqual(artifact2.data.data, data)

def test_single_chunked_and_chunked_artifact(self):
file_name1 = "smallJar"
file_name2 = "junitLargeJar"
file_name2 = "largeJar"
small_jar_path = os.path.join(self.artifact_file_path, f"{file_name1}.jar")
small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name1}.txt")
large_jar_path = os.path.join(self.artifact_file_path, f"{file_name2}.jar")
large_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name2}.txt")
large_jar_size = os.path.getsize(large_jar_path)
if not os.path.isfile(small_jar_path):
raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.")
if not os.path.isfile(large_jar_path):
raise unittest.SkipTest(f"Skipped as {large_jar_path} does not exist.")
expected_chunks = int(
(large_jar_size + (ArtifactManager.CHUNK_SIZE - 1)) / ArtifactManager.CHUNK_SIZE
)

requests = list(
self.artifact_manager._create_requests(
Expand All @@ -357,38 +370,40 @@ def test_single_chunked_and_chunked_artifact(self):
file=False,
)
)
# There are a total of 14 requests.
# The 1st request contains a single artifact - smallJar.jar (There are no
# other artifacts batched with it since the next one is large multi-chunk artifact)
# Requests 2-13 (1-indexed) belong to the transfer of junitLargeJar.jar. This includes
# the first "beginning chunk" and the subsequent data chunks.
# The last request (14) contains two smallJar.jar batched
# together.
self.assertEqual(len(requests), 1 + 12 + 1)
# 1st request: batch with smallJar.jar
# Next expected_chunks requests: chunked transfer of largeJar.jar
# Last request: batch with two smallJar.jar
self.assertEqual(len(requests), 1 + expected_chunks + 1)

first_req_batch = requests[0].batch.artifacts
self.assertEqual(len(first_req_batch), 1)
self.assertEqual(first_req_batch[0].name, os.path.join("jars", f"{file_name1}.jar"))
with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2:
self.assertEqual(first_req_batch[0].data.crc, int(f1.readline()))
self.assertEqual(first_req_batch[0].data.data, f2.read())
with open(small_jar_path, "rb") as f:
small_data = f.read()
small_crc = self._compute_crc(small_data)[0]
self.assertEqual(first_req_batch[0].data.crc, small_crc)
self.assertEqual(first_req_batch[0].data.data, small_data)

second_req_batch = requests[1]
self.assertIsNotNone(second_req_batch.begin_chunk)
begin_chunk = second_req_batch.begin_chunk
self.assertEqual(begin_chunk.name, os.path.join("jars", f"{file_name2}.jar"))
self.assertEqual(begin_chunk.total_bytes, large_jar_size)
self.assertEqual(begin_chunk.num_chunks, 12)
self.assertEqual(begin_chunk.num_chunks, expected_chunks)
other_requests = requests[2:-1]
data_chunks = [begin_chunk.initial_chunk] + [req.chunk for req in other_requests]

with open(large_jar_crc_path) as f1, open(large_jar_path, "rb") as f2:
with open(large_jar_path, "rb") as f:
large_data = f.read()
expected_cscs = self._compute_crc(large_data)
cscs = [chunk.crc for chunk in data_chunks]
expected_cscs = [int(line.rstrip()) for line in f1]
self.assertEqual(cscs, expected_cscs)

binaries = [chunk.data for chunk in data_chunks]
expected_binaries = list(iter(lambda: f2.read(ArtifactManager.CHUNK_SIZE), b""))
expected_binaries = [
large_data[i : i + ArtifactManager.CHUNK_SIZE]
for i in range(0, len(large_data), ArtifactManager.CHUNK_SIZE)
]
self.assertEqual(binaries, expected_binaries)

last_request = requests[-1]
Expand All @@ -403,13 +418,10 @@ def test_single_chunked_and_chunked_artifact(self):
self.assertTrue(artifact2.name.endswith(".jar"))

self.assertEqual(os.path.join("jars", f"{file_name1}.jar"), artifact1.name)
with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2:
crc = int(f1.readline())
data = f2.read()
self.assertEqual(artifact1.data.crc, crc)
self.assertEqual(artifact1.data.data, data)
self.assertEqual(artifact2.data.crc, crc)
self.assertEqual(artifact2.data.data, data)
self.assertEqual(artifact1.data.crc, small_crc)
self.assertEqual(artifact1.data.data, small_data)
self.assertEqual(artifact2.data.crc, small_crc)
self.assertEqual(artifact2.data.data, small_data)

def test_copy_from_local_to_fs(self):
with tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs1") as d:
Expand Down
Loading