Skip to content

[data] Add Dataset.write_datasink_lazy to support intermediate outputs. #52094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

basveeling
Copy link

@basveeling basveeling commented Apr 8, 2025

This PR proposes extending the dataset api with Dataset.write_datasink_lazy. I'd be happy to discuss and welcome any comments and can finalize the PR with additional tests and documentation if there's interest.

 def write_datasink_lazy(
        self,
        datasink: Datasink,
        *,
        prefilter_fn: Callable[[Block], Block] | None = None,
        ray_remote_args: dict[str, Any] = None,
        concurrency: Optional[int] = None,
    ) -> "Dataset":
    """Writes the dataset to a custom :class:`~ray.data.Datasink` lazily while allowing subsequent data operations."""

Why are these changes needed?

Some Ray Data pipelines benefit from writing intermediate outputs, for example:

  1. During development, one wants to cache the output in the middle of a DAG and debug subsequent dataset operations from cache.
  2. Some data operations generate multiple outputs which need to be written into separate datasinks.

This was partly inspired by https://deepseek-ai.github.io/smallpond/'s ability to handle multiple outputs using https://deepseek-ai.github.io/smallpond/generated/smallpond.dataframe.Session.wait.html#smallpond.dataframe.Session.wait . This PR takes a different approach by providing a write node that passes data through transparantly for further processing.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • TODO: I've included any doc changes needed for https://docs.ray.io/en/master/.
    • TODO I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • TODO I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: basveeling <[email protected]>
@basveeling basveeling requested a review from a team as a code owner April 8, 2025 14:29
@basveeling
Copy link
Author

One point to be discussed is if and how this should handle the Datasink.on_write_complete() hook. We could check if the provided Datasink implements this method and throw an error, or look for ways to retrieve the WriteResult

@hainesmichaelc hainesmichaelc added the community-contribution Contributed by the community label Apr 9, 2025
@jcotant1 jcotant1 added the data Ray Data-related issues label Apr 10, 2025
@richardliaw
Copy link
Contributor

This is quite nice, thanks for the contribution! Will take a quick look.

self,
datasink: Datasink,
*,
prefilter_fn: Optional[Callable[[Block], Block]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the prefilter_fn for now to maintain consistency with write_datasink?

Comment on lines +21 to +45
def generate_lazy_write_fn(
datasink_or_legacy_datasource: Union[Datasink, Datasource],
prefilter_fn: Optional[Callable[[Block], Block]] = None,
**write_args,
) -> Callable[[Iterator[Block], TaskContext], Iterator[Block]]:
def fn(blocks: Iterator[Block], ctx: TaskContext) -> Iterator[Block]:
"""Writes the blocks to the given datasink or legacy datasource.

Outputs the original blocks to be written."""
# Create a copy of the iterator, so we can return the original blocks.
it1, it2 = itertools.tee(blocks, 2)
if isinstance(datasink_or_legacy_datasource, Datasink):
# Apply the prefilter function to each block before writing
if prefilter_fn is not None:
it1 = (prefilter_fn(block) if len(block) else block for block in it1)
ctx.kwargs["_datasink_write_return"] = datasink_or_legacy_datasource.write(
it1, ctx
)
else:
datasink_or_legacy_datasource.write(it1, ctx, **write_args)

return it2

return fn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't much different than generate_write_fn right?

Comment on lines +66 to +67
# TODO: figure out how to handle on_write_complete()
return MapOperator.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah indeed, we'll want to figure this part out (and on_write_failed)

@richardliaw
Copy link
Contributor

@raulchen - any thoughts on how to handle this properly? main questions:

  1. if we do a lazy write, do we still want to generate stats (like
    collect_stats_fn = generate_collect_write_stats_fn()
    )
  2. How do we support things like on_write_complete or on_write_failed?

    ray/python/ray/data/dataset.py

    Lines 4178 to 4182 in dd1038c

    datasink.on_write_complete(write_result)
    except Exception as e:
    datasink.on_write_failed(e)
    raise

@raulchen
Copy link
Contributor

Thanks for your contribution. This is a nice feature.

  • Regarding the API, I prefer just adding a lazy flag to the existing write_xxx APIs. otherwise we'll have to make an async copy for each write API.
  • The current write_datasink_lazy implementation has an implication that the input data will be preserved in the outputs of the write. This definitely makes sense. But I'm thinking maybe we can decouple this behavior with sync/async. I.E., we have 2 flags: 1) async to determine if execution will be triggered immediately. 2) preserve_data to determine if data will be preserved in the outputs of write_xxx.
  • on_write_complete still needs to be supported. Because some data sinks (e.g., Lance) depend on this API to perform a commit operation. We can update the implementation to propagate the WriteResults via BlockMetadata, instead of via Blocks. The WriteResults will be accessible to the data sinks. We can still print the stats by default.

@richardliaw richardliaw added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Apr 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution Contributed by the community data Ray Data-related issues @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants