Skip to content

Commit 1980523

Browse files
ZhenLianejona86
authored andcommitted
netty: Add Executor When Creating SslContext
This PR is to add one more Executor parameter when creating the SslContext. In Netty, we already have this implementation for passing Executor when creating SslContext: netty/netty#8847. This extra Executor is used to take some time-consuming tasks when doing SSL handshake. However, in current gRPC implementation, we are not using this API. In this PR, the relevant changes are: 1. get the executorPool from ChannelBuilder or ServerBuilder 2. pass the executorPool all the way down to ClientTlsHandler 3. fill executorPool in when creating SslHandler
1 parent dd8165b commit 1980523

File tree

10 files changed

+200
-30
lines changed

10 files changed

+200
-30
lines changed

core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,4 +657,11 @@ private T thisT() {
657657
T thisT = (T) this;
658658
return thisT;
659659
}
660+
661+
/**
662+
* Returns the internal offload executor pool for offloading tasks.
663+
*/
664+
protected ObjectPool<? extends Executor> getOffloadExecutorPool() {
665+
return this.offloadExecutorPool;
666+
}
660667
}

core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,11 @@ public List<ServerServiceDefinition> getServices() {
297297
return null;
298298
}
299299
}
300+
301+
/**
302+
* Returns the internal ExecutorPool for offloading tasks.
303+
*/
304+
protected ObjectPool<? extends Executor> getExecutorPool() {
305+
return this.executorPool;
306+
}
300307
}

netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public static ChannelHandler grpcNegotiationHandler(GrpcHttp2ConnectionHandler n
159159

160160
public static ChannelHandler clientTlsHandler(
161161
ChannelHandler next, SslContext sslContext, String authority) {
162-
return new ClientTlsHandler(next, sslContext, authority);
162+
return new ClientTlsHandler(next, sslContext, authority, null);
163163
}
164164

165165
public static class ProtocolNegotiationHandler

netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.net.SocketAddress;
5252
import java.util.HashMap;
5353
import java.util.Map;
54+
import java.util.concurrent.Executor;
5455
import java.util.concurrent.ScheduledExecutorService;
5556
import java.util.concurrent.TimeUnit;
5657
import javax.annotation.CheckReturnValue;
@@ -398,7 +399,8 @@ protected ClientTransportFactory buildTransportFactory() {
398399
throw new RuntimeException(ex);
399400
}
400401
}
401-
negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext);
402+
negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext,
403+
this.getOffloadExecutorPool());
402404
}
403405

404406
return new NettyTransportFactory(
@@ -441,14 +443,15 @@ void overrideAuthorityChecker(@Nullable OverrideAuthorityChecker authorityChecke
441443
@CheckReturnValue
442444
static ProtocolNegotiator createProtocolNegotiatorByType(
443445
NegotiationType negotiationType,
444-
SslContext sslContext) {
446+
SslContext sslContext,
447+
ObjectPool<? extends Executor> executorPool) {
445448
switch (negotiationType) {
446449
case PLAINTEXT:
447450
return ProtocolNegotiators.plaintext();
448451
case PLAINTEXT_UPGRADE:
449452
return ProtocolNegotiators.plaintextUpgrade();
450453
case TLS:
451-
return ProtocolNegotiators.tls(sslContext);
454+
return ProtocolNegotiators.tls(sslContext, executorPool);
452455
default:
453456
throw new IllegalArgumentException("Unsupported negotiationType: " + negotiationType);
454457
}

netty/src/main/java/io/grpc/netty/NettyServerBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,8 +533,9 @@ protected List<NettyServer> buildTransportServers(
533533

534534
ProtocolNegotiator negotiator = protocolNegotiator;
535535
if (negotiator == null) {
536-
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
537-
ProtocolNegotiators.serverPlaintext();
536+
negotiator = sslContext != null
537+
? ProtocolNegotiators.serverTls(sslContext, this.getExecutorPool())
538+
: ProtocolNegotiators.serverPlaintext();
538539
}
539540

540541
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());

netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.grpc.Status;
3434
import io.grpc.internal.GrpcAttributes;
3535
import io.grpc.internal.GrpcUtil;
36+
import io.grpc.internal.ObjectPool;
3637
import io.netty.channel.ChannelDuplexHandler;
3738
import io.netty.channel.ChannelFutureListener;
3839
import io.netty.channel.ChannelHandler;
@@ -58,6 +59,7 @@
5859
import java.net.SocketAddress;
5960
import java.net.URI;
6061
import java.util.Arrays;
62+
import java.util.concurrent.Executor;
6163
import java.util.logging.Level;
6264
import java.util.logging.Logger;
6365
import javax.annotation.Nullable;
@@ -106,20 +108,35 @@ public static ProtocolNegotiator serverPlaintext() {
106108

107109
/**
108110
* Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
111+
* @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
109112
*/
110-
public static ProtocolNegotiator serverTls(final SslContext sslContext) {
113+
public static ProtocolNegotiator serverTls(final SslContext sslContext,
114+
final ObjectPool<? extends Executor> executorPool) {
111115
Preconditions.checkNotNull(sslContext, "sslContext");
116+
final Executor executor;
117+
if (executorPool != null) {
118+
// The handlers here can out-live the {@link ProtocolNegotiator}.
119+
// To keep their own reference to executor from executorPool, we use an extra (unused)
120+
// reference here forces the executor to stay alive, which prevents it from being re-created
121+
// for every connection.
122+
executor = executorPool.getObject();
123+
} else {
124+
executor = null;
125+
}
112126
return new ProtocolNegotiator() {
113127
@Override
114128
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
115129
ChannelHandler gnh = new GrpcNegotiationHandler(handler);
116-
ChannelHandler sth = new ServerTlsHandler(gnh, sslContext);
130+
ChannelHandler sth = new ServerTlsHandler(gnh, sslContext, executorPool);
117131
return new WaitUntilActiveHandler(sth);
118132
}
119133

120134
@Override
121-
public void close() {}
122-
135+
public void close() {
136+
if (executorPool != null && executor != null) {
137+
executorPool.returnObject(executor);
138+
}
139+
}
123140

124141
@Override
125142
public AsciiString scheme() {
@@ -128,22 +145,37 @@ public AsciiString scheme() {
128145
};
129146
}
130147

148+
/**
149+
* Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
150+
*/
151+
public static ProtocolNegotiator serverTls(final SslContext sslContext) {
152+
return serverTls(sslContext, null);
153+
}
154+
131155
static final class ServerTlsHandler extends ChannelInboundHandlerAdapter {
156+
private Executor executor;
132157
private final ChannelHandler next;
133158
private final SslContext sslContext;
134159

135160
private ProtocolNegotiationEvent pne = ProtocolNegotiationEvent.DEFAULT;
136161

137-
ServerTlsHandler(ChannelHandler next, SslContext sslContext) {
162+
ServerTlsHandler(ChannelHandler next,
163+
SslContext sslContext,
164+
final ObjectPool<? extends Executor> executorPool) {
138165
this.sslContext = checkNotNull(sslContext, "sslContext");
139166
this.next = checkNotNull(next, "next");
167+
if (executorPool != null) {
168+
this.executor = executorPool.getObject();
169+
}
140170
}
141171

142172
@Override
143173
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
144174
super.handlerAdded(ctx);
145175
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
146-
ctx.pipeline().addBefore(ctx.name(), null, new SslHandler(sslEngine, false));
176+
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, this.executor != null
177+
? new SslHandler(sslEngine, false, this.executor)
178+
: new SslHandler(sslEngine, false));
147179
}
148180

149181
@Override
@@ -259,11 +291,18 @@ protected void userEventTriggered0(ChannelHandlerContext ctx, Object evt) throws
259291

260292
static final class ClientTlsProtocolNegotiator implements ProtocolNegotiator {
261293

262-
public ClientTlsProtocolNegotiator(SslContext sslContext) {
294+
public ClientTlsProtocolNegotiator(SslContext sslContext,
295+
ObjectPool<? extends Executor> executorPool) {
263296
this.sslContext = checkNotNull(sslContext, "sslContext");
297+
this.executorPool = executorPool;
298+
if (this.executorPool != null) {
299+
this.executor = this.executorPool.getObject();
300+
}
264301
}
265302

266303
private final SslContext sslContext;
304+
private final ObjectPool<? extends Executor> executorPool;
305+
private Executor executor;
267306

268307
@Override
269308
public AsciiString scheme() {
@@ -273,26 +312,34 @@ public AsciiString scheme() {
273312
@Override
274313
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
275314
ChannelHandler gnh = new GrpcNegotiationHandler(grpcHandler);
276-
ChannelHandler cth = new ClientTlsHandler(gnh, sslContext, grpcHandler.getAuthority());
315+
ChannelHandler cth = new ClientTlsHandler(gnh, sslContext, grpcHandler.getAuthority(),
316+
this.executor);
277317
return new WaitUntilActiveHandler(cth);
278318
}
279319

280320
@Override
281-
public void close() {}
321+
public void close() {
322+
if (this.executorPool != null && this.executor != null) {
323+
this.executorPool.returnObject(this.executor);
324+
}
325+
}
282326
}
283327

284328
static final class ClientTlsHandler extends ProtocolNegotiationHandler {
285329

286330
private final SslContext sslContext;
287331
private final String host;
288332
private final int port;
333+
private Executor executor;
289334

290-
ClientTlsHandler(ChannelHandler next, SslContext sslContext, String authority) {
335+
ClientTlsHandler(ChannelHandler next, SslContext sslContext, String authority,
336+
Executor executor) {
291337
super(next);
292338
this.sslContext = checkNotNull(sslContext, "sslContext");
293339
HostPort hostPort = parseAuthority(authority);
294340
this.host = hostPort.host;
295341
this.port = hostPort.port;
342+
this.executor = executor;
296343
}
297344

298345
@Override
@@ -301,7 +348,9 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
301348
SSLParameters sslParams = sslEngine.getSSLParameters();
302349
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
303350
sslEngine.setSSLParameters(sslParams);
304-
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, new SslHandler(sslEngine, false));
351+
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, this.executor != null
352+
? new SslHandler(sslEngine, false, this.executor)
353+
: new SslHandler(sslEngine, false));
305354
}
306355

307356
@Override
@@ -363,13 +412,24 @@ static HostPort parseAuthority(String authority) {
363412
return new HostPort(host, port);
364413
}
365414

415+
/**
416+
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
417+
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
418+
* may happen immediately, even before the TLS Handshake is complete.
419+
* @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
420+
*/
421+
public static ProtocolNegotiator tls(SslContext sslContext,
422+
ObjectPool<? extends Executor> executorPool) {
423+
return new ClientTlsProtocolNegotiator(sslContext, executorPool);
424+
}
425+
366426
/**
367427
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
368428
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
369429
* may happen immediately, even before the TLS Handshake is complete.
370430
*/
371431
public static ProtocolNegotiator tls(SslContext sslContext) {
372-
return new ClientTlsProtocolNegotiator(sslContext);
432+
return tls(sslContext, null);
373433
}
374434

375435
/** A tuple of (host, port). */

netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package io.grpc.netty;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
2021
import static org.junit.Assert.assertTrue;
2122
import static org.mockito.Mockito.mock;
2223

2324
import io.grpc.ManagedChannel;
2425
import io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker;
26+
import io.grpc.netty.NettyTestUtil.TrackingObjectPoolForTest;
2527
import io.netty.channel.Channel;
2628
import io.netty.channel.ChannelFactory;
2729
import io.netty.channel.EventLoopGroup;
@@ -143,26 +145,42 @@ public void failIfSslContextIsNotClient() {
143145
public void createProtocolNegotiatorByType_plaintext() {
144146
ProtocolNegotiator negotiator = NettyChannelBuilder.createProtocolNegotiatorByType(
145147
NegotiationType.PLAINTEXT,
146-
noSslContext);
148+
noSslContext, null);
147149
// just check that the classes are the same, and that negotiator is not null.
148150
assertTrue(negotiator instanceof ProtocolNegotiators.PlaintextProtocolNegotiator);
151+
negotiator.close();
149152
}
150153

151154
@Test
152155
public void createProtocolNegotiatorByType_plaintextUpgrade() {
153156
ProtocolNegotiator negotiator = NettyChannelBuilder.createProtocolNegotiatorByType(
154157
NegotiationType.PLAINTEXT_UPGRADE,
155-
noSslContext);
158+
noSslContext, null);
156159
// just check that the classes are the same, and that negotiator is not null.
157160
assertTrue(negotiator instanceof ProtocolNegotiators.PlaintextUpgradeProtocolNegotiator);
161+
negotiator.close();
158162
}
159163

160164
@Test
161165
public void createProtocolNegotiatorByType_tlsWithNoContext() {
162166
thrown.expect(NullPointerException.class);
163167
NettyChannelBuilder.createProtocolNegotiatorByType(
164168
NegotiationType.TLS,
165-
noSslContext);
169+
noSslContext, null);
170+
}
171+
172+
@Test
173+
public void createProtocolNegotiatorByType_tlsWithExecutor() throws Exception {
174+
TrackingObjectPoolForTest executorPool = new TrackingObjectPoolForTest();
175+
assertEquals(false, executorPool.isInUse());
176+
SslContext localSslContext = GrpcSslContexts.forClient().build();
177+
ProtocolNegotiator negotiator = NettyChannelBuilder.createProtocolNegotiatorByType(
178+
NegotiationType.TLS,
179+
localSslContext, executorPool);
180+
assertEquals(true, executorPool.isInUse());
181+
assertNotNull(negotiator);
182+
negotiator.close();
183+
assertEquals(false, executorPool.isInUse());
166184
}
167185

168186
@Test

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import io.grpc.internal.TransportTracer;
6666
import io.grpc.internal.testing.TestUtils;
6767
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
68+
import io.grpc.netty.NettyTestUtil.TrackingObjectPoolForTest;
6869
import io.netty.channel.Channel;
6970
import io.netty.channel.ChannelConfig;
7071
import io.netty.channel.ChannelDuplexHandler;
@@ -671,6 +672,51 @@ public void keepAliveDisabled_shouldNotSetTcpUserTimeout() throws Exception {
671672
}
672673
}
673674

675+
/**
676+
* Verifies that we can successfully build a server and client negotiator with tls and the
677+
* executor passing in, and without resource leak after closing the negotiator.
678+
*/
679+
@Test
680+
public void tlsNegotiationServerExecutorShouldSucceed() throws Exception {
681+
// initialize the client and server Executor pool
682+
TrackingObjectPoolForTest serverExecutorPool = new TrackingObjectPoolForTest();
683+
TrackingObjectPoolForTest clientExecutorPool = new TrackingObjectPoolForTest();
684+
assertEquals(false, serverExecutorPool.isInUse());
685+
assertEquals(false, clientExecutorPool.isInUse());
686+
687+
File serverCert = TestUtils.loadCert("server1.pem");
688+
File serverKey = TestUtils.loadCert("server1.key");
689+
SslContext sslContext = GrpcSslContexts.forServer(serverCert, serverKey)
690+
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
691+
.clientAuth(ClientAuth.NONE)
692+
.build();
693+
negotiator = ProtocolNegotiators.serverTls(sslContext, serverExecutorPool);
694+
startServer();
695+
// after starting the server, the Executor in the server pool should be used
696+
assertEquals(true, serverExecutorPool.isInUse());
697+
698+
File caCert = TestUtils.loadCert("ca.pem");
699+
File clientCert = TestUtils.loadCert("client.pem");
700+
File clientKey = TestUtils.loadCert("client.key");
701+
SslContext clientContext = GrpcSslContexts.forClient()
702+
.trustManager(caCert)
703+
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
704+
.keyManager(clientCert, clientKey)
705+
.build();
706+
ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext, clientExecutorPool);
707+
// after starting the client, the Executor in the client pool should be used
708+
assertEquals(true, clientExecutorPool.isInUse());
709+
final NettyClientTransport transport = newTransport(negotiator);
710+
callMeMaybe(transport.start(clientTransportListener));
711+
Rpc rpc = new Rpc(transport).halfClose();
712+
rpc.waitForResponse();
713+
// closing the negotiators should return the executors back to pool, and release the resource
714+
negotiator.close();
715+
assertEquals(false, clientExecutorPool.isInUse());
716+
this.negotiator.close();
717+
assertEquals(false, serverExecutorPool.isInUse());
718+
}
719+
674720
private Throwable getRootCause(Throwable t) {
675721
if (t.getCause() == null) {
676722
return t;

0 commit comments

Comments
 (0)