Skip to content

Conversation

@duxiao1212
Copy link
Contributor

@duxiao1212 duxiao1212 commented Nov 13, 2025

Summary:
Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221

== NO RELEASE NOTE ==

@duxiao1212 duxiao1212 requested review from a team as code owners November 13, 2025 23:47
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Nov 13, 2025
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 13, 2025

Reviewer's Guide

Implements sorted shuffle support in LocalShuffleReader by adding a buffered k-way merge via TreeOfLosers and SortedShuffleFileStreamReader, updates compareKeys to three‐way ordering, adds size sanity checks, and extends tests with full end‐to‐end sorted shuffle validation.

Sequence diagram for k-way merge in LocalShuffleReader::nextSorted

sequenceDiagram
    participant LocalShuffleReader
    participant TreeOfLosers
    participant SortedShuffleFileStreamReader
    participant FileSystem
    LocalShuffleReader->>FileSystem: getReadPartitionFiles()
    LocalShuffleReader->>SortedShuffleFileStreamReader: create reader for each file
    SortedShuffleFileStreamReader->>FileSystem: openFileForRead(filePath)
    LocalShuffleReader->>TreeOfLosers: initialize with readers
    loop while TreeOfLosers has next stream
        LocalShuffleReader->>TreeOfLosers: next()
        TreeOfLosers->>SortedShuffleFileStreamReader: next()
        SortedShuffleFileStreamReader->>SortedShuffleFileStreamReader: nextRow()
        SortedShuffleFileStreamReader-->>LocalShuffleReader: currentData()
        LocalShuffleReader->>LocalShuffleReader: add row to batch
    end
Loading

ER diagram for sorted shuffle file streaming and merging

erDiagram
    SHUFFLE_FILE ||--o{ SORTED_SHUFFLE_FILE_STREAM_READER : streams
    SORTED_SHUFFLE_FILE_STREAM_READER ||--|{ TREE_OF_LOSERS : merged_by
    TREE_OF_LOSERS ||--o{ READ_BATCH : produces
    SHUFFLE_FILE {
        string filePath
        uint64_t fileSize
    }
    SORTED_SHUFFLE_FILE_STREAM_READER {
        string filePath
        uint16_t streamIdx
        string currentKey
        string currentData
    }
    TREE_OF_LOSERS {
        MergeStream[] streams
    }
    READ_BATCH {
        string_view[] rows
        BufferPtr buffer
    }
Loading

File-Level Changes

Change Details Files
Add SortedShuffleFileStreamReader and k-way merge logic for sorted shuffles
  • Define SortedShuffleFileStreamReader implementing MergeStream with buffered I/O
  • Implement buffer refill and nextRow parsing for (key,data) entries
  • Add nextSorted method to initialize TreeOfLosers with readers and emit sorted ReadBatches
  • Modify next() to dispatch between nextSorted and nextUnsorted based on sortedShuffle flag
presto_cpp/main/operators/LocalShuffle.cpp
presto_cpp/main/operators/LocalShuffle.h
Revise compareKeys to use std::strong_ordering and update related call sites
  • Change compareKeys signature to return std::strong_ordering using lexicographical_compare_three_way
  • Update calls in extractAndSortRowMetadata and ShuffleTest comparator to compare against std::strong_ordering::less
presto_cpp/main/operators/LocalShuffle.cpp
presto_cpp/main/operators/LocalShuffle.h
presto_cpp/main/operators/tests/ShuffleTest.cpp
Enforce buffer and file size limits to avoid unreadable or oversized rows/files
  • Add VELOX_CHECK_LE in LocalShuffleWriter::collect to constrain rowSize
  • Add VELOX_CHECK_LE in nextSorted to ensure single row <= maxBytes
  • Add VELOX_CHECK_LE in nextUnsorted to ensure each file size <= maxBytes
presto_cpp/main/operators/LocalShuffle.cpp
Add end-to-end sorted shuffle test coverage
  • Introduce persistentShuffleSortedEndToEnd test generating random keys and data
  • Validate that reader output matches sorted order by key using embedded index markers
presto_cpp/main/operators/tests/ShuffleTest.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:399-400` </location>
<code_context>
+  }
+
+  const char* pos = buffer_->as<char>() + bufferPos_;
+  const TRowSize keySize =
+      folly::Endian::big(*reinterpret_cast<const TRowSize*>(pos));
+  const TRowSize dataSize = folly::Endian::big(
+      *reinterpret_cast<const TRowSize*>(pos + sizeof(TRowSize)));
</code_context>

<issue_to_address>
**issue (bug_risk):** No validation for keySize and dataSize values read from file.

Without bounds checks, corrupted files could cause keySize or dataSize to be unreasonably large, risking buffer overflows or excessive memory allocation. Please validate these values before use.
</issue_to_address>

### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:282-290` </location>
<code_context>
       key);
   const auto rowSize = this->rowSize(key.size(), data.size());
+
+  VELOX_CHECK_LE(
+      rowSize,
+      kDefaultStreamReaderBufferSize,
+      "Row size {} bytes (keySize={}, dataSize={}) exceeds maximum reader buffer size {} bytes. "
+      "This row would be unreadable.",
+      rowSize,
+      key.size(),
+      data.size(),
+      kDefaultStreamReaderBufferSize);
+
   auto& buffer = inProgressPartitions_[partition];
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Row size check may not prevent all unreadable rows if buffer size changes.

If bufferSize is set lower than the default elsewhere, this check may not catch all unreadable rows. Please validate against the actual buffer size used during reading.

Suggested implementation:

```cpp
  const size_t actualBufferSize = getActualReaderBufferSize(); // Replace with actual method or member variable
  VELOX_CHECK_LE(
      rowSize,
      actualBufferSize,
      "Row size {} bytes (keySize={}, dataSize={}) exceeds maximum reader buffer size {} bytes. "
      "This row would be unreadable.",
      rowSize,
      key.size(),
      data.size(),
      actualBufferSize);

```

- You need to implement or use an existing method/member to get the actual buffer size used for reading, e.g., `getActualReaderBufferSize()` or `bufferSize_`.
- If the buffer size is a member variable (e.g., `bufferSize_`), use that directly.
- If not present, you may need to pass the buffer size to this context or retrieve it from configuration.
</issue_to_address>

### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:513-516` </location>
<code_context>
+      return batches;
+    }
+
+    // Write row: [dataSize][data]
+    char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
+    *reinterpret_cast<TRowSize*>(writePos) =
+        folly::Endian::big(static_cast<TRowSize>(data.size()));
+
</code_context>

<issue_to_address>
**suggestion:** Direct use of reinterpret_cast for writing data may be non-portable.

This approach may fail on platforms with strict alignment. Use memcpy or a portable serialization method instead.

```suggestion
    // Write row: [dataSize][data]
    char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
    TRowSize rowSizeBigEndian = folly::Endian::big(static_cast<TRowSize>(data.size()));
    std::memcpy(writePos, &rowSizeBigEndian, sizeof(TRowSize));
```
</issue_to_address>

### Comment 4
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1839-1843` </location>
<code_context>
+      data.append(fmt::format("_idx{:04d}", i));
+      dataValues.push_back(std::move(data));
+    }
+    for (size_t i = 0; i < randomKeys.size(); ++i) {
+      int32_t keyBigEndian = folly::Endian::big(randomKeys[i]);
+      std::string_view keyBytes(
+          reinterpret_cast<const char*>(&keyBigEndian), kUint32Size);
+      writer->collect(partition, keyBytes, dataValues[i]);
+    }
+    writer->noMoreData(true);
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test for duplicate keys in sorted shuffle.

Add a test with duplicate sort keys to ensure the k-way merge and tie-breaking logic produce deterministic, correct output.

Suggested implementation:

```cpp
TEST_F(ShuffleTest, persistentShuffleSortedEndToEnd) {
  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;

  struct TestConfig {
    size_t maxBytesPerPartition;
  };

  // Test for duplicate sort keys in sorted shuffle
  TEST_F(ShuffleTest, persistentShuffleSortedEndToEndWithDuplicateKeys) {
    const uint32_t numPartitions = 1;
    const uint32_t partition = 0;
    constexpr size_t kNumRows = 10;
    constexpr size_t kUint32Size = sizeof(uint32_t);

    std::vector<uint32_t> sortKeys;
    std::vector<std::string> dataValues;

    // Create duplicate keys: keys 42 and 99 appear twice
    for (size_t i = 0; i < kNumRows; ++i) {
      uint32_t key = (i < 2) ? 42 : (i < 4) ? 99 : i + 100;
      sortKeys.push_back(key);
      std::string data = fmt::format("row{}_key{}", i, key);
      dataValues.push_back(data);
    }

    // Shuffle the data to simulate random input order
    std::vector<size_t> indices(kNumRows);
    std::iota(indices.begin(), indices.end(), 0);
    std::random_device rd;
    std::mt19937 g(rd());
    std::shuffle(indices.begin(), indices.end(), g);

    std::vector<uint32_t> shuffledKeys;
    std::vector<std::string> shuffledData;
    for (size_t idx : indices) {
      shuffledKeys.push_back(sortKeys[idx]);
      shuffledData.push_back(dataValues[idx]);
    }

    // Write shuffled data to the shuffle writer
    auto writer = createTestShuffleWriter();
    for (size_t i = 0; i < kNumRows; ++i) {
      int32_t keyBigEndian = folly::Endian::big(shuffledKeys[i]);
      std::string_view keyBytes(
          reinterpret_cast<const char*>(&keyBigEndian), kUint32Size);
      writer->collect(partition, keyBytes, shuffledData[i]);
    }
    writer->noMoreData(true);

    // Read and verify output is sorted and tie-breaking is deterministic
    auto output = readShuffleOutput(partition);
    std::vector<std::pair<uint32_t, std::string>> result;
    for (const auto& row : output) {
      // Extract key from row string
      auto keyPos = row.find("key");
      ASSERT_NE(keyPos, std::string::npos);
      uint32_t key = std::stoi(row.substr(keyPos + 3));
      result.emplace_back(key, row);
    }

    // Check sorted order and deterministic tie-breaking
    for (size_t i = 1; i < result.size(); ++i) {
      ASSERT_LE(result[i-1].first, result[i].first);
      if (result[i-1].first == result[i].first) {
        // Tie-breaking: check lexicographical order of data string
        ASSERT_LT(result[i-1].second, result[i].second);
      }
    }
  }

```

- You may need to adjust the helper functions `createTestShuffleWriter()` and `readShuffleOutput()` to match your test harness.
- If your shuffle implementation uses a different tie-breaking logic, update the assertion accordingly.
- Ensure the new test is registered and run with your test suite.
</issue_to_address>

### Comment 5
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1830-1837` </location>
<code_context>
+    for (size_t i = 0; i < testData.numRows; ++i) {
+      randomKeys.push_back(static_cast<int32_t>(folly::Random::rand32(rng)));
+
+      const size_t sizeRange = testData.maxDataSize - testData.minDataSize;
+      const size_t dataSize = testData.minDataSize +
+          (sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0);
+
+      // Create data with index marker at the end for verification
+      std::string data(dataSize, static_cast<char>('a' + (i % 26)));
+      data.append(fmt::format("_idx{:04d}", i));
+      dataValues.push_back(std::move(data));
+    }
+    for (size_t i = 0; i < randomKeys.size(); ++i) {
</code_context>

<issue_to_address>
**suggestion (testing):** No test for empty data values in sorted shuffle.

Please add a test case with zero-length data values to verify correct handling and prevent potential errors or corruption.

Suggested implementation:

```cpp
    // Add a test case for zero-length data value
    {
      int32_t zeroLengthKey = static_cast<int32_t>(folly::Random::rand32(rng));
      randomKeys.push_back(zeroLengthKey);
      std::string zeroLengthData; // empty string
      zeroLengthData.append(fmt::format("_idx{:04d}", static_cast<int>(randomKeys.size() - 1)));
      dataValues.push_back(std::move(zeroLengthData));
    }

    for (size_t i = 0; i < testData.numRows; ++i) {
      randomKeys.push_back(static_cast<int32_t>(folly::Random::rand32(rng)));

      const size_t sizeRange = testData.maxDataSize - testData.minDataSize;
      const size_t dataSize = testData.minDataSize +
          (sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0);

      // Create data with index marker at the end for verification
      std::string data(dataSize, static_cast<char>('a' + (i % 26)));
      data.append(fmt::format("_idx{:04d}", i));
      dataValues.push_back(std::move(data));
    }

```

If there is a verification step later in the test, ensure that it checks for the presence and correctness of the zero-length data value (with the "_idx" marker). You may need to update any assertions or checks to account for the extra entry in `dataValues` and `randomKeys`.
</issue_to_address>

### Comment 6
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1850-1858` </location>
<code_context>
+    size_t count = 0;
+    std::vector<std::string> readDataValues;
+
+    while (true) {
+      auto batches = reader->next(testData.readMaxBytes)
+                         .via(folly::getGlobalCPUExecutor())
+                         .get();
+      if (batches.empty()) {
+        break;
+      }
</code_context>

<issue_to_address>
**suggestion (testing):** No test for corrupted or incomplete shuffle files.

Add a test that simulates a corrupted or truncated shuffle file to ensure the reader raises the correct error and does not return invalid data.

```suggestion
    auto reader = std::make_shared<LocalShuffleReader>(
        readInfo.rootPath,
        readInfo.queryId,
        readInfo.partitionIds,
        /*sortedShuffle=*/true,
        pool());

    size_t count = 0;
    std::vector<std::string> readDataValues;

// Test for corrupted or incomplete shuffle file.
TEST(ShuffleTest, CorruptedShuffleFile) {
  // Setup: create a valid shuffle file, then truncate/corrupt it.
  const std::string rootPath = "/tmp/shuffle_test_corrupt";
  const std::string queryId = "test_query_corrupt";
  const std::vector<uint32_t> partitionIds = {0};
  const bool sortedShuffle = true;

  // Write a valid shuffle file.
  {
    auto writer = std::make_shared<LocalShuffleWriter>(
        rootPath, queryId, partitionIds, sortedShuffle, pool());
    std::string validData = "valid_data";
    writer->collect(0, std::string_view(validData.data(), validData.size()), nullptr);
    writer->flush();
  }

  // Corrupt the file by truncating it.
  std::string shuffleFilePath = rootPath + "/" + queryId + "/0";
  std::ofstream ofs(shuffleFilePath, std::ios::in | std::ios::out | std::ios::binary);
  ofs.seekp(2); // Truncate after 2 bytes.
  ofs.write("", 0);
  ofs.close();

  // Try to read the corrupted file.
  auto reader = std::make_shared<LocalShuffleReader>(
      rootPath, queryId, partitionIds, sortedShuffle, pool());

  try {
    auto batches = reader->next(1024)
                       .via(folly::getGlobalCPUExecutor())
                       .get();
    // If no exception, fail the test.
    FAIL() << "Expected exception for corrupted shuffle file, but none was thrown.";
  } catch (const std::exception& ex) {
    // Check that the error is related to file corruption/truncation.
    EXPECT_TRUE(std::string(ex.what()).find("corrupt") != std::string::npos ||
                std::string(ex.what()).find("truncat") != std::string::npos ||
                std::string(ex.what()).find("incomplete") != std::string::npos)
        << "Unexpected error message: " << ex.what();
  }
}
```
</issue_to_address>

### Comment 7
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1892-1901` </location>
<code_context>
+    auto sortedOrder = getSortOrder(keys);
+
+    // Verify data appears in sorted key order
+    for (size_t i = 0; i < readDataValues.size(); ++i) {
+      // Extract original index from data value (format: [chars]_idx0000)
+      const std::string& dataValue = readDataValues[i];
+      size_t idxPos = dataValue.find("_idx");
+      ASSERT_NE(idxPos, std::string::npos)
+          << "Data value at position " << i << " missing '_idx' marker: '"
+          << dataValue << "'";
+
+      size_t originalIdx = std::stoul(dataValue.substr(idxPos + 4));
+
+      // The data at position i should correspond to the key at sortedOrder[i]
+      EXPECT_EQ(originalIdx, sortedOrder[i])
+          << "Data at position " << i << " should correspond to key at index "
+          << sortedOrder[i] << " but corresponds to index " << originalIdx;
</code_context>

<issue_to_address>
**suggestion (testing):** Test does not cover the case where no rows are written (empty input).

Add a test where no rows are written to verify the reader handles empty input without errors.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +1839 to +1844
for (size_t i = 0; i < randomKeys.size(); ++i) {
int32_t keyBigEndian = folly::Endian::big(randomKeys[i]);
std::string_view keyBytes(
reinterpret_cast<const char*>(&keyBigEndian), kUint32Size);
writer->collect(partition, keyBytes, dataValues[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test for duplicate keys in sorted shuffle.

Add a test with duplicate sort keys to ensure the k-way merge and tie-breaking logic produce deterministic, correct output.

Suggested implementation:

TEST_F(ShuffleTest, persistentShuffleSortedEndToEnd) {
  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;

  struct TestConfig {
    size_t maxBytesPerPartition;
  };

  // Test for duplicate sort keys in sorted shuffle
  TEST_F(ShuffleTest, persistentShuffleSortedEndToEndWithDuplicateKeys) {
    const uint32_t numPartitions = 1;
    const uint32_t partition = 0;
    constexpr size_t kNumRows = 10;
    constexpr size_t kUint32Size = sizeof(uint32_t);

    std::vector<uint32_t> sortKeys;
    std::vector<std::string> dataValues;

    // Create duplicate keys: keys 42 and 99 appear twice
    for (size_t i = 0; i < kNumRows; ++i) {
      uint32_t key = (i < 2) ? 42 : (i < 4) ? 99 : i + 100;
      sortKeys.push_back(key);
      std::string data = fmt::format("row{}_key{}", i, key);
      dataValues.push_back(data);
    }

    // Shuffle the data to simulate random input order
    std::vector<size_t> indices(kNumRows);
    std::iota(indices.begin(), indices.end(), 0);
    std::random_device rd;
    std::mt19937 g(rd());
    std::shuffle(indices.begin(), indices.end(), g);

    std::vector<uint32_t> shuffledKeys;
    std::vector<std::string> shuffledData;
    for (size_t idx : indices) {
      shuffledKeys.push_back(sortKeys[idx]);
      shuffledData.push_back(dataValues[idx]);
    }

    // Write shuffled data to the shuffle writer
    auto writer = createTestShuffleWriter();
    for (size_t i = 0; i < kNumRows; ++i) {
      int32_t keyBigEndian = folly::Endian::big(shuffledKeys[i]);
      std::string_view keyBytes(
          reinterpret_cast<const char*>(&keyBigEndian), kUint32Size);
      writer->collect(partition, keyBytes, shuffledData[i]);
    }
    writer->noMoreData(true);

    // Read and verify output is sorted and tie-breaking is deterministic
    auto output = readShuffleOutput(partition);
    std::vector<std::pair<uint32_t, std::string>> result;
    for (const auto& row : output) {
      // Extract key from row string
      auto keyPos = row.find("key");
      ASSERT_NE(keyPos, std::string::npos);
      uint32_t key = std::stoi(row.substr(keyPos + 3));
      result.emplace_back(key, row);
    }

    // Check sorted order and deterministic tie-breaking
    for (size_t i = 1; i < result.size(); ++i) {
      ASSERT_LE(result[i-1].first, result[i].first);
      if (result[i-1].first == result[i].first) {
        // Tie-breaking: check lexicographical order of data string
        ASSERT_LT(result[i-1].second, result[i].second);
      }
    }
  }
  • You may need to adjust the helper functions createTestShuffleWriter() and readShuffleOutput() to match your test harness.
  • If your shuffle implementation uses a different tie-breaking logic, update the assertion accordingly.
  • Ensure the new test is registered and run with your test suite.

Comment on lines +1830 to +1838
const size_t sizeRange = testData.maxDataSize - testData.minDataSize;
const size_t dataSize = testData.minDataSize +
(sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0);

// Create data with index marker at the end for verification
std::string data(dataSize, static_cast<char>('a' + (i % 26)));
data.append(fmt::format("_idx{:04d}", i));
dataValues.push_back(std::move(data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): No test for empty data values in sorted shuffle.

Please add a test case with zero-length data values to verify correct handling and prevent potential errors or corruption.

Suggested implementation:

    // Add a test case for zero-length data value
    {
      int32_t zeroLengthKey = static_cast<int32_t>(folly::Random::rand32(rng));
      randomKeys.push_back(zeroLengthKey);
      std::string zeroLengthData; // empty string
      zeroLengthData.append(fmt::format("_idx{:04d}", static_cast<int>(randomKeys.size() - 1)));
      dataValues.push_back(std::move(zeroLengthData));
    }

    for (size_t i = 0; i < testData.numRows; ++i) {
      randomKeys.push_back(static_cast<int32_t>(folly::Random::rand32(rng)));

      const size_t sizeRange = testData.maxDataSize - testData.minDataSize;
      const size_t dataSize = testData.minDataSize +
          (sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0);

      // Create data with index marker at the end for verification
      std::string data(dataSize, static_cast<char>('a' + (i % 26)));
      data.append(fmt::format("_idx{:04d}", i));
      dataValues.push_back(std::move(data));
    }

If there is a verification step later in the test, ensure that it checks for the presence and correctness of the zero-length data value (with the "_idx" marker). You may need to update any assertions or checks to account for the extra entry in dataValues and randomKeys.

Comment on lines 1850 to 1860
auto reader = std::make_shared<LocalShuffleReader>(
readInfo.rootPath,
readInfo.queryId,
readInfo.partitionIds,
/*sortedShuffle=*/true,
pool());

size_t count = 0;
std::vector<std::string> readDataValues;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): No test for corrupted or incomplete shuffle files.

Add a test that simulates a corrupted or truncated shuffle file to ensure the reader raises the correct error and does not return invalid data.

Suggested change
auto reader = std::make_shared<LocalShuffleReader>(
readInfo.rootPath,
readInfo.queryId,
readInfo.partitionIds,
/*sortedShuffle=*/true,
pool());
size_t count = 0;
std::vector<std::string> readDataValues;
auto reader = std::make_shared<LocalShuffleReader>(
readInfo.rootPath,
readInfo.queryId,
readInfo.partitionIds,
/*sortedShuffle=*/true,
pool());
size_t count = 0;
std::vector<std::string> readDataValues;
// Test for corrupted or incomplete shuffle file.
TEST(ShuffleTest, CorruptedShuffleFile) {
// Setup: create a valid shuffle file, then truncate/corrupt it.
const std::string rootPath = "/tmp/shuffle_test_corrupt";
const std::string queryId = "test_query_corrupt";
const std::vector<uint32_t> partitionIds = {0};
const bool sortedShuffle = true;
// Write a valid shuffle file.
{
auto writer = std::make_shared<LocalShuffleWriter>(
rootPath, queryId, partitionIds, sortedShuffle, pool());
std::string validData = "valid_data";
writer->collect(0, std::string_view(validData.data(), validData.size()), nullptr);
writer->flush();
}
// Corrupt the file by truncating it.
std::string shuffleFilePath = rootPath + "/" + queryId + "/0";
std::ofstream ofs(shuffleFilePath, std::ios::in | std::ios::out | std::ios::binary);
ofs.seekp(2); // Truncate after 2 bytes.
ofs.write("", 0);
ofs.close();
// Try to read the corrupted file.
auto reader = std::make_shared<LocalShuffleReader>(
rootPath, queryId, partitionIds, sortedShuffle, pool());
try {
auto batches = reader->next(1024)
.via(folly::getGlobalCPUExecutor())
.get();
// If no exception, fail the test.
FAIL() << "Expected exception for corrupted shuffle file, but none was thrown.";
} catch (const std::exception& ex) {
// Check that the error is related to file corruption/truncation.
EXPECT_TRUE(std::string(ex.what()).find("corrupt") != std::string::npos ||
std::string(ex.what()).find("truncat") != std::string::npos ||
std::string(ex.what()).find("incomplete") != std::string::npos)
<< "Unexpected error message: " << ex.what();
}
}

Comment on lines +1892 to +1900
for (size_t i = 0; i < readDataValues.size(); ++i) {
// Extract original index from data value (format: [chars]_idx0000)
const std::string& dataValue = readDataValues[i];
size_t idxPos = dataValue.find("_idx");
ASSERT_NE(idxPos, std::string::npos)
<< "Data value at position " << i << " missing '_idx' marker: '"
<< dataValue << "'";

size_t originalIdx = std::stoul(dataValue.substr(idxPos + 4));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test does not cover the case where no rows are written (empty input).

Add a test where no rows are written to verify the reader handles empty input without errors.

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 14, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 14, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 14, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
@duxiao1212 duxiao1212 changed the title feat: impl sort key for LocalShuffleReader feat: Impl sort key for LocalShuffleReader Nov 14, 2025
Comment on lines 448 to 439
if (!mergeInitialized_) {
if (readPartitionFiles_.empty()) {
mergeInitialized_ = true;
return batches;
}

std::vector<std::unique_ptr<velox::MergeStream>> streams;
streams.reserve(readPartitionFiles_.size());
uint16_t streamIndex = 0;
for (const auto& filename : readPartitionFiles_) {
VELOX_CHECK(
!filename.empty(),
"Invalid empty shuffle file path for query {}, partitions: [{}]",
queryId_,
folly::join(", ", partitionIds_));
auto reader = std::make_unique<SortedShuffleFileStreamReader>(
filename, streamIndex, pool_);
if (reader->hasData()) {
streams.push_back(std::move(reader));
++streamIndex;
}
}
if (!streams.empty()) {
merge_ =
std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t>>(
std::move(streams));
}

mergeInitialized_ = true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then shall we have a separate initialize method to wrap these initialization code and have some checks in these next/nextSorted methods, to enforce callers to call initialize before using the reader?

Copy link
Contributor

@tanjialiang tanjialiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to completely replace TestShuffleWriter/Reader with local shuffle. Please refactor the test file completely.

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 16, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 16, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 16, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 16, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Differential Revision: D86888221
tanjialiang
tanjialiang previously approved these changes Nov 17, 2025
Copy link
Contributor

@tanjialiang tanjialiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks LGTM

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 17, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 17, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 17, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 17, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
@tanjialiang
Copy link
Contributor

The goal is to completely replace TestShuffleWriter/Reader with local shuffle. Please refactor the test file completely.

Synced offline. the refactor is in another PR

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 19, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 19, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duxiao1212 thanks for the change % minors.

reinterpret_cast<const unsigned char*>(key1.data() + key1.size()),
reinterpret_cast<const unsigned char*>(key2.data()),
reinterpret_cast<const unsigned char*>(key2.data() + key2.size()));
const auto minSize = std::min(key1.size(), key2.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply use strcmp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to support non-null-terminated strings

const TRowSize keySize = folly::Endian::big(read<TRowSize>());
const TRowSize dataSize = folly::Endian::big(read<TRowSize>());

currentKey_ = nextStringView(keySize, keyStorage_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just use std::string for currentKey_ and currentData_ and currentKey() and currentData() can return a view on top of that? Also change data to value like

currentKey_
currentValue_

xiaoxmeng
xiaoxmeng previously approved these changes Nov 19, 2025
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duxiao1212 please address comments before land. Thanks!


while (auto* stream = merge_->next()) {
auto* reader = dynamic_cast<SortedFileInputStream*>(stream);
const auto data = reader->currentData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge currentValue and next into one? Consider to call it nextValue() comment say this also advance to the next value? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @xiaoxmeng. i think separating the read and advance next() operations is a fundamental requirement for correctness when early returns are possible. Otherwise, the code becomes more complex and harder to maintain.

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 19, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang, xiaoxmeng

Differential Revision: D86888221
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 19, 2025
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang, xiaoxmeng

Differential Revision: D86888221
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang, xiaoxmeng

Differential Revision: D86888221
Copy link
Contributor

@tanjialiang tanjialiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement!

@NikhilCollooru NikhilCollooru changed the title feat: Impl sort key for LocalShuffleReader feat: Implement sort key for LocalShuffleReader Nov 20, 2025
@duxiao1212 duxiao1212 merged commit 1db6b85 into prestodb:master Nov 20, 2025
85 of 87 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants