Skip to content

Commit 84017a9

Browse files
committed
feat(native): Add exchange client runtime stats
1 parent a2c5b8f commit 84017a9

File tree

10 files changed

+51
-6
lines changed

10 files changed

+51
-6
lines changed

presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,21 @@ void PrestoExchangeSource::processDataResponse(
259259
std::unique_ptr<http::HttpResponse> response,
260260
bool isGetDataSizeRequest) {
261261
if (isGetDataSizeRequest) {
262+
double waitTimeMs = 0.0;
263+
auto waitTimeMsString = response->headers()->getHeaders().getSingleOrEmpty(
264+
protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER);
265+
if (!waitTimeMsString.empty()) {
266+
waitTimeMs = std::stod(waitTimeMsString);
267+
}
268+
getDataSizeMetric_.addValue(
269+
(dataRequestRetryState_.durationMs() - waitTimeMs) * 1'000'000);
262270
RECORD_HISTOGRAM_METRIC_VALUE(
263271
kCounterExchangeGetDataSizeDuration,
264-
dataRequestRetryState_.durationMs());
272+
dataRequestRetryState_.durationMs() - waitTimeMs);
265273
RECORD_HISTOGRAM_METRIC_VALUE(
266274
kCounterExchangeGetDataSizeNumTries, dataRequestRetryState_.numTries());
267275
} else {
276+
getDataMetric_.addValue(dataRequestRetryState_.durationMs() * 1'000'000);
268277
RECORD_HISTOGRAM_METRIC_VALUE(
269278
kCounterExchangeRequestDuration, dataRequestRetryState_.durationMs());
270279
RECORD_HISTOGRAM_METRIC_VALUE(
@@ -346,6 +355,7 @@ void PrestoExchangeSource::processDataResponse(
346355

347356
// Record page size counter when not a get-data-size request
348357
if (!isGetDataSizeRequest) {
358+
pageSizeMetric_.addValue(totalBytes);
349359
RECORD_HISTOGRAM_METRIC_VALUE(
350360
kCounterExchangeRequestPageSize, totalBytes);
351361
}

presto-native-execution/presto_cpp/main/PrestoExchangeSource.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,23 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
137137

138138
folly::F14FastMap<std::string, velox::RuntimeMetric> metrics()
139139
const override {
140-
return {
140+
folly::F14FastMap<std::string, velox::RuntimeMetric> result = {
141141
{"prestoExchangeSource.numPages", velox::RuntimeMetric(numPages_)},
142142
{"prestoExchangeSource.totalBytes",
143143
velox::RuntimeMetric(
144144
totalBytes_, velox::RuntimeCounter::Unit::kBytes)},
145145
};
146+
if (getDataMetric_.count > 0) {
147+
result["prestoExchangeSource.getDataMs"] = getDataMetric_;
148+
}
149+
if (getDataSizeMetric_.count > 0) {
150+
result["prestoExchangeSource.getDataSizeMs"] = getDataSizeMetric_;
151+
}
152+
if (pageSizeMetric_.count > 0) {
153+
result["prestoExchangeSource.pageSizeBytes"] = pageSizeMetric_;
154+
}
155+
156+
return result;
146157
}
147158

148159
folly::dynamic toJson() override {
@@ -289,6 +300,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
289300
velox::VeloxPromise<Response> promise_{
290301
velox::VeloxPromise<Response>::makeEmpty()};
291302

303+
velox::RuntimeMetric getDataMetric_{velox::RuntimeCounter::Unit::kNanos};
304+
velox::RuntimeMetric getDataSizeMetric_{velox::RuntimeCounter::Unit::kNanos};
305+
velox::RuntimeMetric pageSizeMetric_{velox::RuntimeCounter::Unit::kBytes};
306+
292307
friend class test::PrestoExchangeSourceTestHelper;
293308
};
294309
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/PrestoTask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ struct Result {
7171
std::unique_ptr<folly::IOBuf> data;
7272
bool complete;
7373
std::vector<int64_t> remainingBytes;
74+
double waitTimeMs;
7475
};
7576

7677
struct ResultRequest {

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,12 @@ void getData(
182182
}
183183
}
184184

185-
VLOG(1) << "Task " << taskId << ", buffer " << bufferId << ", sequence "
186-
<< sequence << " Results size: " << bytes
185+
int64_t callbackStartMs = getCurrentTimeMs();
186+
int64_t waitTimeMs = callbackStartMs - startMs;
187+
VLOG(1) << "Task " << taskId << " waited " << waitTimeMs
188+
<< "ms for data: "
189+
<< "buffer " << bufferId << ", sequence " << sequence
190+
<< " Results size: " << bytes
187191
<< ", page count: " << pages.size()
188192
<< ", remaining: " << folly::join(',', remainingBytes)
189193
<< ", complete: " << std::boolalpha << complete;
@@ -194,6 +198,7 @@ void getData(
194198
result->complete = complete;
195199
result->data = std::move(iobuf);
196200
result->remainingBytes = std::move(remainingBytes);
201+
result->waitTimeMs = waitTimeMs;
197202

198203
promiseHolder->promise.setValue(std::move(result));
199204

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,11 @@ proxygen::RequestHandler* TaskResource::getResults(
533533
protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER,
534534
folly::join(',', result->remainingBytes));
535535
}
536+
if (result->waitTimeMs > 0) {
537+
builder.header(
538+
protocol::PRESTO_BUFFER_WAIT_TIME_MS_HEADER,
539+
std::to_string(result->waitTimeMs));
540+
}
536541
builder.body(std::move(result->data)).sendWithEOM();
537542
})
538543
.thenError(

presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,9 +640,11 @@ TEST_P(PrestoExchangeSourceTest, basic) {
640640
EXPECT_EQ(pool_->usedBytes(), 0);
641641

642642
const auto stats = exchangeSource->metrics();
643-
ASSERT_EQ(stats.size(), 2);
643+
ASSERT_EQ(stats.size(), 4);
644644
ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size());
645645
ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages));
646+
ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0);
647+
ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0);
646648
}
647649

648650
TEST_P(PrestoExchangeSourceTest, getDataSize) {
@@ -912,9 +914,11 @@ TEST_P(PrestoExchangeSourceTest, slowProducer) {
912914
EXPECT_EQ(pool_->usedBytes(), 0);
913915

914916
const auto stats = exchangeSource->metrics();
915-
ASSERT_EQ(stats.size(), 2);
917+
ASSERT_EQ(stats.size(), 4);
916918
ASSERT_EQ(stats.at("prestoExchangeSource.numPages").sum, pages.size());
917919
ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages));
920+
ASSERT_GT(stats.at("prestoExchangeSource.getDataMs").count, 0);
921+
ASSERT_GT(stats.at("prestoExchangeSource.pageSizeBytes").count, 0);
918922
}
919923

920924
DEBUG_ONLY_TEST_P(

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-cpp.mustache

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER = "X-Presto-Page-End-Sequence-Id
8787
const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete";
8888
const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size";
8989
const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER = "X-Presto-Buffer-Remaining-Bytes";
90+
const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER = "X-Presto-Buffer-Wait-Time-Ms";
9091
const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER = "X-Presto-Buffer-Remaining-From-Spill";
9192
9293
const char* const PRESTO_MAX_WAIT_DEFAULT = "2s";

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER;
6262
extern const char* const PRESTO_BUFFER_COMPLETE_HEADER;
6363
extern const char* const PRESTO_GET_DATA_SIZE_HEADER;
6464
extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER;
65+
extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER;
6566
extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER;
6667
6768
extern const char* const PRESTO_MAX_WAIT_DEFAULT;

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ const char* const PRESTO_BUFFER_COMPLETE_HEADER = "X-Presto-Buffer-Complete";
8787
const char* const PRESTO_GET_DATA_SIZE_HEADER = "X-Presto-Get-Data-Size";
8888
const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER =
8989
"X-Presto-Buffer-Remaining-Bytes";
90+
const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER =
91+
"X-Presto-Buffer-Wait-Time-Ms";
9092
const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER =
9193
"X-Presto-Buffer-Remaining-From-Spill";
9294

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ extern const char* const PRESTO_PAGE_NEXT_TOKEN_HEADER;
5959
extern const char* const PRESTO_BUFFER_COMPLETE_HEADER;
6060
extern const char* const PRESTO_GET_DATA_SIZE_HEADER;
6161
extern const char* const PRESTO_BUFFER_REMAINING_BYTES_HEADER;
62+
extern const char* const PRESTO_BUFFER_WAIT_TIME_MS_HEADER;
6263
extern const char* const PRESTO_BUFFER_REMAINING_FROM_SPILL_HEADER;
6364

6465
extern const char* const PRESTO_MAX_WAIT_DEFAULT;

0 commit comments

Comments
 (0)