Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,21 @@ void PrestoExchangeSource::processDataResponse(
std::unique_ptr<http::HttpResponse> response,
bool isGetDataSizeRequest) {
if (isGetDataSizeRequest) {
double waitTimeMs = 0.0;
auto waitTimeMsString = response->headers()->getHeaders().getSingleOrEmpty(
protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER);
if (!waitTimeMsString.empty()) {
waitTimeMs = std::stod(waitTimeMsString);
}
getDataSizeMetric_.addValue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max will always be 10s (equal to the long pool duration). We need to substract the time request is waiting for long pool to make it meaningful.

(dataRequestRetryState_.durationMs() - waitTimeMs) * 1'000'000);
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterExchangeGetDataSizeDuration,
dataRequestRetryState_.durationMs());
dataRequestRetryState_.durationMs() - waitTimeMs);
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterExchangeGetDataSizeNumTries, dataRequestRetryState_.numTries());
} else {
getDataMetric_.addValue(dataRequestRetryState_.durationMs() * 1'000'000);
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterExchangeRequestDuration, dataRequestRetryState_.durationMs());
RECORD_HISTOGRAM_METRIC_VALUE(
Expand Down Expand Up @@ -346,6 +355,7 @@ void PrestoExchangeSource::processDataResponse(

// Record page size counter when not a get-data-size request
if (!isGetDataSizeRequest) {
pageSizeMetric_.addValue(totalBytes);
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterExchangeRequestPageSize, totalBytes);
}
Expand Down
17 changes: 16 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,23 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {

folly::F14FastMap<std::string, velox::RuntimeMetric> metrics()
const override {
return {
folly::F14FastMap<std::string, velox::RuntimeMetric> result = {
{"prestoExchangeSource.numPages", velox::RuntimeMetric(numPages_)},
{"prestoExchangeSource.totalBytes",
velox::RuntimeMetric(
totalBytes_, velox::RuntimeCounter::Unit::kBytes)},
};
if (getDataMetric_.count > 0) {
result["prestoExchangeSource.getDataMs"] = getDataMetric_;
}
if (getDataSizeMetric_.count > 0) {
result["prestoExchangeSource.getDataSizeMs"] = getDataSizeMetric_;
}
if (pageSizeMetric_.count > 0) {
result["prestoExchangeSource.pageSizeBytes"] = pageSizeMetric_;
}

return result;
}

folly::dynamic toJson() override {
Expand Down Expand Up @@ -289,6 +300,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
velox::VeloxPromise<Response> promise_{
velox::VeloxPromise<Response>::makeEmpty()};

velox::RuntimeMetric getDataMetric_{velox::RuntimeCounter::Unit::kNanos};
velox::RuntimeMetric getDataSizeMetric_{velox::RuntimeCounter::Unit::kNanos};
velox::RuntimeMetric pageSizeMetric_{velox::RuntimeCounter::Unit::kBytes};

friend class test::PrestoExchangeSourceTestHelper;
};
} // namespace facebook::presto
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct Result {
std::unique_ptr<folly::IOBuf> data;
bool complete;
std::vector<int64_t> remainingBytes;
double waitTimeMs;
};

struct ResultRequest {
Expand Down
9 changes: 7 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ void getData(
}
}

VLOG(1) << "Task " << taskId << ", buffer " << bufferId << ", sequence "
<< sequence << " Results size: " << bytes
int64_t callbackStartMs = getCurrentTimeMs();
int64_t waitTimeMs = callbackStartMs - startMs;
VLOG(1) << "Task " << taskId << " waited " << waitTimeMs
<< "ms for data: "
<< "buffer " << bufferId << ", sequence " << sequence
<< " Results size: " << bytes
<< ", page count: " << pages.size()
<< ", remaining: " << folly::join(',', remainingBytes)
<< ", complete: " << std::boolalpha << complete;
Expand All @@ -194,6 +198,7 @@ void getData(
result->complete = complete;
result->data = std::move(iobuf);
result->remainingBytes = std::move(remainingBytes);
result->waitTimeMs = waitTimeMs;

promiseHolder->promise.setValue(std::move(result));

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ proxygen::RequestHandler* TaskResource::getResults(
protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER,
folly::join(',', result->remainingBytes));
}
if (result->waitTimeMs > 0) {
builder.header(
protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER,
std::to_string(result->waitTimeMs));
}
builder.body(std::move(result->data)).sendWithEOM();
})
.thenError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,11 @@ TEST_P(PrestoExchangeSourceTest, basic) {
EXPECT_EQ(pool_->usedBytes(), 0);

const auto stats = exchangeSource->metrics();
ASSERT_EQ(stats.size(), 2);
ASSERT_EQ(stats.size(), 4);
ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size());
ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages));
ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0);
ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0);
}

TEST_P(PrestoExchangeSourceTest, getDataSize) {
Expand Down Expand Up @@ -912,9 +914,11 @@ TEST_P(PrestoExchangeSourceTest, slowProducer) {
EXPECT_EQ(pool_->usedBytes(), 0);

const auto stats = exchangeSource->metrics();
ASSERT_EQ(stats.size(), 2);
ASSERT_EQ(stats.size(), 4);
ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size());
ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages));
ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0);
ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0);
}

DEBUG_ONLY_TEST_P(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER = "X-Presto-Page-End-Sequence-Id
const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete";
const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size";
const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER = "X-Presto-Buffer-Remaining-Bytes";
const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER = "X-Presto-Buffer-Wait-Time-Ms";
const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER = "X-Presto-Buffer-Remaining-From-Spill";

const char* const PRESTO_MAX_WAIT_DEFAULT = "2s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER;
extern const char* const PRESTO_BUFFER_COMPLETE_HEADER;
extern const char* const PRESTO_GET_DATA_SIZE_HEADER;
extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER;
extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER;
extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER;

extern const char* const PRESTO_MAX_WAIT_DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete";
const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size";
const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER =
"X-Presto-Buffer-Remaining-Bytes";
const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER =
"X-Presto-Buffer-Wait-Time-Ms";
const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER =
"X-Presto-Buffer-Remaining-From-Spill";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER;
extern const char* const PRESTO_BUFFER_COMPLETE_HEADER;
extern const char* const PRESTO_GET_DATA_SIZE_HEADER;
extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER;
extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER;
extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER;

extern const char* const PRESTO_MAX_WAIT_DEFAULT;
Expand Down
Loading