Skip to content

Commit 34b0e0b

Browse files
committed
Absolute deadline propagation grpc#11966
1 parent c340f4a commit 34b0e0b

File tree

5 files changed

+84
-21
lines changed

5 files changed

+84
-21
lines changed

examples/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
3030
implementation "io.grpc:grpc-services:${grpcVersion}"
3131
implementation "io.grpc:grpc-stub:${grpcVersion}"
32+
implementation "io.grpc:grpc-core:${grpcVersion}"
3233
compileOnly "org.apache.tomcat:annotations-api:6.0.53"
3334

3435
// examples/advanced need this for JsonFormat
@@ -45,7 +46,7 @@ dependencies {
4546
protobuf {
4647
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
4748
plugins {
48-
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
49+
grpc { artifact = "io.grpc:protoc-gen-grpc-java:1.70.0" }
4950
}
5051
generateProtoTasks {
5152
all()*.plugins { grpc {} }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.grpc.examples.deadline;
2+
3+
import java.util.concurrent.Executors;
4+
import java.util.concurrent.ScheduledExecutorService;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import io.grpc.Context;
8+
import io.grpc.Metadata;
9+
import io.grpc.ServerStreamTracer;
10+
import io.grpc.internal.GrpcUtil;
11+
12+
class AbsoluteDeadlineServerStreamTracerFactory extends ServerStreamTracer.Factory {
13+
public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
14+
Metadata.ASCII_STRING_MARSHALLER);
15+
public static final ServerStreamTracer NOOP_STREAM_TRACER = new ServerStreamTracer() {
16+
};
17+
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
18+
19+
@Override
20+
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
21+
22+
headers.discardAll(GrpcUtil.TIMEOUT_KEY);
23+
24+
return new ServerStreamTracer() {
25+
@Override
26+
public Context filterContext(Context context) {
27+
return context.withDeadlineAfter(300, TimeUnit.MILLISECONDS, scheduler);
28+
}
29+
};
30+
}
31+
}

examples/src/main/java/io/grpc/examples/deadline/DeadlineClient.java

-20
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,10 @@ public static void main(String[] args) throws Exception {
7979
try {
8080
DeadlineClient client = new DeadlineClient(channel);
8181

82-
// The server takes 500ms to process the call, so setting a deadline further in the future we
83-
// should get a successful response.
84-
logger.info("Calling server with a generous deadline, expected to work");
85-
client.greet("deadline client", 1000);
86-
8782
// A smaller deadline will result in us getting a DEADLINE_EXCEEDED error.
8883
logger.info(
8984
"Calling server with an unrealistic (300ms) deadline, expecting a DEADLINE_EXCEEDED");
9085
client.greet("deadline client", 300);
91-
92-
// Including the "propagate" magic string in the request will cause the server to call itself
93-
// to simulate a situation where a server needs to call another server to satisfy the original
94-
// request. This will double the time it takes to respond to the client request, but with
95-
// an increased deadline we should get a successful response.
96-
logger.info("Calling server with propagation and a generous deadline, expected to work");
97-
client.greet("deadline client [propagate]", 2000);
98-
99-
// With this propagated call we reduce the deadline making it impossible for the both the
100-
// first server call and the propagated one to succeed. You should see the call fail with
101-
// DEADLINE_EXCEEDED, and you should also see DEADLINE_EXCEEDED in the server output as it
102-
// runs out of time waiting for the propagated call to finish.
103-
logger.info(
104-
"Calling server with propagation and a generous deadline, expecting a DEADLINE_EXCEEDED");
105-
client.greet("deadline client [propagate]", 1000);
10686
} finally {
10787
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
10888
}

examples/src/main/java/io/grpc/examples/deadline/DeadlineServer.java

+6
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import io.grpc.InsecureChannelCredentials;
2121
import io.grpc.InsecureServerCredentials;
2222
import io.grpc.Server;
23+
import io.grpc.ServerInterceptors;
2324
import io.grpc.examples.helloworld.GreeterGrpc;
2425
import io.grpc.examples.helloworld.HelloReply;
2526
import io.grpc.examples.helloworld.HelloRequest;
2627
import io.grpc.stub.StreamObserver;
2728
import java.io.IOException;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
2831
import java.util.concurrent.TimeUnit;
2932
import java.util.logging.Logger;
3033

@@ -38,6 +41,9 @@ private void start() throws IOException {
3841
int port = 50051;
3942
SlowGreeter slowGreeter = new SlowGreeter();
4043
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
44+
/*.addService(ServerInterceptors.intercept(slowGreeter, new DeadlineServerInterceptor(
45+
Executors.newScheduledThreadPool(1))))*/
46+
.addStreamTracerFactory(new AbsoluteDeadlineServerStreamTracerFactory())
4147
.addService(slowGreeter)
4248
.build()
4349
.start();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.grpc.examples.deadline;
2+
3+
import java.util.concurrent.ScheduledExecutorService;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import io.grpc.Context;
7+
import io.grpc.Contexts;
8+
import io.grpc.Metadata;
9+
import io.grpc.ServerCall;
10+
import io.grpc.ServerCallHandler;
11+
import io.grpc.ServerInterceptor;
12+
import io.grpc.Status;
13+
14+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
15+
import static io.grpc.Contexts.statusFromCancelled;
16+
import static io.grpc.Status.DEADLINE_EXCEEDED;
17+
18+
class DeadlineServerInterceptor implements ServerInterceptor {
19+
public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
20+
Metadata.ASCII_STRING_MARSHALLER);
21+
22+
private final ScheduledExecutorService scheduler;
23+
24+
public DeadlineServerInterceptor(ScheduledExecutorService scheduler) {
25+
this.scheduler = scheduler;
26+
}
27+
28+
@Override
29+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
30+
ServerCallHandler<ReqT, RespT> next) {
31+
32+
Context.CancellableContext context = Context.current()
33+
.fork()
34+
.withDeadlineAfter(100, TimeUnit.MILLISECONDS, scheduler);
35+
36+
context.addListener(ctx -> {
37+
Status status = statusFromCancelled(ctx);
38+
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
39+
call.close(status, new Metadata());
40+
}
41+
}, directExecutor());
42+
43+
return Contexts.interceptCall(context, call, headers, next);
44+
}
45+
}

0 commit comments

Comments
 (0)