Skip to content

Commit

Permalink
core: change serverimpl,servercallimpl's internalclose to cancel stre…
Browse files Browse the repository at this point in the history
…am (#4038)

The HTTP/2 error code will be INTERNAL_ERROR for all cancel statuses,
except for DEADLINE_EXCEEDED and CANCELLED, which are mapped to
CANCELLED.
  • Loading branch information
ramaraochavali authored and zpencer committed Feb 22, 2018
1 parent 887217e commit 48ca452
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 17 deletions.
4 changes: 3 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.grpc.ServerCall;
import io.grpc.Status;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;

final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
Expand Down Expand Up @@ -205,7 +206,8 @@ public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
* on.
*/
private void internalClose(Status internalError) {
stream.close(internalError, new Metadata());
log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
stream.cancel(internalError);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,7 @@ void setListener(ServerStreamListener listener) {
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose() {
// TODO(ejona86): this is not thread-safe :)
stream.close(Status.UNKNOWN, new Metadata());
stream.cancel(Status.INTERNAL);
}

@Override
Expand Down
9 changes: 3 additions & 6 deletions core/src/test/java/io/grpc/internal/ServerCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ private void sendMessage_serverSendsOne_closeOnSecondCall(
verify(stream, times(1)).writeMessage(any(InputStream.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

@Test
Expand All @@ -221,7 +220,7 @@ private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(
serverCall.sendMessage(1L);
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, times(1)).close(any(Status.class), any(Metadata.class));
verify(stream, times(1)).cancel(any(Status.class));

// App runs to completion but everything is ignored
serverCall.sendMessage(1L);
Expand Down Expand Up @@ -255,11 +254,9 @@ private void serverSendsOne_okFailsOnMissingResponse(
CompressorRegistry.getDefaultInstance());
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

@Test
Expand Down
18 changes: 12 additions & 6 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ public void messageRead_errorCancelsCall() throws Exception {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand All @@ -1133,7 +1133,7 @@ public void messageRead_runtimeExceptionCancelsCall() throws Exception {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand All @@ -1156,7 +1156,7 @@ public void halfClosed_errorCancelsCall() {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand All @@ -1179,7 +1179,7 @@ public void halfClosed_runtimeExceptionCancelsCall() {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand All @@ -1202,7 +1202,7 @@ public void onReady_errorCancelsCall() {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand All @@ -1225,7 +1225,7 @@ public void onReady_runtimeExceptionCancelsCall() {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
ensureServerStateIsCancelled();
}
}

Expand Down Expand Up @@ -1396,6 +1396,12 @@ private void ensureServerStateNotLeaked() {
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

private void ensureServerStateIsCancelled() {
verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.INTERNAL, statusCaptor.getValue());
assertNull(statusCaptor.getValue().getCause());
}

private static class SimpleServer implements io.grpc.internal.InternalServer {
ServerListener listener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void onCompleted() {
.onNext(StreamingInputCallRequest.getDefaultInstance());

assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS));
assertEquals(Status.UNKNOWN, Status.fromThrowable(throwableRef.get()));
assertEquals(Status.CANCELLED.getCode(), Status.fromThrowable(throwableRef.get()).getCode());
assertNull(responseRef.get());
}
}
7 changes: 6 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,13 @@ private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand c
ChannelPromise promise) {
// Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason());
Http2Error http2Error = Http2Error.INTERNAL_ERROR;
if (Status.DEADLINE_EXCEEDED.getCode().equals(cmd.reason().getCode()) || Status.CANCELLED
.getCode().equals(cmd.reason().getCode())) {
http2Error = Http2Error.CANCEL;
}
// Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
encoder().writeRstStream(ctx, cmd.stream().id(), http2Error.code(), promise);
}

private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
Expand Down

0 comments on commit 48ca452

Please sign in to comment.