diff --git a/presto-main/pom.xml b/presto-main/pom.xml index 944ec4e6faf8c..f0387197a54af 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -475,6 +475,11 @@ postgresql test + + + com.github.luben + zstd-jni + diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index f5d2543048a46..76554eb86c60d 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -152,6 +152,7 @@ import com.facebook.presto.resourcemanager.ResourceManagerConfig; import com.facebook.presto.resourcemanager.ResourceManagerInconsistentException; import com.facebook.presto.resourcemanager.ResourceManagerResourceGroupService; +import com.facebook.presto.server.remotetask.DecompressionFilter; import com.facebook.presto.server.remotetask.HttpLocationFactory; import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig; import com.facebook.presto.server.thrift.FixedAddressSelector; @@ -437,6 +438,10 @@ else if (serverConfig.isCoordinator()) { // task execution jaxrsBinder(binder).bind(TaskResource.class); jaxrsBinder(binder).bind(ThriftTaskUpdateRequestBodyReader.class); + install(installModuleIf( + ReactorNettyHttpClientConfig.class, + config -> config.isHttp2CompressionEnabled() && config.isReactorNettyHttpClientEnabled(), + moduleBinder -> jaxrsBinder(moduleBinder).bind(DecompressionFilter.class))); newExporter(binder).export(TaskResource.class).withGeneratedName(); jaxrsBinder(binder).bind(TaskExecutorResource.class); diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/DecompressionFilter.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/DecompressionFilter.java new file mode 100644 index 0000000000000..54ee336358776 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/DecompressionFilter.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.remotetask; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.PrestoException; +import com.github.luben.zstd.ZstdInputStream; +import jakarta.annotation.Priority; +import jakarta.ws.rs.Priorities; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.ext.Provider; + +import java.io.IOException; +import java.io.InputStream; + +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; + +@Provider +@Priority(Priorities.ENTITY_CODER) +public class DecompressionFilter + implements ContainerRequestFilter +{ + private static final Logger log = Logger.get(DecompressionFilter.class); + + @Override + public void filter(ContainerRequestContext containerRequestContext) + throws IOException + { + String contentEncoding = containerRequestContext.getHeaderString("Content-Encoding"); + + if (contentEncoding != null && !contentEncoding.equalsIgnoreCase("identity")) { + InputStream originalStream = containerRequestContext.getEntityStream(); + InputStream decompressedStream; + + if (contentEncoding.equalsIgnoreCase("zstd")) { + decompressedStream = new ZstdInputStream(originalStream); + } + else { + throw new PrestoException(NOT_SUPPORTED, format("Unsupported Content-Encoding: '%s'. Only zstd compression is supported.", contentEncoding)); + } + + containerRequestContext.setEntityStream(decompressedStream); + containerRequestContext.getHeaders().remove("Content-Encoding"); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java index a018bebaaa8be..6d83c9b3863f6 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClient.java @@ -21,12 +21,15 @@ import com.facebook.airlift.http.client.StaticBodyGenerator; import com.facebook.airlift.log.Logger; import com.facebook.airlift.units.Duration; +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; import com.google.common.base.Splitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import io.netty.channel.ChannelOption; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.ssl.ApplicationProtocolConfig; @@ -44,6 +47,7 @@ import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -62,6 +66,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.zip.GZIPInputStream; import static com.facebook.airlift.security.pem.PemReader.loadPrivateKey; import static com.facebook.airlift.security.pem.PemReader.readCertificateChain; @@ -84,17 +89,25 @@ public class ReactorNettyHttpClient private static final Logger log = Logger.get(ReactorNettyHttpClient.class); private static final HeaderName CONTENT_TYPE_HEADER_NAME = HeaderName.of("Content-Type"); private static final HeaderName CONTENT_LENGTH_HEADER_NAME = HeaderName.of("Content-Length"); + private static final HeaderName CONTENT_ENCODING_HEADER_NAME = HeaderName.of("Content-Encoding"); + private static final HeaderName ACCEPT_ENCODING_HEADER_NAME = HeaderName.of("Accept-Encoding"); private final Duration requestTimeout; private HttpClient httpClient; private final HttpClientConnectionPoolStats connectionPoolStats; private final HttpClientStats httpClientStats; + private final boolean isHttp2CompressionEnabled; + private final int payloadSizeThreshold; + private final double compressionSavingThreshold; @Inject public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config, HttpClientConnectionPoolStats connectionPoolStats, HttpClientStats httpClientStats) { this.connectionPoolStats = connectionPoolStats; this.httpClientStats = httpClientStats; + this.isHttp2CompressionEnabled = config.isHttp2CompressionEnabled(); + this.payloadSizeThreshold = config.getPayloadSizeThreshold(); + this.compressionSavingThreshold = config.getCompressionSavingThreshold(); SslContext sslContext = null; if (config.isHttpsEnabled()) { try { @@ -114,11 +127,11 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config, HttpClientCon if (os.toLowerCase(Locale.ENGLISH).contains("linux")) { // Make sure Open ssl is available for linux deployments if (!OpenSsl.isAvailable()) { - throw new UnsupportedOperationException(format("OpenSsl is not unavailable. Stacktrace: %s", Arrays.toString(OpenSsl.unavailabilityCause().getStackTrace()).replace(',', '\n'))); + throw new UnsupportedOperationException(format("OpenSsl is not available. Stacktrace: %s", Arrays.toString(OpenSsl.unavailabilityCause().getStackTrace()).replace(',', '\n'))); } // Make sure epoll threads are used for linux deployments if (!Epoll.isAvailable()) { - throw new UnsupportedOperationException(format("Epoll is not unavailable. Stacktrace: %s", Arrays.toString(Epoll.unavailabilityCause().getStackTrace()).replace(',', '\n'))); + throw new UnsupportedOperationException(format("Epoll is not available. Stacktrace: %s", Arrays.toString(Epoll.unavailabilityCause().getStackTrace()).replace(',', '\n'))); } } @@ -166,9 +179,10 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config, HttpClientCon // Create HTTP/2 client SslContext finalSslContext = sslContext; + this.httpClient = HttpClient - // The custom pool is wrapped with a HttpConnectionProvider over here - .create(pool) + .create(pool) // The custom pool is wrapped with a HttpConnectionProvider over here + .compress(false) // we will enable response compression manually .protocol(HttpProtocol.H2, HttpProtocol.HTTP11) .runOn(loopResources, true) .http2Settings(settings -> { @@ -179,6 +193,9 @@ public ReactorNettyHttpClient(ReactorNettyHttpClientConfig config, HttpClientCon .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) config.getConnectTimeout().getValue()) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, config.getTcpBufferSize()) + .option(ChannelOption.SO_RCVBUF, config.getTcpBufferSize()) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(config.getWriteBufferWaterMarkLow(), config.getWriteBufferWaterMarkHigh())) // Track HTTP client metrics .metrics(true, () -> httpClientStats, Function.identity()); @@ -208,6 +225,10 @@ public HttpResponseFuture executeAsync(Request airli for (Map.Entry entry : airliftRequest.getHeaders().entries()) { hdr.set(entry.getKey(), entry.getValue()); } + + if (isHttp2CompressionEnabled) { + hdr.set(ACCEPT_ENCODING_HEADER_NAME.toString(), "zstd, gzip"); + } }); URI uri = airliftRequest.getUri(); @@ -223,9 +244,34 @@ public HttpResponseFuture executeAsync(Request airli break; case "POST": byte[] postBytes = ((StaticBodyGenerator) airliftRequest.getBodyGenerator()).getBody(); - disposable = client.post() + byte[] bodyToSend = postBytes; + HttpClient postClient = client; + // We manually do compression for request, use zstd + if (isHttp2CompressionEnabled && postBytes.length >= payloadSizeThreshold) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(postBytes.length / 2); + try (ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(baos)) { + zstdOutput.write(postBytes); + } + + byte[] compressedBytes = baos.toByteArray(); + double compressionRatio = (double) (postBytes.length - compressedBytes.length) / postBytes.length; + if (compressionRatio >= compressionSavingThreshold) { + bodyToSend = compressedBytes; + postClient = client.headers(h -> h.set(CONTENT_ENCODING_HEADER_NAME.toString(), "zstd")); + } + } + catch (IOException e) { + log.error(e, "Fail to compress POST request body"); + onError(listenableFuture, e); + disposable = () -> {}; + break; + } + } + + disposable = postClient.post() .uri(uri) - .send(ByteBufFlux.fromInbound(Mono.just(postBytes))) + .send(ByteBufFlux.fromInbound(Mono.just(bodyToSend))) .responseSingle((response, bytes) -> bytes.asInputStream().zipWith(Mono.just(response))) // Request timeout .timeout(java.time.Duration.of(requestTimeout.toMillis(), MILLIS)) @@ -303,6 +349,7 @@ public void onSuccess(ResponseHandler responseHandler, InputStream inputStream, } long contentLength = 0; + String contentEncoding = null; // Iterate over the headers for (String name : headers.names()) { if (name.equalsIgnoreCase(CONTENT_LENGTH_HEADER_NAME.toString())) { @@ -313,6 +360,9 @@ public void onSuccess(ResponseHandler responseHandler, InputStream inputStream, else if (name.equalsIgnoreCase(CONTENT_TYPE_HEADER_NAME.toString())) { responseHeaders.put(CONTENT_TYPE_HEADER_NAME, headers.get(name)); } + else if (name.equalsIgnoreCase(CONTENT_ENCODING_HEADER_NAME.toString())) { + contentEncoding = headers.get(name); + } else { responseHeaders.put(HeaderName.of(name), headers.get(name)); } @@ -323,7 +373,21 @@ else if (name.equalsIgnoreCase(CONTENT_TYPE_HEADER_NAME.toString())) { return; } + final InputStream[] streamHolder = new InputStream[1]; + streamHolder[0] = inputStream; try { + if (contentEncoding != null && !contentEncoding.equalsIgnoreCase("identity")) { + if (contentEncoding.equalsIgnoreCase("zstd")) { + streamHolder[0] = new ZstdInputStream(inputStream); + } + else if (contentEncoding.equalsIgnoreCase("gzip")) { + streamHolder[0] = new GZIPInputStream(inputStream); + } + else { + throw new RuntimeException(format("Unsupported Content-Encoding: %s. Supported: zstd, gzip.", contentEncoding)); + } + } + long finalContentLength = contentLength; Object a = responseHandler.handle(null, new Response() { @@ -349,11 +413,11 @@ public long getBytesRead() public InputStream getInputStream() throws IOException { - return inputStream; + return streamHolder[0]; } }); // closing it here to prevent memory leak of bytebuf - inputStream.close(); + streamHolder[0].close(); listenableFuture.set(a); } catch (Exception e) { @@ -361,7 +425,7 @@ public InputStream getInputStream() } finally { try { - inputStream.close(); + streamHolder[0].close(); } catch (IOException e) { log.warn(e, "Failed to close input stream"); diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java index 63ccc174b364b..fe9699d58356b 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/ReactorNettyHttpClientConfig.java @@ -17,10 +17,12 @@ import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.airlift.units.DataSize; import com.facebook.airlift.units.Duration; +import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; import java.util.Optional; +import static com.facebook.airlift.units.DataSize.Unit.KILOBYTE; import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.SECONDS; @@ -45,6 +47,94 @@ public class ReactorNettyHttpClientConfig private String trustStorePath; private Optional cipherSuites = Optional.empty(); + private boolean http2CompressionEnabled; + private DataSize payloadSizeThreshold = new DataSize(50, KILOBYTE); + private double compressionSavingThreshold = 0.1; + private DataSize tcpBufferSize = new DataSize(512, KILOBYTE); + private DataSize writeBufferWaterMarkLow = new DataSize(256, KILOBYTE); + private DataSize writeBufferWaterMarkHigh = new DataSize(512, KILOBYTE); + + @Config("reactor.enable-http2-compression") + public ReactorNettyHttpClientConfig setHttp2CompressionEnabled(boolean http2CompressionEnabled) + { + this.http2CompressionEnabled = http2CompressionEnabled; + return this; + } + + public boolean isHttp2CompressionEnabled() + { + return http2CompressionEnabled; + } + + public double getCompressionSavingThreshold() + { + return compressionSavingThreshold; + } + + @Config("reactor.compression-ratio-threshold") + @ConfigDescription("Use compressed data if the compression ratio is above the threshold") + public ReactorNettyHttpClientConfig setCompressionSavingThreshold(double compressionSavingThreshold) + { + this.compressionSavingThreshold = compressionSavingThreshold; + return this; + } + + @Min(1024) + @Max(1024 * 1024) + public int getTcpBufferSize() + { + return (int) tcpBufferSize.toBytes(); + } + + @Config("reactor.tcp-buffer-size") + public ReactorNettyHttpClientConfig setTcpBufferSize(DataSize tcpBufferSize) + { + this.tcpBufferSize = tcpBufferSize; + return this; + } + + @Min(1024) + @Max(1024 * 1024) + public int getWriteBufferWaterMarkLow() + { + return (int) writeBufferWaterMarkLow.toBytes(); + } + + @Config("reactor.tcp-write-buffer-water-mark-low") + public ReactorNettyHttpClientConfig setWriteBufferWaterMarkLow(DataSize writeBufferWaterMarkLow) + { + this.writeBufferWaterMarkLow = writeBufferWaterMarkLow; + return this; + } + + @Min(1024) + @Max(1024 * 1024) + public int getWriteBufferWaterMarkHigh() + { + return (int) writeBufferWaterMarkHigh.toBytes(); + } + + @Config("reactor.tcp-write-buffer-water-mark-high") + public ReactorNettyHttpClientConfig setWriteBufferWaterMarkHigh(DataSize writeBufferWaterMarkHigh) + { + this.writeBufferWaterMarkHigh = writeBufferWaterMarkHigh; + return this; + } + + @Min(1024) + @Max(512 * 1024) + public int getPayloadSizeThreshold() + { + return (int) payloadSizeThreshold.toBytes(); + } + + @Config("reactor.payload-compression-threshold") + public ReactorNettyHttpClientConfig setPayloadSizeThreshold(DataSize payloadSizeThreshold) + { + this.payloadSizeThreshold = payloadSizeThreshold; + return this; + } + public boolean isReactorNettyHttpClientEnabled() { return reactorNettyHttpClientEnabled; diff --git a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java index 29d913fb3adab..c0958e4d79a04 100644 --- a/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/remotetask/TestReactorNettyHttpClientConfig.java @@ -24,6 +24,7 @@ import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static com.facebook.airlift.units.DataSize.Unit.KILOBYTE; import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.SECONDS; @@ -50,7 +51,13 @@ public void testDefaults() .setKeyStorePath(null) .setKeyStorePassword(null) .setTrustStorePath(null) - .setCipherSuites(null)); + .setCipherSuites(null) + .setHttp2CompressionEnabled(false) + .setPayloadSizeThreshold(new DataSize(50, KILOBYTE)) + .setCompressionSavingThreshold(0.1) + .setTcpBufferSize(new DataSize(512, KILOBYTE)) + .setWriteBufferWaterMarkHigh(new DataSize(512, KILOBYTE)) + .setWriteBufferWaterMarkLow(new DataSize(256, KILOBYTE))); } @Test @@ -75,6 +82,12 @@ public void testExplicitPropertyMappings() .put("reactor.truststore-path", "/var/abc/def/presto.jks") .put("reactor.keystore-password", "password") .put("reactor.cipher-suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256") + .put("reactor.enable-http2-compression", "true") + .put("reactor.payload-compression-threshold", "10kB") + .put("reactor.compression-ratio-threshold", "0.2") + .put("reactor.tcp-buffer-size", "256kB") + .put("reactor.tcp-write-buffer-water-mark-high", "256kB") + .put("reactor.tcp-write-buffer-water-mark-low", "128kB") .build(); ReactorNettyHttpClientConfig expected = new ReactorNettyHttpClientConfig() @@ -95,7 +108,13 @@ public void testExplicitPropertyMappings() .setKeyStorePath("/var/abc/def/presto.jks") .setTrustStorePath("/var/abc/def/presto.jks") .setKeyStorePassword("password") - .setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"); + .setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256") + .setHttp2CompressionEnabled(true) + .setPayloadSizeThreshold(new DataSize(10, KILOBYTE)) + .setCompressionSavingThreshold(0.2) + .setTcpBufferSize(new DataSize(256, KILOBYTE)) + .setWriteBufferWaterMarkHigh(new DataSize(256, KILOBYTE)) + .setWriteBufferWaterMarkLow(new DataSize(128, KILOBYTE)); assertFullMapping(properties, expected); } diff --git a/presto-tests/src/test/java/com/facebook/presto/server/TestCompression.java b/presto-tests/src/test/java/com/facebook/presto/server/TestCompression.java new file mode 100644 index 0000000000000..84b018f105519 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/server/TestCompression.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.facebook.presto.spi.QueryId; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.presto.tests.tpch.TpchQueryRunner.createQueryRunner; +import static org.testng.Assert.assertTrue; + +public class TestCompression +{ + @Test + public void testCompressionWithJson() + throws Exception + { + // Enable HTTP/2 and compression with minimal threshold to guarantee compression + Map properties = ImmutableMap.builder() + .put("reactor.netty-http-client-enabled", "true") // Enable HTTP/2 client + .put("reactor.enable-http2-compression", "true") // Enable compression + .put("reactor.payload-compression-threshold", "1kB") // 1kB (minimum allowed by @Min(1024)) + .put("reactor.compression-ratio-threshold", "0.01") // 1% compression savings threshold + .build(); + + try (DistributedQueryRunner queryRunner = createQueryRunner(properties)) { + // Run multiple queries to test different data sizes and patterns + String[] queries = { + "SELECT count(*) FROM tpch.tiny.nation", + "SELECT * FROM tpch.tiny.region", + "SELECT n.name, r.name FROM tpch.tiny.nation n JOIN tpch.tiny.region r ON n.regionkey = r.regionkey", + "SELECT * FROM tpch.tiny.nation ORDER BY nationkey" + }; + + for (String query : queries) { + QueryId queryId = queryRunner.executeWithQueryId(queryRunner.getDefaultSession(), query).getQueryId(); + assertTrue(queryRunner.getQueryInfo(queryId).getState().isDone()); + } + } + } + + @Test + public void testCompressionWithThrift() + throws Exception + { + // Enable HTTP/2 and compression with minimal threshold to guarantee compression + Map properties = ImmutableMap.builder() + .put("reactor.netty-http-client-enabled", "true") // Enable HTTP/2 client + .put("reactor.enable-http2-compression", "true") // Enable compression + .put("reactor.payload-compression-threshold", "1kB") // 1kB (minimum allowed by @Min(1024)) + .put("reactor.compression-ratio-threshold", "0.01") // 1% compression savings threshold + .put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true") + .put("experimental.internal-communication.task-update-request-thrift-serde-enabled", "true") + .build(); + + try (DistributedQueryRunner queryRunner = createQueryRunner(properties)) { + // Run multiple queries to test different data sizes and patterns + String[] queries = { + "SELECT count(*) FROM tpch.tiny.nation", + "SELECT * FROM tpch.tiny.region", + "SELECT n.name, r.name FROM tpch.tiny.nation n JOIN tpch.tiny.region r ON n.regionkey = r.regionkey", + "SELECT * FROM tpch.tiny.nation ORDER BY nationkey" + }; + + for (String query : queries) { + QueryId queryId = queryRunner.executeWithQueryId(queryRunner.getDefaultSession(), query).getQueryId(); + assertTrue(queryRunner.getQueryInfo(queryId).getState().isDone()); + } + } + } +}