1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18+ #include " velox/common/file/FileInputStream.h"
19+
1820#include < folly/lang/Bits.h>
21+ #include < boost/range/algorithm/sort.hpp>
1922
20- using namespace facebook ::velox::exec;
21- using namespace facebook ::velox;
2223
2324namespace facebook ::presto::operators {
2425
2526using json = nlohmann::json;
2627
2728namespace {
2829
30+ using TStreamIdx = uint16_t ;
31+
32+ // / SortedFileInputStream reads sorted (key, data) pairs from a single
33+ // / shuffle file with buffered I/O. It extends FileInputStream for efficient
34+ // / buffered I/O and implements MergeStream interface for k-way merge.
35+ class SortedFileInputStream final : public velox::common::FileInputStream,
36+ public velox::MergeStream {
37+ public:
38+ SortedFileInputStream (
39+ const std::string& filePath,
40+ TStreamIdx streamIdx,
41+ velox::memory::MemoryPool* pool,
42+ size_t bufferSize = kDefaultInputStreamBufferSize )
43+ : velox::common::FileInputStream(
44+ velox::filesystems::getFileSystem (filePath, nullptr )
45+ ->openFileForRead(filePath),
46+ bufferSize,
47+ pool),
48+ streamIdx_(streamIdx) {
49+ // Initialize by reading first row.
50+ next ();
51+ }
52+
53+ ~SortedFileInputStream () override = default ;
54+
55+ // / Advances to next entry. Returns false if EOF reached.
56+ bool next () {
57+ if (atEnd ()) {
58+ currentKey_ = std::string_view ();
59+ currentData_ = std::string_view ();
60+ return false ;
61+ }
62+ const TRowSize keySize = folly::Endian::big (read<TRowSize>());
63+ const TRowSize dataSize = folly::Endian::big (read<TRowSize>());
64+
65+ currentKey_ = nextStringView (keySize, keyStorage_);
66+ currentData_ = nextStringView (dataSize, dataStorage_);
67+
68+ return true ;
69+ }
70+
71+ std::string_view currentKey () const {
72+ return currentKey_;
73+ }
74+
75+ std::string_view currentData () const {
76+ return currentData_;
77+ }
78+
79+ bool hasData () const override {
80+ return !currentData_.empty () || !atEnd ();
81+ }
82+
83+ bool operator <(const velox::MergeStream& other) const override {
84+ const auto * otherReader = static_cast <const SortedFileInputStream*>(&other);
85+ const auto cmp = compareKeys (currentKey_, otherReader->currentKey_ );
86+ if (cmp != std::strong_ordering::equal) {
87+ return cmp == std::strong_ordering::less;
88+ }
89+ // Tie-break using stream index for deterministic ordering
90+ return streamIdx_ < otherReader->streamIdx_ ;
91+ }
92+
93+ private:
94+ // / Returns a string view of the next 'size' bytes using nextView
95+ std::string_view nextStringView (TRowSize size, std::string& storage) {
96+ if (size == 0 ) {
97+ return {};
98+ }
99+ const auto view = nextView (size);
100+ if (view.size () == size) {
101+ return view;
102+ }
103+
104+ // Data crosses buffer boundary - must copy
105+ storage.resize (size);
106+ std::memcpy (storage.data (), view.data (), view.size ());
107+ readBytes (
108+ reinterpret_cast <uint8_t *>(storage.data ()) + view.size (),
109+ size - view.size ());
110+ return std::string_view (storage);
111+ }
112+
113+ const TStreamIdx streamIdx_;
114+ // Views into the FileInputStream buffer.
115+ std::string_view currentKey_;
116+ std::string_view currentData_;
117+
118+ // Temporary storage for edge cases when data crosses buffer boundaries
119+ std::string keyStorage_;
120+ std::string dataStorage_;
121+ };
122+
29123std::vector<RowMetadata>
30124extractRowMetadata (const char * buffer, size_t bufferSize, bool sortedShuffle) {
31125 std::vector<RowMetadata> rows;
@@ -91,13 +185,8 @@ extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
91185
92186inline std::string_view
93187extractRowData (const RowMetadata& row, const char * buffer, bool sortedShuffle) {
94- if (sortedShuffle) {
95- const size_t dataOffset = row.rowStart + (kUint32Size * 2 ) + row.keySize ;
96- return {buffer + dataOffset, row.dataSize };
97- } else {
98- const size_t dataOffset = row.rowStart + kUint32Size ;
99- return {buffer + dataOffset, row.dataSize };
100- }
188+ const auto dataOffset = row.rowStart + (sortedShuffle ? (kUint32Size * 2 ) + row.keySize : kUint32Size );
189+ return {buffer + dataOffset, row.dataSize };
101190}
102191
103192std::vector<RowMetadata> extractAndSortRowMetadata (
@@ -106,15 +195,15 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
106195 bool sortedShuffle) {
107196 auto rows = extractRowMetadata (buffer, bufferSize, sortedShuffle);
108197 if (!rows.empty () && sortedShuffle) {
109- std::sort (
110- rows.begin (),
111- rows.end (),
198+ boost::range::sort (
199+ rows,
112200 [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
113201 const char * lhsKey = buffer + lhs.rowStart + (kUint32Size * 2 );
114202 const char * rhsKey = buffer + rhs.rowStart + (kUint32Size * 2 );
115203 return compareKeys (
116- std::string_view (lhsKey, lhs.keySize ),
117- std::string_view (rhsKey, rhs.keySize ));
204+ std::string_view (lhsKey, lhs.keySize ),
205+ std::string_view (rhsKey, rhs.keySize )) ==
206+ std::strong_ordering::less;
118207 });
119208 }
120209 return rows;
@@ -277,9 +366,10 @@ void LocalShuffleWriter::collect(
277366 " key '{}' must be empty for non-sorted shuffle" ,
278367 key);
279368 const auto rowSize = this ->rowSize (key.size (), data.size ());
369+
280370 auto & buffer = inProgressPartitions_[partition];
281371 if (buffer == nullptr ) {
282- buffer = AlignedBuffer::allocate<char >(
372+ buffer = velox:: AlignedBuffer::allocate<char >(
283373 std::max (static_cast <uint64_t >(rowSize), maxBytesPerPartition_),
284374 pool_,
285375 0 );
@@ -319,31 +409,114 @@ LocalShuffleReader::LocalShuffleReader(
319409 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
320410}
321411
322- folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
323- LocalShuffleReader::next (uint64_t maxBytes) {
324- if (readPartitionFiles_.empty ()) {
325- readPartitionFiles_ = getReadPartitionFiles ();
412+ void LocalShuffleReader::initialize () {
413+ VELOX_CHECK (!initialized_, " LocalShuffleReader already initialized" );
414+
415+ readPartitionFiles_ = getReadPartitionFiles ();
416+
417+ if (sortedShuffle_ && !readPartitionFiles_.empty ()) {
418+ std::vector<std::unique_ptr<velox::MergeStream>> streams;
419+ streams.reserve (readPartitionFiles_.size ());
420+ TStreamIdx streamIdx = 0 ;
421+ for (const auto & filename : readPartitionFiles_) {
422+ VELOX_CHECK (
423+ !filename.empty (),
424+ " Invalid empty shuffle file path for query {}, partitions: [{}]" ,
425+ queryId_,
426+ folly::join (" , " , partitionIds_));
427+ auto reader = std::make_unique<SortedFileInputStream>(
428+ filename, streamIdx, pool_);
429+ if (reader->hasData ()) {
430+ streams.push_back (std::move (reader));
431+ ++streamIdx;
432+ }
433+ }
434+ if (!streams.empty ()) {
435+ merge_ =
436+ std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t >>(
437+ std::move (streams));
438+ }
439+ }
440+
441+ initialized_ = true ;
442+ }
443+
444+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted (
445+ uint64_t maxBytes) {
446+ std::vector<std::unique_ptr<ReadBatch>> batches;
447+
448+ if (merge_ == nullptr ) {
449+ return batches;
450+ }
451+
452+ auto batchBuffer = velox::AlignedBuffer::allocate<char >(maxBytes, pool_, 0 );
453+ std::vector<std::string_view> rows;
454+ uint64_t bufferUsed = 0 ;
455+
456+ while (auto * stream = merge_->next ()) {
457+ auto * reader = dynamic_cast <SortedFileInputStream*>(stream);
458+ const auto data = reader->currentData ();
459+ const auto rowSize = kUint32Size + data.size ();
460+
461+ // With the current row the bufferUsed byte will exceed the maxBytes
462+ if (bufferUsed + rowSize > maxBytes) {
463+ if (bufferUsed > 0 ) {
464+ // We have some rows already, return them to release the memory
465+ batches.push_back (
466+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
467+ return batches;
468+ }
469+ // Single row exceeds buffer - allocate larger buffer for this row
470+ batchBuffer = velox::AlignedBuffer::allocate<char >(rowSize, pool_, 0 );
471+ bufferUsed = 0 ;
472+ }
473+
474+ // Write row: [dataSize][data]
475+ char * writePos = batchBuffer->asMutable <char >() + bufferUsed;
476+ *reinterpret_cast <TRowSize*>(writePos) =
477+ folly::Endian::big (static_cast <TRowSize>(data.size ()));
478+
479+ if (!data.empty ()) {
480+ memcpy (writePos + sizeof (TRowSize), data.data (), data.size ());
481+ }
482+
483+ rows.emplace_back (batchBuffer->as <char >() + bufferUsed, rowSize);
484+ bufferUsed += rowSize;
485+
486+ reader->next ();
487+ }
488+
489+ if (!rows.empty ()) {
490+ batches.push_back (
491+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
326492 }
327493
494+ return batches;
495+ }
496+
497+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted (
498+ uint64_t maxBytes) {
328499 std::vector<std::unique_ptr<ReadBatch>> batches;
329500 uint64_t totalBytes{0 };
330- // Read files until we reach maxBytes limit or run out of files.
501+
331502 while (readPartitionFileIndex_ < readPartitionFiles_.size ()) {
332503 const auto filename = readPartitionFiles_[readPartitionFileIndex_];
333504 auto file = fileSystem_->openFileForRead (filename);
334505 const auto fileSize = file->size ();
335506
336- // Stop if adding this file would exceed maxBytes (unless we haven't read
337- // any files yet)
507+
508+ // TODO: Refactor to use streaming I/O with bounded buffer size instead of
509+ // loading entire files into memory at once. A streaming approach would
510+ // reduce peak memory consumption and enable processing arbitrarily large
511+ // shuffle files while maintaining constant memory usage.
338512 if (!batches.empty () && totalBytes + fileSize > maxBytes) {
339513 break ;
340514 }
341515
342- auto buffer = AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
516+ auto buffer = velox:: AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
343517 file->pread (0 , fileSize, buffer->asMutable <void >());
344518 ++readPartitionFileIndex_;
345519
346- // Parse the buffer to extract individual rows
347520 const char * data = buffer->as <char >();
348521 const auto parsedRows = extractRowMetadata (data, fileSize, sortedShuffle_);
349522 std::vector<std::string_view> rows;
@@ -357,7 +530,16 @@ LocalShuffleReader::next(uint64_t maxBytes) {
357530 std::make_unique<ReadBatch>(std::move (rows), std::move (buffer)));
358531 }
359532
360- return folly::makeSemiFuture (std::move (batches));
533+ return batches;
534+ }
535+
536+ folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
537+ LocalShuffleReader::next (uint64_t maxBytes) {
538+ VELOX_CHECK (
539+ initialized_,
540+ " LocalShuffleReader::initialize() must be called before next()" );
541+ return folly::makeSemiFuture (
542+ sortedShuffle_ ? nextSorted (maxBytes) : nextUnsorted (maxBytes));
361543}
362544
363545void LocalShuffleReader::noMoreData (bool success) {
@@ -403,12 +585,14 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
403585 velox::memory::MemoryPool* pool) {
404586 const operators::LocalShuffleReadInfo readInfo =
405587 operators::LocalShuffleReadInfo::deserialize (serializedStr);
406- return std::make_shared<LocalShuffleReader>(
588+ auto reader = std::make_shared<LocalShuffleReader>(
407589 readInfo.rootPath ,
408590 readInfo.queryId ,
409591 readInfo.partitionIds ,
410592 /* sortShuffle=*/ false , // default to false for now
411593 pool);
594+ reader->initialize ();
595+ return reader;
412596}
413597
414598std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter (
0 commit comments