Skip to content

Commit 58a20c9

Browse files
authored
feat(native): Add http2 data compression for cpp worker (#26382)
## Description 1. add support to decompress thrift request with zstd and compress thrift response for http server ## Motivation and Context 1. compressed data can be sent over wire faster Differential Revision: D85150330 ``` == RELEASE NOTES == General Changes * Add compression support for http2 protocol on cpp worker ```
1 parent 12cd04c commit 58a20c9

File tree

6 files changed

+176
-15
lines changed

6 files changed

+176
-15
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,17 +224,29 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
224224
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
225225
const auto receiveThrift =
226226
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
227+
const auto contentEncoding = headers.getSingleOrEmpty("Content-Encoding");
228+
const auto isCompressed =
229+
!contentEncoding.empty() && contentEncoding != "identity";
227230

228231
return new http::CallbackRequestHandler(
229-
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
232+
[this,
233+
taskId,
234+
summarize,
235+
createOrUpdateFunc,
236+
sendThrift,
237+
receiveThrift,
238+
contentEncoding,
239+
isCompressed](
230240
proxygen::HTTPMessage* /*message*/,
231241
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
232242
proxygen::ResponseHandler* downstream,
233243
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
234244
folly::via(
235245
httpSrvCpuExecutor_,
236246
[this,
237-
requestBody = util::extractMessageBody(body),
247+
requestBody = isCompressed
248+
? util::decompressMessageBody(body, contentEncoding)
249+
: util::extractMessageBody(body),
238250
taskId,
239251
summarize,
240252
createOrUpdateFunc,

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,17 @@ SystemConfig::SystemConfig() {
150150
NONE_PROP(kHttpServerHttpsPort),
151151
BOOL_PROP(kHttpServerHttpsEnabled, false),
152152
BOOL_PROP(kHttpServerHttp2Enabled, true),
153+
NUM_PROP(kHttpServerIdleTimeoutMs, 60'000),
153154
NUM_PROP(kHttpServerHttp2InitialReceiveWindow, 1 << 20),
154155
NUM_PROP(kHttpServerHttp2ReceiveStreamWindowSize, 1 << 20),
155156
NUM_PROP(kHttpServerHttp2ReceiveSessionWindowSize, 10 * (1 << 20)),
156-
NUM_PROP(kHttpServerIdleTimeoutMs, 60'000),
157+
NUM_PROP(kHttpServerHttp2MaxConcurrentStreams, 100),
158+
NUM_PROP(kHttpServerContentCompressionLevel, 4),
159+
NUM_PROP(kHttpServerContentCompressionMinimumSize, 3584),
160+
BOOL_PROP(kHttpServerEnableContentCompression, false),
161+
BOOL_PROP(kHttpServerEnableZstdCompression, false),
162+
NUM_PROP(kHttpServerZstdContentCompressionLevel, 8),
163+
BOOL_PROP(kHttpServerEnableGzipCompression, false),
157164
STR_PROP(
158165
kHttpsSupportedCiphers,
159166
"ECDHE-ECDSA-AES256-GCM-SHA384,AES256-GCM-SHA384"),
@@ -309,6 +316,10 @@ bool SystemConfig::httpServerHttp2Enabled() const {
309316
return optionalProperty<bool>(kHttpServerHttp2Enabled).value();
310317
}
311318

319+
uint32_t SystemConfig::httpServerIdleTimeoutMs() const {
320+
return optionalProperty<uint32_t>(kHttpServerIdleTimeoutMs).value();
321+
}
322+
312323
uint32_t SystemConfig::httpServerHttp2InitialReceiveWindow() const {
313324
return optionalProperty<uint32_t>(kHttpServerHttp2InitialReceiveWindow)
314325
.value();
@@ -324,8 +335,35 @@ uint32_t SystemConfig::httpServerHttp2ReceiveSessionWindowSize() const {
324335
.value();
325336
}
326337

327-
uint32_t SystemConfig::httpServerIdleTimeoutMs() const {
328-
return optionalProperty<uint32_t>(kHttpServerIdleTimeoutMs).value();
338+
uint32_t SystemConfig::httpServerHttp2MaxConcurrentStreams() const {
339+
return optionalProperty<uint32_t>(kHttpServerHttp2MaxConcurrentStreams)
340+
.value();
341+
}
342+
343+
uint32_t SystemConfig::httpServerContentCompressionLevel() const {
344+
return optionalProperty<uint32_t>(kHttpServerContentCompressionLevel).value();
345+
}
346+
347+
uint32_t SystemConfig::httpServerContentCompressionMinimumSize() const {
348+
return optionalProperty<uint32_t>(kHttpServerContentCompressionMinimumSize)
349+
.value();
350+
}
351+
352+
bool SystemConfig::httpServerEnableContentCompression() const {
353+
return optionalProperty<bool>(kHttpServerEnableContentCompression).value();
354+
}
355+
356+
bool SystemConfig::httpServerEnableZstdCompression() const {
357+
return optionalProperty<bool>(kHttpServerEnableZstdCompression).value();
358+
}
359+
360+
uint32_t SystemConfig::httpServerZstdContentCompressionLevel() const {
361+
return optionalProperty<uint32_t>(kHttpServerZstdContentCompressionLevel)
362+
.value();
363+
}
364+
365+
bool SystemConfig::httpServerEnableGzipCompression() const {
366+
return optionalProperty<bool>(kHttpServerEnableGzipCompression).value();
329367
}
330368

331369
std::string SystemConfig::httpsSupportedCiphers() const {

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ class SystemConfig : public ConfigBase {
209209
"http-server.https.enabled"};
210210
static constexpr std::string_view kHttpServerHttp2Enabled{
211211
"http-server.http2.enabled"};
212+
/// HTTP/2 server idle timeout in milliseconds (default 60000ms).
213+
static constexpr std::string_view kHttpServerIdleTimeoutMs{
214+
"http-server.http2.idle-timeout-ms"};
212215
/// HTTP/2 initial receive window size in bytes (default 1MB).
213216
static constexpr std::string_view kHttpServerHttp2InitialReceiveWindow{
214217
"http-server.http2.initial-receive-window"};
@@ -218,11 +221,27 @@ class SystemConfig : public ConfigBase {
218221
/// HTTP/2 receive session window size in bytes (default 10MB).
219222
static constexpr std::string_view kHttpServerHttp2ReceiveSessionWindowSize{
220223
"http-server.http2.receive-session-window-size"};
221-
222-
/// HTTP server idle timeout in milliseconds
223-
static constexpr std::string_view kHttpServerIdleTimeoutMs{
224-
"http-server.idle-timeout-ms"};
225-
224+
/// HTTP/2 maximum concurrent streams per connection (default 100).
225+
static constexpr std::string_view kHttpServerHttp2MaxConcurrentStreams{
226+
"http-server.http2.max-concurrent-streams"};
227+
/// HTTP/2 content compression level (1-9, default 4 for speed).
228+
static constexpr std::string_view kHttpServerContentCompressionLevel{
229+
"http-server.http2.content-compression-level"};
230+
/// HTTP/2 content compression minimum size in bytes (default 3584).
231+
static constexpr std::string_view kHttpServerContentCompressionMinimumSize{
232+
"http-server.http2.content-compression-minimum-size"};
233+
/// Enable content compression (master switch, default true).
234+
static constexpr std::string_view kHttpServerEnableContentCompression{
235+
"http-server.http2.enable-content-compression"};
236+
/// Enable zstd compression (default false).
237+
static constexpr std::string_view kHttpServerEnableZstdCompression{
238+
"http-server.http2.enable-zstd-compression"};
239+
/// Zstd compression level (-5 to 22, default 8).
240+
static constexpr std::string_view kHttpServerZstdContentCompressionLevel{
241+
"http-server.http2.zstd-content-compression-level"};
242+
/// Enable gzip compression (default true).
243+
static constexpr std::string_view kHttpServerEnableGzipCompression{
244+
"http-server.http2.enable-gzip-compression"};
226245
/// List of comma separated ciphers the client can use.
227246
///
228247
/// NOTE: the client needs to have at least one cipher shared with server
@@ -841,13 +860,27 @@ class SystemConfig : public ConfigBase {
841860

842861
bool httpServerHttp2Enabled() const;
843862

863+
uint32_t httpServerIdleTimeoutMs() const;
864+
844865
uint32_t httpServerHttp2InitialReceiveWindow() const;
845866

846867
uint32_t httpServerHttp2ReceiveStreamWindowSize() const;
847868

848869
uint32_t httpServerHttp2ReceiveSessionWindowSize() const;
849870

850-
uint32_t httpServerIdleTimeoutMs() const;
871+
uint32_t httpServerHttp2MaxConcurrentStreams() const;
872+
873+
uint32_t httpServerContentCompressionLevel() const;
874+
875+
uint32_t httpServerContentCompressionMinimumSize() const;
876+
877+
bool httpServerEnableContentCompression() const;
878+
879+
bool httpServerEnableZstdCompression() const;
880+
881+
uint32_t httpServerZstdContentCompressionLevel() const;
882+
883+
bool httpServerEnableGzipCompression() const;
851884

852885
/// A list of ciphers (comma separated) that are supported by
853886
/// server and client. Note Java and folly::SSLContext use different names to

presto-native-execution/presto_cpp/main/common/Utils.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
#include "presto_cpp/main/common/Utils.h"
1616
#include <fmt/format.h>
17+
#include <folly/compression/Compression.h>
1718
#include <folly/io/Cursor.h>
19+
#include <folly/io/IOBuf.h>
1820
#include <sys/resource.h>
21+
#include "velox/common/base/Exceptions.h"
1922
#include "velox/common/process/ThreadDebugInfo.h"
2023

2124
namespace facebook::presto::util {
@@ -89,4 +92,48 @@ std::string extractMessageBody(
8992
}
9093
return ret;
9194
}
95+
96+
std::string decompressMessageBody(
97+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
98+
const std::string& contentEncoding) {
99+
try {
100+
// Combine all IOBufs into a single chain
101+
std::unique_ptr<folly::IOBuf> combined;
102+
for (const auto& buf : body) {
103+
if (!combined) {
104+
combined = buf->clone();
105+
} else {
106+
combined->appendToChain(buf->clone());
107+
}
108+
}
109+
110+
// Determine compression codec type; Support only ZSTD for now
111+
folly::compression::CodecType codecType;
112+
if (contentEncoding == "zstd") {
113+
codecType = folly::compression::CodecType::ZSTD;
114+
} else {
115+
VELOX_USER_FAIL("Unsupported Content-Encoding: {}", contentEncoding);
116+
}
117+
118+
// Decompress the data
119+
auto codec = folly::compression::getCodec(
120+
codecType); // getCodec never return nullptr
121+
auto decompressed = codec->uncompress(combined.get());
122+
123+
size_t decompressedSize = decompressed->computeChainDataLength();
124+
125+
// Convert decompressed IOBuf to string
126+
std::string ret;
127+
ret.resize(decompressedSize);
128+
folly::io::Cursor cursor(decompressed.get());
129+
cursor.pull(ret.data(), decompressedSize);
130+
131+
return ret;
132+
} catch (const std::exception& e) {
133+
VELOX_USER_FAIL(
134+
"Failed to decompress request body with {}: {}",
135+
contentEncoding,
136+
e.what());
137+
}
138+
}
92139
} // namespace facebook::presto::util

presto-native-execution/presto_cpp/main/common/Utils.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ void installSignalHandler();
4949
std::string extractMessageBody(
5050
const std::vector<std::unique_ptr<folly::IOBuf>>& body);
5151

52+
/// Decompress message body based on Content-Encoding
53+
/// Throws exception if decompression fails
54+
std::string decompressMessageBody(
55+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
56+
const std::string& contentEncoding);
57+
5258
inline std::string addDefaultNamespacePrefix(
5359
const std::string& prestoDefaultNamespacePrefix,
5460
const std::string& functionName) {

presto-native-execution/presto_cpp/main/http/HttpServer.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,10 @@ void HttpServer::start(
273273
std::function<void(proxygen::HTTPServer* /*server*/)> onSuccess,
274274
std::function<void(std::exception_ptr)> onError) {
275275
proxygen::HTTPServerOptions options;
276-
options.idleTimeout = std::chrono::milliseconds(
277-
SystemConfig::instance()->httpServerIdleTimeoutMs());
278-
options.enableContentCompression = false;
276+
277+
auto systemConfig = SystemConfig::instance();
278+
options.idleTimeout =
279+
std::chrono::milliseconds(systemConfig->httpServerIdleTimeoutMs());
279280

280281
proxygen::RequestHandlerChain handlerFactories;
281282

@@ -290,15 +291,39 @@ void HttpServer::start(
290291
options.handlerFactories = handlerFactories.build();
291292

292293
// HTTP/2 flow control window sizes (configurable)
293-
auto systemConfig = SystemConfig::instance();
294294
options.initialReceiveWindow =
295295
systemConfig->httpServerHttp2InitialReceiveWindow();
296296
options.receiveStreamWindowSize =
297297
systemConfig->httpServerHttp2ReceiveStreamWindowSize();
298298
options.receiveSessionWindowSize =
299299
systemConfig->httpServerHttp2ReceiveSessionWindowSize();
300+
options.maxConcurrentIncomingStreams =
301+
systemConfig->httpServerHttp2MaxConcurrentStreams();
300302
options.h2cEnabled = true;
301303

304+
// Enable HTTP/2 responses compression for better performance
305+
// Supports both gzip and zstd (zstd preferred when client supports it)
306+
options.enableContentCompression =
307+
systemConfig->httpServerEnableContentCompression();
308+
options.contentCompressionLevel =
309+
systemConfig->httpServerContentCompressionLevel();
310+
options.contentCompressionMinimumSize =
311+
systemConfig->httpServerContentCompressionMinimumSize();
312+
options.enableZstdCompression =
313+
systemConfig->httpServerEnableZstdCompression();
314+
options.zstdContentCompressionLevel =
315+
systemConfig->httpServerZstdContentCompressionLevel();
316+
options.enableGzipCompression =
317+
systemConfig->httpServerEnableGzipCompression();
318+
319+
// CRITICAL: Add Thrift content-types for Presto task updates
320+
// By default, proxygen only compresses text/* and some application/* types
321+
// We need to explicitly add all Thrift variants used by Presto
322+
options.contentCompressionTypes.insert(
323+
"application/x-thrift"); // Standard Thrift
324+
options.contentCompressionTypes.insert(
325+
"application/x-thrift+binary"); // Thrift binary protocol
326+
302327
server_ = std::make_unique<proxygen::HTTPServer>(std::move(options));
303328

304329
std::vector<proxygen::HTTPServer::IPConfig> ipConfigs;

0 commit comments

Comments
 (0)