Skip to content

Commit 7dfecd1

Browse files
shangm2facebook-github-bot
authored andcommitted
add http2 data compression for worker
Summary: 1. add support to decompress thrift request with zstd and compress thrift response for http server Differential Revision: D85150330
1 parent 199ea1e commit 7dfecd1

File tree

4 files changed

+107
-13
lines changed

4 files changed

+107
-13
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,37 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
216216
bool summarize = message->hasQueryParam("summarize");
217217

218218
const auto& headers = message->getHeaders();
219-
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
219+
const auto& acceptHeader =
220+
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
220221
const auto sendThrift =
221222
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
222-
const auto& contentHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
223+
const auto& contentHeader =
224+
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
223225
const auto receiveThrift =
224226
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
227+
const auto contentEncoding = headers.getSingleOrEmpty("Content-Encoding");
228+
const auto isCompressed =
229+
!contentEncoding.empty() && contentEncoding != "identity";
225230

226231
return new http::CallbackRequestHandler(
227-
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
232+
[this,
233+
taskId,
234+
summarize,
235+
createOrUpdateFunc,
236+
sendThrift,
237+
receiveThrift,
238+
contentEncoding,
239+
isCompressed](
228240
proxygen::HTTPMessage* /*message*/,
229241
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
230242
proxygen::ResponseHandler* downstream,
231243
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
232244
folly::via(
233245
httpSrvCpuExecutor_,
234246
[this,
235-
requestBody = util::extractMessageBody(body),
247+
requestBody = isCompressed
248+
? util::decompressMessageBody(body, contentEncoding)
249+
: util::extractMessageBody(body),
236250
taskId,
237251
summarize,
238252
createOrUpdateFunc,
@@ -242,7 +256,11 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
242256
std::unique_ptr<protocol::TaskInfo> taskInfo;
243257
try {
244258
taskInfo = createOrUpdateFunc(
245-
taskId, requestBody, summarize, startProcessCpuTimeNs, receiveThrift);
259+
taskId,
260+
requestBody,
261+
summarize,
262+
startProcessCpuTimeNs,
263+
receiveThrift);
246264
} catch (const velox::VeloxException& e) {
247265
// Creating an empty task, putting errors inside so that next
248266
// status fetch from coordinator will catch the error and well
@@ -355,7 +373,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
355373
bool receiveThrift) {
356374
protocol::TaskUpdateRequest updateRequest;
357375
if (receiveThrift) {
358-
auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
376+
auto thriftTaskUpdateRequest =
377+
std::make_shared<thrift::TaskUpdateRequest>();
359378
thriftRead(requestBody, thriftTaskUpdateRequest);
360379
fromThrift(*thriftTaskUpdateRequest, updateRequest);
361380
} else {
@@ -364,7 +383,10 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
364383
velox::core::PlanFragment planFragment;
365384
std::shared_ptr<velox::core::QueryCtx> queryCtx;
366385
if (updateRequest.fragment) {
367-
protocol::PlanFragment prestoPlan = json::parse(receiveThrift ? *updateRequest.fragment : velox::encoding::Base64::decode(*updateRequest.fragment));
386+
protocol::PlanFragment prestoPlan = json::parse(
387+
receiveThrift
388+
? *updateRequest.fragment
389+
: velox::encoding::Base64::decode(*updateRequest.fragment));
368390

369391
queryCtx =
370392
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
@@ -397,7 +419,8 @@ proxygen::RequestHandler* TaskResource::deleteTask(
397419
}
398420
bool summarize = message->hasQueryParam("summarize");
399421
const auto& headers = message->getHeaders();
400-
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
422+
const auto& acceptHeader =
423+
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
401424
const auto sendThrift =
402425
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
403426

@@ -415,7 +438,8 @@ proxygen::RequestHandler* TaskResource::deleteTask(
415438
return std::move(taskInfo);
416439
})
417440
.via(folly::EventBaseManager::get()->getEventBase())
418-
.thenValue([taskId, downstream, handlerState, sendThrift](auto&& taskInfo) {
441+
.thenValue([taskId, downstream, handlerState, sendThrift](
442+
auto&& taskInfo) {
419443
if (!handlerState->requestExpired()) {
420444
if (taskInfo == nullptr) {
421445
sendTaskNotFound(downstream, taskId);
@@ -550,7 +574,8 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
550574
auto maxWait = getMaxWait(message);
551575

552576
const auto& headers = message->getHeaders();
553-
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
577+
const auto& acceptHeader =
578+
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
554579
const auto sendThrift =
555580
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
556581

@@ -621,7 +646,8 @@ proxygen::RequestHandler* TaskResource::getTaskInfo(
621646
bool summarize = message->hasQueryParam("summarize");
622647

623648
const auto& headers = message->getHeaders();
624-
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
649+
const auto& acceptHeader =
650+
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
625651
const auto sendThrift =
626652
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
627653

presto-native-execution/presto_cpp/main/common/Utils.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
#include "presto_cpp/main/common/Utils.h"
1616
#include <fmt/format.h>
17+
#include <folly/compression/Compression.h>
1718
#include <folly/io/Cursor.h>
19+
#include <folly/io/IOBuf.h>
1820
#include <sys/resource.h>
21+
#include <iomanip>
1922
#include "velox/common/process/ThreadDebugInfo.h"
2023

2124
namespace facebook::presto::util {
@@ -84,4 +87,49 @@ std::string extractMessageBody(
8487
}
8588
return ret;
8689
}
90+
91+
std::string decompressMessageBody(
92+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
93+
const std::string& contentEncoding) {
94+
try {
95+
// Combine all IOBufs into a single chain
96+
std::unique_ptr<folly::IOBuf> combined;
97+
for (const auto& buf : body) {
98+
if (!combined) {
99+
combined = buf->clone();
100+
} else {
101+
combined->prependChain(buf->clone());
102+
}
103+
}
104+
105+
// Determine compression codec type; Support only ZSTD for now
106+
folly::compression::CodecType codecType;
107+
if (contentEncoding == "zstd") {
108+
codecType = folly::compression::CodecType::ZSTD;
109+
} else {
110+
LOG(WARNING) << "Unsupported Content-Encoding: " << contentEncoding
111+
<< ", treating as uncompressed";
112+
return extractMessageBody(body);
113+
}
114+
115+
// Decompress the data
116+
auto codec = folly::compression::getCodec(codecType);
117+
auto decompressed = codec->uncompress(combined.get());
118+
119+
size_t decompressedSize = decompressed->computeChainDataLength();
120+
121+
// Convert decompressed IOBuf to string
122+
std::string ret;
123+
ret.resize(decompressedSize);
124+
folly::io::Cursor cursor(decompressed.get());
125+
cursor.pull(ret.data(), decompressedSize);
126+
127+
return ret;
128+
} catch (const std::exception& e) {
129+
LOG(ERROR) << "Failed to decompress request body with " << contentEncoding
130+
<< ": " << e.what() << ". Treating as uncompressed.";
131+
// Fall back to treating it as uncompressed
132+
return extractMessageBody(body);
133+
}
134+
}
87135
} // namespace facebook::presto::util

presto-native-execution/presto_cpp/main/common/Utils.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
* limitations under the License.
1313
*/
1414
#pragma once
15+
#include <folly/io/IOBuf.h>
1516
#include <folly/io/async/SSLContext.h>
1617
#include <glog/logging.h>
17-
#include <folly/io/IOBuf.h>
1818

1919
namespace facebook::presto::util {
2020

@@ -48,6 +48,12 @@ void installSignalHandler();
4848
std::string extractMessageBody(
4949
const std::vector<std::unique_ptr<folly::IOBuf>>& body);
5050

51+
// Decompress message body based on Content-Encoding
52+
// Throws exception if decompression fails
53+
std::string decompressMessageBody(
54+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
55+
const std::string& contentEncoding);
56+
5157
inline std::string addDefaultNamespacePrefix(
5258
const std::string& prestoDefaultNamespacePrefix,
5359
const std::string& functionName) {

presto-native-execution/presto_cpp/main/http/HttpServer.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ proxygen::HTTPServer::IPConfig HttpsConfig::ipConfig() const {
132132
sslCfg.sslCiphers = supportedCiphers_;
133133
if (!clientCaFile_.empty()) {
134134
sslCfg.clientCAFiles = {clientCaFile_};
135-
sslCfg.clientVerification =
135+
sslCfg.clientVerification =
136136
folly::SSLContext::VerifyClientCertificate::ALWAYS;
137137
}
138138

@@ -294,6 +294,20 @@ void HttpServer::start(
294294
options.receiveSessionWindowSize = 10 * (1 << 20);
295295
options.h2cEnabled = true;
296296

297+
// Enable HTTP/2 content compression for better performance
298+
// Supports gzip compression for responses
299+
options.enableContentCompression = true;
300+
options.contentCompressionLevel = 6; // Compression level (1-9, 6 is balanced)
301+
options.contentCompressionMinimumSize = 1024; // Only compress >= 1KB responses
302+
303+
// CRITICAL: Add Thrift content-types for Presto task updates
304+
// By default, proxygen only compresses text/* and some application/* types
305+
// We need to explicitly add all Thrift variants used by Presto
306+
options.contentCompressionTypes.insert(
307+
"application/x-thrift"); // Standard Thrift
308+
options.contentCompressionTypes.insert(
309+
"application/x-thrift+binary"); // Thrift binary protocol
310+
297311
server_ = std::make_unique<proxygen::HTTPServer>(std::move(options));
298312

299313
std::vector<proxygen::HTTPServer::IPConfig> ipConfigs;

0 commit comments

Comments
 (0)