1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18+ #include " velox/common/file/FileInputStream.h"
19+
1820#include < folly/lang/Bits.h>
21+ #include < boost/range/algorithm/sort.hpp>
1922
20- using namespace facebook ::velox::exec;
21- using namespace facebook ::velox;
2223
2324namespace facebook ::presto::operators {
2425
2526using json = nlohmann::json;
2627
2728namespace {
2829
30+ // / SortedFileInputStream reads sorted (key, data) pairs from a single
31+ // / shuffle file with buffered I/O. It extends FileInputStream for efficient
32+ // / buffered I/O and implements MergeStream interface for k-way merge.
33+ class SortedFileInputStream final : public velox::common::FileInputStream,
34+ public velox::MergeStream {
35+ public:
36+ SortedFileInputStream (
37+ const std::string& filePath,
38+ uint16_t streamIdx,
39+ velox::memory::MemoryPool* pool,
40+ size_t bufferSize = kDefaultInputStreamBufferSize );
41+
42+ ~SortedFileInputStream () override = default ;
43+
44+ // / Advances to next entry. Returns false if EOF reached.
45+ bool next ();
46+
47+ std::string_view currentKey () const {
48+ return currentKey_;
49+ }
50+
51+ std::string_view currentData () const {
52+ return currentData_;
53+ }
54+
55+ bool hasData () const override {
56+ return hasCurrentRow_;
57+ }
58+
59+ bool operator <(const velox::MergeStream& other) const override {
60+ const auto * otherReader = static_cast <const SortedFileInputStream*>(&other);
61+ const auto cmp = compareKeys (currentKey_, otherReader->currentKey_ );
62+ if (cmp != std::strong_ordering::equal) {
63+ return cmp == std::strong_ordering::less;
64+ }
65+ // Tie-break using stream index for deterministic ordering
66+ return streamIdx_ < otherReader->streamIdx_ ;
67+ }
68+
69+ const std::string& filePath () const {
70+ return filePath_;
71+ }
72+
73+ private:
74+ bool nextRow ();
75+
76+ const std::string filePath_;
77+ const uint16_t streamIdx_;
78+ std::string currentKey_;
79+ std::string currentData_;
80+ bool hasCurrentRow_{false };
81+ };
82+
2983std::vector<RowMetadata>
3084extractRowMetadata (const char * buffer, size_t bufferSize, bool sortedShuffle) {
3185 std::vector<RowMetadata> rows;
@@ -91,13 +145,8 @@ extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
91145
92146inline std::string_view
93147extractRowData (const RowMetadata& row, const char * buffer, bool sortedShuffle) {
94- if (sortedShuffle) {
95- const size_t dataOffset = row.rowStart + (kUint32Size * 2 ) + row.keySize ;
96- return {buffer + dataOffset, row.dataSize };
97- } else {
98- const size_t dataOffset = row.rowStart + kUint32Size ;
99- return {buffer + dataOffset, row.dataSize };
100- }
148+ const auto dataOffset = row.rowStart + (sortedShuffle ? (kUint32Size * 2 ) + row.keySize : kUint32Size );
149+ return {buffer + dataOffset, row.dataSize };
101150}
102151
103152std::vector<RowMetadata> extractAndSortRowMetadata (
@@ -106,15 +155,15 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
106155 bool sortedShuffle) {
107156 auto rows = extractRowMetadata (buffer, bufferSize, sortedShuffle);
108157 if (!rows.empty () && sortedShuffle) {
109- std::sort (
110- rows.begin (),
111- rows.end (),
158+ boost::range::sort (
159+ rows,
112160 [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
113161 const char * lhsKey = buffer + lhs.rowStart + (kUint32Size * 2 );
114162 const char * rhsKey = buffer + rhs.rowStart + (kUint32Size * 2 );
115163 return compareKeys (
116- std::string_view (lhsKey, lhs.keySize ),
117- std::string_view (rhsKey, rhs.keySize ));
164+ std::string_view (lhsKey, lhs.keySize ),
165+ std::string_view (rhsKey, rhs.keySize )) ==
166+ std::strong_ordering::less;
118167 });
119168 }
120169 return rows;
@@ -277,9 +326,10 @@ void LocalShuffleWriter::collect(
277326 " key '{}' must be empty for non-sorted shuffle" ,
278327 key);
279328 const auto rowSize = this ->rowSize (key.size (), data.size ());
329+
280330 auto & buffer = inProgressPartitions_[partition];
281331 if (buffer == nullptr ) {
282- buffer = AlignedBuffer::allocate<char >(
332+ buffer = velox:: AlignedBuffer::allocate<char >(
283333 std::max (static_cast <uint64_t >(rowSize), maxBytesPerPartition_),
284334 pool_,
285335 0 );
@@ -305,6 +355,49 @@ void LocalShuffleWriter::noMoreData(bool success) {
305355 }
306356}
307357
358+ SortedFileInputStream::SortedFileInputStream (
359+ const std::string& filePath,
360+ uint16_t streamIdx,
361+ velox::memory::MemoryPool* pool,
362+ size_t bufferSize)
363+ : velox::common::FileInputStream(
364+ velox::filesystems::getFileSystem (filePath, nullptr )
365+ ->openFileForRead(filePath),
366+ bufferSize,
367+ pool),
368+ filePath_(filePath),
369+ streamIdx_(streamIdx) {
370+ // Initialize by reading first row.
371+ nextRow ();
372+ }
373+
374+ bool SortedFileInputStream::next () {
375+ return nextRow ();
376+ }
377+
378+ bool SortedFileInputStream::nextRow () {
379+ if (atEnd ()) {
380+ currentKey_.clear ();
381+ currentData_.clear ();
382+ hasCurrentRow_ = false ;
383+ return false ;
384+ }
385+ const auto keySize = folly::Endian::big (read<TRowSize>());
386+ const auto dataSize = folly::Endian::big (read<TRowSize>());
387+
388+ currentKey_.resize (keySize);
389+ if (keySize > 0 ) {
390+ readBytes (reinterpret_cast <uint8_t *>(currentKey_.data ()), keySize);
391+ }
392+ currentData_.resize (dataSize);
393+ if (dataSize > 0 ) {
394+ readBytes (reinterpret_cast <uint8_t *>(currentData_.data ()), dataSize);
395+ }
396+
397+ hasCurrentRow_ = true ;
398+ return true ;
399+ }
400+
308401LocalShuffleReader::LocalShuffleReader (
309402 const std::string& rootPath,
310403 const std::string& queryId,
@@ -319,31 +412,113 @@ LocalShuffleReader::LocalShuffleReader(
319412 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
320413}
321414
322- folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
323- LocalShuffleReader::next (uint64_t maxBytes) {
324- if (readPartitionFiles_.empty ()) {
325- readPartitionFiles_ = getReadPartitionFiles ();
415+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted (
416+ uint64_t maxBytes) {
417+ std::vector<std::unique_ptr<ReadBatch>> batches;
418+
419+ if (!mergeInitialized_) {
420+ if (readPartitionFiles_.empty ()) {
421+ mergeInitialized_ = true ;
422+ return batches;
423+ }
424+
425+ std::vector<std::unique_ptr<velox::MergeStream>> streams;
426+ streams.reserve (readPartitionFiles_.size ());
427+ uint16_t streamIdx = 0 ;
428+ for (const auto & filename : readPartitionFiles_) {
429+ VELOX_CHECK (
430+ !filename.empty (),
431+ " Invalid empty shuffle file path for query {}, partitions: [{}]" ,
432+ queryId_,
433+ folly::join (" , " , partitionIds_));
434+ auto reader = std::make_unique<SortedFileInputStream>(
435+ filename, streamIdx, pool_);
436+ if (reader->hasData ()) {
437+ streams.push_back (std::move (reader));
438+ ++streamIdx;
439+ }
440+ }
441+ if (!streams.empty ()) {
442+ merge_ =
443+ std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t >>(
444+ std::move (streams));
445+ }
446+
447+ mergeInitialized_ = true ;
326448 }
327449
450+ if (merge_ == nullptr ) {
451+ return batches;
452+ }
453+
454+ auto batchBuffer = velox::AlignedBuffer::allocate<char >(maxBytes, pool_, 0 );
455+ std::vector<std::string_view> rows;
456+ uint64_t bufferUsed = 0 ;
457+
458+ while (auto * stream = merge_->next ()) {
459+ auto * reader = dynamic_cast <SortedFileInputStream*>(stream);
460+ const auto data = reader->currentData ();
461+ const auto rowSize = kUint32Size + data.size ();
462+
463+ // With the current row the bufferUsed byte will exceed the maxBytes
464+ if (bufferUsed + rowSize > maxBytes) {
465+ if (bufferUsed > 0 ) {
466+ // We have some rows already, return them to release the memory
467+ batches.push_back (
468+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
469+ return batches;
470+ }
471+ // Single row exceeds buffer - allocate larger buffer for this row
472+ batchBuffer = velox::AlignedBuffer::allocate<char >(rowSize, pool_, 0 );
473+ bufferUsed = 0 ;
474+ }
475+
476+ // Write row: [dataSize][data]
477+ char * writePos = batchBuffer->asMutable <char >() + bufferUsed;
478+ *reinterpret_cast <TRowSize*>(writePos) =
479+ folly::Endian::big (static_cast <TRowSize>(data.size ()));
480+
481+ if (!data.empty ()) {
482+ memcpy (writePos + sizeof (TRowSize), data.data (), data.size ());
483+ }
484+
485+ rows.emplace_back (batchBuffer->as <char >() + bufferUsed, rowSize);
486+ bufferUsed += rowSize;
487+
488+ reader->next ();
489+ }
490+
491+ if (!rows.empty ()) {
492+ batches.push_back (
493+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
494+ }
495+
496+ return batches;
497+ }
498+
499+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted (
500+ uint64_t maxBytes) {
328501 std::vector<std::unique_ptr<ReadBatch>> batches;
329502 uint64_t totalBytes{0 };
330- // Read files until we reach maxBytes limit or run out of files.
503+
331504 while (readPartitionFileIndex_ < readPartitionFiles_.size ()) {
332505 const auto filename = readPartitionFiles_[readPartitionFileIndex_];
333506 auto file = fileSystem_->openFileForRead (filename);
334507 const auto fileSize = file->size ();
335508
336- // Stop if adding this file would exceed maxBytes (unless we haven't read
337- // any files yet)
509+
510+ // TODO: Refactor to use streaming I/O with bounded buffer size instead of
511+ // loading entire files into memory at once. A streaming approach would
512+ // reduce peak memory consumption and enable processing arbitrarily large
513+ // shuffle files while maintaining constant memory usage.
338514 if (!batches.empty () && totalBytes + fileSize > maxBytes) {
339515 break ;
340516 }
341517
342- auto buffer = AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
518+ auto buffer = velox:: AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
343519 file->pread (0 , fileSize, buffer->asMutable <void >());
344520 ++readPartitionFileIndex_;
345521
346- // Parse the buffer to extract individual rows
347522 const char * data = buffer->as <char >();
348523 const auto parsedRows = extractRowMetadata (data, fileSize, sortedShuffle_);
349524 std::vector<std::string_view> rows;
@@ -357,7 +532,16 @@ LocalShuffleReader::next(uint64_t maxBytes) {
357532 std::make_unique<ReadBatch>(std::move (rows), std::move (buffer)));
358533 }
359534
360- return folly::makeSemiFuture (std::move (batches));
535+ return batches;
536+ }
537+
538+ folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
539+ LocalShuffleReader::next (uint64_t maxBytes) {
540+ if (readPartitionFiles_.empty ()) {
541+ readPartitionFiles_ = getReadPartitionFiles ();
542+ }
543+ return folly::makeSemiFuture (
544+ sortedShuffle_ ? nextSorted (maxBytes) : nextUnsorted (maxBytes));
361545}
362546
363547void LocalShuffleReader::noMoreData (bool success) {
0 commit comments