@@ -113,8 +113,9 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
113113 const char * lhsKey = buffer + lhs.rowStart + (kUint32Size * 2 );
114114 const char * rhsKey = buffer + rhs.rowStart + (kUint32Size * 2 );
115115 return compareKeys (
116- std::string_view (lhsKey, lhs.keySize ),
117- std::string_view (rhsKey, rhs.keySize ));
116+ std::string_view (lhsKey, lhs.keySize ),
117+ std::string_view (rhsKey, rhs.keySize )) ==
118+ std::strong_ordering::less;
118119 });
119120 }
120121 return rows;
@@ -277,6 +278,17 @@ void LocalShuffleWriter::collect(
277278 " key '{}' must be empty for non-sorted shuffle" ,
278279 key);
279280 const auto rowSize = this ->rowSize (key.size (), data.size ());
281+
282+ VELOX_CHECK_LE (
283+ rowSize,
284+ kDefaultStreamReaderBufferSize ,
285+ " Row size {} bytes (keySize={}, dataSize={}) exceeds maximum reader buffer size {} bytes. "
286+ " This row would be unreadable." ,
287+ rowSize,
288+ key.size (),
289+ data.size (),
290+ kDefaultStreamReaderBufferSize );
291+
280292 auto & buffer = inProgressPartitions_[partition];
281293 if (buffer == nullptr ) {
282294 buffer = AlignedBuffer::allocate<char >(
@@ -305,6 +317,116 @@ void LocalShuffleWriter::noMoreData(bool success) {
305317 }
306318}
307319
320+ LocalShuffleReader::SortedShuffleFileStreamReader::
321+ SortedShuffleFileStreamReader (
322+ const std::string& filePath,
323+ uint16_t streamIdx,
324+ velox::memory::MemoryPool* pool,
325+ size_t bufferSize)
326+ : pool_(pool),
327+ filePath_(filePath),
328+ streamIdx_(streamIdx) {
329+ auto fs = velox::filesystems::getFileSystem (filePath, nullptr );
330+ file_ = fs->openFileForRead (filePath);
331+ fileSize_ = file_->size ();
332+ // Set bufferSize_ to actual allocated size: min of requested buffer size and file size
333+ bufferSize_ = std::min (bufferSize, fileSize_);
334+ buffer_ = velox::AlignedBuffer::allocate<char >(bufferSize_, pool_);
335+ if (!nextRow ()) {
336+ eof_ = true ;
337+ }
338+ }
339+
340+ bool LocalShuffleReader::SortedShuffleFileStreamReader::next () {
341+ if (eof_) {
342+ return false ;
343+ }
344+ eof_ = !nextRow ();
345+ return !eof_;
346+ }
347+
348+ void LocalShuffleReader::SortedShuffleFileStreamReader::fillBuffer () {
349+ // Compact buffer: move remaining data to the beginning
350+ if (bufferPos_ > 0 && bufferPos_ < bytesInBuffer_) {
351+ const size_t remaining = bytesInBuffer_ - bufferPos_;
352+ std::memmove (
353+ buffer_->asMutable <char >(),
354+ buffer_->as <char >() + bufferPos_,
355+ remaining);
356+ bytesInBuffer_ = remaining;
357+ } else if (bufferPos_ >= bytesInBuffer_) {
358+ bytesInBuffer_ = 0 ;
359+ }
360+
361+ bufferPos_ = 0 ;
362+
363+ // Read more data from file to fill remaining buffer space
364+ if (bytesInBuffer_ < bufferSize_ && filePos_ < fileSize_) {
365+ const size_t toRead =
366+ std::min (bufferSize_ - bytesInBuffer_, fileSize_ - filePos_);
367+
368+ if (toRead > 0 ) {
369+ const std::string_view bytesRead = file_->pread (
370+ filePos_, toRead, buffer_->asMutable <char >() + bytesInBuffer_);
371+ bytesInBuffer_ += bytesRead.size ();
372+ filePos_ += bytesRead.size ();
373+ }
374+ }
375+ }
376+
377+ bool LocalShuffleReader::SortedShuffleFileStreamReader::nextRow () {
378+ constexpr size_t kHeaderSize = sizeof (TRowSize) * 2 ;
379+ if (bufferPos_ + kHeaderSize > bytesInBuffer_) {
380+ fillBuffer ();
381+ if (bytesInBuffer_ < kHeaderSize ) {
382+ // If we have some bytes but not enough for a header, the file is
383+ // corrupted
384+ VELOX_CHECK_EQ (
385+ bytesInBuffer_,
386+ 0 ,
387+ " Corrupted shuffle file {}: incomplete header at file position {}, "
388+ " expected {} bytes but only {} bytes available" ,
389+ filePath_,
390+ filePos_,
391+ kHeaderSize ,
392+ bytesInBuffer_);
393+
394+ return false ;
395+ }
396+ }
397+
398+ const char * pos = buffer_->as <char >() + bufferPos_;
399+ const TRowSize keySize =
400+ folly::Endian::big (*reinterpret_cast <const TRowSize*>(pos));
401+ const TRowSize dataSize = folly::Endian::big (
402+ *reinterpret_cast <const TRowSize*>(pos + sizeof (TRowSize)));
403+
404+ bufferPos_ += kHeaderSize ;
405+ const size_t rowSize = keySize + dataSize;
406+ // Ensure buffer contains complete entry (rowSize must fit in bufferSize)
407+ if (bufferPos_ + rowSize > bytesInBuffer_) {
408+ fillBuffer ();
409+ // If still not enough data after fill, the file is corrupted
410+ VELOX_CHECK_GE (
411+ bytesInBuffer_,
412+ rowSize,
413+ " Corrupted shuffle file {}: incomplete entry at file position {}, "
414+ " expected {} bytes (keySize={}, dataSize={}) but only {} bytes available" ,
415+ filePath_,
416+ filePos_,
417+ rowSize,
418+ keySize,
419+ dataSize,
420+ bytesInBuffer_);
421+ }
422+
423+ pos = buffer_->as <char >() + bufferPos_;
424+ currentKey_.assign (pos, keySize);
425+ currentData_.assign (pos + keySize, dataSize);
426+ bufferPos_ += rowSize;
427+ return true ;
428+ }
429+
308430LocalShuffleReader::LocalShuffleReader (
309431 const std::string& rootPath,
310432 const std::string& queryId,
@@ -319,22 +441,120 @@ LocalShuffleReader::LocalShuffleReader(
319441 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
320442}
321443
322- folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
323- LocalShuffleReader::next (uint64_t maxBytes) {
324- if (readPartitionFiles_.empty ()) {
325- readPartitionFiles_ = getReadPartitionFiles ();
444+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted (
445+ uint64_t maxBytes) {
446+ std::vector<std::unique_ptr<ReadBatch>> batches;
447+
448+ if (!mergeInitialized_) {
449+ if (readPartitionFiles_.empty ()) {
450+ mergeInitialized_ = true ;
451+ return batches;
452+ }
453+
454+ std::vector<std::unique_ptr<velox::MergeStream>> streams;
455+ streams.reserve (readPartitionFiles_.size ());
456+ uint16_t streamIndex = 0 ;
457+ for (const auto & filename : readPartitionFiles_) {
458+ VELOX_CHECK (
459+ !filename.empty (),
460+ " Invalid empty shuffle file path for query {}, partitions: [{}]" ,
461+ queryId_,
462+ folly::join (" , " , partitionIds_));
463+ auto reader = std::make_unique<SortedShuffleFileStreamReader>(
464+ filename, streamIndex, pool_);
465+ if (reader->hasData ()) {
466+ streams.push_back (std::move (reader));
467+ ++streamIndex;
468+ }
469+ }
470+ if (!streams.empty ()) {
471+ merge_ =
472+ std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t >>(
473+ std::move (streams));
474+ }
475+
476+ mergeInitialized_ = true ;
477+ }
478+
479+ auto batchBuffer = AlignedBuffer::allocate<char >(maxBytes, pool_, 0 );
480+ std::vector<std::string_view> rows;
481+ size_t totalBytes = 0 ;
482+ size_t bufferUsed = 0 ;
483+
484+ VELOX_CHECK (
485+ merge_ || readPartitionFiles_.empty (),
486+ " Failed to initialize merge: no valid shuffle files found" );
487+
488+ if (merge_ == nullptr ) {
489+ return batches;
326490 }
327491
492+ while (auto * stream = merge_->next ()) {
493+ auto * reader = dynamic_cast <SortedShuffleFileStreamReader*>(stream);
494+ const auto data = reader->currentData ();
495+ const size_t rowSize = kUint32Size + data.size ();
496+
497+ // Ensure single row doesn't exceed maxBytes limit
498+ VELOX_CHECK_LE (
499+ rowSize,
500+ maxBytes,
501+ " Single row size {} bytes exceeds maxBytes limit {} bytes. "
502+ " This indicates data corruption or misconfiguration." ,
503+ rowSize,
504+ maxBytes);
505+
506+ // Check if adding this row would exceed maxBytes limit
507+ if (totalBytes > 0 && totalBytes + rowSize > maxBytes) {
508+ batches.push_back (
509+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
510+ return batches;
511+ }
512+
513+ // Write row: [dataSize][data]
514+ char * writePos = batchBuffer->asMutable <char >() + bufferUsed;
515+ *reinterpret_cast <TRowSize*>(writePos) =
516+ folly::Endian::big (static_cast <TRowSize>(data.size ()));
517+
518+ if (!data.empty ()) {
519+ memcpy (writePos + sizeof (TRowSize), data.data (), data.size ());
520+ }
521+
522+ rows.emplace_back (batchBuffer->as <char >() + bufferUsed, rowSize);
523+ bufferUsed += rowSize;
524+ totalBytes += rowSize;
525+
526+ reader->next ();
527+ }
528+
529+ if (!rows.empty ()) {
530+ batches.push_back (
531+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
532+ }
533+
534+ return batches;
535+ }
536+
537+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted (
538+ uint64_t maxBytes) {
328539 std::vector<std::unique_ptr<ReadBatch>> batches;
329540 uint64_t totalBytes{0 };
330- // Read files until we reach maxBytes limit or run out of files.
541+
331542 while (readPartitionFileIndex_ < readPartitionFiles_.size ()) {
332543 const auto filename = readPartitionFiles_[readPartitionFileIndex_];
333544 auto file = fileSystem_->openFileForRead (filename);
334545 const auto fileSize = file->size ();
335546
336- // Stop if adding this file would exceed maxBytes (unless we haven't read
337- // any files yet)
547+ // Ensure single file doesn't exceed maxBytes limit
548+ // This guarantees we can always make progress and prevents OOM
549+ VELOX_CHECK_LE (
550+ fileSize,
551+ maxBytes,
552+ " Shuffle file {} size {} bytes exceeds maxBytes limit {} bytes. "
553+ " This indicates data corruption or misconfiguration." ,
554+ filename,
555+ fileSize,
556+ maxBytes);
557+
338558 if (!batches.empty () && totalBytes + fileSize > maxBytes) {
339559 break ;
340560 }
@@ -343,7 +563,6 @@ LocalShuffleReader::next(uint64_t maxBytes) {
343563 file->pread (0 , fileSize, buffer->asMutable <void >());
344564 ++readPartitionFileIndex_;
345565
346- // Parse the buffer to extract individual rows
347566 const char * data = buffer->as <char >();
348567 const auto parsedRows = extractRowMetadata (data, fileSize, sortedShuffle_);
349568 std::vector<std::string_view> rows;
@@ -357,7 +576,16 @@ LocalShuffleReader::next(uint64_t maxBytes) {
357576 std::make_unique<ReadBatch>(std::move (rows), std::move (buffer)));
358577 }
359578
360- return folly::makeSemiFuture (std::move (batches));
579+ return batches;
580+ }
581+
582+ folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
583+ LocalShuffleReader::next (uint64_t maxBytes) {
584+ if (readPartitionFiles_.empty ()) {
585+ readPartitionFiles_ = getReadPartitionFiles ();
586+ }
587+ return folly::makeSemiFuture (
588+ sortedShuffle_ ? nextSorted (maxBytes) : nextUnsorted (maxBytes));
361589}
362590
363591void LocalShuffleReader::noMoreData (bool success) {
@@ -381,7 +609,7 @@ std::vector<std::string> LocalShuffleReader::getReadPartitionFiles() const {
381609 fmt::format (" {}/{}_{}_" , trimmedRootPath, queryId_, partitionId);
382610 auto files = fileSystem_->list (fmt::format (" {}/" , rootPath_));
383611 for (const auto & file : files) {
384- if (file.starts_with (prefix)) {
612+ if (file.find (prefix) == 0 ) {
385613 partitionFiles.push_back (file);
386614 }
387615 }
0 commit comments