Skip to content

Commit 4279c5a

Browse files
committed
API, Core, Spark: Pass Table with FileIO from Scan on Spark's read path
1 parent 8709876 commit 4279c5a

File tree

9 files changed

+344
-29
lines changed

9 files changed

+344
-29
lines changed

api/src/main/java/org/apache/iceberg/BatchScan.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import org.apache.iceberg.io.FileIO;
22+
2123
/** API for configuring a batch scan. */
2224
public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {
2325
/**
@@ -68,4 +70,14 @@ public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanT
6870
* @return the Snapshot this scan will use
6971
*/
7072
Snapshot snapshot();
73+
74+
/**
75+
* The {@link FileIO} instance to use for the scan.
76+
*
77+
* @return The {@link FileIO} instance to use for the scan.
78+
*/
79+
@Override
80+
default FileIO io() {
81+
return table().io();
82+
}
7183
}

api/src/main/java/org/apache/iceberg/BatchScanAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import org.apache.iceberg.expressions.Expression;
2424
import org.apache.iceberg.io.CloseableIterable;
25+
import org.apache.iceberg.io.FileIO;
2526
import org.apache.iceberg.metrics.MetricsReporter;
2627

2728
/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
@@ -156,4 +157,9 @@ public BatchScan metricsReporter(MetricsReporter reporter) {
156157
public BatchScan minRowsRequested(long numRows) {
157158
return new BatchScanAdapter(scan.minRowsRequested(numRows));
158159
}
160+
161+
@Override
162+
public FileIO io() {
163+
return scan.io();
164+
}
159165
}

api/src/main/java/org/apache/iceberg/Scan.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import org.apache.iceberg.expressions.Expression;
2424
import org.apache.iceberg.io.CloseableIterable;
25+
import org.apache.iceberg.io.FileIO;
2526
import org.apache.iceberg.metrics.MetricsReporter;
2627
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2728

@@ -208,4 +209,13 @@ default ThisT minRowsRequested(long numRows) {
208209
throw new UnsupportedOperationException(
209210
this.getClass().getName() + " doesn't implement minRowsRequested");
210211
}
212+
213+
/**
214+
* The {@link FileIO} instance to use for the scan.
215+
*
216+
* @return The {@link FileIO} instance to use for the scan.
217+
*/
218+
default FileIO io() {
219+
throw new UnsupportedOperationException(this.getClass().getName() + " doesn't implement io");
220+
}
211221
}

core/src/main/java/org/apache/iceberg/BaseScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ public Table table() {
102102
return table;
103103
}
104104

105-
protected FileIO io() {
105+
@Override
106+
public FileIO io() {
106107
return table.io();
107108
}
108109

core/src/main/java/org/apache/iceberg/rest/RESTTable.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.Set;
23-
import java.util.concurrent.atomic.AtomicReference;
2423
import java.util.function.Supplier;
2524
import org.apache.iceberg.BaseTable;
2625
import org.apache.iceberg.BatchScan;
@@ -30,7 +29,6 @@
3029
import org.apache.iceberg.TableOperations;
3130
import org.apache.iceberg.TableScan;
3231
import org.apache.iceberg.catalog.TableIdentifier;
33-
import org.apache.iceberg.io.FileIO;
3432
import org.apache.iceberg.metrics.MetricsReporter;
3533

3634
class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
@@ -42,7 +40,6 @@ class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
4240
private final Set<Endpoint> supportedEndpoints;
4341
private final Map<String, String> catalogProperties;
4442
private final Object hadoopConf;
45-
private final AtomicReference<FileIO> ioReference;
4643

4744
RESTTable(
4845
TableOperations ops,
@@ -64,7 +61,6 @@ class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
6461
this.supportedEndpoints = supportedEndpoints;
6562
this.catalogProperties = catalogProperties;
6663
this.hadoopConf = hadoopConf;
67-
this.ioReference = new AtomicReference<>(ops.io());
6864
}
6965

7066
@Override
@@ -79,20 +75,11 @@ public TableScan newScan() {
7975
tableIdentifier,
8076
resourcePaths,
8177
supportedEndpoints,
82-
ioReference,
78+
io(),
8379
catalogProperties,
8480
hadoopConf);
8581
}
8682

87-
@Override
88-
public FileIO io() {
89-
if (null != ioReference.get()) {
90-
return ioReference.get();
91-
}
92-
93-
return super.io();
94-
}
95-
9683
@Override
9784
public BatchScan newBatchScan() {
9885
return new BatchScanAdapter(newScan());

core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Set;
28-
import java.util.concurrent.atomic.AtomicReference;
2928
import java.util.stream.Collectors;
3029
import org.apache.iceberg.CatalogProperties;
3130
import org.apache.iceberg.CatalogUtil;
@@ -70,8 +69,9 @@ class RESTTableScan extends DataTableScan {
7069
private final ParserContext parserContext;
7170
private final Map<String, String> catalogProperties;
7271
private final Object hadoopConf;
73-
private final AtomicReference<FileIO> ioReference;
72+
private final FileIO tableIo;
7473
private String planId = null;
74+
private FileIO fileIOForPlanId = null;
7575

7676
RESTTableScan(
7777
Table table,
@@ -83,7 +83,7 @@ class RESTTableScan extends DataTableScan {
8383
TableIdentifier tableIdentifier,
8484
ResourcePaths resourcePaths,
8585
Set<Endpoint> supportedEndpoints,
86-
AtomicReference<FileIO> ioReference,
86+
FileIO tableIo,
8787
Map<String, String> catalogProperties,
8888
Object hadoopConf) {
8989
super(table, schema, context);
@@ -99,7 +99,7 @@ class RESTTableScan extends DataTableScan {
9999
.add("specsById", table.specs())
100100
.add("caseSensitive", context().caseSensitive())
101101
.build();
102-
this.ioReference = ioReference;
102+
this.tableIo = tableIo;
103103
this.catalogProperties = catalogProperties;
104104
this.hadoopConf = hadoopConf;
105105
}
@@ -117,14 +117,14 @@ protected TableScan newRefinedScan(
117117
tableIdentifier,
118118
resourcePaths,
119119
supportedEndpoints,
120-
ioReference,
120+
io(),
121121
catalogProperties,
122122
hadoopConf);
123123
}
124124

125125
@Override
126-
protected FileIO io() {
127-
return ioReference.get();
126+
public FileIO io() {
127+
return null != fileIOForPlanId ? fileIOForPlanId : tableIo;
128128
}
129129

130130
@Override
@@ -177,8 +177,7 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
177177
this.planId = response.planId();
178178
PlanStatus planStatus = response.planStatus();
179179
if (null != planId && !response.credentials().isEmpty()) {
180-
// update FileIO for RESTTable
181-
ioReference.set(fileIOForPlanId(response.credentials()));
180+
this.fileIOForPlanId = fileIOForPlanId(response.credentials());
182181
}
183182

184183
switch (planStatus) {
@@ -260,8 +259,7 @@ private CloseableIterable<FileScanTask> fetchPlanningResult() {
260259
planId);
261260

262261
if (!response.credentials().isEmpty()) {
263-
// update FileIO for RESTTable
264-
ioReference.set(fileIOForPlanId(response.credentials()));
262+
this.fileIOForPlanId = fileIOForPlanId(response.credentials());
265263
}
266264

267265
return scanTasksIterable(response.planTasks(), response.fileScanTasks());

core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -996,8 +996,13 @@ public <T extends RESTResponse> T execute(
996996

997997
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
998998
// make sure remote scan planning is called and FileIO gets the planId
999-
assertThat(table.newScan().planFiles()).hasSize(1);
1000-
assertThat(table.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
999+
TableScan tableScan = table.newScan();
1000+
assertThat(tableScan.io().properties())
1001+
.isSameAs(table.io().properties())
1002+
.doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1003+
assertThat(tableScan.planFiles()).hasSize(1);
1004+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1005+
assertThat(tableScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
10011006
}
10021007

10031008
@SuppressWarnings("unchecked")

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import java.util.function.Supplier;
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
30+
import org.apache.iceberg.BaseTable;
3031
import org.apache.iceberg.PartitionField;
3132
import org.apache.iceberg.PartitionScanTask;
3233
import org.apache.iceberg.PartitionSpec;
3334
import org.apache.iceberg.Scan;
3435
import org.apache.iceberg.ScanTask;
3536
import org.apache.iceberg.ScanTaskGroup;
3637
import org.apache.iceberg.Schema;
38+
import org.apache.iceberg.SparkDistributedDataScan;
3739
import org.apache.iceberg.Table;
3840
import org.apache.iceberg.exceptions.ValidationException;
3941
import org.apache.iceberg.expressions.Expression;
@@ -79,7 +81,15 @@ abstract class SparkPartitioningAwareScan<T extends PartitionScanTask> extends S
7981
Schema projection,
8082
List<Expression> filters,
8183
Supplier<ScanReport> scanReportSupplier) {
82-
super(spark, table, readConf, projection, filters, scanReportSupplier);
84+
super(
85+
spark,
86+
table instanceof BaseTable && null != scan && !(scan instanceof SparkDistributedDataScan)
87+
? new TableWithIO(table, scan::io)
88+
: table,
89+
readConf,
90+
projection,
91+
filters,
92+
scanReportSupplier);
8393

8494
this.scan = scan;
8595
this.preserveDataGrouping = readConf.preserveDataGrouping();

0 commit comments

Comments
 (0)