Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException when calling onError(StatusRuntimeException) #11973

Open
ulfjack opened this issue Mar 21, 2025 · 5 comments
Open

NullPointerException when calling onError(StatusRuntimeException) #11973

ulfjack opened this issue Mar 21, 2025 · 5 comments
Milestone

Comments

@ulfjack
Copy link
Contributor

ulfjack commented Mar 21, 2025

What version of gRPC-Java are you using?

v1.71.0

What is your environment?

Linux

What did you expect to see?

I was not expecting onError to throw NullPointerException.

What did you see instead?

It threw NPE.

Steps to reproduce the bug

We have been seeing some rare NullPointerException or ArrayIndexOutOfBoundsException when calling onError on an incoming server call. This happens several times per day across all of our services, which translates to maybe one in a few billion calls or so. The exception looks like this:

java.lang.NullPointerException: Cannot invoke "io.grpc.Metadata$LazyValue.toBytes()" because "value" is null
	at io.grpc.Metadata.valueAsBytes(Metadata.java:183)
	at io.grpc.Metadata.serialize(Metadata.java:474)
	at io.grpc.InternalMetadata.serialize(InternalMetadata.java:79)
	at io.grpc.internal.TransportFrameUtil.toHttp2Headers(TransportFrameUtil.java:51)
	at io.grpc.netty.Utils.convertTrailers(Utils.java:327)
	at io.grpc.netty.NettyServerStream$Sink.writeTrailers(NettyServerStream.java:131)
	at io.grpc.internal.AbstractServerStream.close(AbstractServerStream.java:133)
	at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:227)
	at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:213)
	at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onError(ServerCalls.java:389)

In order to debug this, we added some code to use reflection to dump the internal structure of the Metadata object that throws. Here are some examples:

[grpc-status,14,grpc-message,Channel shutdown invoked,null,null,null,null]
[grpc-status,14,grpc-message,Network closed for unknown reason,null,null,null,null]
[null,null,grpc-status,14,grpc-message,Channel shutdown invoked,null,null]
[grpc-status,14,grpc-message,Channel shutdown invoked,null,null,grpc-message,Channel shutdown invoked]
[grpc-status,14,grpc-message,Channel shutdown invoked,null,null,null,null]

Some of these look perfectly valid, while others have the expected content, but in an unusual order, with null elements at the beginning or between entries. The documentation for Metadata clearly says that it is mutable and not thread-safe, and this appears to be due to multi-threaded mutation of the Metadata object.

We initially found this surprising: we're not setting or mutating metadata anywhere.

After reviewing the grpc-java source code, we discovered the following sequence of events:

We call
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onError
which calls
Metadata metadata = Status.trailersFromThrowable(t);
and then calls
io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:213)
which calls
io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:227)
which calls
io.grpc.internal.AbstractServerStream.close(AbstractServerStream.java:133)
which calls
addStatusToTrailers
which mutates the Metadata object.

The Metadata object mutated here is the Metadata object stored in the StatusRuntimeException that we passed to onError. Who owns that instance?

What actually happens in our code is that we make outgoing gRPC calls to other services which fail with a StatusRuntimeException, and then we use the same instance to reply to multiple incoming gRPC calls on different threads. When we see the concurrent mutation of the Metadata object, that's because the onError call unconditionally mutates the Metadata, which is now shared across multiple threads.

The code is racing with itself, and this happens even when the application code never sets or modifies Metadata objects. This is not great.

Personally, I think it's surprising and problematic to mutate the Metadata that's attached to a SRE in the onError call. I would prefer StatusRuntimeException to be effectively immutable.

At a minimum, it would be good to explicitly document that StatusRuntimeException is not thread-safe and should not be shared across threads (unfortunately, we end up storing it in Future objects, which is difficult to fix on our side). This would - at least - have pointed us in the right direction earlier.

Other options:

  • Make SRE immutable by copying the metadata whenever it's returned from getTrailers, but this would have performance implications.
  • Make onError not mutate the passed-in Metadata object:
    • Copy the trailers if set (performance?).
    • Don't update the Metadata if the status fields match the corresponding Status (not a complete solution).
  • Make outgoing client calls throw SRE without a Metadata object; I believe that the data is currently duplicated via the Metadata and the Status, even if no other metadata is set (not a complete solution).
@ulfjack
Copy link
Contributor Author

ulfjack commented Mar 22, 2025

Here is a Java test to reproduce the problem:

syntax = "proto3";

package com.engflow.testing.echo;

import "google/protobuf/wrappers.proto";

option java_multiple_files = true;
option java_package = "com.engflow.testing.echo";

service EchoService {
  rpc Echo(google.protobuf.StringValue) returns (google.protobuf.StringValue) {}
}
package com.engflow.grpc.echo;

import com.engflow.testing.echo.EchoServiceGrpc;
import com.engflow.testing.echo.EchoServiceGrpc.EchoServiceImplBase;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.StringValue;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import org.junit.Test;

public class MultithreadedServerTest {
  private static final StatusRuntimeException error =
      Status.INTERNAL.asRuntimeException(new Metadata());

  private ManagedChannel createServer() throws Exception {
    var server =
        NettyServerBuilder.forPort(0)
            .addService(
                new EchoServiceImplBase() {
                  @Override
                  public void echo(
                      StringValue request, StreamObserver<StringValue> responseObserver) {
                    responseObserver.onError(error);
                  }
                })
            .build();
    server.start();

    return NettyChannelBuilder.forAddress("localhost", server.getPort())
        .negotiationType(NegotiationType.PLAINTEXT)
        .build();
  }

  @Test(timeout = 10_000L)
  public void multithreaded() throws Exception {
    var channel = createServer();
    ListeningExecutorService executorService =
        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
    EchoServiceGrpc.EchoServiceBlockingStub stub = EchoServiceGrpc.newBlockingStub(channel);
    CountDownLatch latch = new CountDownLatch(1);
    List<ListenableFuture<?>> futures = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
      var future =
          executorService.submit(
              () -> {
                Uninterruptibles.awaitUninterruptibly(latch);
                stub.echo(StringValue.newBuilder().setValue("foo").build());
              });
      futures.add(future);
    }
    latch.countDown();
    Futures.whenAllComplete(futures).run(() -> {}, MoreExecutors.directExecutor()).get();
  }
}

@ulfjack
Copy link
Contributor Author

ulfjack commented Mar 22, 2025

Note that the test hangs when this happens; the onError method marks the call as closed before Metadata.serialize throws, which in turn happens before the call is actually closed. This means the caller hangs indefinitely.

Also, I was not able to reproduce with InprocessServer; it probably doesn't modify Metadata?

@kannanjgithub
Copy link
Contributor

The code is racing but these are application threads that should synchronize their write to StreamObserver.onError, as the documentation says.

Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls.

@ulfjack
Copy link
Contributor Author

ulfjack commented Mar 24, 2025

These are writes to different StreamObserver instances, so this comment does not apply.

@kannanjgithub
Copy link
Contributor

Yes. Agree that this breaks the documented behavior about synchronicity and needs fixing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants