Skip to content

GH-26818: [C++][Python] Preserve order when writing dataset multi-threaded #44470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 14, 2025

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Oct 18, 2024

Rationale for this change

The order of rows in a dataset might be important for users and should be preserved when writing to a filesystem. With multi-threaded write, the order is currently not guaranteed,

What changes are included in this PR?

Preserving the dataset order of rows requires the SourceNode to sequence the fragments output (this keeps exec batches in the order of fragments), to provide an ImplicitOrdering (this gives exec batches an index), and the ConsumingSinkNode to sequence exec batches (finally preserve order of batches according to their index).

User-facing changes:

  • Add option preserve_order to FileSystemDatasetWriteOptions (C++) and arrow.dataset.write_dataset (Python).

Default behaviour is current behaviour.

Are these changes tested?

Unit tests have been added,

Are there any user-facing changes?

Users can set FileSystemDatasetWriteOptions.preserve_order = true (C++) / arrow.dataset.write_dataset(..., preserve_order=True) (Python).

@EnricoMi EnricoMi requested a review from westonpace as a code owner October 18, 2024 11:07
Copy link

⚠️ GitHub issue #26818 has been automatically assigned in GitHub to PR creator.

@gitmodimo
Copy link
Contributor

This pull request seems to functionally overlap with this one. Some changes are almost exactly the same. Ordering of data is kept in threaded execution with use of batch index. Can you check whether it fixes your use case also?

@@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor has a default value for ordering and initializes ordering with the value given to the constructor. No point for another default value here, I think.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Oct 31, 2024
acero::ConsumingSinkNodeOptions{
std::move(consumer),
{},
/*sequence_output=*/write_options.preserve_order}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeeNode needs the same treatment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, looks like this requires some refactoring, since TeeNode is not used for writing datasets I'd leave this to a separate PR.

@gitmodimo
Copy link
Contributor

Since you are fixing dataset write ordering I think this check never fires. It should be moved to InsertBatch.
Also probaly AccumulationQueue, SequencingQueue and SerialSequencingQueue should be exported for acero nodes developers.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 4, 2024

@gitmodimo I think that refactoring should be done in a separate PR keeping this PR focused on fixing the issue.

@EnricoMi
Copy link
Contributor Author

@zanmato1984 this touches related code area as #44616. Hoping you could take a look when you find time.

@zanmato1984
Copy link
Contributor

Hi @EnricoMi , I can take a look. Just first glance but do you think the PR description could be updated accordingly?

Copy link

⚠️ GitHub issue #26818 has been automatically assigned in GitHub to PR creator.

@zanmato1984
Copy link
Contributor

Also, you might want to update the PR description to reflect its latest purpose.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Mar 7, 2025

Also, you might want to update the PR description to reflect its latest purpose.

I think the PR description is up-to-date, do you see any discrepancy?

@EnricoMi EnricoMi force-pushed the preserve-order-2 branch from f5a18b2 to 3726f87 Compare May 9, 2025 08:44
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels May 9, 2025
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels May 9, 2025
@EnricoMi EnricoMi force-pushed the preserve-order-2 branch from f9b3549 to dfd958d Compare May 9, 2025 10:17
@@ -4591,6 +4591,22 @@ def file_visitor(written_file):
assert result1.to_table().equals(result2.to_table())


@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_use_threads_preserve_order(tempdir):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test code from issue description. The test fails with preserve_order=False.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels May 9, 2025
Copy link
Contributor

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zanmato1984
Copy link
Contributor

Let's wait for @rok a while before I can merge this. Thanks.

@zanmato1984
Copy link
Contributor

@github-actions crossbow submit -g cpp -g python

Copy link

Revision: dfd958d

Submitted crossbow builds: ursacomputing/crossbow @ actions-58bcfa66f9

Task Status
example-cpp-minimal-build-static GitHub Actions
example-cpp-minimal-build-static-system-dependency GitHub Actions
example-cpp-tutorial GitHub Actions
example-python-minimal-build-fedora-conda GitHub Actions
example-python-minimal-build-ubuntu-venv GitHub Actions
test-alpine-linux-cpp GitHub Actions
test-build-cpp-fuzz GitHub Actions
test-conda-cpp GitHub Actions
test-conda-cpp-meson GitHub Actions
test-conda-cpp-valgrind GitHub Actions
test-conda-python-3.10 GitHub Actions
test-conda-python-3.10-hdfs-2.9.2 GitHub Actions
test-conda-python-3.10-hdfs-3.2.1 GitHub Actions
test-conda-python-3.10-pandas-latest-numpy-latest GitHub Actions
test-conda-python-3.11 GitHub Actions
test-conda-python-3.11-dask-latest GitHub Actions
test-conda-python-3.11-dask-upstream_devel GitHub Actions
test-conda-python-3.11-hypothesis GitHub Actions
test-conda-python-3.11-pandas-latest-numpy-1.26 GitHub Actions
test-conda-python-3.11-pandas-latest-numpy-latest GitHub Actions
test-conda-python-3.11-pandas-nightly-numpy-nightly GitHub Actions
test-conda-python-3.11-pandas-upstream_devel-numpy-nightly GitHub Actions
test-conda-python-3.11-spark-master GitHub Actions
test-conda-python-3.12 GitHub Actions
test-conda-python-3.12-cpython-debug GitHub Actions
test-conda-python-3.13 GitHub Actions
test-conda-python-3.9 GitHub Actions
test-conda-python-3.9-pandas-1.1.3-numpy-1.19.5 GitHub Actions
test-conda-python-emscripten GitHub Actions
test-cuda-cpp-ubuntu-22.04-cuda-11.7.1 GitHub Actions
test-cuda-python-ubuntu-22.04-cuda-11.7.1 GitHub Actions
test-debian-12-cpp-amd64 GitHub Actions
test-debian-12-cpp-i386 GitHub Actions
test-debian-12-python-3-amd64 GitHub Actions
test-debian-12-python-3-i386 GitHub Actions
test-fedora-39-cpp GitHub Actions
test-fedora-39-python-3 GitHub Actions
test-ubuntu-22.04-cpp GitHub Actions
test-ubuntu-22.04-cpp-20 GitHub Actions
test-ubuntu-22.04-cpp-bundled GitHub Actions
test-ubuntu-22.04-cpp-emscripten GitHub Actions
test-ubuntu-22.04-cpp-no-threading GitHub Actions
test-ubuntu-22.04-python-3 GitHub Actions
test-ubuntu-22.04-python-313-freethreading GitHub Actions
test-ubuntu-24.04-cpp GitHub Actions
test-ubuntu-24.04-cpp-bundled-offline GitHub Actions
test-ubuntu-24.04-cpp-gcc-13-bundled GitHub Actions
test-ubuntu-24.04-cpp-gcc-14 GitHub Actions
test-ubuntu-24.04-cpp-minimal-with-formats GitHub Actions
test-ubuntu-24.04-cpp-thread-sanitizer GitHub Actions
test-ubuntu-24.04-python-3 GitHub Actions

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels May 14, 2025
@rok rok merged commit 021d8ab into apache:main May 14, 2025
44 of 45 checks passed
@rok rok removed the awaiting merge Awaiting merge label May 14, 2025
@rok
Copy link
Member

rok commented May 14, 2025

Thanks for doing this @EnricoMi ! This was a long standing issue.

@EnricoMi
Copy link
Contributor Author

Thanks everyone for the thorough review!

@EnricoMi EnricoMi deleted the preserve-order-2 branch May 14, 2025 07:38
Copy link

After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit 021d8ab.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 6 possible false positives for unstable benchmarks that are known to sometimes produce them.

@rok
Copy link
Member

rok commented May 14, 2025

Perhaps it'd be worth it to add a benchmark for preserve_order == true to ensure there's no future regressions. Or would the nodes this uses already be benchmarked?

@EnricoMi
Copy link
Contributor Author

I'll look into this!

EnricoMi added a commit to EnricoMi/arrow that referenced this pull request May 23, 2025
…ti-threaded (apache#44470)

### Rationale for this change
The order of rows in a dataset might be important for users and should be preserved when writing to a filesystem. With multi-threaded write, the order is currently not guaranteed,

### What changes are included in this PR?
Preserving the dataset order of rows requires the `SourceNode` to sequence the fragments output (this keeps exec batches in the order of fragments), to provide an `ImplicitOrdering` (this gives exec batches an index), and the `ConsumingSinkNode` to sequence exec batches (finally preserve order of batches according to their index).

User-facing changes:
- Add option `preserve_order` to `FileSystemDatasetWriteOptions` (C++) and `arrow.dataset.write_dataset` (Python).

Default behaviour is current behaviour.

### Are these changes tested?
Unit tests have been added,

### Are there any user-facing changes?
Users can set `FileSystemDatasetWriteOptions.preserve_order = true` (C++) / `arrow.dataset.write_dataset(..., preserve_order=True)` (Python).
* GitHub Issue: apache#26818

Lead-authored-by: Enrico Minack <[email protected]>
Co-authored-by: Rok Mihevc <[email protected]>
Signed-off-by: Rok Mihevc <[email protected]>
EnricoMi added a commit to G-Research/arrow that referenced this pull request May 23, 2025
…ti-threaded (apache#44470) (#5)

### Rationale for this change
The order of rows in a dataset might be important for users and should be preserved when writing to a filesystem. With multi-threaded write, the order is currently not guaranteed,

### What changes are included in this PR?
Preserving the dataset order of rows requires the `SourceNode` to sequence the fragments output (this keeps exec batches in the order of fragments), to provide an `ImplicitOrdering` (this gives exec batches an index), and the `ConsumingSinkNode` to sequence exec batches (finally preserve order of batches according to their index).

User-facing changes:
- Add option `preserve_order` to `FileSystemDatasetWriteOptions` (C++) and `arrow.dataset.write_dataset` (Python).

Default behaviour is current behaviour.

### Are these changes tested?
Unit tests have been added,

### Are there any user-facing changes?
Users can set `FileSystemDatasetWriteOptions.preserve_order = true` (C++) / `arrow.dataset.write_dataset(..., preserve_order=True)` (Python).
* GitHub Issue: apache#26818

Lead-authored-by: Enrico Minack <[email protected]>

Signed-off-by: Rok Mihevc <[email protected]>
Co-authored-by: Rok Mihevc <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants