Skip to content

Commit

Permalink
Partially revert "core: change serverimpl,servercallimpl's internalcl…
Browse files Browse the repository at this point in the history
…ose to cancel stream (grpc#4038)"

This partially reverts commit 48ca452.
It leaves the changes to ServerCallImpl and test.

This also partially reverts "Lint fixes" commit
3002a23 which removed unused variables
which are now necessary again.
  • Loading branch information
ejona86 committed Mar 2, 2018
1 parent 066ad3c commit 5c550de
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 17 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ void setListener(ServerStreamListener listener) {
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose() {
stream.cancel(Status.INTERNAL);
// TODO(ejona86): this is not thread-safe :)
stream.close(Status.UNKNOWN, new Metadata());
}

@Override
Expand Down
19 changes: 10 additions & 9 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ public void messageRead_errorCancelsCall() throws Exception {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand All @@ -1135,7 +1135,7 @@ public void messageRead_runtimeExceptionCancelsCall() throws Exception {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand All @@ -1158,7 +1158,7 @@ public void halfClosed_errorCancelsCall() {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand All @@ -1181,7 +1181,7 @@ public void halfClosed_runtimeExceptionCancelsCall() {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand All @@ -1204,7 +1204,7 @@ public void onReady_errorCancelsCall() {
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand All @@ -1227,7 +1227,7 @@ public void onReady_runtimeExceptionCancelsCall() {
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateIsCancelled();
ensureServerStateNotLeaked();
}
}

Expand Down Expand Up @@ -1430,10 +1430,11 @@ private void verifyExecutorsReturned() {
verifyNoMoreInteractions(executorPool);
}

private void ensureServerStateIsCancelled() {
verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.INTERNAL, statusCaptor.getValue());
private void ensureServerStateNotLeaked() {
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
assertEquals(Status.UNKNOWN, statusCaptor.getValue());
assertNull(statusCaptor.getValue().getCause());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

private static class SimpleServer implements io.grpc.internal.InternalServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void onCompleted() {
.onNext(StreamingInputCallRequest.getDefaultInstance());

assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS));
assertEquals(Status.CANCELLED.getCode(), Status.fromThrowable(throwableRef.get()).getCode());
assertEquals(Status.UNKNOWN, Status.fromThrowable(throwableRef.get()));
assertNull(responseRef.get());
}
}
7 changes: 1 addition & 6 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,8 @@ private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand c
ChannelPromise promise) {
// Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason());
Http2Error http2Error = Http2Error.INTERNAL_ERROR;
if (Status.DEADLINE_EXCEEDED.getCode().equals(cmd.reason().getCode()) || Status.CANCELLED
.getCode().equals(cmd.reason().getCode())) {
http2Error = Http2Error.CANCEL;
}
// Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), http2Error.code(), promise);
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
}

private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
Expand Down

0 comments on commit 5c550de

Please sign in to comment.