Skip to content

Commit 894e240

Browse files
authored
Merge pull request #901 from bosch-io/bugfix/http-publish-metrics
Fix missing metrics for successfully published messages via HTTP connection
2 parents 4551855 + ac34473 commit 894e240

File tree

4 files changed

+167
-43
lines changed

4 files changed

+167
-43
lines changed

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/Sending.java

+60-28
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,17 @@ final class Sending implements SendingOrDropped {
7676
public Optional<CompletionStage<CommandResponse>> monitorAndAcknowledge(
7777
final ExceptionToAcknowledgementConverter exceptionConverter) {
7878

79-
return Optional.of(futureResponse
80-
.thenApply(response -> acknowledge(response, exceptionConverter))
81-
.thenApply(this::monitor)
82-
.exceptionally(error -> {
83-
final Acknowledgement result = handleException(getRootCause(error), exceptionConverter);
84-
monitor(result);
85-
return result;
79+
return Optional.of(futureResponse.thenApply(response -> acknowledge(response, exceptionConverter))
80+
.handle((response, error) -> {
81+
if (error != null) {
82+
final Acknowledgement errorAck = handleException(getRootCause(error), exceptionConverter);
83+
if (errorAck != null) {
84+
monitorAcknowledgementWitFailedSending(errorAck);
85+
}
86+
return errorAck;
87+
} else {
88+
return this.monitor(response);
89+
}
8690
}));
8791
}
8892

@@ -132,7 +136,11 @@ private static boolean isAcknowledgement(final CommandResponse<?> commandRespons
132136
}
133137

134138
private void monitorAcknowledgement(final Acknowledgement acknowledgement) {
135-
new AcknowledgementMonitoring(acknowledgement).monitor();
139+
new AcknowledgementMonitoring(acknowledgement, true).monitor();
140+
}
141+
142+
private void monitorAcknowledgementWitFailedSending(final Acknowledgement acknowledgement) {
143+
new AcknowledgementMonitoring(acknowledgement, false).monitor();
136144
}
137145

138146
private boolean isTargetIssuesLiveResponse() {
@@ -215,46 +223,59 @@ private static Exception getRootCause(final Throwable throwable) {
215223

216224
private abstract class CommandResponseMonitoring<T extends CommandResponse<?>> {
217225

218-
private final T cmdResponse;
226+
protected final T cmdResponse;
219227

220228
protected CommandResponseMonitoring(final T cmdResponse) {
221229
this.cmdResponse = cmdResponse;
222230
}
223231

224-
void monitor() {
225-
if (isSendSuccess()) {
226-
monitorSendSuccess();
227-
} else {
228-
monitorSendFailure(getExceptionFor(cmdResponse));
229-
}
230-
}
232+
abstract void monitor();
231233

232-
private boolean isSendSuccess() {
233-
final HttpStatusCode statusCode = cmdResponse.getStatusCode();
234-
return !(statusCode.isClientError() || statusCode.isInternalError());
234+
protected void monitorSendSuccess() {
235+
sendingContext.getPublishedMonitor().success(sendingContext.getExternalMessage());
235236
}
236237

237-
private void monitorSendSuccess() {
238-
final ConnectionMonitor publishedMonitor = sendingContext.getPublishedMonitor();
239-
publishedMonitor.success(sendingContext.getExternalMessage());
240-
sendingContext.getAcknowledgedMonitor().ifPresent(ackMonitor ->
241-
ackMonitor.success(sendingContext.getExternalMessage()));
238+
protected void monitorAckSuccess() {
239+
sendingContext.getAcknowledgedMonitor().ifPresent(
240+
ackMonitor -> ackMonitor.success(sendingContext.getExternalMessage())
241+
);
242242
}
243243

244-
abstract DittoRuntimeException getExceptionFor(T response);
245-
246-
private void monitorSendFailure(final DittoRuntimeException messageSendingFailedException) {
244+
protected void monitorAckFailure() {
245+
final DittoRuntimeException messageSendingFailedException = getExceptionFor(cmdResponse);
247246
sendingContext.getAcknowledgedMonitor()
248247
.ifPresent(acknowledgedMonitor -> acknowledgedMonitor.failure(sendingContext.getExternalMessage(),
249248
messageSendingFailedException));
250249
}
251250

251+
abstract DittoRuntimeException getExceptionFor(T response);
252+
252253
}
253254

254255
private final class AcknowledgementMonitoring extends CommandResponseMonitoring<Acknowledgement> {
255256

256-
AcknowledgementMonitoring(final Acknowledgement acknowledgement) {
257+
private final boolean sendSuccess;
258+
259+
AcknowledgementMonitoring(final Acknowledgement acknowledgement, final boolean sendSuccess) {
257260
super(acknowledgement);
261+
this.sendSuccess = sendSuccess;
262+
}
263+
264+
@Override
265+
void monitor() {
266+
final HttpStatusCode statusCode = cmdResponse.getStatusCode();
267+
if (statusCode.isInternalError()) {
268+
if (sendSuccess) {
269+
monitorSendSuccess();
270+
}
271+
monitorAckFailure();
272+
} else if (statusCode.isClientError()) {
273+
monitorSendSuccess();
274+
monitorAckFailure();
275+
} else {
276+
monitorSendSuccess();
277+
monitorAckSuccess();
278+
}
258279
}
259280

260281
@Override
@@ -274,6 +295,17 @@ private final class LiveResponseMonitoring extends CommandResponseMonitoring<Com
274295
super(cmdResponse);
275296
}
276297

298+
@Override
299+
void monitor() {
300+
final HttpStatusCode statusCode = cmdResponse.getStatusCode();
301+
monitorSendSuccess();
302+
if (HttpStatusCode.REQUEST_TIMEOUT.equals(statusCode)) {
303+
monitorAckFailure();
304+
} else {
305+
monitorAckSuccess();
306+
}
307+
}
308+
277309
@Override
278310
DittoRuntimeException getExceptionFor(final CommandResponse<?> response) {
279311
return MessageSendingFailedException.newBuilder()

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/httppush/HttpPublisherActor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -326,17 +326,17 @@ private CompletionStage<CommandResponse<?>> toCommandResponseOrAcknowledgement(f
326326
} else if (parsedResponse instanceof MessageCommandResponse) {
327327
result = parsedResponse;
328328
} else {
329-
result = null;
329+
result = Acknowledgement.of(NO_ACK_LABEL, entityIdWithType, statusCode, dittoHeaders, body);
330330
}
331331
} else {
332-
result = null;
332+
result = Acknowledgement.of(NO_ACK_LABEL, entityIdWithType, statusCode, dittoHeaders, body);
333333
}
334334
} else {
335335
// There is an issued ack declared but its not live-response => handle response as acknowledgement.
336336
result = Acknowledgement.of(label, entityIdWithType, statusCode, dittoHeaders, body);
337337
}
338338

339-
if (result != null && isMessageCommand) {
339+
if (result instanceof MessageCommandResponse && isMessageCommand) {
340340
// Do only return command response for live commands with a correct response.
341341
validateLiveResponse(result, (MessageCommand<?, ?>) signal);
342342
}

services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/SendingTest.java

+47-12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.CompletionStage;
2222

23+
import org.assertj.core.api.SoftAssertions;
2324
import org.eclipse.ditto.json.JsonObject;
2425
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
2526
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
@@ -168,7 +169,7 @@ public void monitorAcknowledgementSendSuccessKeepOriginalResponse() {
168169
}
169170

170171
@Test
171-
public void monitorAcknowledgementSendFailureKeepOriginalResponse() {
172+
public void monitorAcknowledgementSendSuccessInCaseOfHandledException() {
172173
final var acknowledgement = Mockito.mock(Acknowledgement.class);
173174
final var acknowledgementPayload = JsonObject.newBuilder().set("foo", "bar").build();
174175
final var acknowledgementStatusCode = HttpStatusCode.INTERNAL_SERVER_ERROR;
@@ -185,12 +186,53 @@ public void monitorAcknowledgementSendFailureKeepOriginalResponse() {
185186

186187
final Optional<CompletionStage<CommandResponse>> result = underTest.monitorAndAcknowledge(exceptionConverter);
187188

188-
Mockito.verifyNoInteractions(publishedMonitor);
189+
Mockito.verify(publishedMonitor).success(externalMessage);
189190
Mockito.verify(acknowledgedMonitor).failure(externalMessage, expectedException);
190191
assertThat(result)
191192
.hasValueSatisfying(resultFuture -> assertThat(resultFuture).isCompletedWithValue(acknowledgement));
192193
}
193194

195+
@Test
196+
public void monitorAcknowledgementSendFailureInCaseOfUnhandledException() {
197+
final var source = Mockito.mock(Signal.class);
198+
final var thingId = ThingId.generateRandom();
199+
Mockito.when(source.getEntityId()).thenReturn(thingId);
200+
Mockito.when(source.getDittoHeaders()).thenReturn(dittoHeaders);
201+
Mockito.when(mappedOutboundSignal.getSource()).thenReturn(source);
202+
Mockito.when(autoAckTarget.getIssuedAcknowledgementLabel()).thenReturn(Optional.of(ACKNOWLEDGEMENT_LABEL));
203+
final var thrownException = new IllegalStateException("Test");
204+
final var acknowledgementPayload = JsonObject.newBuilder()
205+
.set("message", "Encountered <IllegalStateException>.")
206+
.set("description", "Test")
207+
.build();
208+
final var acknowledgementStatusCode = HttpStatusCode.INTERNAL_SERVER_ERROR;
209+
final var expectedException = MessageSendingFailedException.newBuilder()
210+
.statusCode(acknowledgementStatusCode)
211+
.message("Received negative acknowledgement for label <" + ACKNOWLEDGEMENT_LABEL + ">.")
212+
.description("Payload: " + acknowledgementPayload)
213+
.build();
214+
final CompletableFuture<CommandResponse<?>> failedFuture = new CompletableFuture<>();
215+
failedFuture.completeExceptionally(thrownException);
216+
final Sending underTest = new Sending(sendingContext, failedFuture, connectionIdResolver, logger);
217+
218+
final Optional<CompletionStage<CommandResponse>> result = underTest.monitorAndAcknowledge(exceptionConverter);
219+
220+
Mockito.verify(publishedMonitor).exception(externalMessage, thrownException);
221+
Mockito.verify(acknowledgedMonitor).failure(externalMessage, expectedException);
222+
assertThat(result).hasValueSatisfying(
223+
resultFuture -> assertThat(resultFuture).isCompletedWithValueMatching(response -> {
224+
final SoftAssertions softly = new SoftAssertions();
225+
softly.assertThat(response).isInstanceOf(Acknowledgement.class);
226+
final Acknowledgement ack = (Acknowledgement) response;
227+
softly.assertThat(ack.getLabel().toString()).isEqualTo(ACKNOWLEDGEMENT_LABEL.toString());
228+
softly.assertThat(ack.getEntity()).contains(acknowledgementPayload);
229+
softly.assertThat(ack.getEntityId().toString()).isEqualTo(thingId.toString());
230+
softly.assertThat(ack.getStatusCode()).isEqualTo(acknowledgementStatusCode);
231+
softly.assertAll();
232+
return true;
233+
}));
234+
}
235+
194236
@Test
195237
public void monitorLiveResponseSendSuccessKeepOriginalResponse() {
196238
final var issuedAckLabel = DittoAcknowledgementLabel.LIVE_RESPONSE;
@@ -215,23 +257,16 @@ public void monitorLiveResponseSendFailureKeepOriginalResponse() {
215257
final var issuedAckLabel = DittoAcknowledgementLabel.LIVE_RESPONSE;
216258
Mockito.when(autoAckTarget.getIssuedAcknowledgementLabel()).thenReturn(Optional.of(issuedAckLabel));
217259
final CommandResponse<?> commandResponse = Mockito.mock(CommandResponse.class);
218-
final var commandResponsePayload = JsonObject.newBuilder().set("foo", "bar").build();
219260
final var commandResponseStatusCode = HttpStatusCode.CONFLICT;
220261
Mockito.when(commandResponse.getStatusCode()).thenReturn(commandResponseStatusCode);
221-
Mockito.when(commandResponse.toJson()).thenReturn(commandResponsePayload);
222-
final var expectedException = MessageSendingFailedException.newBuilder()
223-
.statusCode(commandResponseStatusCode)
224-
.message("Received negative acknowledgement for label <" + issuedAckLabel + ">.")
225-
.description("Payload: " + commandResponsePayload)
226-
.build();
227262
final Sending underTest =
228263
new Sending(sendingContext, CompletableFuture.completedStage(commandResponse), connectionIdResolver,
229264
logger);
230265

231266
final Optional<CompletionStage<CommandResponse>> result = underTest.monitorAndAcknowledge(exceptionConverter);
232267

233-
Mockito.verifyNoInteractions(publishedMonitor);
234-
Mockito.verify(acknowledgedMonitor).failure(externalMessage, expectedException);
268+
Mockito.verify(publishedMonitor).success(externalMessage);
269+
Mockito.verify(acknowledgedMonitor).success(externalMessage);
235270
assertThat(result)
236271
.hasValueSatisfying(resultFuture -> assertThat(resultFuture).isCompletedWithValue(commandResponse));
237272
}
@@ -271,4 +306,4 @@ public void monitorAndAcknowledgeWhenFutureResponseTerminatedExceptionallyAndNoA
271306
assertThat(result).hasValueSatisfying(resultFuture -> assertThat(resultFuture).isCompletedWithValue(null));
272307
}
273308

274-
}
309+
}

services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/httppush/HttpPublisherActorTest.java

+57
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,63 @@ public void testMessageCommandHttpPushCreatesCommandResponse() {
270270
}};
271271
}
272272

273+
@Test
274+
public void testMessageCommandHttpPushWithNonLiveResponseIssuedAcknowledgement() {
275+
new TestKit(actorSystem) {{
276+
final String contentType = "application/json";
277+
final StatusCode statusCode = StatusCodes.IM_A_TEAPOT;
278+
final JsonValue jsonResponse = JsonFactory.readFrom("{ \"foo\": true }");
279+
httpPushFactory = mockHttpPushFactory(contentType, statusCode, jsonResponse.toString());
280+
final AcknowledgementLabel autoAckLabel = AcknowledgementLabel.of("foo:bar");
281+
final Target target = ConnectivityModelFactory.newTargetBuilder()
282+
.address(getOutboundAddress())
283+
.originalAddress(getOutboundAddress())
284+
.authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT)
285+
.headerMapping(TestConstants.HEADER_MAPPING)
286+
.issuedAcknowledgementLabel(autoAckLabel)
287+
.topics(Topic.LIVE_MESSAGES)
288+
.build();
289+
290+
final Props props = getPublisherActorProps();
291+
final ActorRef publisherActor = childActorOf(props);
292+
publisherCreated(this, publisherActor);
293+
294+
final MessageDirection messageDirection = MessageDirection.FROM;
295+
final String messageSubject = "please-respond";
296+
final Message<?> message = Message.newBuilder(
297+
MessageHeaders.newBuilder(messageDirection, TestConstants.Things.THING_ID, messageSubject)
298+
.build()
299+
).build();
300+
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
301+
.correlationId(TestConstants.CORRELATION_ID)
302+
.putHeader("device_id", "ditto:thing")
303+
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.LIVE_RESPONSE),
304+
AcknowledgementRequest.of(autoAckLabel))
305+
.build();
306+
final Signal<?> source = SendThingMessage.of(TestConstants.Things.THING_ID, message, dittoHeaders);
307+
final OutboundSignal outboundSignal =
308+
OutboundSignalFactory.newOutboundSignal(source, Collections.singletonList(target));
309+
final ExternalMessage externalMessage =
310+
ExternalMessageFactory.newExternalMessageBuilder(Collections.emptyMap())
311+
.withText("payload")
312+
.build();
313+
final Adaptable adaptable = DittoProtocolAdapter.newInstance().toAdaptable(source);
314+
final OutboundSignal.Mapped mapped =
315+
OutboundSignalFactory.newMappedOutboundSignal(outboundSignal, adaptable, externalMessage);
316+
317+
publisherActor.tell(
318+
OutboundSignalFactory.newMultiMappedOutboundSignal(Collections.singletonList(mapped), getRef()),
319+
getRef());
320+
321+
final Acknowledgements acknowledgements = expectMsgClass(Acknowledgements.class);
322+
assertThat(acknowledgements).hasSize(1);
323+
final Acknowledgement acknowledgement = acknowledgements.getAcknowledgement(autoAckLabel).get();
324+
assertThat(acknowledgement).isNotNull();
325+
assertThat(acknowledgement.getStatusCode().toInt()).isEqualTo(statusCode.intValue());
326+
assertThat(acknowledgement.getEntityId().toString()).hasToString(TestConstants.Things.THING_ID.toString());
327+
}};
328+
}
329+
273330
@Test
274331
public void testMessageCommandHttpPushCreatesCommandResponseFromProtocolMessage() {
275332
new TestKit(actorSystem) {{

0 commit comments

Comments
 (0)