Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import org.apache.iceberg.io.FileIO;

/** API for configuring a batch scan. */
public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {
/**
Expand Down Expand Up @@ -68,4 +70,10 @@ public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanT
* @return the Snapshot this scan will use
*/
Snapshot snapshot();

/** Returns the {@link FileIO} instance to use when reading data files for this scan. */
@Override
default FileIO io() {
return table().io();
}
}
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;

/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
Expand All @@ -38,6 +39,11 @@ public Table table() {
return scan.table();
}

@Override
public FileIO io() {
return scan.io();
}

@Override
public BatchScan useSnapshot(long snapshotId) {
return new BatchScanAdapter(scan.useSnapshot(snapshotId));
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

Expand Down Expand Up @@ -208,4 +209,9 @@ default ThisT minRowsRequested(long numRows) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement minRowsRequested");
}

/** Returns the {@link FileIO} instance to use when reading data files for this scan. */
default FileIO io() {
throw new UnsupportedOperationException("io() is not implemented: added in 1.11.0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the version belongs here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly for unexpected implementations, so I was thinking that a version number would be helpful like a deprecation message that has one. I'm fine removing it, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong feeling here, are you thinking of this as a implementer facing message? I assume they won't really care when it's added, it's not like they can upgrade there way out of it. So i'll take it either way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it could address urgent questions if you have deployed a new version of Iceberg and hit this, like "when was this added so I can roll back?" or "how long has this been broken?"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who would that message be for though? I would think it's only for library integrators who would hopefully break immediately when they bump their dependency. I really don't mind though. We can keep it

}
}
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public Table table() {
return table;
}

protected FileIO io() {
@Override
public FileIO io() {
return table.io();
}

Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public TableScan newScan() {
tableIdentifier,
resourcePaths,
supportedEndpoints,
io(),
catalogProperties,
hadoopConf);
}
Expand Down
46 changes: 21 additions & 25 deletions core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,14 @@ class RESTTableScan extends DataTableScan {
private final RESTClient client;
private final Map<String, String> headers;
private final TableOperations operations;
private final Table table;
private final ResourcePaths resourcePaths;
private final TableIdentifier tableIdentifier;
private final Set<Endpoint> supportedEndpoints;
private final ParserContext parserContext;
private final Map<String, String> catalogProperties;
private final Object hadoopConf;
private final FileIO tableIO;
private String planId = null;
private FileIO fileIOForPlanId = null;
private FileIO scanFileIO = null;
Copy link
Contributor

@rdblue rdblue Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table's FileIO is not needed. It is passed in only once as table.io() and table is also passed in. And now that I'm looking at it, table not needed as a field because a parent (BaseScan) exposes table(). Both fields should be removed, along with the constructor argument for the table's IO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at the information passed around in this class more based on having a couple of unnecessary fields and I have a few more questions:

Why does this use Map<String, String> headers rather than Supplier<Map<String, String>> headers that the table has? Doesn't this mean that the headers from the auth session are static? How does credential refresh work if the authentication header is stale?

Was the TableOperations field originally used? It isn't used now, other than to pass it to refined scans. I think it should be removed from both the constructor and the fields.

This passes ResourcePaths and TableIdentifier to build 2 paths that could be passed in a 1 path using the plan ID. I think this should be reconsidered. The plan and fetch endpoints can be passed in as simple strings, and it would simplify this to use a Function<String, String> to construct the plan path with ID. Another option is to create an object similar to ResourcePaths that is specific to a table (hides TableIdentifier) and use that. Either one would be cleaner.

This also passes supportedEndpoints, which spreads out the logic for how to handle endpoints that aren't supported. These endpoints are also checked when needed, so this code will create a plan request and could then fail to fetch tasks if the fetch endpoint isn't supported. It also throws error messages that are not helpful, like "Server does not support endpoint: GET /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}" instead of "Invalid status: submitted (service does not support async planning)" that would be more helpful.

A cleaner way to handle endpoints is to verify required endpoints before creating this scan. Both the plan and fetch endpoints should be required. Then the optional endpoint should be booleans, like supportsAsync and supportsCancel.

Last, this leaks catalog properties and the Hadoop conf so that CatalogUtil.loadFileIO can be called with List<Credential>. Why not pass a function to create the FileIO so that the properties and conf are contained in the REST table operations?

I think this works right now and these aren't blockers (other than the potential auth session issue), but I would really like to see this class simplified by reducing the number of things that have to be passed to it and remove some of the things that are done here, like handling endpoint checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @singhpk234 for opening #15595 to address those things


RESTTableScan(
Table table,
Expand All @@ -97,11 +95,9 @@ class RESTTableScan extends DataTableScan {
TableIdentifier tableIdentifier,
ResourcePaths resourcePaths,
Set<Endpoint> supportedEndpoints,
FileIO tableIO,
Map<String, String> catalogProperties,
Object hadoopConf) {
super(table, schema, context);
this.table = table;
this.client = client;
this.headers = headers;
this.operations = operations;
Expand All @@ -113,7 +109,6 @@ class RESTTableScan extends DataTableScan {
.add("specsById", table.specs())
.add("caseSensitive", context().caseSensitive())
.build();
this.tableIO = tableIO;
this.catalogProperties = catalogProperties;
this.hadoopConf = hadoopConf;
}
Expand All @@ -131,14 +126,15 @@ protected TableScan newRefinedScan(
tableIdentifier,
resourcePaths,
supportedEndpoints,
io(),
catalogProperties,
hadoopConf);
}

@Override
protected FileIO io() {
return null != fileIOForPlanId ? fileIOForPlanId : tableIO;
public FileIO io() {
Preconditions.checkState(
null != scanFileIO, "FileIO is not available: planFiles() must be called first");
return scanFileIO;
}

@Override
Expand Down Expand Up @@ -170,7 +166,7 @@ public CloseableIterable<FileScanTask> planFiles() {
.withEndSnapshotId(endSnapshotId)
.withUseSnapshotSchema(true);
} else if (snapshotId != null) {
boolean useSnapShotSchema = snapshotId != table.currentSnapshot().snapshotId();
boolean useSnapShotSchema = snapshotId != table().currentSnapshot().snapshotId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a blocker, but it doesn't look correct. Additional nit: "snapshot" is one word and should be capitalized "Snapshot".

@singhpk234, The snapshot schema should be used when time traveling to a tag or a specific snapshot ID, but not when reading from a branch. That context comes from how the ref or snapshot was configured. Choosing a specific snapshot should generally send useSnapshotSchema=true, but just reading from a branch should not. Here is the description from the REST spec:

Whether to use the schema at the time the snapshot was written. When time travelling, the snapshot schema should be used (true). When scanning a branch, the table schema should be used (false).

This comparison isn't going to be sufficient because the distinction is whether the snapshot was selected via branch name vs directly by ID or by tag name. This needs to know whether useRef was called and whether the ref was a branch.

We also can't rely on comparing the result schema because DataTableScan (and its children) always override the schema to the branch/tag/snapshot schema and when useSnapshot or useRef are called. That's because the fields passed to select are applied lazily in planFiles. The snapshot/ref selection changes the base schema and columns are selected by name, or a specific projection is used directly.

To fix this, I think you need to override some of the API methods to detect how the scan is configured.

And while we're looking at schema projection, I think the projection code is also incorrect:

    List<String> selectedColumns =
        schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());

The problem here is that it will select top-level fields only because NestedField#name returns the local field's name and not any children. To get the full field names, you'd need to call TypeUtil.getProjectedIds and then pass each ID through schema().findColumnName(id) like you do for stats fields. This is what SnapshotScan does:

    List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
    List<String> projectedFieldNames =
        projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());

builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema);
}

Expand All @@ -190,9 +186,8 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT

this.planId = response.planId();
PlanStatus planStatus = response.planStatus();
if (null != planId && !response.credentials().isEmpty()) {
this.fileIOForPlanId = fileIOForPlanId(response.credentials());
}
this.scanFileIO =
!response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io();

switch (planStatus) {
case COMPLETED:
Expand All @@ -212,14 +207,18 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
}
}

private FileIO fileIOForPlanId(List<Credential> storageCredentials) {
private FileIO scanFileIO(List<Credential> storageCredentials) {
ImmutableMap.Builder<String, String> builder =
ImmutableMap.<String, String>builder().putAll(catalogProperties);
if (null != planId) {
builder.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes no sense to me that the ID that we use to internally track the plan ID is a public field where we keep properties to configure the REST catalog. Is this something we can change or has it been released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hasn't been released and we can still change it, but this should most likely be done in a separate PR because that property is being used in a few other places as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing in a separate PR is fine, but we don't want to replace one blocker with another endlessly as we find these issues.

We should also consider whether there are any alternatives to passing this mixed into catalog properties. Passing state like this in a property map along with config mixes concepts and causes weird API additions like this constant in RESTCatalogProperties.

}

Map<String, String> properties = builder.buildKeepingLast();
FileIO ioForScan =
CatalogUtil.loadFileIO(
catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL),
ImmutableMap.<String, String>builder()
.putAll(catalogProperties)
.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
.buildKeepingLast(),
properties,
hadoopConf,
storageCredentials.stream()
.map(c -> StorageCredential.create(c.prefix(), c.config()))
Expand Down Expand Up @@ -275,9 +274,8 @@ private CloseableIterable<FileScanTask> fetchPlanningResult() {
response.planStatus(),
planId);

if (!response.credentials().isEmpty()) {
this.fileIOForPlanId = fileIOForPlanId(response.credentials());
}
this.scanFileIO =
!response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io();

return scanTasksIterable(response.planTasks(), response.fileScanTasks());
} catch (FailsafeException e) {
Expand Down Expand Up @@ -319,10 +317,8 @@ private CloseableIterable<FileScanTask> scanTasksIterable(
/** Cancels the plan on the server (if supported) and closes the plan-scoped FileIO */
private void cleanupPlanResources() {
cancelPlan();
if (null != fileIOForPlanId) {
FILEIO_TRACKER.invalidate(this);
this.fileIOForPlanId = null;
}
FILEIO_TRACKER.invalidate(this);
this.scanFileIO = null;
}

@VisibleForTesting
Expand Down
111 changes: 111 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -953,4 +957,111 @@ public void serverSupportsPlanningButNotCancellation() throws IOException {
// Verify no exception was thrown - cancelPlan returns false when endpoint not supported
assertThat(cancelled).isFalse();
}

@ParameterizedTest
@EnumSource(PlanningMode.class)
void fileIOForRemotePlanningIsPropagated(
Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) {
RESTCatalogAdapter adapter =
Mockito.spy(
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(
HTTPRequest request,
Class<T> responseType,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders,
ParserContext parserContext) {
T response =
super.execute(
request, responseType, errorHandler, responseHeaders, parserContext);
return maybeAddStorageCredential(response);
}
});

adapter.setPlanningBehavior(planMode.apply(TestPlanningBehavior.builder()).build());

RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
"true"));

Table table = restTableFor(catalog, "file_io_propagation");

assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);

TableScan tableScan = table.newScan();
assertThatThrownBy(tableScan::io)
.isInstanceOf(IllegalStateException.class)
.hasMessage("FileIO is not available: planFiles() must be called first");

// make sure remote scan planning is called and FileIO gets the planId
assertThat(tableScan.planFiles()).hasSize(1);
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noted that mixing the plan ID into properties is not a great solution above. And I want to point out that this assertion is necessary because of it. We have to make sure we're not modifying the wrong property map and worry about cases where user-driven config passes in a hard-coded plan ID (see buildKeepingLast()). I don't know that there's an easy fix, but this is not a pattern we want in the codebase.

assertThat(tableScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
String planId = tableScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID);

TableScan newScan = table.newScan();
assertThatThrownBy(newScan::io)
.isInstanceOf(IllegalStateException.class)
.hasMessage("FileIO is not available: planFiles() must be called first");

// make sure remote scan planning is called and FileIO gets the planId
assertThat(newScan.planFiles()).hasSize(1);
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);

// make sure planIds are different for each scan
assertThat(newScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
assertThat(newScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID))
.isNotEqualTo(planId);
}

@SuppressWarnings("unchecked")
private <T extends RESTResponse> T maybeAddStorageCredential(T response) {
if (response instanceof PlanTableScanResponse resp
&& PlanStatus.COMPLETED == resp.planStatus()) {
return (T)
PlanTableScanResponse.builder()
.withPlanStatus(resp.planStatus())
.withPlanId(resp.planId())
.withPlanTasks(resp.planTasks())
.withFileScanTasks(resp.fileScanTasks())
.withCredentials(
ImmutableList.<Credential>builder()
.addAll(resp.credentials())
.add(
ImmutableCredential.builder()
.prefix("dummy")
.putConfig("dummyKey", "dummyVal")
.build())
.build())
.withSpecsById(resp.specsById())
.build();
} else if (response instanceof FetchPlanningResultResponse resp
&& PlanStatus.COMPLETED == resp.planStatus()) {
return (T)
FetchPlanningResultResponse.builder()
.withPlanStatus(resp.planStatus())
.withFileScanTasks(resp.fileScanTasks())
.withPlanTasks(resp.planTasks())
.withSpecsById(resp.specsById())
.withCredentials(
ImmutableList.<Credential>builder()
.addAll(resp.credentials())
.add(
ImmutableCredential.builder()
.prefix("dummy")
.putConfig("dummyKey", "dummyVal")
.build())
.build())
.build();
}

return response;
}
}