diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index 19c1931b420eb..1e6f904d35453 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -259,12 +259,21 @@ void PrestoExchangeSource::processDataResponse( std::unique_ptr response, bool isGetDataSizeRequest) { if (isGetDataSizeRequest) { + double waitTimeMs = 0.0; + auto waitTimeMsString = response->headers()->getHeaders().getSingleOrEmpty( + protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER); + if (!waitTimeMsString.empty()) { + waitTimeMs = std::stod(waitTimeMsString); + } + getDataSizeMetric_.addValue( + (dataRequestRetryState_.durationMs() - waitTimeMs) * 1'000'000); RECORD_HISTOGRAM_METRIC_VALUE( kCounterExchangeGetDataSizeDuration, - dataRequestRetryState_.durationMs()); + dataRequestRetryState_.durationMs() - waitTimeMs); RECORD_HISTOGRAM_METRIC_VALUE( kCounterExchangeGetDataSizeNumTries, dataRequestRetryState_.numTries()); } else { + getDataMetric_.addValue(dataRequestRetryState_.durationMs() * 1'000'000); RECORD_HISTOGRAM_METRIC_VALUE( kCounterExchangeRequestDuration, dataRequestRetryState_.durationMs()); RECORD_HISTOGRAM_METRIC_VALUE( @@ -346,6 +355,7 @@ void PrestoExchangeSource::processDataResponse( // Record page size counter when not a get-data-size request if (!isGetDataSizeRequest) { + pageSizeMetric_.addValue(totalBytes); RECORD_HISTOGRAM_METRIC_VALUE( kCounterExchangeRequestPageSize, totalBytes); } diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index b98959097d1ab..3241359c9fa66 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -137,12 +137,23 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { folly::F14FastMap metrics() const override { - return { + folly::F14FastMap result = { {"prestoExchangeSource.numPages", velox::RuntimeMetric(numPages_)}, {"prestoExchangeSource.totalBytes", velox::RuntimeMetric( totalBytes_, velox::RuntimeCounter::Unit::kBytes)}, }; + if (getDataMetric_.count > 0) { + result["prestoExchangeSource.getDataMs"] = getDataMetric_; + } + if (getDataSizeMetric_.count > 0) { + result["prestoExchangeSource.getDataSizeMs"] = getDataSizeMetric_; + } + if (pageSizeMetric_.count > 0) { + result["prestoExchangeSource.pageSizeBytes"] = pageSizeMetric_; + } + + return result; } folly::dynamic toJson() override { @@ -289,6 +300,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { velox::VeloxPromise promise_{ velox::VeloxPromise::makeEmpty()}; + velox::RuntimeMetric getDataMetric_{velox::RuntimeCounter::Unit::kNanos}; + velox::RuntimeMetric getDataSizeMetric_{velox::RuntimeCounter::Unit::kNanos}; + velox::RuntimeMetric pageSizeMetric_{velox::RuntimeCounter::Unit::kBytes}; + friend class test::PrestoExchangeSourceTestHelper; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.h b/presto-native-execution/presto_cpp/main/PrestoTask.h index b41666fa90a93..42a0b2306d488 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.h +++ b/presto-native-execution/presto_cpp/main/PrestoTask.h @@ -71,6 +71,7 @@ struct Result { std::unique_ptr data; bool complete; std::vector remainingBytes; + double waitTimeMs; }; struct ResultRequest { diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 257841d115dae..192a3060341d1 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -182,8 +182,12 @@ void getData( } } - VLOG(1) << "Task " << taskId << ", buffer " << bufferId << ", sequence " - << sequence << " Results size: " << bytes + int64_t callbackStartMs = getCurrentTimeMs(); + int64_t waitTimeMs = callbackStartMs - startMs; + VLOG(1) << "Task " << taskId << " waited " << waitTimeMs + << "ms for data: " + << "buffer " << bufferId << ", sequence " << sequence + << " Results size: " << bytes << ", page count: " << pages.size() << ", remaining: " << folly::join(',', remainingBytes) << ", complete: " << std::boolalpha << complete; @@ -194,6 +198,7 @@ void getData( result->complete = complete; result->data = std::move(iobuf); result->remainingBytes = std::move(remainingBytes); + result->waitTimeMs = waitTimeMs; promiseHolder->promise.setValue(std::move(result)); diff --git a/presto-native-execution/presto_cpp/main/TaskResource.cpp b/presto-native-execution/presto_cpp/main/TaskResource.cpp index 3fc4a2e42b4ff..574618f306d38 100644 --- a/presto-native-execution/presto_cpp/main/TaskResource.cpp +++ b/presto-native-execution/presto_cpp/main/TaskResource.cpp @@ -533,6 +533,11 @@ proxygen::RequestHandler* TaskResource::getResults( protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER, folly::join(',', result->remainingBytes)); } + if (result->waitTimeMs > 0) { + builder.header( + protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER, + std::to_string(result->waitTimeMs)); + } builder.body(std::move(result->data)).sendWithEOM(); }) .thenError( diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index d48d3fb4fb626..7b685062e9c6d 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -640,9 +640,11 @@ TEST_P(PrestoExchangeSourceTest, basic) { EXPECT_EQ(pool_->usedBytes(), 0); const auto stats = exchangeSource->metrics(); - ASSERT_EQ(stats.size(), 2); + ASSERT_EQ(stats.size(), 4); ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size()); ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages)); + ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0); + ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0); } TEST_P(PrestoExchangeSourceTest, getDataSize) { @@ -912,9 +914,11 @@ TEST_P(PrestoExchangeSourceTest, slowProducer) { EXPECT_EQ(pool_->usedBytes(), 0); const auto stats = exchangeSource->metrics(); - ASSERT_EQ(stats.size(), 2); + ASSERT_EQ(stats.size(), 4); ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size()); ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages)); + ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0); + ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0); } DEBUG_ONLY_TEST_P( diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-cpp.mustache b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-cpp.mustache index 9af6aae623b4a..4f2e1b478c56e 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-cpp.mustache +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-cpp.mustache @@ -87,6 +87,7 @@ const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER = "X-Presto-Page-End-Sequence-Id const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete"; const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size"; const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER = "X-Presto-Buffer-Remaining-Bytes"; +const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER = "X-Presto-Buffer-Wait-Time-Ms"; const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER = "X-Presto-Buffer-Remaining-From-Spill"; const char* const PRESTO_MAX_WAIT_DEFAULT = "2s"; diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache index 886735f96963c..e2a2966d4fe33 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache @@ -62,6 +62,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER; extern const char* const PRESTO_BUFFER_COMPLETE_HEADER; extern const char* const PRESTO_GET_DATA_SIZE_HEADER; extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER; +extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER; extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER; extern const char* const PRESTO_MAX_WAIT_DEFAULT; diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp index 874e2275577c1..5610efc8bbdd1 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp @@ -87,6 +87,8 @@ const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete"; const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size"; const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER = "X-Presto-Buffer-Remaining-Bytes"; +const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER = + "X-Presto-Buffer-Wait-Time-Ms"; const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER = "X-Presto-Buffer-Remaining-From-Spill"; diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h index 2b1e4eb66c14e..c905bef8d1dea 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h @@ -59,6 +59,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER; extern const char* const PRESTO_BUFFER_COMPLETE_HEADER; extern const char* const PRESTO_GET_DATA_SIZE_HEADER; extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER; +extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER; extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER; extern const char* const PRESTO_MAX_WAIT_DEFAULT;