diff --git a/data/artifact-tests/crc/README.md b/data/artifact-tests/crc/README.md deleted file mode 100644 index df9af41064444..0000000000000 --- a/data/artifact-tests/crc/README.md +++ /dev/null @@ -1,5 +0,0 @@ -The CRCs for a specific file are stored in a text file with the same name (excluding the original extension). - -The CRCs are calculated for data chunks of `32768 bytes` (individual CRCs) and are newline delimited. - -The CRCs were calculated using https://simplycalc.com/crc32-file.php \ No newline at end of file diff --git a/data/artifact-tests/crc/junitLargeJar.txt b/data/artifact-tests/crc/junitLargeJar.txt deleted file mode 100644 index 3e89631dea57c..0000000000000 --- a/data/artifact-tests/crc/junitLargeJar.txt +++ /dev/null @@ -1,12 +0,0 @@ -902183889 -2415704507 -1084811487 -1951510 -1158852476 -2003120166 -3026803842 -3850244775 -3409267044 -652109216 -104029242 -3019434266 \ No newline at end of file diff --git a/data/artifact-tests/crc/smallJar.txt b/data/artifact-tests/crc/smallJar.txt deleted file mode 100644 index df32adcce7ab5..0000000000000 --- a/data/artifact-tests/crc/smallJar.txt +++ /dev/null @@ -1 +0,0 @@ -1631702900 \ No newline at end of file diff --git a/data/artifact-tests/junitLargeJar.jar b/data/artifact-tests/junitLargeJar.jar deleted file mode 100755 index 6da55d8b8520d..0000000000000 Binary files a/data/artifact-tests/junitLargeJar.jar and /dev/null differ diff --git a/data/artifact-tests/smallJar.jar b/data/artifact-tests/smallJar.jar deleted file mode 100755 index 3c4930e8e9549..0000000000000 Binary files a/data/artifact-tests/smallJar.jar and /dev/null differ diff --git a/dev/test-classes.txt b/dev/test-classes.txt index 2dc6b290ad4f9..e69de29bb2d1d 100644 --- a/dev/test-classes.txt +++ b/dev/test-classes.txt @@ -1,7 +0,0 @@ -sql/core/src/test/resources/artifact-tests/Hello.class -sql/core/src/test/resources/artifact-tests/IntSumUdf.class -sql/core/src/test/resources/artifact-tests/smallClassFile.class -sql/connect/common/src/test/resources/artifact-tests/Hello.class -sql/core/src/test/resources/artifact-tests/HelloWithPackage.class -sql/connect/common/src/test/resources/artifact-tests/smallClassFile.class -sql/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class diff --git a/dev/test-jars.txt b/dev/test-jars.txt index 6d4d9732ff14a..e69de29bb2d1d 100644 --- a/dev/test-jars.txt +++ b/dev/test-jars.txt @@ -1,4 +0,0 @@ -data/artifact-tests/junitLargeJar.jar -data/artifact-tests/smallJar.jar -sql/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar -sql/connect/common/src/test/resources/artifact-tests/smallJar.jar diff --git a/python/pyspark/sql/tests/connect/client/test_artifact.py b/python/pyspark/sql/tests/connect/client/test_artifact.py index d5eb81061d33e..8400e4d1562ad 100644 --- a/python/pyspark/sql/tests/connect/client/test_artifact.py +++ b/python/pyspark/sql/tests/connect/client/test_artifact.py @@ -19,11 +19,12 @@ import tempfile import unittest import os +import zipfile +import zlib from pyspark.util import is_remote_only from pyspark.sql import SparkSession from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect -from pyspark.testing.sqlutils import SPARK_HOME from pyspark.sql.functions import udf, assert_true, lit if should_test_connect: @@ -205,15 +206,37 @@ def root(cls): def setUpClass(cls): super().setUpClass() cls.artifact_manager: ArtifactManager = cls.spark._client._artifact_manager - cls.base_resource_dir = os.path.join(SPARK_HOME, "data") - cls.artifact_file_path = os.path.join( - cls.base_resource_dir, - "artifact-tests", - ) - cls.artifact_crc_path = os.path.join( - cls.artifact_file_path, - "crc", - ) + cls.artifact_file_path = tempfile.mkdtemp(prefix="artifact_tests_") + + # Generate test artifacts as ZIP files with .jar extension. + # These are not valid JAR files (no MANIFEST.MF), but the tests only verify + # byte-level transfer protocol (chunking, CRC), so the .jar extension alone + # is sufficient to trigger the correct artifact routing (jars/ prefix). + + # Generate smallJar.jar + small_jar_path = os.path.join(cls.artifact_file_path, "smallJar.jar") + with zipfile.ZipFile(small_jar_path, "w") as zf: + zf.writestr("dummy.txt", "hello") + + # Generate largeJar.jar (>32KB for multi-chunk tests) + large_jar_path = os.path.join(cls.artifact_file_path, "largeJar.jar") + with zipfile.ZipFile(large_jar_path, "w", zipfile.ZIP_STORED) as zf: + for i in range(50): + zf.writestr(f"entry_{i}.txt", f"data_{i}" * 1000) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.artifact_file_path, ignore_errors=True) + super().tearDownClass() + + @staticmethod + def _compute_crc(data): + """Compute per-chunk CRC32 values matching Java's CRC32.""" + chunk_size = ArtifactManager.CHUNK_SIZE + crcs = [] + for i in range(0, len(data), chunk_size): + crcs.append(zlib.crc32(data[i : i + chunk_size]) & 0xFFFFFFFF) + return crcs @classmethod def conf(cls): @@ -224,8 +247,6 @@ def conf(cls): def test_basic_requests(self): file_name = "smallJar" small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar") - if not os.path.isfile(small_jar_path): - raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.") response = self.artifact_manager._retrieve_responses( self.artifact_manager._create_requests( small_jar_path, pyfile=False, archive=False, file=False @@ -236,9 +257,6 @@ def test_basic_requests(self): def test_single_chunk_artifact(self): file_name = "smallJar" small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar") - small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt") - if not os.path.isfile(small_jar_path): - raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.") requests = list( self.artifact_manager._create_requests( @@ -257,24 +275,21 @@ def test_single_chunk_artifact(self): self.assertTrue(single_artifact.name.endswith(".jar")) self.assertEqual(os.path.join("jars", f"{file_name}.jar"), single_artifact.name) - with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2: - self.assertEqual(single_artifact.data.crc, int(f1.readline())) - self.assertEqual(single_artifact.data.data, f2.read()) + with open(small_jar_path, "rb") as f: + data = f.read() + expected_crc = self._compute_crc(data)[0] + self.assertEqual(single_artifact.data.crc, expected_crc) + self.assertEqual(single_artifact.data.data, data) def test_chunked_artifacts(self): - file_name = "junitLargeJar" + file_name = "largeJar" large_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar") - large_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt") - if not os.path.isfile(large_jar_path): - raise unittest.SkipTest(f"Skipped as {large_jar_path} does not exist.") requests = list( self.artifact_manager._create_requests( large_jar_path, pyfile=False, archive=False, file=False ) ) - # Expected chunks = roundUp( file_size / chunk_size) = 12 - # File size of `junitLargeJar.jar` is 384581 bytes. large_jar_size = os.path.getsize(large_jar_path) expected_chunks = int( (large_jar_size + (ArtifactManager.CHUNK_SIZE - 1)) / ArtifactManager.CHUNK_SIZE @@ -289,21 +304,22 @@ def test_chunked_artifacts(self): other_requests = requests[1:] data_chunks = [begin_chunk.initial_chunk] + [req.chunk for req in other_requests] - with open(large_jar_crc_path) as f1, open(large_jar_path, "rb") as f2: + with open(large_jar_path, "rb") as f: + data = f.read() + expected_cscs = self._compute_crc(data) cscs = [chunk.crc for chunk in data_chunks] - expected_cscs = [int(line.rstrip()) for line in f1] self.assertEqual(cscs, expected_cscs) binaries = [chunk.data for chunk in data_chunks] - expected_binaries = list(iter(lambda: f2.read(ArtifactManager.CHUNK_SIZE), b"")) + expected_binaries = [ + data[i : i + ArtifactManager.CHUNK_SIZE] + for i in range(0, len(data), ArtifactManager.CHUNK_SIZE) + ] self.assertEqual(binaries, expected_binaries) def test_batched_artifacts(self): file_name = "smallJar" small_jar_path = os.path.join(self.artifact_file_path, f"{file_name}.jar") - small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name}.txt") - if not os.path.isfile(small_jar_path): - raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.") requests = list( self.artifact_manager._create_requests( @@ -325,9 +341,9 @@ def test_batched_artifacts(self): self.assertTrue(artifact2.name.endswith(".jar")) self.assertEqual(os.path.join("jars", f"{file_name}.jar"), artifact1.name) - with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2: - crc = int(f1.readline()) - data = f2.read() + with open(small_jar_path, "rb") as f: + data = f.read() + crc = self._compute_crc(data)[0] self.assertEqual(artifact1.data.crc, crc) self.assertEqual(artifact1.data.data, data) self.assertEqual(artifact2.data.crc, crc) @@ -335,16 +351,13 @@ def test_batched_artifacts(self): def test_single_chunked_and_chunked_artifact(self): file_name1 = "smallJar" - file_name2 = "junitLargeJar" + file_name2 = "largeJar" small_jar_path = os.path.join(self.artifact_file_path, f"{file_name1}.jar") - small_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name1}.txt") large_jar_path = os.path.join(self.artifact_file_path, f"{file_name2}.jar") - large_jar_crc_path = os.path.join(self.artifact_crc_path, f"{file_name2}.txt") large_jar_size = os.path.getsize(large_jar_path) - if not os.path.isfile(small_jar_path): - raise unittest.SkipTest(f"Skipped as {small_jar_path} does not exist.") - if not os.path.isfile(large_jar_path): - raise unittest.SkipTest(f"Skipped as {large_jar_path} does not exist.") + expected_chunks = int( + (large_jar_size + (ArtifactManager.CHUNK_SIZE - 1)) / ArtifactManager.CHUNK_SIZE + ) requests = list( self.artifact_manager._create_requests( @@ -357,38 +370,40 @@ def test_single_chunked_and_chunked_artifact(self): file=False, ) ) - # There are a total of 14 requests. - # The 1st request contains a single artifact - smallJar.jar (There are no - # other artifacts batched with it since the next one is large multi-chunk artifact) - # Requests 2-13 (1-indexed) belong to the transfer of junitLargeJar.jar. This includes - # the first "beginning chunk" and the subsequent data chunks. - # The last request (14) contains two smallJar.jar batched - # together. - self.assertEqual(len(requests), 1 + 12 + 1) + # 1st request: batch with smallJar.jar + # Next expected_chunks requests: chunked transfer of largeJar.jar + # Last request: batch with two smallJar.jar + self.assertEqual(len(requests), 1 + expected_chunks + 1) first_req_batch = requests[0].batch.artifacts self.assertEqual(len(first_req_batch), 1) self.assertEqual(first_req_batch[0].name, os.path.join("jars", f"{file_name1}.jar")) - with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2: - self.assertEqual(first_req_batch[0].data.crc, int(f1.readline())) - self.assertEqual(first_req_batch[0].data.data, f2.read()) + with open(small_jar_path, "rb") as f: + small_data = f.read() + small_crc = self._compute_crc(small_data)[0] + self.assertEqual(first_req_batch[0].data.crc, small_crc) + self.assertEqual(first_req_batch[0].data.data, small_data) second_req_batch = requests[1] self.assertIsNotNone(second_req_batch.begin_chunk) begin_chunk = second_req_batch.begin_chunk self.assertEqual(begin_chunk.name, os.path.join("jars", f"{file_name2}.jar")) self.assertEqual(begin_chunk.total_bytes, large_jar_size) - self.assertEqual(begin_chunk.num_chunks, 12) + self.assertEqual(begin_chunk.num_chunks, expected_chunks) other_requests = requests[2:-1] data_chunks = [begin_chunk.initial_chunk] + [req.chunk for req in other_requests] - with open(large_jar_crc_path) as f1, open(large_jar_path, "rb") as f2: + with open(large_jar_path, "rb") as f: + large_data = f.read() + expected_cscs = self._compute_crc(large_data) cscs = [chunk.crc for chunk in data_chunks] - expected_cscs = [int(line.rstrip()) for line in f1] self.assertEqual(cscs, expected_cscs) binaries = [chunk.data for chunk in data_chunks] - expected_binaries = list(iter(lambda: f2.read(ArtifactManager.CHUNK_SIZE), b"")) + expected_binaries = [ + large_data[i : i + ArtifactManager.CHUNK_SIZE] + for i in range(0, len(large_data), ArtifactManager.CHUNK_SIZE) + ] self.assertEqual(binaries, expected_binaries) last_request = requests[-1] @@ -403,13 +418,10 @@ def test_single_chunked_and_chunked_artifact(self): self.assertTrue(artifact2.name.endswith(".jar")) self.assertEqual(os.path.join("jars", f"{file_name1}.jar"), artifact1.name) - with open(small_jar_crc_path) as f1, open(small_jar_path, "rb") as f2: - crc = int(f1.readline()) - data = f2.read() - self.assertEqual(artifact1.data.crc, crc) - self.assertEqual(artifact1.data.data, data) - self.assertEqual(artifact2.data.crc, crc) - self.assertEqual(artifact2.data.data, data) + self.assertEqual(artifact1.data.crc, small_crc) + self.assertEqual(artifact1.data.data, small_data) + self.assertEqual(artifact2.data.crc, small_crc) + self.assertEqual(artifact2.data.data, small_data) def test_copy_from_local_to_fs(self): with tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs1") as d: diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index e17cda3a5ba2f..8961a49c09ce1 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -16,12 +16,12 @@ */ package org.apache.spark.sql.connect.client -import java.io.InputStream +import java.io.{FileOutputStream, InputStream} import java.net.URI import java.nio.file.{Files, Path, Paths} import java.util.concurrent.TimeUnit - -import scala.jdk.CollectionConverters._ +import java.util.jar.{JarEntry, JarOutputStream} +import java.util.zip.CRC32 import com.google.protobuf.ByteString import io.grpc.{ManagedChannel, Server} @@ -86,16 +86,54 @@ class ArtifactSuite extends ConnectFunSuite { } private val CHUNK_SIZE: Int = 32 * 1024 - protected def artifactFilePath: Path = commonResourcePath.resolve("artifact-tests") - protected def artifactCrcPath: Path = artifactFilePath.resolve("crc") + + protected lazy val artifactFilePath: Path = { + val dir = Files.createTempDirectory("artifact-tests") + dir.toFile.deleteOnExit() + // Generate test artifacts. The .class files and JAR entries are not valid Java + // class files -- they contain arbitrary bytes. The tests only verify byte-level + // transfer protocol (chunking, CRC), so valid class content is not required. + val small = "public class smallClassFile {}".getBytes + Files.write(dir.resolve("smallClassFile.class"), small) + Files.write(dir.resolve("smallClassFileDup.class"), small) + Files.write(dir.resolve("Hello.class"), "Hello".getBytes) + // small JAR + val smallJar = dir.resolve("smallJar.jar") + val jos = new JarOutputStream(new FileOutputStream(smallJar.toFile)) + try { + jos.putNextEntry(new JarEntry("Dummy.class")) + jos.write(small) + } finally { jos.close() } + // large JAR (>32KB) - use STORED to avoid compression + val largeJar = dir.resolve("largeJar.jar") + val jos2 = new JarOutputStream(new FileOutputStream(largeJar.toFile)) + try { + for (i <- 0 until 100) { + val data = new Array[Byte](4096) + java.util.Arrays.fill(data, i.toByte) + val entry = new JarEntry(s"pkg/Class$i.class") + entry.setMethod(java.util.zip.ZipEntry.STORED) + entry.setSize(data.length) + entry.setCompressedSize(data.length) + val crc = new CRC32() + crc.update(data) + entry.setCrc(crc.getValue) + jos2.putNextEntry(entry) + jos2.write(data) + } + } finally { jos2.close() } + dir + } private def getCrcValues(filePath: Path): Seq[Long] = { - val fileName = filePath.getFileName.toString - val crcFileName = fileName.split('.').head + ".txt" - Files - .readAllLines(artifactCrcPath.resolve(crcFileName)) - .asScala - .map(_.toLong) + val data = Files.readAllBytes(filePath) + data + .grouped(CHUNK_SIZE) + .map { chunk => + val c = new CRC32() + c.update(chunk) + c.getValue + } .toSeq } @@ -117,7 +155,6 @@ class ArtifactSuite extends ConnectFunSuite { private def singleChunkArtifactTest(path: String): Unit = { test(s"Single Chunk Artifact - $path") { val artifactPath = artifactFilePath.resolve(path) - assume(artifactPath.toFile.exists) artifactManager.addArtifact(artifactPath.toString) val receivedRequests = service.getAndClearLatestAddArtifactRequests() @@ -177,20 +214,17 @@ class ArtifactSuite extends ConnectFunSuite { } } - test("Chunked Artifact - junitLargeJar.jar") { - val artifactPath = artifactFilePath.resolve("junitLargeJar.jar") - assume(artifactPath.toFile.exists) + test("Chunked Artifact - largeJar.jar") { + val artifactPath = artifactFilePath.resolve("largeJar.jar") artifactManager.addArtifact(artifactPath.toString) - // Expected chunks = roundUp( file_size / chunk_size) = 12 - // File size of `junitLargeJar.jar` is 384581 bytes. - val expectedChunks = (384581 + (CHUNK_SIZE - 1)) / CHUNK_SIZE + val largeJarSize = Files.size(artifactPath) + val expectedChunks = ((largeJarSize + CHUNK_SIZE - 1) / CHUNK_SIZE).toInt val receivedRequests = service.getAndClearLatestAddArtifactRequests() - assert(384581 == Files.size(artifactPath)) assert(receivedRequests.size == expectedChunks) assert(receivedRequests.head.hasBeginChunk) val beginChunkRequest = receivedRequests.head.getBeginChunk - assert(beginChunkRequest.getName == "jars/junitLargeJar.jar") - assert(beginChunkRequest.getTotalBytes == 384581) + assert(beginChunkRequest.getName == "jars/largeJar.jar") + assert(beginChunkRequest.getTotalBytes == largeJarSize) assert(beginChunkRequest.getNumChunks == expectedChunks) val dataChunks = Seq(beginChunkRequest.getInitialChunk) ++ receivedRequests.drop(1).map(_.getChunk) @@ -199,10 +233,8 @@ class ArtifactSuite extends ConnectFunSuite { test("Batched SingleChunkArtifacts") { val path1 = artifactFilePath.resolve("smallClassFile.class") - assume(path1.toFile.exists) val file1 = path1.toUri val path2 = artifactFilePath.resolve("smallJar.jar") - assume(path2.toFile.exists) val file2 = path2.toUri artifactManager.addArtifacts(Seq(file1, file2)) val receivedRequests = service.getAndClearLatestAddArtifactRequests() @@ -225,27 +257,22 @@ class ArtifactSuite extends ConnectFunSuite { test("Mix of SingleChunkArtifact and chunked artifact") { val path1 = artifactFilePath.resolve("smallClassFile.class") - assume(path1.toFile.exists) val file1 = path1.toUri - val path2 = artifactFilePath.resolve("junitLargeJar.jar") - assume(path2.toFile.exists) + val path2 = artifactFilePath.resolve("largeJar.jar") val file2 = path2.toUri val path3 = artifactFilePath.resolve("smallClassFileDup.class") - assume(path3.toFile.exists) val file3 = path3.toUri val path4 = artifactFilePath.resolve("smallJar.jar") - assume(path4.toFile.exists) val file4 = path4.toUri artifactManager.addArtifacts(Seq(file1, file2, file3, file4)) + val largeJarSize = Files.size(path2) + val largeJarChunks = ((largeJarSize + CHUNK_SIZE - 1) / CHUNK_SIZE).toInt val receivedRequests = service.getAndClearLatestAddArtifactRequests() - // There are a total of 14 requests. // The 1st request contains a single artifact - smallClassFile.class (There are no // other artifacts batched with it since the next one is large multi-chunk artifact) - // Requests 2-13 (1-indexed) belong to the transfer of junitLargeJar.jar. This includes - // the first "beginning chunk" and the subsequent data chunks. - // The last request (14) contains both smallClassFileDup.class and smallJar.jar batched - // together. - assert(receivedRequests.size == 1 + 12 + 1) + // Next `largeJarChunks` requests belong to the transfer of largeJar.jar. + // The last request contains both smallClassFileDup.class and smallJar.jar batched together. + assert(receivedRequests.size == 1 + largeJarChunks + 1) val firstReqBatch = receivedRequests.head.getBatch.getArtifactsList assert(firstReqBatch.size() == 1) @@ -255,10 +282,10 @@ class ArtifactSuite extends ConnectFunSuite { val secondReq = receivedRequests(1) assert(secondReq.hasBeginChunk) val beginChunkRequest = secondReq.getBeginChunk - assert(beginChunkRequest.getName == "jars/junitLargeJar.jar") - assert(beginChunkRequest.getTotalBytes == 384581) - assert(beginChunkRequest.getNumChunks == 12) - // Large artifact data chunks are requests number 3 to 13. + assert(beginChunkRequest.getName == "jars/largeJar.jar") + assert(beginChunkRequest.getTotalBytes == largeJarSize) + assert(beginChunkRequest.getNumChunks == largeJarChunks) + // Large artifact data chunks follow the begin chunk request. val dataChunks = Seq(beginChunkRequest.getInitialChunk) ++ receivedRequests.drop(2).dropRight(1).map(_.getChunk) checkChunksDataAndCrc(Paths.get(file2), dataChunks) @@ -303,7 +330,6 @@ class ArtifactSuite extends ConnectFunSuite { test("artifact with custom target") { val artifactPath = artifactFilePath.resolve("smallClassFile.class") - assume(artifactPath.toFile.exists) val target = "sub/package/smallClassFile.class" artifactManager.addArtifact(artifactPath.toString, target) val receivedRequests = service.getAndClearLatestAddArtifactRequests() @@ -324,7 +350,6 @@ class ArtifactSuite extends ConnectFunSuite { test("in-memory artifact with custom target") { val artifactPath = artifactFilePath.resolve("smallClassFile.class") - assume(artifactPath.toFile.exists) val artifactBytes = Files.readAllBytes(artifactPath) val target = "sub/package/smallClassFile.class" artifactManager.addArtifact(artifactBytes, target) @@ -348,7 +373,6 @@ class ArtifactSuite extends ConnectFunSuite { "When both source and target paths are given, extension conditions are checked " + "on target path") { val artifactPath = artifactFilePath.resolve("smallClassFile.class") - assume(artifactPath.toFile.exists) assertThrows[UnsupportedOperationException] { artifactManager.addArtifact(artifactPath.toString, "dummy.extension") } diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ClassFinderSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ClassFinderSuite.scala index 5eef1de0a5437..0f616b460aecf 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ClassFinderSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ClassFinderSuite.scala @@ -16,21 +16,20 @@ */ package org.apache.spark.sql.connect.client -import java.nio.file.Paths +import java.nio.file.{Files, Paths} import org.apache.spark.sql.connect.test.ConnectFunSuite -import org.apache.spark.util.SparkFileUtils class ClassFinderSuite extends ConnectFunSuite { - private val classResourcePath = commonResourcePath.resolve("artifact-tests") + private val dummyContent = Array[Byte](0xca.toByte, 0xfe.toByte, 0xba.toByte, 0xbe.toByte) test("REPLClassDirMonitor functionality test") { val requiredClasses = Seq("Hello.class", "smallClassFile.class", "smallClassFileDup.class") - requiredClasses.foreach(className => - assume(classResourcePath.resolve(className).toFile.exists)) - val copyDir = SparkFileUtils.createTempDir().toPath - SparkFileUtils.copyDirectory(classResourcePath.toFile, copyDir.toFile) + val copyDir = Files.createTempDirectory("classFinderTest") + copyDir.toFile.deleteOnExit() + requiredClasses.foreach(name => Files.write(copyDir.resolve(name), dummyContent)) + val monitor = new REPLClassDirMonitor(copyDir.toAbsolutePath.toString) def checkClasses(monitor: REPLClassDirMonitor, additionalClasses: Seq[String] = Nil): Unit = { @@ -44,12 +43,11 @@ class ClassFinderSuite extends ConnectFunSuite { checkClasses(monitor) - // Add new class file into directory - val subDir = SparkFileUtils.createTempDir(copyDir.toAbsolutePath.toString) - val classToCopy = copyDir.resolve("Hello.class") - val copyLocation = subDir.toPath.resolve("HelloDup.class") - SparkFileUtils.copyFile(classToCopy.toFile, copyLocation.toFile) + // Add new class file into a subdirectory + val subDir = Files.createTempDirectory(copyDir, "sub") + subDir.toFile.deleteOnExit() + Files.write(subDir.resolve("HelloDup.class"), dummyContent) - checkClasses(monitor, Seq(s"${subDir.getName}/HelloDup.class")) + checkClasses(monitor, Seq(s"${subDir.getFileName}/HelloDup.class")) } } diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index e833657984dcb..22feaff1c77f1 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -397,10 +397,10 @@ class SparkConnectClientSuite extends ConnectFunSuite { .build() val session = SparkSession.builder().client(client).create() - val artifactFilePath = commonResourcePath.resolve("artifact-tests") - val path = artifactFilePath.resolve("smallClassFile.class") - assume(path.toFile.exists) - session.addArtifact(path.toString) + val tmpFile = java.io.File.createTempFile("smallClassFile", ".class") + tmpFile.deleteOnExit() + java.nio.file.Files.write(tmpFile.toPath, "dummy".getBytes) + session.addArtifact(tmpFile.getAbsolutePath) } private def buildPlan(query: String): proto.Plan = { diff --git a/sql/connect/common/src/test/resources/artifact-tests/Hello.class b/sql/connect/common/src/test/resources/artifact-tests/Hello.class deleted file mode 100644 index 56725764de205..0000000000000 Binary files a/sql/connect/common/src/test/resources/artifact-tests/Hello.class and /dev/null differ diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/Hello.txt b/sql/connect/common/src/test/resources/artifact-tests/crc/Hello.txt deleted file mode 100644 index 799ce78f11b7e..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/Hello.txt +++ /dev/null @@ -1 +0,0 @@ -553633018 \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/README.md b/sql/connect/common/src/test/resources/artifact-tests/crc/README.md deleted file mode 100644 index df9af41064444..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/README.md +++ /dev/null @@ -1,5 +0,0 @@ -The CRCs for a specific file are stored in a text file with the same name (excluding the original extension). - -The CRCs are calculated for data chunks of `32768 bytes` (individual CRCs) and are newline delimited. - -The CRCs were calculated using https://simplycalc.com/crc32-file.php \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/junitLargeJar.txt b/sql/connect/common/src/test/resources/artifact-tests/crc/junitLargeJar.txt deleted file mode 100644 index 3e89631dea57c..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/junitLargeJar.txt +++ /dev/null @@ -1,12 +0,0 @@ -902183889 -2415704507 -1084811487 -1951510 -1158852476 -2003120166 -3026803842 -3850244775 -3409267044 -652109216 -104029242 -3019434266 \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFile.txt b/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFile.txt deleted file mode 100644 index 531f98ce9a225..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFile.txt +++ /dev/null @@ -1 +0,0 @@ -1935693963 \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFileDup.txt b/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFileDup.txt deleted file mode 100644 index 531f98ce9a225..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/smallClassFileDup.txt +++ /dev/null @@ -1 +0,0 @@ -1935693963 \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/crc/smallJar.txt b/sql/connect/common/src/test/resources/artifact-tests/crc/smallJar.txt deleted file mode 100644 index df32adcce7ab5..0000000000000 --- a/sql/connect/common/src/test/resources/artifact-tests/crc/smallJar.txt +++ /dev/null @@ -1 +0,0 @@ -1631702900 \ No newline at end of file diff --git a/sql/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar b/sql/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar deleted file mode 100755 index 6da55d8b8520d..0000000000000 Binary files a/sql/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar and /dev/null differ diff --git a/sql/connect/common/src/test/resources/artifact-tests/smallClassFile.class b/sql/connect/common/src/test/resources/artifact-tests/smallClassFile.class deleted file mode 100755 index e796030e471b0..0000000000000 Binary files a/sql/connect/common/src/test/resources/artifact-tests/smallClassFile.class and /dev/null differ diff --git a/sql/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class b/sql/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class deleted file mode 100755 index e796030e471b0..0000000000000 Binary files a/sql/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class and /dev/null differ diff --git a/sql/connect/common/src/test/resources/artifact-tests/smallJar.jar b/sql/connect/common/src/test/resources/artifact-tests/smallJar.jar deleted file mode 100755 index 3c4930e8e9549..0000000000000 Binary files a/sql/connect/common/src/test/resources/artifact-tests/smallJar.jar and /dev/null differ diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala index 1df8ba46286cb..75d7fdd4d8848 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala @@ -16,9 +16,11 @@ */ package org.apache.spark.sql.connect.service -import java.io.InputStream +import java.io.{FileOutputStream, InputStream} import java.nio.file.{Files, Path} import java.util.UUID +import java.util.jar.{JarEntry, JarOutputStream} +import java.util.zip.CRC32 import scala.collection.mutable import scala.concurrent.Promise @@ -37,7 +39,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.sql.connect.ResourceHelper import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { @@ -85,8 +87,88 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { def forceCleanUp(): Unit = super.cleanUpStagedArtifacts() } - protected val inputFilePath: Path = commonResourcePath.resolve("artifact-tests") - protected val crcPath: Path = inputFilePath.resolve("crc") + // Generate test artifacts and CRC files dynamically. + protected lazy val inputFilePath: Path = { + val dir = Utils.createTempDir().toPath.resolve("artifact-tests") + Files.createDirectories(dir) + val crc = dir.resolve("crc") + Files.createDirectories(crc) + // Generate test artifacts. The .class files and JAR entries are not valid Java + // class files -- they contain arbitrary bytes. The tests only verify byte-level + // transfer protocol (chunking, CRC), so valid class content is not required. + // smallClassFile.class -- minimal Java class + val small = "public class smallClassFile {}".getBytes + createClassFileAndCrc(dir, crc, "smallClassFile", small) + // smallClassFileDup.class -- same content + createClassFileAndCrc(dir, crc, "smallClassFileDup", small) + // smallJar.jar -- minimal JAR containing a dummy class + createSmallJar(dir, crc, "smallJar", small) + // largeJar.jar -- large JAR for multi-chunk transfer test (>32KB) + createLargeJar(dir, crc, "largeJar") + // Hello.class -- small dummy (only used for byte transfer) + createClassFileAndCrc(dir, crc, "Hello", small) + dir + } + protected lazy val crcPath: Path = inputFilePath.resolve("crc") + + private def writeCrc(crcDir: Path, baseName: String, data: Array[Byte]): Unit = { + val chunkSize = CHUNK_SIZE + val crcs = data + .grouped(chunkSize) + .map { chunk => + val c = new CRC32() + c.update(chunk) + c.getValue.toString + } + .toSeq + Files.writeString(crcDir.resolve(s"$baseName.txt"), crcs.mkString("\n")) + } + + private def createClassFileAndCrc( + dir: Path, + crcDir: Path, + name: String, + content: Array[Byte]): Unit = { + val file = dir.resolve(s"$name.class") + Files.write(file, content) + writeCrc(crcDir, name, content) + } + + private def createSmallJar( + dir: Path, + crcDir: Path, + name: String, + classContent: Array[Byte]): Unit = { + val jarFile = dir.resolve(s"$name.jar") + val jos = new JarOutputStream(new FileOutputStream(jarFile.toFile)) + try { + jos.putNextEntry(new JarEntry("HelloWorld/Main.class")) + jos.write(classContent) + } finally { jos.close() } + writeCrc(crcDir, name, Files.readAllBytes(jarFile)) + } + + private def createLargeJar(dir: Path, crcDir: Path, name: String): Unit = { + val jarFile = dir.resolve(s"$name.jar") + val jos = new JarOutputStream(new FileOutputStream(jarFile.toFile)) + try { + // Create enough entries to exceed 32KB (CHUNK_SIZE), using STORED to avoid compression + for (i <- 0 until 100) { + val data = new Array[Byte](4096) + java.util.Arrays.fill(data, i.toByte) + val entry = new JarEntry(s"pkg/Class$i.class") + entry.setMethod(java.util.zip.ZipEntry.STORED) + entry.setSize(data.length) + entry.setCompressedSize(data.length) + val crc = new CRC32() + crc.update(data) + entry.setCrc(crc.getValue) + jos.putNextEntry(entry) + jos.write(data) + } + } finally { jos.close() } + writeCrc(crcDir, name, Files.readAllBytes(jarFile)) + } private def readNextChunk(in: InputStream): ByteString = { val buf = new Array[Byte](CHUNK_SIZE) @@ -209,7 +291,6 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { try { val name = "classes/smallClassFile.class" val artifactPath = inputFilePath.resolve("smallClassFile.class") - assume(artifactPath.toFile.exists) addSingleChunkArtifact(handler, name, artifactPath) handler.onCompleted() val response = ThreadUtils.awaitResult(promise.future, 5.seconds) @@ -232,9 +313,8 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { val promise = Promise[AddArtifactsResponse]() val handler = new TestAddArtifactsHandler(new DummyStreamObserver(promise)) try { - val name = "jars/junitLargeJar.jar" - val artifactPath = inputFilePath.resolve("junitLargeJar.jar") - assume(artifactPath.toFile.exists) + val name = "jars/largeJar.jar" + val artifactPath = inputFilePath.resolve("largeJar.jar") addChunkedArtifact(handler, name, artifactPath) handler.onCompleted() val response = ThreadUtils.awaitResult(promise.future, 5.seconds) @@ -259,16 +339,15 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { try { val names = Seq( "classes/smallClassFile.class", - "jars/junitLargeJar.jar", + "jars/largeJar.jar", "classes/smallClassFileDup.class", "jars/smallJar.jar") val artifactPaths = Seq( inputFilePath.resolve("smallClassFile.class"), - inputFilePath.resolve("junitLargeJar.jar"), + inputFilePath.resolve("largeJar.jar"), inputFilePath.resolve("smallClassFileDup.class"), inputFilePath.resolve("smallJar.jar")) - artifactPaths.foreach(p => assume(p.toFile.exists)) addSingleChunkArtifact(handler, names.head, artifactPaths.head) addChunkedArtifact(handler, names(1), artifactPaths(1)) @@ -300,7 +379,6 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { try { val name = "classes/smallClassFile.class" val artifactPath = inputFilePath.resolve("smallClassFile.class") - assume(artifactPath.toFile.exists) val dataChunks = getDataChunks(artifactPath) assert(dataChunks.size == 1) val bytes = dataChunks.head @@ -473,7 +551,6 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper { val artifactPath1 = inputFilePath.resolve("smallClassFile.class") val artifactPath2 = inputFilePath.resolve("smallJar.jar") - assume(artifactPath1.toFile.exists) addSingleChunkArtifact(handler, sessionKey, name1, artifactPath1) addSingleChunkArtifact(handler, sessionKey, name3, artifactPath1) diff --git a/sql/core/src/test/resources/artifact-tests/Hello.class b/sql/core/src/test/resources/artifact-tests/Hello.class deleted file mode 100644 index 56725764de205..0000000000000 Binary files a/sql/core/src/test/resources/artifact-tests/Hello.class and /dev/null differ diff --git a/sql/core/src/test/resources/artifact-tests/HelloWithPackage.class b/sql/core/src/test/resources/artifact-tests/HelloWithPackage.class deleted file mode 100644 index f0ff0c4f5cf03..0000000000000 Binary files a/sql/core/src/test/resources/artifact-tests/HelloWithPackage.class and /dev/null differ diff --git a/sql/core/src/test/resources/artifact-tests/IntSumUdf.class b/sql/core/src/test/resources/artifact-tests/IntSumUdf.class deleted file mode 100644 index 75a41446cfca1..0000000000000 Binary files a/sql/core/src/test/resources/artifact-tests/IntSumUdf.class and /dev/null differ diff --git a/sql/core/src/test/resources/artifact-tests/smallClassFile.class b/sql/core/src/test/resources/artifact-tests/smallClassFile.class deleted file mode 100755 index e796030e471b0..0000000000000 Binary files a/sql/core/src/test/resources/artifact-tests/smallClassFile.class and /dev/null differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala index 3a361a90f04f9..a1678335b7755 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.DataTypes import org.apache.spark.storage.CacheId -import org.apache.spark.util.Utils +import org.apache.spark.util.{SparkTestUtils, Utils} class ArtifactManagerSuite extends SharedSparkSession { @@ -40,7 +40,62 @@ class ArtifactManagerSuite extends SharedSparkSession { conf.set(SQLConf.ARTIFACTS_SESSION_ISOLATION_ALWAYS_APPLY_CLASSLOADER, true) } - private val artifactPath = new File("src/test/resources/artifact-tests").toPath + private lazy val artifactPath: Path = { + val dir = Utils.createTempDir().toPath.resolve("artifact-tests") + Files.createDirectories(dir) + val cp = System.getProperty("java.class.path") + .split(File.pathSeparator).map(p => new File(p).toURI.toURL).toSeq + // Hello.class -- Scala class implementing Function1, used by classloader and UDF tests + val helloScala = File.createTempFile("Hello", ".scala") + helloScala.deleteOnExit() + Files.writeString(helloScala.toPath, + """class Hello(val name: String) extends (String => String) with Serializable { + | def this() = this("World") + | def msg(): String = "Hello " + name + "! Nice to meet you!" + | def apply(s: String): String = msg() + |}""".stripMargin) + val helloJar = new File(Utils.createTempDir(), "Hello.jar") + SparkTestUtils.createJarWithScalaSources(Seq(helloScala), helloJar, cp) + extractClassFromJar(helloJar, "Hello.class", dir) + // HelloWithPackage.class -- Java class with package + val helloSrc = new File("src/test/resources/artifact-tests/HelloWithPackage.java") + SparkTestUtils.createCompiledClass("HelloWithPackage", dir.toFile, + new SparkTestUtils.JavaSourceFromString("my.custom.pkg.HelloWithPackage", + new String(Files.readAllBytes(helloSrc.toPath))), Seq.empty) + val pkgDir = dir.resolve("my/custom/pkg") + if (pkgDir.resolve("HelloWithPackage.class").toFile.exists) { + Files.move(pkgDir.resolve("HelloWithPackage.class"), dir.resolve("HelloWithPackage.class")) + } + // smallClassFile.class -- dummy class for transfer tests + SparkTestUtils.createCompiledClass("smallClassFile", dir.toFile, + new SparkTestUtils.JavaSourceFromString("smallClassFile", + "public class smallClassFile {}"), Seq.empty) + // IntSumUdf.class -- Scala UDF + val intSumSrc = new File("src/test/resources/artifact-tests/IntSumUdf.scala") + val intSumJar = new File(Utils.createTempDir(), "IntSumUdf.jar") + SparkTestUtils.createJarWithScalaSources(Seq(intSumSrc), intSumJar, cp) + extractClassFromJar(intSumJar, "IntSumUdf.class", dir) + dir + } + + private def extractClassFromJar(jar: File, className: String, destDir: Path): Unit = { + val jarFile = new java.util.jar.JarFile(jar) + try { + val entry = jarFile.getEntry(className) + val in = jarFile.getInputStream(entry) + try { Files.copy(in, destDir.resolve(className)) } finally { in.close() } + } finally { jarFile.close() } + } + + private lazy val udfNoAJar: Path = { + val srcFile = new File("../connect/client/jvm/src/test/resources/StubClassDummyUdf.scala") + val jarFile = new File(Utils.createTempDir(), "udf_noA.jar") + val cp = System.getProperty("java.class.path") + .split(File.pathSeparator).map(p => new File(p).toURI.toURL).toSeq + SparkTestUtils.createJarWithScalaSources( + Seq(srcFile), jarFile, cp, excludeClassPrefixes = Seq("A")) + jarFile.toPath + } private lazy val artifactManager = spark.artifactManager @@ -64,7 +119,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("Class artifacts are added to the correct directory.") { - assume(artifactPath.resolve("smallClassFile.class").toFile.exists) val copyDir = Utils.createTempDir().toPath Utils.copyDirectory(artifactPath.toFile, copyDir.toFile) @@ -80,7 +134,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("Class file artifacts are added to SC classloader") { - assume(artifactPath.resolve("Hello.class").toFile.exists) val copyDir = Utils.createTempDir().toPath Utils.copyDirectory(artifactPath.toFile, copyDir.toFile) @@ -106,7 +159,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("UDF can reference added class file") { - assume(artifactPath.resolve("Hello.class").toFile.exists) val copyDir = Utils.createTempDir().toPath Utils.copyDirectory(artifactPath.toFile, copyDir.toFile) @@ -187,7 +239,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("SPARK-43790: Forward artifact file to cloud storage path") { - assume(artifactPath.resolve("smallClassFile.class").toFile.exists) val copyDir = Utils.createTempDir().toPath val destFSDir = Utils.createTempDir().toPath @@ -203,7 +254,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("Removal of resources") { - assume(artifactPath.resolve("smallClassFile.class").toFile.exists) withTempPath { path => // Setup cache @@ -249,7 +299,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("Classloaders for spark sessions are isolated") { - assume(artifactPath.resolve("Hello.class").toFile.exists) val session1 = spark.newSession() val session2 = spark.newSession() @@ -307,7 +356,6 @@ class ArtifactManagerSuite extends SharedSparkSession { } test("SPARK-44300: Cleaning up resources only deletes session-specific resources") { - assume(artifactPath.resolve("Hello.class").toFile.exists) val copyDir = Utils.createTempDir().toPath Utils.copyDirectory(artifactPath.toFile, copyDir.toFile) @@ -369,9 +417,7 @@ class ArtifactManagerSuite extends SharedSparkSession { val targetPath2 = Paths.get(artifact2Path) val classPath1 = copyDir.resolve("Hello.class") - val classPath2 = copyDir.resolve("smallClassFile.class") - assume(artifactPath.resolve("Hello.class").toFile.exists) - assume(artifactPath.resolve("smallClassFile.class").toFile.exists) + val classPath2 = udfNoAJar val artifact1 = Artifact.newArtifactFromExtension( targetPath.getFileName.toString, @@ -431,7 +477,6 @@ class ArtifactManagerSuite extends SharedSparkSession { test("Added artifact can be loaded by the current SparkSession") { val path = artifactPath.resolve("IntSumUdf.class") - assume(path.toFile.exists) val buffer = Files.readAllBytes(path) spark.addArtifact(buffer, "IntSumUdf.class") @@ -447,7 +492,6 @@ class ArtifactManagerSuite extends SharedSparkSession { private def testAddArtifactToLocalSession( classFileToUse: String, binaryName: String)(addFunc: Path => String): Unit = { val copyDir = Utils.createTempDir().toPath - assume(artifactPath.resolve(classFileToUse).toFile.exists) Utils.copyDirectory(artifactPath.toFile, copyDir.toFile) val classPath = copyDir.resolve(classFileToUse) @@ -495,15 +539,13 @@ class ArtifactManagerSuite extends SharedSparkSession { // Register multiple kinds of artifacts val clsPath = path.resolve("Hello.class") - assume(clsPath.toFile.exists) artifactManager.addArtifact( // Class Paths.get("classes/Hello.class"), clsPath, None) artifactManager.addArtifact( // Python Paths.get("pyfiles/abc.zip"), randomFilePath, None, deleteStagedFile = false) - val jarPath = Paths.get("jars/smallClassFile.class") - assume(jarPath.toFile.exists) + val jarPath = Paths.get("jars/udf_noA.jar") artifactManager.addArtifact( // JAR - jarPath, path.resolve("smallClassFile.class"), None) + jarPath, udfNoAJar, None) artifactManager.addArtifact( // Cached Paths.get("cache/test"), randomFilePath, None) assert(Utils.listPaths(artifactManager.artifactPath.toFile).size() === 3)