1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18- #include < folly/lang/Bits.h >
18+ #include " velox/common/file/FileInputStream.h "
1919
20- using namespace facebook ::velox::exec;
21- using namespace facebook ::velox;
20+ #include < boost/range/algorithm/sort.hpp>
2221
2322namespace facebook ::presto::operators {
2423
2524using json = nlohmann::json;
2625
2726namespace {
2827
28+ using TStreamIdx = uint16_t ;
29+
30+ // / SortedFileInputStream reads sorted (key, data) pairs from a single
31+ // / shuffle file with buffered I/O. It extends FileInputStream for efficient
32+ // / buffered I/O and implements MergeStream interface for k-way merge.
33+ class SortedFileInputStream final
34+ : public velox::common::FileInputStream,
35+ public velox::MergeStream {
36+ public:
37+ SortedFileInputStream (
38+ const std::string& filePath,
39+ TStreamIdx streamIdx,
40+ velox::memory::MemoryPool* pool,
41+ size_t bufferSize = kDefaultInputStreamBufferSize )
42+ : velox::common::FileInputStream(
43+ velox::filesystems::getFileSystem (filePath, nullptr )
44+ ->openFileForRead(filePath),
45+ bufferSize,
46+ pool),
47+ streamIdx_(streamIdx) {
48+ next ();
49+ }
50+
51+ ~SortedFileInputStream () override = default ;
52+
53+ bool next () {
54+ if (atEnd ()) {
55+ currentKey_ = {};
56+ currentData_ = {};
57+ keyStorage_.clear ();
58+ dataStorage_.clear ();
59+ return false ;
60+ }
61+ const TRowSize keySize = folly::Endian::big (read<TRowSize>());
62+ const TRowSize dataSize = folly::Endian::big (read<TRowSize>());
63+
64+ currentKey_ = nextStringView (keySize, keyStorage_);
65+ currentData_ = nextStringView (dataSize, dataStorage_);
66+ return true ;
67+ }
68+
69+ std::string_view currentKey () const {
70+ return currentKey_;
71+ }
72+
73+ std::string_view currentData () const {
74+ return currentData_;
75+ }
76+
77+ bool hasData () const override {
78+ return !currentData_.empty () || !atEnd ();
79+ }
80+
81+ bool operator <(const velox::MergeStream& other) const override {
82+ const auto * otherReader = static_cast <const SortedFileInputStream*>(&other);
83+ const auto cmp = compareKeys (currentKey_, otherReader->currentKey_ );
84+ if (cmp != std::strong_ordering::equal) {
85+ return cmp == std::strong_ordering::less;
86+ }
87+ return streamIdx_ < otherReader->streamIdx_ ;
88+ }
89+
90+ private:
91+ // Returns string_view using zero-copy when data fits in buffer,
92+ // otherwise copies to storage when crossing buffer boundaries.
93+ std::string_view nextStringView (TRowSize size, std::string& storage) {
94+ if (size == 0 ) {
95+ return {};
96+ }
97+ auto view = nextView (size);
98+ if (view.size () == size) {
99+ return view;
100+ }
101+ storage.resize (size);
102+ std::memcpy (storage.data (), view.data (), view.size ());
103+ readBytes (
104+ reinterpret_cast <uint8_t *>(storage.data ()) + view.size (),
105+ size - view.size ());
106+ return std::string_view (storage);
107+ }
108+
109+ const TStreamIdx streamIdx_;
110+ std::string_view currentKey_;
111+ std::string_view currentData_;
112+ std::string keyStorage_;
113+ std::string dataStorage_;
114+ };
115+
29116std::vector<RowMetadata>
30117extractRowMetadata (const char * buffer, size_t bufferSize, bool sortedShuffle) {
31118 std::vector<RowMetadata> rows;
@@ -91,13 +178,8 @@ extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
91178
92179inline std::string_view
93180extractRowData (const RowMetadata& row, const char * buffer, bool sortedShuffle) {
94- if (sortedShuffle) {
95- const size_t dataOffset = row.rowStart + (kUint32Size * 2 ) + row.keySize ;
96- return {buffer + dataOffset, row.dataSize };
97- } else {
98- const size_t dataOffset = row.rowStart + kUint32Size ;
99- return {buffer + dataOffset, row.dataSize };
100- }
181+ const auto dataOffset = row.rowStart + (sortedShuffle ? (kUint32Size * 2 ) + row.keySize : kUint32Size );
182+ return {buffer + dataOffset, row.dataSize };
101183}
102184
103185std::vector<RowMetadata> extractAndSortRowMetadata (
@@ -106,15 +188,15 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
106188 bool sortedShuffle) {
107189 auto rows = extractRowMetadata (buffer, bufferSize, sortedShuffle);
108190 if (!rows.empty () && sortedShuffle) {
109- std::sort (
110- rows.begin (),
111- rows.end (),
191+ boost::range::sort (
192+ rows,
112193 [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
113194 const char * lhsKey = buffer + lhs.rowStart + (kUint32Size * 2 );
114195 const char * rhsKey = buffer + rhs.rowStart + (kUint32Size * 2 );
115196 return compareKeys (
116- std::string_view (lhsKey, lhs.keySize ),
117- std::string_view (rhsKey, rhs.keySize ));
197+ std::string_view (lhsKey, lhs.keySize ),
198+ std::string_view (rhsKey, rhs.keySize )) ==
199+ std::strong_ordering::less;
118200 });
119201 }
120202 return rows;
@@ -277,9 +359,10 @@ void LocalShuffleWriter::collect(
277359 " key '{}' must be empty for non-sorted shuffle" ,
278360 key);
279361 const auto rowSize = this ->rowSize (key.size (), data.size ());
362+
280363 auto & buffer = inProgressPartitions_[partition];
281364 if (buffer == nullptr ) {
282- buffer = AlignedBuffer::allocate<char >(
365+ buffer = velox:: AlignedBuffer::allocate<char >(
283366 std::max (static_cast <uint64_t >(rowSize), maxBytesPerPartition_),
284367 pool_,
285368 0 );
@@ -319,31 +402,114 @@ LocalShuffleReader::LocalShuffleReader(
319402 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
320403}
321404
322- folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
323- LocalShuffleReader::next (uint64_t maxBytes) {
324- if (readPartitionFiles_.empty ()) {
325- readPartitionFiles_ = getReadPartitionFiles ();
405+ void LocalShuffleReader::initialize () {
406+ VELOX_CHECK (!initialized_, " LocalShuffleReader already initialized" );
407+
408+ readPartitionFiles_ = getReadPartitionFiles ();
409+
410+ if (sortedShuffle_ && !readPartitionFiles_.empty ()) {
411+ std::vector<std::unique_ptr<velox::MergeStream>> streams;
412+ streams.reserve (readPartitionFiles_.size ());
413+ TStreamIdx streamIdx = 0 ;
414+ for (const auto & filename : readPartitionFiles_) {
415+ VELOX_CHECK (
416+ !filename.empty (),
417+ " Invalid empty shuffle file path for query {}, partitions: [{}]" ,
418+ queryId_,
419+ folly::join (" , " , partitionIds_));
420+ auto reader = std::make_unique<SortedFileInputStream>(
421+ filename, streamIdx, pool_);
422+ if (reader->hasData ()) {
423+ streams.push_back (std::move (reader));
424+ ++streamIdx;
425+ }
426+ }
427+ if (!streams.empty ()) {
428+ merge_ =
429+ std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t >>(
430+ std::move (streams));
431+ }
432+ }
433+
434+ initialized_ = true ;
435+ }
436+
437+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted (
438+ uint64_t maxBytes) {
439+ std::vector<std::unique_ptr<ReadBatch>> batches;
440+
441+ if (merge_ == nullptr ) {
442+ return batches;
443+ }
444+
445+ auto batchBuffer = velox::AlignedBuffer::allocate<char >(maxBytes, pool_, 0 );
446+ std::vector<std::string_view> rows;
447+ uint64_t bufferUsed = 0 ;
448+
449+ while (auto * stream = merge_->next ()) {
450+ auto * reader = dynamic_cast <SortedFileInputStream*>(stream);
451+ const auto data = reader->currentData ();
452+ const auto rowSize = kUint32Size + data.size ();
453+
454+ // With the current row the bufferUsed byte will exceed the maxBytes
455+ if (bufferUsed + rowSize > maxBytes) {
456+ if (bufferUsed > 0 ) {
457+ // We have some rows already, return them to release the memory
458+ batches.push_back (
459+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
460+ return batches;
461+ }
462+ // Single row exceeds buffer - allocate larger buffer for this row
463+ batchBuffer = velox::AlignedBuffer::allocate<char >(rowSize, pool_, 0 );
464+ bufferUsed = 0 ;
465+ }
466+
467+ // Write row: [dataSize][data]
468+ char * writePos = batchBuffer->asMutable <char >() + bufferUsed;
469+ *reinterpret_cast <TRowSize*>(writePos) =
470+ folly::Endian::big (static_cast <TRowSize>(data.size ()));
471+
472+ if (!data.empty ()) {
473+ memcpy (writePos + sizeof (TRowSize), data.data (), data.size ());
474+ }
475+
476+ rows.emplace_back (batchBuffer->as <char >() + bufferUsed, rowSize);
477+ bufferUsed += rowSize;
478+
479+ reader->next ();
326480 }
327481
482+ if (!rows.empty ()) {
483+ batches.push_back (
484+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
485+ }
486+
487+ return batches;
488+ }
489+
490+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted (
491+ uint64_t maxBytes) {
328492 std::vector<std::unique_ptr<ReadBatch>> batches;
329493 uint64_t totalBytes{0 };
330- // Read files until we reach maxBytes limit or run out of files.
494+
331495 while (readPartitionFileIndex_ < readPartitionFiles_.size ()) {
332496 const auto filename = readPartitionFiles_[readPartitionFileIndex_];
333497 auto file = fileSystem_->openFileForRead (filename);
334498 const auto fileSize = file->size ();
335499
336- // Stop if adding this file would exceed maxBytes (unless we haven't read
337- // any files yet)
500+
501+ // TODO: Refactor to use streaming I/O with bounded buffer size instead of
502+ // loading entire files into memory at once. A streaming approach would
503+ // reduce peak memory consumption and enable processing arbitrarily large
504+ // shuffle files while maintaining constant memory usage.
338505 if (!batches.empty () && totalBytes + fileSize > maxBytes) {
339506 break ;
340507 }
341508
342- auto buffer = AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
509+ auto buffer = velox:: AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
343510 file->pread (0 , fileSize, buffer->asMutable <void >());
344511 ++readPartitionFileIndex_;
345512
346- // Parse the buffer to extract individual rows
347513 const char * data = buffer->as <char >();
348514 const auto parsedRows = extractRowMetadata (data, fileSize, sortedShuffle_);
349515 std::vector<std::string_view> rows;
@@ -357,7 +523,16 @@ LocalShuffleReader::next(uint64_t maxBytes) {
357523 std::make_unique<ReadBatch>(std::move (rows), std::move (buffer)));
358524 }
359525
360- return folly::makeSemiFuture (std::move (batches));
526+ return batches;
527+ }
528+
529+ folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
530+ LocalShuffleReader::next (uint64_t maxBytes) {
531+ VELOX_CHECK (
532+ initialized_,
533+ " LocalShuffleReader::initialize() must be called before next()" );
534+ return folly::makeSemiFuture (
535+ sortedShuffle_ ? nextSorted (maxBytes) : nextUnsorted (maxBytes));
361536}
362537
363538void LocalShuffleReader::noMoreData (bool success) {
@@ -403,12 +578,14 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
403578 velox::memory::MemoryPool* pool) {
404579 const operators::LocalShuffleReadInfo readInfo =
405580 operators::LocalShuffleReadInfo::deserialize (serializedStr);
406- return std::make_shared<LocalShuffleReader>(
581+ auto reader = std::make_shared<LocalShuffleReader>(
407582 readInfo.rootPath ,
408583 readInfo.queryId ,
409584 readInfo.partitionIds ,
410585 /* sortShuffle=*/ false , // default to false for now
411586 pool);
587+ reader->initialize ();
588+ return reader;
412589}
413590
414591std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter (
0 commit comments