Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

absolute deadline propagation #11966

Open
macrergate opened this issue Mar 17, 2025 · 3 comments
Open

absolute deadline propagation #11966

macrergate opened this issue Mar 17, 2025 · 3 comments
Labels

Comments

@macrergate
Copy link

macrergate commented Mar 17, 2025

Currently grpc deadlines are propagated from client to server via grpc-timeout header as relative value(timeout).
We believe we have pretty good clock synchronization between our servers (up to several milliseconds) and are interesting in propagation of the absolute deadline in some custom header. Is there a way to do it by customizing grpc-java server and client logic in interceptors or something else, but without forking grpc-java implementation?

@macrergate
Copy link
Author

macrergate commented Mar 20, 2025

Is this the right way to go?

package io.grpc.examples.deadline;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.Contexts.statusFromCancelled;
import static io.grpc.Status.DEADLINE_EXCEEDED;

class DeadlineServerInterceptor implements ServerInterceptor {
    public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
            Metadata.ASCII_STRING_MARSHALLER);

    private final ScheduledExecutorService scheduler;

    public DeadlineServerInterceptor(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
                                                                 ServerCallHandler<ReqT, RespT> next) {

        long deadlineMillis = Long.parseLong(headers.get(GRPC_DEADLINE));
        long timeout = deadlineMillis - System.currentTimeMillis();

        Context.CancellableContext context = Context.current()
                .fork()
                .withDeadlineAfter(timeout, TimeUnit.MILLISECONDS, scheduler);

        context.addListener(ctx -> {
            Status status = statusFromCancelled(ctx);
            if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
                call.close(status, new Metadata());
            }
        }, directExecutor());

        return Contexts.interceptCall(context, call, headers, next);
    }
}

@macrergate
Copy link
Author

macrergate commented Mar 20, 2025

I have found another approach and it looks better than the former - using ServerStreamTracer.Factory. In this case I'm getting in before original deadline context creation in ServerImpl

package io.grpc.examples.deadline;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.GrpcUtil;

class AbsoluteDeadlineServerStreamTracerFactory extends ServerStreamTracer.Factory {
    public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
            Metadata.ASCII_STRING_MARSHALLER);
    public static final ServerStreamTracer NOOP_STREAM_TRACER = new ServerStreamTracer() {
    };
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @Override
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
        String deadlineString = headers.get(GRPC_DEADLINE);

        if (deadlineString == null) {
            return NOOP_STREAM_TRACER;
        }

        long deadlineMillis = Long.parseLong(deadlineString);
        headers.discardAll(GrpcUtil.TIMEOUT_KEY);

        return new ServerStreamTracer() {
            @Override
            public Context filterContext(Context context) {
                long timeout = deadlineMillis - System.currentTimeMillis();

                return context.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS, scheduler);
            }
        };
    }
}

Any thoughts and criticism are greatly appreciated.

@kannanjgithub
Copy link
Contributor

Yes, the 2nd solution is better, because in the 1st solution you are creating a forked cancellable context and so the cancellation will be called twice. Also it handles closing the call when cancellation due to the timeout occurs.
The 2nd solution just replaced the header timeout via filtering contexts and the ServerStreamCancellationListener will do the job of closing the call when the timeout occurs. There are no multiple contexts for the same purpose. The only caveat is that the timeout key from GrpcUtil is internal.

kannanjgithub added a commit to kannanjgithub/grpc-java that referenced this issue Mar 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants