4848import io .grpc .SecurityLevel ;
4949import io .grpc .ServerCall ;
5050import io .grpc .Status ;
51+ import io .grpc .StatusRuntimeException ;
5152import io .grpc .internal .ServerCallImpl .ServerStreamListenerImpl ;
5253import io .perfmark .PerfMark ;
5354import java .io .ByteArrayInputStream ;
55+ import java .io .IOException ;
5456import java .io .InputStream ;
5557import java .io .InputStreamReader ;
5658import org .junit .Before ;
@@ -69,6 +71,8 @@ public class ServerCallImplTest {
6971
7072 @ Mock private ServerStream stream ;
7173 @ Mock private ServerCall .Listener <Long > callListener ;
74+ @ Mock private StreamListener .MessageProducer messageProducer ;
75+ @ Mock private InputStream message ;
7276
7377 private final CallTracer serverCallTracer = CallTracer .getDefaultFactory ().create ();
7478 private ServerCallImpl <Long , Long > call ;
@@ -493,6 +497,43 @@ public void streamListener_unexpectedRuntimeException() {
493497 assertThat (e ).hasMessageThat ().isEqualTo ("unexpected exception" );
494498 }
495499
500+ @ Test
501+ public void streamListener_statusRuntimeException () throws IOException {
502+ MethodDescriptor <Long , Long > failingParseMethod = MethodDescriptor .<Long , Long >newBuilder ()
503+ .setType (MethodType .UNARY )
504+ .setFullMethodName ("service/method" )
505+ .setRequestMarshaller (new LongMarshaller () {
506+ @ Override
507+ public Long parse (InputStream stream ) {
508+ throw new StatusRuntimeException (Status .RESOURCE_EXHAUSTED
509+ .withDescription ("Decompressed gRPC message exceeds maximum size" ));
510+ }
511+ })
512+ .setResponseMarshaller (new LongMarshaller ())
513+ .build ();
514+
515+ call = new ServerCallImpl <>(stream , failingParseMethod , requestHeaders , context ,
516+ DecompressorRegistry .getDefaultInstance (), CompressorRegistry .getDefaultInstance (),
517+ serverCallTracer , PerfMark .createTag ());
518+
519+ ServerStreamListenerImpl <Long > streamListener =
520+ new ServerCallImpl .ServerStreamListenerImpl <>(call , callListener , context );
521+
522+ when (messageProducer .next ()).thenReturn (message , (InputStream ) null );
523+ streamListener .messagesAvailable (messageProducer );
524+ ArgumentCaptor <Status > statusCaptor = ArgumentCaptor .forClass (Status .class );
525+ ArgumentCaptor <Metadata > metadataCaptor = ArgumentCaptor .forClass (Metadata .class );
526+
527+ verify (stream ).close (statusCaptor .capture (), metadataCaptor .capture ());
528+ Status status = statusCaptor .getValue ();
529+ assertEquals (Status .RESOURCE_EXHAUSTED .getCode (), status .getCode ());
530+ assertEquals ("Decompressed gRPC message exceeds maximum size" , status .getDescription ());
531+
532+ streamListener .halfClosed ();
533+ verify (callListener , never ()).onHalfClose ();
534+ verify (callListener , never ()).onMessage (any ());
535+ }
536+
496537 private static class LongMarshaller implements Marshaller <Long > {
497538 @ Override
498539 public InputStream stream (Long value ) {
0 commit comments