Skip to content

Commit 000673a

Browse files
committed
API, Core, Spark: Pass FileIO on Spark's read path
1 parent 84e307c commit 000673a

31 files changed

Lines changed: 415 additions & 34 deletions

api/src/main/java/org/apache/iceberg/BatchScan.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import org.apache.iceberg.io.FileIO;
22+
2123
/** API for configuring a batch scan. */
2224
public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {
2325
/**
@@ -68,4 +70,9 @@ public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanT
6870
* @return the Snapshot this scan will use
6971
*/
7072
Snapshot snapshot();
73+
74+
@Override
75+
default FileIO io() {
76+
return table().io();
77+
}
7178
}

api/src/main/java/org/apache/iceberg/BatchScanAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import org.apache.iceberg.expressions.Expression;
2424
import org.apache.iceberg.io.CloseableIterable;
25+
import org.apache.iceberg.io.FileIO;
2526
import org.apache.iceberg.metrics.MetricsReporter;
2627

2728
/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
@@ -38,6 +39,11 @@ public Table table() {
3839
return scan.table();
3940
}
4041

42+
@Override
43+
public FileIO io() {
44+
return scan.io();
45+
}
46+
4147
@Override
4248
public BatchScan useSnapshot(long snapshotId) {
4349
return new BatchScanAdapter(scan.useSnapshot(snapshotId));

api/src/main/java/org/apache/iceberg/Scan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import org.apache.iceberg.expressions.Expression;
2424
import org.apache.iceberg.io.CloseableIterable;
25+
import org.apache.iceberg.io.FileIO;
2526
import org.apache.iceberg.metrics.MetricsReporter;
2627
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2728

@@ -208,4 +209,8 @@ default ThisT minRowsRequested(long numRows) {
208209
throw new UnsupportedOperationException(
209210
this.getClass().getName() + " doesn't implement minRowsRequested");
210211
}
212+
213+
default FileIO io() {
214+
throw new UnsupportedOperationException(this.getClass().getName() + " doesn't implement io");
215+
}
211216
}

core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.iceberg.events.IncrementalScanEvent;
2222
import org.apache.iceberg.events.Listeners;
2323
import org.apache.iceberg.io.CloseableIterable;
24+
import org.apache.iceberg.io.FileIO;
2425
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2526
import org.apache.iceberg.util.SnapshotUtil;
2627

@@ -185,4 +186,9 @@ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
185186
}
186187
}
187188
}
189+
190+
@Override
191+
public FileIO io() {
192+
return table().io();
193+
}
188194
}

core/src/main/java/org/apache/iceberg/BaseScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ public Table table() {
102102
return table;
103103
}
104104

105-
protected FileIO io() {
105+
@Override
106+
public FileIO io() {
106107
return table.io();
107108
}
108109

core/src/main/java/org/apache/iceberg/SerializableTable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected SerializableTable(Table table) {
8383
Map<Integer, PartitionSpec> specs = table.specs();
8484
specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec)));
8585
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
86-
this.io = fileIO(table);
86+
this.io = SerializableTable.copyOf(table.io());
8787
this.encryption = table.encryption();
8888
this.locationProviderTry = Try.of(table::locationProvider);
8989
this.refs = SerializableMap.copyOf(table.refs());
@@ -124,12 +124,12 @@ private String metadataFileLocation(Table table) {
124124
}
125125
}
126126

127-
private FileIO fileIO(Table table) {
128-
if (table.io() instanceof HadoopConfigurable) {
129-
((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
127+
public static FileIO copyOf(FileIO fileIO) {
128+
if (fileIO instanceof HadoopConfigurable) {
129+
((HadoopConfigurable) fileIO).serializeConfWith(SerializableConfSupplier::new);
130130
}
131131

132-
return table.io();
132+
return fileIO;
133133
}
134134

135135
private Table lazyTable() {

core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void initialize(String name, Map<String, String> properties) {
9191

9292
String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
9393
this.warehouseLocation = warehouse.replaceAll("/*$", "");
94-
this.io = new InMemoryFileIO();
94+
this.io = CatalogUtil.loadFileIO(InMemoryFileIO.class.getName(), properties, null);
9595
this.closeableGroup = new CloseableGroup();
9696
closeableGroup.addCloseable(metricsReporter());
9797
closeableGroup.setSuppressCloseFailure(true);

core/src/main/java/org/apache/iceberg/inmemory/InMemoryFileIO.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,25 @@
2424
import org.apache.iceberg.io.InputFile;
2525
import org.apache.iceberg.io.OutputFile;
2626
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2728
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
29+
import org.apache.iceberg.util.SerializableMap;
2830

2931
public class InMemoryFileIO implements FileIO {
3032

3133
private static final Map<String, byte[]> IN_MEMORY_FILES = Maps.newConcurrentMap();
3234
private boolean closed = false;
35+
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());
36+
37+
@Override
38+
public void initialize(Map<String, String> props) {
39+
this.properties = SerializableMap.copyOf(props);
40+
}
41+
42+
@Override
43+
public Map<String, String> properties() {
44+
return properties.immutableMap();
45+
}
3346

3447
public void addFile(String location, byte[] contents) {
3548
Preconditions.checkState(!closed, "Cannot call addFile after calling close()");

core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ protected TableScan newRefinedScan(
137137
}
138138

139139
@Override
140-
protected FileIO io() {
140+
public FileIO io() {
141141
return null != fileIOForPlanId ? fileIOForPlanId : tableIo;
142142
}
143143

core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,12 @@
6262
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
6363
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
6464
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
65+
import org.apache.iceberg.rest.credentials.Credential;
66+
import org.apache.iceberg.rest.credentials.ImmutableCredential;
6567
import org.apache.iceberg.rest.responses.ConfigResponse;
6668
import org.apache.iceberg.rest.responses.ErrorResponse;
69+
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
70+
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
6771
import org.junit.jupiter.api.Disabled;
6872
import org.junit.jupiter.api.Test;
6973
import org.junit.jupiter.params.ParameterizedTest;
@@ -953,4 +957,110 @@ public void serverSupportsPlanningButNotCancellation() throws IOException {
953957
// Verify no exception was thrown - cancelPlan returns false when endpoint not supported
954958
assertThat(cancelled).isFalse();
955959
}
960+
961+
@ParameterizedTest
962+
@EnumSource(PlanningMode.class)
963+
void fileIOForRemotePlanningIsPropagated(
964+
Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) {
965+
RESTCatalogAdapter adapter =
966+
Mockito.spy(
967+
new RESTCatalogAdapter(backendCatalog) {
968+
@Override
969+
public <T extends RESTResponse> T execute(
970+
HTTPRequest request,
971+
Class<T> responseType,
972+
Consumer<ErrorResponse> errorHandler,
973+
Consumer<Map<String, String>> responseHeaders,
974+
ParserContext parserContext) {
975+
T response =
976+
super.execute(
977+
request, responseType, errorHandler, responseHeaders, parserContext);
978+
return maybeAddStorageCredential(response);
979+
}
980+
});
981+
982+
adapter.setPlanningBehavior(planMode.apply(TestPlanningBehavior.builder()).build());
983+
984+
RESTCatalog catalog =
985+
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
986+
catalog.initialize(
987+
"test",
988+
ImmutableMap.of(
989+
CatalogProperties.FILE_IO_IMPL,
990+
"org.apache.iceberg.inmemory.InMemoryFileIO",
991+
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
992+
"true"));
993+
994+
Table table = restTableFor(catalog, "file_io_propagation");
995+
setParserContext(table);
996+
997+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
998+
999+
TableScan tableScan = table.newScan();
1000+
assertThat(tableScan.io().properties())
1001+
.isSameAs(table.io().properties())
1002+
.doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1003+
// make sure remote scan planning is called and FileIO gets the planId
1004+
assertThat(tableScan.planFiles()).hasSize(1);
1005+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1006+
assertThat(tableScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1007+
String planId = tableScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1008+
1009+
TableScan newScan = table.newScan();
1010+
assertThat(newScan.io().properties())
1011+
.isSameAs(table.io().properties())
1012+
.doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1013+
// make sure remote scan planning is called and FileIO gets the planId
1014+
assertThat(newScan.planFiles()).hasSize(1);
1015+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1016+
1017+
// make sure planIds are different for each scan
1018+
assertThat(newScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
1019+
assertThat(newScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID))
1020+
.isNotEqualTo(planId);
1021+
}
1022+
1023+
@SuppressWarnings("unchecked")
1024+
private <T extends RESTResponse> T maybeAddStorageCredential(T response) {
1025+
if (response instanceof PlanTableScanResponse resp
1026+
&& PlanStatus.COMPLETED == resp.planStatus()) {
1027+
return (T)
1028+
PlanTableScanResponse.builder()
1029+
.withPlanStatus(resp.planStatus())
1030+
.withPlanId(resp.planId())
1031+
.withPlanTasks(resp.planTasks())
1032+
.withFileScanTasks(resp.fileScanTasks())
1033+
.withCredentials(
1034+
ImmutableList.<Credential>builder()
1035+
.addAll(resp.credentials())
1036+
.add(
1037+
ImmutableCredential.builder()
1038+
.prefix("dummy")
1039+
.putConfig("dummyKey", "dummyVal")
1040+
.build())
1041+
.build())
1042+
.withSpecsById(resp.specsById())
1043+
.build();
1044+
} else if (response instanceof FetchPlanningResultResponse resp
1045+
&& PlanStatus.COMPLETED == resp.planStatus()) {
1046+
return (T)
1047+
FetchPlanningResultResponse.builder()
1048+
.withPlanStatus(resp.planStatus())
1049+
.withFileScanTasks(resp.fileScanTasks())
1050+
.withPlanTasks(resp.planTasks())
1051+
.withSpecsById(resp.specsById())
1052+
.withCredentials(
1053+
ImmutableList.<Credential>builder()
1054+
.addAll(resp.credentials())
1055+
.add(
1056+
ImmutableCredential.builder()
1057+
.prefix("dummy")
1058+
.putConfig("dummyKey", "dummyVal")
1059+
.build())
1060+
.build())
1061+
.build();
1062+
}
1063+
1064+
return response;
1065+
}
9561066
}

0 commit comments

Comments
 (0)