Skip to content

Commit

Permalink
[BEAM-11227] Catch throwable in onError
Browse files Browse the repository at this point in the history
Trying sanjaypujare's advice:
grpc/grpc-java#8174 (comment)
  • Loading branch information
suztomo committed May 14, 2021
1 parent a14279b commit cf73ac1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,32 @@ private class InboundObserver implements StreamObserver<BeamFnApi.LogEntry.List>

@Override
public void onNext(BeamFnApi.LogEntry.List value) {
DataflowWorkerLoggingMDC.setSdkHarnessId(sdkWorkerId);
for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
clientLogger.accept(logEntry);
try {
DataflowWorkerLoggingMDC.setSdkHarnessId(sdkWorkerId);
for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
clientLogger.accept(logEntry);
}
DataflowWorkerLoggingMDC.setSdkHarnessId(null);
LOG.debug("onNext succeeded");
} catch (Throwable ex) {
LOG.error("Throwable detected in onNext", ex);
throw ex;
}
DataflowWorkerLoggingMDC.setSdkHarnessId(null);
}

@Override
public void onError(Throwable t) {
LOG.warn("Logging client failed unexpectedly. ClientId: {}", sdkWorkerId, t);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
completeIfNotNull(connectedClients.remove(this));
try {
LOG.warn("Logging client failed unexpectedly. ClientId: {}", sdkWorkerId, t);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
completeIfNotNull(connectedClients.remove(this));
LOG.debug("onError succeeded");
} catch (Throwable ex) {
LOG.error("Throwable detected in onError", ex);
throw ex;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,30 @@ private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> outboundObse
private class InboundObserver implements StreamObserver<BeamFnApi.LogEntry.List> {
@Override
public void onNext(BeamFnApi.LogEntry.List value) {
for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
logWriter.log(logEntry);
try {
for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
logWriter.log(logEntry);
}
LOG.debug("onNext succeeded");
} catch (Throwable ex) {
LOG.error("Throwable detected in onNext", ex);
throw ex;
}
}

@Override
public void onError(Throwable t) {
LOG.warn("Logging client failed unexpectedly.", t);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
completeIfNotNull(connectedClients.remove(this));
try {
LOG.warn("Logging client failed unexpectedly.", t);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
completeIfNotNull(connectedClients.remove(this));
LOG.debug("onError succeeded");
} catch (Throwable ex) {
LOG.error("Throwable detected in onError", ex);
throw ex;
}
}

@Override
Expand Down

0 comments on commit cf73ac1

Please sign in to comment.