15
15
import java .time .Instant ;
16
16
import java .util .Optional ;
17
17
import java .util .concurrent .CompletableFuture ;
18
+ import java .util .concurrent .CompletionException ;
18
19
import java .util .concurrent .CompletionStage ;
19
20
20
21
import javax .annotation .Nullable ;
24
25
import org .apache .pekko .japi .pf .ReceiveBuilder ;
25
26
import org .apache .pekko .persistence .RecoveryCompleted ;
26
27
import org .eclipse .ditto .base .model .acks .DittoAcknowledgementLabel ;
28
+ import org .eclipse .ditto .base .model .exceptions .DittoInternalErrorException ;
27
29
import org .eclipse .ditto .base .model .exceptions .DittoRuntimeException ;
28
30
import org .eclipse .ditto .base .model .exceptions .DittoRuntimeExceptionBuilder ;
29
31
import org .eclipse .ditto .base .model .headers .DittoHeaders ;
@@ -153,7 +155,7 @@ public void onStagedQuery(final Command<?> command, final CompletionStage<WithDi
153
155
@ Nullable final StartedSpan startedSpan ) {
154
156
final ActorRef sender = getSender ();
155
157
response .whenComplete ((r , throwable ) -> {
156
- if (throwable instanceof DittoRuntimeException dittoRuntimeException ) {
158
+ if (throwable != null && unwrapThrowable ( throwable ) instanceof DittoRuntimeException dittoRuntimeException ) {
157
159
notifySender (sender , dittoRuntimeException );
158
160
} else {
159
161
doOnQuery (command , r , sender );
@@ -164,6 +166,12 @@ public void onStagedQuery(final Command<?> command, final CompletionStage<WithDi
164
166
});
165
167
}
166
168
169
+ private static Throwable unwrapThrowable (final Throwable throwable ) {
170
+ if (throwable instanceof CompletionException ) {
171
+ return throwable .getCause () != null ? throwable .getCause () : throwable ;
172
+ }
173
+ return throwable ;
174
+ }
167
175
168
176
private void doOnQuery (final Command <?> command , final WithDittoHeaders response , final ActorRef sender ) {
169
177
if (response .getDittoHeaders ().didLiveChannelConditionMatch ()) {
0 commit comments