diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java similarity index 53% rename from spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java rename to spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java index 9c31eb970b56..0152c8e0e6f0 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java @@ -16,15 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.spark.extensions; +package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTCatalogProperties; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.sql.TestSelect; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(ParameterizedTestExtension.class) @@ -38,8 +49,6 @@ protected static Object[][] parameters() { ImmutableMap.builder() .putAll(SparkCatalogConfig.REST.properties()) .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) - // this flag is typically only set by the server, but we set it from the client for - // testing .put( RESTCatalogProperties.SCAN_PLANNING_MODE, RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) @@ -48,4 +57,36 @@ protected static Object[][] parameters() { } }; } + + @TestTemplate + public void fileIOIsPropagated() { + RESTCatalog catalog = new RESTCatalog(); + catalog.setConf(new Configuration()); + catalog.initialize( + "test", + ImmutableMap.builder() + .putAll(restCatalog.properties()) + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) + .build()); + Table table = catalog.loadTable(tableIdent); + + SparkScanBuilder builder = new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()); + verifyFileIOHasPlanId(builder.build().toBatch(), table); + verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table); + } + + private void verifyFileIOHasPlanId(Batch batch, Table table) { + FileIO fileIOForScan = + (FileIO) + assertThat(batch) + .extracting("fileIO") + .isInstanceOf(Supplier.class) + .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class)) + .actual() + .get(); + assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID); + assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 0acd8bc24476..3f6d1ff8e63d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -31,6 +31,7 @@ import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -51,13 +52,14 @@ abstract class BaseBatchReader extends BaseReader taskGroup, Schema expectedSchema, boolean caseSensitive, ParquetBatchReadConf parquetConf, OrcBatchReadConf orcConf, boolean cacheDeleteFilesOnExecutors) { - super(table, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super(table, fileIO, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); this.parquetConf = parquetConf; this.orcConf = orcConf; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 8adc38f2ae9a..a4d9766ae713 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -45,6 +45,7 @@ import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -68,6 +69,7 @@ abstract class BaseReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); private final Table table; + private final EncryptingFileIO fileIO; private final Schema expectedSchema; private final boolean caseSensitive; private final NameMapping nameMapping; @@ -83,11 +85,13 @@ abstract class BaseReader implements Closeable { BaseReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { this.table = table; + this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption()); this.taskGroup = taskGroup; this.tasks = taskGroup.tasks().iterator(); this.currentIterator = CloseableIterator.empty(); @@ -179,9 +183,8 @@ protected InputFile getInputFile(String location) { private Map inputFiles() { if (lazyInputFiles == null) { this.lazyInputFiles = - EncryptingFileIO.combine(table().io(), table().encryption()) - .bulkDecrypt( - () -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator()); + fileIO.bulkDecrypt( + () -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator()); } return lazyInputFiles; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 14febb212aaf..cbc40db5333c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -28,17 +28,19 @@ import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { BaseRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - super(table, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super(table, fileIO, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); } protected CloseableIterable newIterable( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 9a4ab30fec15..237dfd5c6919 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -52,6 +53,7 @@ class BatchDataReader extends BaseBatchReader OrcBatchReadConf orcBatchReadConf) { this( partition.table(), + partition.io(), partition.taskGroup(), partition.projection(), partition.isCaseSensitive(), @@ -62,6 +64,7 @@ class BatchDataReader extends BaseBatchReader BatchDataReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, @@ -70,6 +73,7 @@ class BatchDataReader extends BaseBatchReader boolean cacheDeleteFilesOnExecutors) { super( table, + fileIO, taskGroup, expectedSchema, caseSensitive, diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 417440d4b48a..eb8e5e63f430 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -35,6 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; @@ -50,6 +51,7 @@ class ChangelogRowReader extends BaseRowReader ChangelogRowReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), partition.projection(), partition.isCaseSensitive(), @@ -58,12 +60,14 @@ class ChangelogRowReader extends BaseRowReader ChangelogRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { super( table, + fileIO, taskGroup, ChangelogUtil.dropChangelogMetadata(expectedSchema), caseSensitive, diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 5c4c21df34a2..aae399c5f2c4 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -25,6 +25,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -32,10 +33,11 @@ public class EqualityDeleteRowReader extends RowDataReader { public EqualityDeleteRowReader( CombinedScanTask task, Table table, + FileIO fileIO, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - super(table, task, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super(table, fileIO, task, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 1a45facba6c6..b14970722ec6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -30,6 +30,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; @@ -48,6 +49,7 @@ class PositionDeletesRowReader extends BaseRowReader PositionDeletesRowReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), partition.projection(), partition.isCaseSensitive(), @@ -56,12 +58,12 @@ class PositionDeletesRowReader extends BaseRowReader PositionDeletesRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - - super(table, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super(table, fileIO, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); int numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 0b53e72d9908..dbfb0b7614e4 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -28,6 +28,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; @@ -47,6 +48,7 @@ class RowDataReader extends BaseRowReader implements PartitionRead RowDataReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), partition.projection(), partition.isCaseSensitive(), @@ -55,12 +57,13 @@ class RowDataReader extends BaseRowReader implements PartitionRead RowDataReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - super(table, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super(table, fileIO, taskGroup, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java new file mode 100644 index 000000000000..49189d9d57e8 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; +import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides a serializable {@link FileIO} with a known size estimate. Spark calls its + * {@link org.apache.spark.util.SizeEstimator} class when broadcasting variables and this can be an + * expensive operation, so providing a known size estimate allows that operation to be skipped. + * + *

This class also implements {@link AutoCloseable} to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main {@link FileIO} are released. + */ +class SerializableFileIOWithSize + implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableFileIOWithSize.class); + private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + private final FileIO fileIO; + + private SerializableFileIOWithSize(FileIO fileIO) { + this.fileIO = fileIO; + this.serializationMarker = new Object(); + } + + @Override + public long estimatedSize() { + return SIZE_ESTIMATE; + } + + public static FileIO wrap(FileIO fileIO) { + return new SerializableFileIOWithSize(fileIO); + } + + @Override + public void close() { + if (null == serializationMarker) { + LOG.debug("Closing FileIO"); + fileIO.close(); + } + } + + @Override + public InputFile newInputFile(String path) { + return fileIO.newInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + return fileIO.newOutputFile(path); + } + + @Override + public void deleteFile(String path) { + fileIO.deleteFile(path); + } + + @Override + public void initialize(Map properties) { + fileIO.initialize(properties); + } + + @Override + public Map properties() { + return fileIO.properties(); + } + + @Override + public void serializeConfWith( + Function> confSerializer) { + if (fileIO instanceof HadoopConfigurable configurable) { + configurable.serializeConfWith(confSerializer); + } + } + + @Override + public void setConf(Configuration conf) { + if (fileIO instanceof HadoopConfigurable configurable) { + configurable.setConf(conf); + } + } + + @Override + public Configuration getConf() { + if (fileIO instanceof HadoopConfigurable hadoopConfigurable) { + return hadoopConfigurable.getConf(); + } + + return null; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index a4d143fe9321..ee65b033aeff 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Objects; +import java.util.function.Supplier; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -28,6 +29,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.spark.ImmutableOrcBatchReadConf; import org.apache.iceberg.spark.ImmutableParquetBatchReadConf; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -47,6 +49,7 @@ class SparkBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; + private final Supplier fileIO; private final SparkReadConf readConf; private final Types.StructType groupingKeyType; private final List> taskGroups; @@ -60,6 +63,7 @@ class SparkBatch implements Batch { SparkBatch( JavaSparkContext sparkContext, Table table, + Supplier fileIO, SparkReadConf readConf, Types.StructType groupingKeyType, List> taskGroups, @@ -67,6 +71,7 @@ class SparkBatch implements Batch { int scanHashCode) { this.sparkContext = sparkContext; this.table = table; + this.fileIO = fileIO; this.readConf = readConf; this.groupingKeyType = groupingKeyType; this.taskGroups = taskGroups; @@ -83,6 +88,8 @@ public InputPartition[] planInputPartitions() { // broadcast the table metadata as input partitions will be sent to executors Broadcast tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + Broadcast fileIOBroadcast = + sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get())); String projectionString = SchemaParser.toJson(projection); String[][] locations = computePreferredLocations(); @@ -94,6 +101,7 @@ public InputPartition[] planInputPartitions() { groupingKeyType, taskGroups.get(index), tableBroadcast, + fileIOBroadcast, projectionString, caseSensitive, locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE, @@ -105,7 +113,7 @@ public InputPartition[] planInputPartitions() { private String[][] computePreferredLocations() { if (localityEnabled) { - return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups); + return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups); } else if (executorCacheLocalityEnabled) { List executorLocations = SparkUtil.executorLocations(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 4425c4936a86..57ccf92b9651 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -105,6 +105,7 @@ public Batch toBatch() { return new SparkBatch( sparkContext, table, + null != scan ? scan.fileIO() : table::io, readConf, EMPTY_GROUPING_KEY_TYPE, taskGroups(), diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java index 98a0061b3a33..a3d78b43a919 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java @@ -24,6 +24,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Types; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab private final Types.StructType groupingKeyType; private final ScanTaskGroup taskGroup; private final Broadcast
tableBroadcast; + private final Broadcast fileIOBroadcast; private final String projectionString; private final boolean caseSensitive; private final transient String[] preferredLocations; @@ -45,6 +47,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab Types.StructType groupingKeyType, ScanTaskGroup taskGroup, Broadcast
tableBroadcast, + Broadcast fileIOBroadcast, String projectionString, boolean caseSensitive, String[] preferredLocations, @@ -52,6 +55,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab this.groupingKeyType = groupingKeyType; this.taskGroup = taskGroup; this.tableBroadcast = tableBroadcast; + this.fileIOBroadcast = fileIOBroadcast; this.projectionString = projectionString; this.caseSensitive = caseSensitive; this.preferredLocations = preferredLocations; @@ -81,6 +85,10 @@ public Table table() { return tableBroadcast.value(); } + public FileIO io() { + return fileIOBroadcast.value(); + } + public boolean isCaseSensitive() { return caseSensitive; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index c9a0f2566b88..7adf3c633cd0 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -26,6 +26,7 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; @@ -61,10 +62,12 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); private final Table table; + private final Supplier fileIO; private final SparkReadConf readConf; private final boolean caseSensitive; private final String projection; private final Broadcast
tableBroadcast; + private final Broadcast fileIOBroadcast; private final long splitSize; private final int splitLookback; private final long splitOpenFileCost; @@ -80,15 +83,18 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA SparkMicroBatchStream( JavaSparkContext sparkContext, Table table, + Supplier fileIO, SparkReadConf readConf, Schema projection, String checkpointLocation) { this.table = table; + this.fileIO = fileIO; this.readConf = readConf; this.caseSensitive = readConf.caseSensitive(); this.projection = SchemaParser.toJson(projection); this.localityPreferred = readConf.localityEnabled(); this.tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + this.fileIOBroadcast = sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get())); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); @@ -158,6 +164,7 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { EMPTY_GROUPING_KEY_TYPE, combinedScanTasks.get(index), tableBroadcast, + fileIOBroadcast, projection, caseSensitive, locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE, @@ -168,7 +175,9 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { } private String[][] computePreferredLocations(List taskGroups) { - return localityPreferred ? SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null; + return localityPreferred + ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups) + : null; } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index dff844eb45c9..fe5eeee8fb10 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -79,7 +79,15 @@ abstract class SparkPartitioningAwareScan extends S Schema projection, List filters, Supplier scanReportSupplier) { - super(spark, table, schema, readConf, projection, filters, scanReportSupplier); + super( + spark, + table, + null != scan ? scan.fileIO() : table::io, + schema, + readConf, + projection, + filters, + scanReportSupplier); this.scan = scan; this.preserveDataGrouping = readConf.preserveDataGrouping(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 2e5f50ea88fa..6b80199a255c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -34,6 +34,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -101,6 +102,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final JavaSparkContext sparkContext; private final Table table; + private final Supplier fileIO; private final Schema schema; private final SparkSession spark; private final SparkReadConf readConf; @@ -115,6 +117,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { SparkScan( SparkSession spark, Table table, + Supplier fileIO, Schema schema, SparkReadConf readConf, Schema projection, @@ -123,6 +126,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { this.spark = spark; this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; + this.fileIO = fileIO; this.schema = schema; this.readConf = readConf; this.caseSensitive = readConf.caseSensitive(); @@ -169,12 +173,20 @@ protected Types.StructType groupingKeyType() { @Override public Batch toBatch() { return new SparkBatch( - sparkContext, table, readConf, groupingKeyType(), taskGroups(), projection, hashCode()); + sparkContext, + table, + fileIO, + readConf, + groupingKeyType(), + taskGroups(), + projection, + hashCode()); } @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - return new SparkMicroBatchStream(sparkContext, table, readConf, projection, checkpointLocation); + return new SparkMicroBatchStream( + sparkContext, table, fileIO, readConf, projection, checkpointLocation); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index e4412f5cba59..47481ec51cad 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -47,7 +47,7 @@ class SparkStagedScan extends SparkScan { Schema projection, String taskSetId, SparkReadConf readConf) { - super(spark, table, schema, readConf, projection, ImmutableList.of(), null); + super(spark, table, table::io, schema, readConf, projection, ImmutableList.of(), null); this.taskSetId = taskSetId; this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index 7df9c75fb3dd..1760143d2cb1 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase { // status even belonging to the same catalog. Reference: // https://www.sqlite.org/inmemorydb.html CatalogProperties.CLIENT_POOL_SIZE, - "1")); + "1", + "include-credentials", + "true", + "gcs.oauth2.token", + "dummyToken")); protected static RESTCatalog restCatalog; diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java index 5922723096aa..0eb9bbe52ffa 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java @@ -90,7 +90,7 @@ private static class ClosureTrackingReader extends BaseReader tracker = Maps.newHashMap(); ClosureTrackingReader(Table table, List tasks) { - super(table, new BaseCombinedScanTask(tasks), null, false, true); + super(table, table.io(), new BaseCombinedScanTask(tasks), null, false, true); } @Override diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java index c31c10f97cac..b12fdd443ff4 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java @@ -105,7 +105,7 @@ public void testInsert() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), false, true); + new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -136,7 +136,7 @@ public void testDelete() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), false, true); + new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -170,7 +170,7 @@ public void testDataFileRewrite() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), false, true); + new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -197,7 +197,7 @@ public void testMixDeleteAndInsert() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), false, true); + new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java index f4ace848afcb..681ab1fd76a9 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -180,6 +180,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), projectedSchema, false, @@ -219,6 +220,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)), projectedSchema, false, @@ -290,6 +292,7 @@ public void readPositionDeletesTableWithDifferentColumnOrdering() throws IOExcep try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), projectedSchema, false, diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index eea7ac6e5948..62037a021e18 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -333,7 +333,7 @@ public void testReadEqualityDeleteRows() throws IOException { for (CombinedScanTask task : tasks) { try (EqualityDeleteRowReader reader = - new EqualityDeleteRowReader(task, table, table.schema(), false, true)) { + new EqualityDeleteRowReader(task, table, table.io(), table.schema(), false, true)) { while (reader.next()) { actualRowSet.add( new InternalRowWrapper( @@ -682,7 +682,14 @@ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOExcep try (BatchDataReader reader = new BatchDataReader( // expected column is id, while the equality filter column is dt - dateTable, task, dateTable.schema().select("id"), false, conf, null, true)) { + dateTable, + dateTable.io(), + task, + dateTable.schema().select("id"), + false, + conf, + null, + true)) { while (reader.next()) { ColumnarBatch columnarBatch = reader.get(); int numOfCols = columnarBatch.numCols();