|
24 | 24 | import org.apache.pekko.japi.pf.ReceiveBuilder;
|
25 | 25 | import org.apache.pekko.persistence.RecoveryCompleted;
|
26 | 26 | import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
|
| 27 | +import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException; |
| 28 | +import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; |
27 | 29 | import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
|
28 | 30 | import org.eclipse.ditto.base.model.headers.DittoHeaders;
|
29 | 31 | import org.eclipse.ditto.base.model.headers.LiveChannelTimeoutStrategy;
|
@@ -151,14 +153,26 @@ public void onQuery(final Command<?> command, final WithDittoHeaders response) {
|
151 | 153 | public void onStagedQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response,
|
152 | 154 | @Nullable final StartedSpan startedSpan) {
|
153 | 155 | final ActorRef sender = getSender();
|
154 |
| - response.thenAccept(r -> { |
155 |
| - doOnQuery(command, r, sender); |
| 156 | + response.handle((r, throwable) -> { |
| 157 | + if (throwable != null ) { |
| 158 | + final DittoRuntimeException exception = DittoRuntimeException.asDittoRuntimeException(throwable, t -> |
| 159 | + DittoInternalErrorException.newBuilder() |
| 160 | + .cause(t) |
| 161 | + .dittoHeaders(command.getDittoHeaders()) |
| 162 | + .build() |
| 163 | + ); |
| 164 | + notifySender(sender, exception); |
| 165 | + } else { |
| 166 | + doOnQuery(command, r, sender); |
| 167 | + } |
156 | 168 | if (startedSpan != null) {
|
157 | 169 | startedSpan.finish();
|
158 | 170 | }
|
| 171 | + return null; |
159 | 172 | });
|
160 | 173 | }
|
161 | 174 |
|
| 175 | + |
162 | 176 | private void doOnQuery(final Command<?> command, final WithDittoHeaders response, final ActorRef sender) {
|
163 | 177 | if (response.getDittoHeaders().didLiveChannelConditionMatch()) {
|
164 | 178 | final var liveChannelTimeoutStrategy = response.getDittoHeaders()
|
|
0 commit comments