Skip to content

Commit

Permalink
[BEAM-11227] Trying ChannelBuilder's executor option
Browse files Browse the repository at this point in the history
This is suggestion by Sanjay from gRPC team
grpc/grpc-java#8174 (comment)
  • Loading branch information
suztomo committed May 13, 2021
1 parent a942baf commit d19e324
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
Expand Down Expand Up @@ -125,6 +126,15 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
LOG.info("Starting testMultipleClientsFailingIsHandledGracefullyByServer");
Collection<Callable<Void>> tasks = new ArrayList<>();
ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new ConcurrentLinkedQueue<>();
ExecutorService channelExecutor =
Executors.newCachedThreadPool(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "BeamFnLoggingServiceTest-channel-executor");
}
});

try (BeamFnLoggingService service =
new BeamFnLoggingService(
findOpenPort(),
Expand All @@ -139,6 +149,7 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
CountDownLatch waitForTermination = new CountDownLatch(1);
ManagedChannel channel =
InProcessChannelBuilder.forName(service.getApiServiceDescriptor().getUrl())
.executor(channelExecutor)
.build();
StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
BeamFnLoggingGrpc.newStub(channel)
Expand Down Expand Up @@ -175,6 +186,7 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
LOG.info("executorService terminated");
} finally {
LOG.info("Finished testMultipleClientsFailingIsHandledGracefullyByServer");
channelExecutor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
Expand Down Expand Up @@ -106,6 +107,16 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new ConcurrentLinkedQueue<>();
GrpcLoggingService service =
GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));

ExecutorService channelExecutor =
Executors.newCachedThreadPool(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "GrpcLoggingServiceTest-channel-executor");
}
});

try (GrpcFnServer<GrpcLoggingService> server =
GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create())) {

Expand All @@ -116,7 +127,8 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
() -> {
CountDownLatch waitForTermination = new CountDownLatch(1);
String url = server.getApiServiceDescriptor().getUrl();
ManagedChannel channel = InProcessChannelBuilder.forName(url).build();
ManagedChannel channel =
InProcessChannelBuilder.forName(url).executor(channelExecutor).build();
StreamObserver<LogEntry.List> outboundObserver =
BeamFnLoggingGrpc.newStub(channel)
.logging(
Expand All @@ -141,6 +153,7 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
LOG.info("executorService terminated");
} finally {
LOG.info("Finishing testMultipleClientsFailingIsHandledGracefullyByServer");
channelExecutor.shutdown();
}
}

Expand Down

0 comments on commit d19e324

Please sign in to comment.