1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18- #include < folly/lang/Bits.h >
18+ #include " velox/common/file/FileInputStream.h "
1919
20- using namespace facebook ::velox::exec;
21- using namespace facebook ::velox;
20+ #include < boost/range/algorithm/sort.hpp>
2221
2322namespace facebook ::presto::operators {
2423
2524using json = nlohmann::json;
2625
2726namespace {
2827
28+ using TStreamIdx = uint16_t ;
29+
30+ // / SortedFileInputStream reads sorted (key, data) pairs from a single
31+ // / shuffle file with buffered I/O. It extends FileInputStream for efficient
32+ // / buffered I/O and implements MergeStream interface for k-way merge.
33+ class SortedFileInputStream final : public velox::common::FileInputStream,
34+ public velox::MergeStream {
35+ public:
36+ SortedFileInputStream (
37+ const std::string& filePath,
38+ TStreamIdx streamIdx,
39+ velox::memory::MemoryPool* pool,
40+ size_t bufferSize = kDefaultInputStreamBufferSize )
41+ : velox::common::FileInputStream(
42+ velox::filesystems::getFileSystem (filePath, nullptr )
43+ ->openFileForRead(filePath),
44+ bufferSize,
45+ pool),
46+ streamIdx_(streamIdx) {
47+ next ();
48+ }
49+
50+ ~SortedFileInputStream () override = default ;
51+
52+ bool next () {
53+ if (atEnd ()) {
54+ currentKey_ = {};
55+ currentData_ = {};
56+ keyStorage_.clear ();
57+ dataStorage_.clear ();
58+ return false ;
59+ }
60+ const TRowSize keySize = folly::Endian::big (read<TRowSize>());
61+ const TRowSize dataSize = folly::Endian::big (read<TRowSize>());
62+
63+ currentKey_ = nextStringView (keySize, keyStorage_);
64+ currentData_ = nextStringView (dataSize, dataStorage_);
65+ return true ;
66+ }
67+
68+ std::string_view currentKey () const {
69+ return currentKey_;
70+ }
71+
72+ std::string_view currentData () const {
73+ return currentData_;
74+ }
75+
76+ bool hasData () const override {
77+ return !currentData_.empty () || !atEnd ();
78+ }
79+
80+ bool operator <(const velox::MergeStream& other) const override {
81+ const auto * otherReader = static_cast <const SortedFileInputStream*>(&other);
82+ if (currentKey_ != otherReader->currentKey_ ) {
83+ return compareKeys (currentKey_, otherReader->currentKey_ );
84+ }
85+ return streamIdx_ < otherReader->streamIdx_ ;
86+ }
87+
88+ private:
89+ // Returns string_view using zero-copy when data fits in buffer,
90+ // otherwise copies to storage when crossing buffer boundaries.
91+ std::string_view nextStringView (TRowSize size, std::string& storage) {
92+ if (size == 0 ) {
93+ return {};
94+ }
95+ auto view = nextView (size);
96+ if (view.size () == size) {
97+ return view;
98+ }
99+ storage.resize (size);
100+ std::memcpy (storage.data (), view.data (), view.size ());
101+ readBytes (
102+ reinterpret_cast <uint8_t *>(storage.data ()) + view.size (),
103+ size - view.size ());
104+ return std::string_view (storage);
105+ }
106+
107+ const TStreamIdx streamIdx_;
108+ std::string_view currentKey_;
109+ std::string_view currentData_;
110+ std::string keyStorage_;
111+ std::string dataStorage_;
112+ };
113+
29114std::vector<RowMetadata>
30115extractRowMetadata (const char * buffer, size_t bufferSize, bool sortedShuffle) {
31116 std::vector<RowMetadata> rows;
@@ -91,13 +176,9 @@ extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
91176
92177inline std::string_view
93178extractRowData (const RowMetadata& row, const char * buffer, bool sortedShuffle) {
94- if (sortedShuffle) {
95- const size_t dataOffset = row.rowStart + (kUint32Size * 2 ) + row.keySize ;
96- return {buffer + dataOffset, row.dataSize };
97- } else {
98- const size_t dataOffset = row.rowStart + kUint32Size ;
99- return {buffer + dataOffset, row.dataSize };
100- }
179+ const auto dataOffset = row.rowStart +
180+ (sortedShuffle ? (kUint32Size * 2 ) + row.keySize : kUint32Size );
181+ return {buffer + dataOffset, row.dataSize };
101182}
102183
103184std::vector<RowMetadata> extractAndSortRowMetadata (
@@ -106,15 +187,13 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
106187 bool sortedShuffle) {
107188 auto rows = extractRowMetadata (buffer, bufferSize, sortedShuffle);
108189 if (!rows.empty () && sortedShuffle) {
109- std::sort (
110- rows.begin (),
111- rows.end (),
112- [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
190+ boost::range::sort (
191+ rows, [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
113192 const char * lhsKey = buffer + lhs.rowStart + (kUint32Size * 2 );
114193 const char * rhsKey = buffer + rhs.rowStart + (kUint32Size * 2 );
115194 return compareKeys (
116- std::string_view (lhsKey, lhs.keySize ),
117- std::string_view (rhsKey, rhs.keySize ));
195+ std::string_view (lhsKey, lhs.keySize ),
196+ std::string_view (rhsKey, rhs.keySize ));
118197 });
119198 }
120199 return rows;
@@ -276,10 +355,11 @@ void LocalShuffleWriter::collect(
276355 sortedShuffle_ || key.empty (),
277356 " key '{}' must be empty for non-sorted shuffle" ,
278357 key);
358+
279359 const auto rowSize = this ->rowSize (key.size (), data.size ());
280360 auto & buffer = inProgressPartitions_[partition];
281361 if (buffer == nullptr ) {
282- buffer = AlignedBuffer::allocate<char >(
362+ buffer = velox:: AlignedBuffer::allocate<char >(
283363 std::max (static_cast <uint64_t >(rowSize), maxBytesPerPartition_),
284364 pool_,
285365 0 );
@@ -319,31 +399,104 @@ LocalShuffleReader::LocalShuffleReader(
319399 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
320400}
321401
322- folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
323- LocalShuffleReader::next (uint64_t maxBytes) {
324- if (readPartitionFiles_.empty ()) {
325- readPartitionFiles_ = getReadPartitionFiles ();
402+ void LocalShuffleReader::initialize () {
403+ VELOX_CHECK (!initialized_, " LocalShuffleReader already initialized" );
404+ readPartitionFiles_ = getReadPartitionFiles ();
405+
406+ if (sortedShuffle_ && !readPartitionFiles_.empty ()) {
407+ std::vector<std::unique_ptr<velox::MergeStream>> streams;
408+ streams.reserve (readPartitionFiles_.size ());
409+ TStreamIdx streamIdx = 0 ;
410+ for (const auto & filename : readPartitionFiles_) {
411+ VELOX_CHECK (
412+ !filename.empty (),
413+ " Invalid empty shuffle file path for query {}, partitions: [{}]" ,
414+ queryId_,
415+ folly::join (" , " , partitionIds_));
416+ auto reader =
417+ std::make_unique<SortedFileInputStream>(filename, streamIdx, pool_);
418+ if (reader->hasData ()) {
419+ streams.push_back (std::move (reader));
420+ ++streamIdx;
421+ }
422+ }
423+ if (!streams.empty ()) {
424+ merge_ =
425+ std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t >>(
426+ std::move (streams));
427+ }
428+ }
429+
430+ initialized_ = true ;
431+ }
432+
433+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted (
434+ uint64_t maxBytes) {
435+ std::vector<std::unique_ptr<ReadBatch>> batches;
436+
437+ if (merge_ == nullptr ) {
438+ return batches;
439+ }
440+
441+ auto batchBuffer = velox::AlignedBuffer::allocate<char >(maxBytes, pool_, 0 );
442+ std::vector<std::string_view> rows;
443+ uint64_t bufferUsed = 0 ;
444+
445+ while (auto * stream = merge_->next ()) {
446+ auto * reader = dynamic_cast <SortedFileInputStream*>(stream);
447+ const auto data = reader->currentData ();
448+
449+ if (bufferUsed + data.size () > maxBytes) {
450+ if (bufferUsed > 0 ) {
451+ batches.push_back (
452+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
453+ return batches;
454+ }
455+ // Single row exceeds buffer - allocate larger buffer
456+ batchBuffer = velox::AlignedBuffer::allocate<char >(data.size (), pool_, 0 );
457+ bufferUsed = 0 ;
458+ }
459+
460+ char * writePos = batchBuffer->asMutable <char >() + bufferUsed;
461+ if (!data.empty ()) {
462+ memcpy (writePos, data.data (), data.size ());
463+ }
464+
465+ rows.emplace_back (batchBuffer->as <char >() + bufferUsed, data.size ());
466+ bufferUsed += data.size ();
467+ reader->next ();
468+ }
469+
470+ if (!rows.empty ()) {
471+ batches.push_back (
472+ std::make_unique<ReadBatch>(std::move (rows), std::move (batchBuffer)));
326473 }
327474
475+ return batches;
476+ }
477+
478+ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted (
479+ uint64_t maxBytes) {
328480 std::vector<std::unique_ptr<ReadBatch>> batches;
329481 uint64_t totalBytes{0 };
330- // Read files until we reach maxBytes limit or run out of files.
482+
331483 while (readPartitionFileIndex_ < readPartitionFiles_.size ()) {
332484 const auto filename = readPartitionFiles_[readPartitionFileIndex_];
333485 auto file = fileSystem_->openFileForRead (filename);
334486 const auto fileSize = file->size ();
335487
336- // Stop if adding this file would exceed maxBytes (unless we haven't read
337- // any files yet)
488+ // TODO: Refactor to use streaming I/O with bounded buffer size instead of
489+ // loading entire files into memory at once. A streaming approach would
490+ // reduce peak memory consumption and enable processing arbitrarily large
491+ // shuffle files while maintaining constant memory usage.
338492 if (!batches.empty () && totalBytes + fileSize > maxBytes) {
339493 break ;
340494 }
341495
342- auto buffer = AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
496+ auto buffer = velox:: AlignedBuffer::allocate<char >(fileSize, pool_, 0 );
343497 file->pread (0 , fileSize, buffer->asMutable <void >());
344498 ++readPartitionFileIndex_;
345499
346- // Parse the buffer to extract individual rows
347500 const char * data = buffer->as <char >();
348501 const auto parsedRows = extractRowMetadata (data, fileSize, sortedShuffle_);
349502 std::vector<std::string_view> rows;
@@ -357,7 +510,17 @@ LocalShuffleReader::next(uint64_t maxBytes) {
357510 std::make_unique<ReadBatch>(std::move (rows), std::move (buffer)));
358511 }
359512
360- return folly::makeSemiFuture (std::move (batches));
513+ return batches;
514+ }
515+
516+ folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
517+ LocalShuffleReader::next (uint64_t maxBytes) {
518+ VELOX_CHECK (
519+ initialized_,
520+ " LocalShuffleReader::initialize() must be called before next()" );
521+
522+ return folly::makeSemiFuture (
523+ sortedShuffle_ ? nextSorted (maxBytes) : nextUnsorted (maxBytes));
361524}
362525
363526void LocalShuffleReader::noMoreData (bool success) {
@@ -403,12 +566,26 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
403566 velox::memory::MemoryPool* pool) {
404567 const operators::LocalShuffleReadInfo readInfo =
405568 operators::LocalShuffleReadInfo::deserialize (serializedStr);
406- return std::make_shared<LocalShuffleReader>(
569+ // Check if sortedShuffle field is present in the JSON
570+ bool sortedShuffle = false ;
571+ try {
572+ const auto jsonReadInfo = json::parse (serializedStr);
573+ if (jsonReadInfo.contains (" sortedShuffle" )) {
574+ jsonReadInfo.at (" sortedShuffle" ).get_to (sortedShuffle);
575+ }
576+ } catch (const std::exception& /* e*/ ) {
577+ // If parsing fails or field doesn't exist, default to false
578+ sortedShuffle = false ;
579+ }
580+
581+ auto reader = std::make_shared<LocalShuffleReader>(
407582 readInfo.rootPath ,
408583 readInfo.queryId ,
409584 readInfo.partitionIds ,
410- /* sortShuffle= */ false , // default to false for now
585+ sortedShuffle,
411586 pool);
587+ reader->initialize ();
588+ return reader;
412589}
413590
414591std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter (
@@ -418,13 +595,25 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
418595 SystemConfig::instance ()->localShuffleMaxPartitionBytes ();
419596 const operators::LocalShuffleWriteInfo writeInfo =
420597 operators::LocalShuffleWriteInfo::deserialize (serializedStr);
598+ // Check if sortedShuffle field is present in the JSON
599+ bool sortedShuffle = false ;
600+ try {
601+ const auto jsonWriteInfo = json::parse (serializedStr);
602+ if (jsonWriteInfo.contains (" sortedShuffle" )) {
603+ jsonWriteInfo.at (" sortedShuffle" ).get_to (sortedShuffle);
604+ }
605+ } catch (const std::exception& /* e*/ ) {
606+ // If parsing fails or field doesn't exist, default to false
607+ sortedShuffle = false ;
608+ }
609+
421610 return std::make_shared<LocalShuffleWriter>(
422611 writeInfo.rootPath ,
423612 writeInfo.queryId ,
424613 writeInfo.shuffleId ,
425614 writeInfo.numPartitions ,
426615 maxBytesPerPartition,
427- /* sortedShuffle= */ false , // default to false for now
616+ sortedShuffle,
428617 pool);
429618}
430619
@@ -436,5 +625,4 @@ std::vector<RowMetadata> testingExtractRowMetadata(
436625 bool sortedShuffle) {
437626 return extractRowMetadata (buffer, bufferSize, sortedShuffle);
438627}
439-
440628} // namespace facebook::presto::operators
0 commit comments