Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 194 additions & 29 deletions presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,96 @@
#include "presto_cpp/external/json/nlohmann/json.hpp"
#include "presto_cpp/main/common/Configs.h"

#include <folly/lang/Bits.h>
#include "velox/common/Casts.h"
#include "velox/common/file/FileInputStream.h"

using namespace facebook::velox::exec;
using namespace facebook::velox;
#include <boost/range/algorithm/sort.hpp>

namespace facebook::presto::operators {

using json = nlohmann::json;

namespace {

using TStreamIdx = uint16_t;

// Default buffer size for SortedFileInputStream
// This buffer is used for streaming reads from shuffle files during k-way
// merge.
constexpr uint64_t kDefaultInputStreamBufferSize = 8 * 1024 * 1024; // 8MB

/// SortedFileInputStream reads sorted (key, data) pairs from a single
/// shuffle file with buffered I/O. It extends FileInputStream for efficient
/// buffered I/O and implements MergeStream interface for k-way merge.
class SortedFileInputStream final : public velox::common::FileInputStream,
public velox::MergeStream {
public:
SortedFileInputStream(
const std::string& filePath,
TStreamIdx streamIdx,
velox::memory::MemoryPool* pool,
size_t bufferSize = kDefaultInputStreamBufferSize)
: velox::common::FileInputStream(
velox::filesystems::getFileSystem(filePath, nullptr)
->openFileForRead(filePath),
bufferSize,
pool),
streamIdx_(streamIdx) {
next();
}

~SortedFileInputStream() override = default;

bool next() {
if (atEnd()) {
currentKey_.clear();
currentValue_.clear();
return false;
}
const TRowSize keySize = folly::Endian::big(read<TRowSize>());
const TRowSize valueSize = folly::Endian::big(read<TRowSize>());

// TODO: Optimize with zero-copy approach when data is contiguous in buffer.
readString(currentKey_, keySize);
readString(currentValue_, valueSize);
return true;
}

std::string_view currentKey() const {
return currentKey_;
}

std::string_view currentValue() const {
return currentValue_;
}

bool hasData() const override {
return !currentValue_.empty() || !atEnd();
}

bool operator<(const velox::MergeStream& other) const override {
const auto* otherReader = static_cast<const SortedFileInputStream*>(&other);
if (currentKey_ != otherReader->currentKey_) {
return compareKeys(currentKey_, otherReader->currentKey_);
}
return streamIdx_ < otherReader->streamIdx_;
}

private:
void readString(std::string& target, TRowSize size) {
if (size > 0) {
target.resize(size);
readBytes(reinterpret_cast<uint8_t*>(target.data()), size);
} else {
target.clear();
}
}

const TStreamIdx streamIdx_;
std::string currentKey_;
std::string currentValue_;
};

std::vector<RowMetadata>
extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
std::vector<RowMetadata> rows;
Expand Down Expand Up @@ -91,13 +170,9 @@ extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {

inline std::string_view
extractRowData(const RowMetadata& row, const char* buffer, bool sortedShuffle) {
if (sortedShuffle) {
const size_t dataOffset = row.rowStart + (kUint32Size * 2) + row.keySize;
return {buffer + dataOffset, row.dataSize};
} else {
const size_t dataOffset = row.rowStart + kUint32Size;
return {buffer + dataOffset, row.dataSize};
}
const auto dataOffset = row.rowStart +
(sortedShuffle ? (kUint32Size * 2) + row.keySize : kUint32Size);
return {buffer + dataOffset, row.dataSize};
}

std::vector<RowMetadata> extractAndSortRowMetadata(
Expand All @@ -106,10 +181,8 @@ std::vector<RowMetadata> extractAndSortRowMetadata(
bool sortedShuffle) {
auto rows = extractRowMetadata(buffer, bufferSize, sortedShuffle);
if (!rows.empty() && sortedShuffle) {
std::sort(
rows.begin(),
rows.end(),
[buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
boost::range::sort(
rows, [buffer](const RowMetadata& lhs, const RowMetadata& rhs) {
const char* lhsKey = buffer + lhs.rowStart + (kUint32Size * 2);
const char* rhsKey = buffer + rhs.rowStart + (kUint32Size * 2);
return compareKeys(
Expand Down Expand Up @@ -147,6 +220,7 @@ LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
jsonReadInfo.at("queryId").get_to(shuffleInfo.queryId);
jsonReadInfo.at("shuffleId").get_to(shuffleInfo.shuffleId);
jsonReadInfo.at("numPartitions").get_to(shuffleInfo.numPartitions);
shuffleInfo.sortedShuffle = jsonReadInfo.value("sortedShuffle", false);
return shuffleInfo;
}

Expand All @@ -157,6 +231,7 @@ LocalShuffleReadInfo LocalShuffleReadInfo::deserialize(
jsonReadInfo.at("rootPath").get_to(shuffleInfo.rootPath);
jsonReadInfo.at("queryId").get_to(shuffleInfo.queryId);
jsonReadInfo.at("partitionIds").get_to(shuffleInfo.partitionIds);
shuffleInfo.sortedShuffle = jsonReadInfo.value("sortedShuffle", false);
return shuffleInfo;
}

Expand Down Expand Up @@ -276,10 +351,11 @@ void LocalShuffleWriter::collect(
sortedShuffle_ || key.empty(),
"key '{}' must be empty for non-sorted shuffle",
key);

const auto rowSize = this->rowSize(key.size(), data.size());
auto& buffer = inProgressPartitions_[partition];
if (buffer == nullptr) {
buffer = AlignedBuffer::allocate<char>(
buffer = velox::AlignedBuffer::allocate<char>(
std::max(static_cast<uint64_t>(rowSize), maxBytesPerPartition_),
pool_,
0);
Expand Down Expand Up @@ -319,31 +395,107 @@ LocalShuffleReader::LocalShuffleReader(
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
}

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
LocalShuffleReader::next(uint64_t maxBytes) {
if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
void LocalShuffleReader::initialize() {
VELOX_CHECK(!initialized_, "LocalShuffleReader already initialized");
readPartitionFiles_ = getReadPartitionFiles();
if (sortedShuffle_ && !readPartitionFiles_.empty()) {
initSortedShuffleRead();
}

initialized_ = true;
}

void LocalShuffleReader::initSortedShuffleRead() {
std::vector<std::unique_ptr<velox::MergeStream>> streams;
streams.reserve(readPartitionFiles_.size());
TStreamIdx streamIdx = 0;
for (const auto& filename : readPartitionFiles_) {
VELOX_CHECK(
!filename.empty(),
"Invalid empty shuffle file path for query {}, partitions: [{}]",
queryId_,
folly::join(", ", partitionIds_));
auto reader =
std::make_unique<SortedFileInputStream>(filename, streamIdx, pool_);
if (reader->hasData()) {
streams.push_back(std::move(reader));
++streamIdx;
}
}
if (!streams.empty()) {
merge_ =
std::make_unique<velox::TreeOfLosers<velox::MergeStream, uint16_t>>(
std::move(streams));
}
}

std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
uint64_t maxBytes) {
std::vector<std::unique_ptr<ReadBatch>> batches;

if (merge_ == nullptr) {
return batches;
}

auto batchBuffer = velox::AlignedBuffer::allocate<char>(maxBytes, pool_, 0);
std::vector<std::string_view> rows;
uint64_t bufferUsed = 0;

while (auto* stream = merge_->next()) {
auto* reader = velox::checked_pointer_cast<SortedFileInputStream>(stream);
const auto data = reader->currentValue();

if (bufferUsed + data.size() > maxBytes) {
if (bufferUsed > 0) {
batches.push_back(
std::make_unique<ReadBatch>(
std::move(rows), std::move(batchBuffer)));
return batches;
}
// Single row exceeds buffer - allocate larger buffer
batchBuffer = velox::AlignedBuffer::allocate<char>(data.size(), pool_, 0);
}

char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
if (!data.empty()) {
memcpy(writePos, data.data(), data.size());
}

rows.emplace_back(batchBuffer->as<char>() + bufferUsed, data.size());
bufferUsed += data.size();
reader->next();
}

if (!rows.empty()) {
batches.push_back(
std::make_unique<ReadBatch>(std::move(rows), std::move(batchBuffer)));
}

return batches;
}

std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted(
uint64_t maxBytes) {
std::vector<std::unique_ptr<ReadBatch>> batches;
uint64_t totalBytes{0};
// Read files until we reach maxBytes limit or run out of files.

while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
const auto fileSize = file->size();

// Stop if adding this file would exceed maxBytes (unless we haven't read
// any files yet)
// TODO: Refactor to use streaming I/O with bounded buffer size instead of
// loading entire files into memory at once. A streaming approach would
// reduce peak memory consumption and enable processing arbitrarily large
// shuffle files while maintaining constant memory usage.
if (!batches.empty() && totalBytes + fileSize > maxBytes) {
break;
}

auto buffer = AlignedBuffer::allocate<char>(fileSize, pool_, 0);
auto buffer = velox::AlignedBuffer::allocate<char>(fileSize, pool_, 0);
file->pread(0, fileSize, buffer->asMutable<void>());
++readPartitionFileIndex_;

// Parse the buffer to extract individual rows
const char* data = buffer->as<char>();
const auto parsedRows = extractRowMetadata(data, fileSize, sortedShuffle_);
std::vector<std::string_view> rows;
Expand All @@ -357,7 +509,17 @@ LocalShuffleReader::next(uint64_t maxBytes) {
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
}

return folly::makeSemiFuture(std::move(batches));
return batches;
}

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
LocalShuffleReader::next(uint64_t maxBytes) {
VELOX_CHECK(
initialized_,
"LocalShuffleReader::initialize() must be called before next()");

return folly::makeSemiFuture(
sortedShuffle_ ? nextSorted(maxBytes) : nextUnsorted(maxBytes));
}

void LocalShuffleReader::noMoreData(bool success) {
Expand Down Expand Up @@ -403,12 +565,15 @@ std::shared_ptr<ShuffleReader> LocalPersistentShuffleFactory::createReader(
velox::memory::MemoryPool* pool) {
const operators::LocalShuffleReadInfo readInfo =
operators::LocalShuffleReadInfo::deserialize(serializedStr);
return std::make_shared<LocalShuffleReader>(

auto reader = std::make_shared<LocalShuffleReader>(
readInfo.rootPath,
readInfo.queryId,
readInfo.partitionIds,
/*sortShuffle=*/false, // default to false for now
readInfo.sortedShuffle,
pool);
reader->initialize();
return reader;
}

std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
Expand All @@ -418,13 +583,14 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
SystemConfig::instance()->localShuffleMaxPartitionBytes();
const operators::LocalShuffleWriteInfo writeInfo =
operators::LocalShuffleWriteInfo::deserialize(serializedStr);

return std::make_shared<LocalShuffleWriter>(
writeInfo.rootPath,
writeInfo.queryId,
writeInfo.shuffleId,
writeInfo.numPartitions,
maxBytesPerPartition,
/*sortedShuffle=*/false, // default to false for now
writeInfo.sortedShuffle,
pool);
}

Expand All @@ -436,5 +602,4 @@ std::vector<RowMetadata> testingExtractRowMetadata(
bool sortedShuffle) {
return extractRowMetadata(buffer, bufferSize, sortedShuffle);
}

} // namespace facebook::presto::operators
Loading
Loading