Spark 4.1: Pass FileIO on Spark's read path#15448
Spark 4.1: Pass FileIO on Spark's read path#15448nastra wants to merge 3 commits intoapache:mainfrom
Conversation
000673a to
dc8a4f7
Compare
| CatalogProperties.CLIENT_POOL_SIZE, | ||
| "1")); | ||
| "1", | ||
| "include-credentials", |
There was a problem hiding this comment.
we need the server to return some dummy credentials so that we can properly test that FileIO with the planID + storage credentials is propagated in TestRESTScanPlanning
open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTServerCatalogAdapter.java
Show resolved
Hide resolved
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class SerializableFileIOWithSize |
There was a problem hiding this comment.
this makes sure that the FileIO instance is only closed on the driver and not on executor nodes and is similar to SerializableTableWithSize
There was a problem hiding this comment.
Is this needed? What is the advantage? I don't recall why we had to do it with table, so you may want to check with @aokolnychyi.
I would highly prefer not adding an extra class unless it is definitely necessary.
There was a problem hiding this comment.
SerializableTableWithSize was added by @bryanck back then and I was initially testing the code changes without this class but it is definitely needed to avoid closing FileIO on executor nodes
There was a problem hiding this comment.
#7263 is the PR that moved away from serializing the FileIO. The problem is that Spark closed the FileIO as part of broadcast cleanup, which closed the shared S3 client.
There was a problem hiding this comment.
I agree with adding this class; I think we ultimately do need to preserve size estimation property to prevent any regressions and I think we definitley need to preserve making sure the driver fileIO is not closed during broadcast cleanup. If we agree that we need to preserve these properties, and it looks like there are at least 2 places where now we want to broadcast the FileIO , I think there's a good argument to have a wrapper class. If we manage to shrink down what we need to send over the wire maybe there's a simpler structure but whatever we broadcast probably does need to implement FileIO which brings along all the implementations to satisfy that interface.
My only question is does the class need to be public? It looks like it's only used in this module so I think it could be package private.
There was a problem hiding this comment.
+1 to Package private, I'm not as clear on the benefit of handling this Size Estimate, but the closing behavior seems important to match. Feels like Spark should have an easier method though for doing this, like broadcast(var, estimatedSize) or something to avoid us having to implement an interface.
There was a problem hiding this comment.
As a thought, what's stopping us from doing something like
SerializableTableWithSize.copyOf(table, scan.io())); Where we use the table object and just attach the scan io to it?
45a44aa to
122ccc8
Compare
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Outdated
Show resolved
Hide resolved
| @SuppressWarnings("unchecked") | ||
| private <T extends RESTResponse> T maybeAddStorageCredential(T response) { | ||
| if (response instanceof PlanTableScanResponse resp | ||
| && PlanStatus.COMPLETED == resp.planStatus()) { |
There was a problem hiding this comment.
I'm looking into when we can return a FileIO from the scan and I was surprised to see that storage-credentials is returned for any CompletedPlanningResult rather than CompletedPlanningWithIDResult. The one with ID is used for completed responses from the plan endpoint, while the generic result is returned from both plan and fetch endpoints.
Why can we return storage credentials when fetching tasks? Wouldn't it make more sense to return credentials once per planning operation? That simplifies when we know we have credentials.
That wouldn't solve the problem of needing to wait until after planFiles is called to get the FileIO, but it would at least simplify the protocol so we don't need to try to create a new FileIO after each call to fetch more tasks. (@danielcweeks, any thoughts on this?)
There was a problem hiding this comment.
I guess if we want to change this we would at the very least need to update the Spec that we added in #14563
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
Outdated
Show resolved
Hide resolved
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
| SparkScan( | ||
| SparkSession spark, | ||
| Table table, | ||
| Supplier<FileIO> fileIO, |
There was a problem hiding this comment.
This is a supplier because it will be a broadcast?
There was a problem hiding this comment.
this is a supplier because planFiles() will be called later in the read path and so the actual FileIO with the right credentials will only be available later
There was a problem hiding this comment.
I actually considered using Supplier<FileIO> for the API from scan. Now that it's clear that we will create a supplier anyway, should we just update the scan interface so that the caller doesn't need to wrap it?
The nice thing about that is that we don't rely on docs or runtime exceptions (unless you call get too early). Returning a supplier signals to the caller that they should find out when the FileIO is accessible.
There was a problem hiding this comment.
yeah I think that makes sense. I've created #15646 for that
122ccc8 to
4a69352
Compare
4a69352 to
f9e1bde
Compare
c13cd0a to
b5367d7
Compare
3e3ec04 to
55d2b57
Compare
55d2b57 to
d8d6cb7
Compare
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
d8d6cb7 to
456b1d5
Compare
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
Show resolved
Hide resolved
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
Show resolved
Hide resolved
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Show resolved
Hide resolved
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
Outdated
Show resolved
Hide resolved
456b1d5 to
bce38ca
Compare
bce38ca to
1d5a7bb
Compare
|
|
||
| @Override | ||
| public long estimatedSize() { | ||
| return SIZE_ESTIMATE; |
There was a problem hiding this comment.
Size estimate of what? The FileIO's serialized representation?
There was a problem hiding this comment.
this is just a hardcoded size so that sizes don't have to be re-calculated (similar to how we have it in SerializableTableWithSize)
1d5a7bb to
33733d7
Compare
When accessing/reading data files, the codebase is using the Table's
FileIOinstance throughtable.io()on Spark's read path. With remote scan planning theFileIOinstance is configured with a PlanID + custom storage credentials insideRESTTableScan, but that instance is never propagated to the place(s) that actually perform the read., thus leading to errors.This PR passes the
FileIOobtained during remote/distributed scan planning next to theTableinstance on Spark's read path.This is an alternative to #15368 and requires
SerializableFileIOWithSize, which makes sure that theFileIOinstance is only closed on the driver and not on executor nodes (similar toSerializableTableWithSize).