Skip to content

Commit 4a4609a

Browse files
committed
#2115: fix exceptions during outbound mapping not being recorded via "metrics"
1 parent ca3baa5 commit 4a4609a

File tree

7 files changed

+40
-6
lines changed

7 files changed

+40
-6
lines changed

connectivity/model/src/main/java/org/eclipse/ditto/connectivity/model/MetricType.java

+5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public enum MetricType {
4343
*/
4444
MAPPED("mapped", MetricDirection.INBOUND, MetricDirection.OUTBOUND),
4545

46+
/**
47+
* Counts others/uncategorized for messages.
48+
*/
49+
OTHER("other", MetricDirection.INBOUND, MetricDirection.OUTBOUND),
50+
4651
/**
4752
* Counts messages that were dropped (not published by intention e.g. because no reply-to address was given).
4853
*/

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotEmpty;
1616
import static org.eclipse.ditto.connectivity.model.MetricType.DROPPED;
1717
import static org.eclipse.ditto.connectivity.model.MetricType.MAPPED;
18+
import static org.eclipse.ditto.connectivity.model.MetricType.OTHER;
1819

1920
import java.io.PrintWriter;
2021
import java.io.StringWriter;
@@ -157,6 +158,7 @@ public final class OutboundMappingProcessorActor
157158
private final ConnectionMonitor responseDispatchedMonitor;
158159
private final ConnectionMonitor responseDroppedMonitor;
159160
private final ConnectionMonitor responseMappedMonitor;
161+
private final ConnectionMonitor responseOtherMonitor;
160162
private final SignalEnrichmentFacade signalEnrichmentFacade;
161163
private final int processorPoolSize;
162164
private final DittoRuntimeExceptionToErrorResponseFunction toErrorResponseFunction;
@@ -187,6 +189,7 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
187189
responseDispatchedMonitor = connectionMonitorRegistry.forResponseDispatched(this.connection);
188190
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(this.connection);
189191
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(this.connection);
192+
responseOtherMonitor = connectionMonitorRegistry.forResponseOther(this.connection);
190193
signalEnrichmentFacade = ConnectivitySignalEnrichmentProvider.get(system, dittoExtensionConfig).getFacade(this.connection.getId());
191194
this.processorPoolSize = determinePoolSize(processorPoolSize, mappingConfig.getMaxPoolSize());
192195
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(DittoHeadersValidator.get(system, dittoExtensionConfig));
@@ -661,15 +664,13 @@ private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
661664
})
662665
.onError((mapperId, exception, topicPath, unused) -> {
663666
if (exception instanceof DittoRuntimeException e) {
664-
monitorsForOther.forEach(monitor ->
665-
monitor.getLogger().failure(infoProvider, e));
667+
monitorsForOther.forEach(monitor -> monitor.failure(infoProvider, e));
666668
logger.withCorrelationId(e)
667669
.info("Got DittoRuntimeException during processing Signal: {} - {}",
668670
e.getMessage(),
669671
e.getDescription().orElse(""));
670672
} else {
671-
monitorsForOther.forEach(monitor ->
672-
monitor.getLogger().exception(infoProvider, exception));
673+
monitorsForOther.forEach(monitor -> monitor.exception(infoProvider, exception));
673674
logger.withCorrelationId(outbound.getSource())
674675
.warning("Got unexpected exception during processing Signal <{}>.",
675676
exception.getMessage());
@@ -696,7 +697,7 @@ private Set<ConnectionMonitor> getMonitorsForMappedSignal(final OutboundSignal o
696697

697698
private Set<ConnectionMonitor> getMonitorsForOther(final OutboundSignal outbound) {
698699

699-
return getMonitorsForOutboundSignal(outbound, MAPPED, LogType.OTHER, responseMappedMonitor);
700+
return getMonitorsForOutboundSignal(outbound, OTHER, LogType.OTHER, responseOtherMonitor);
700701
}
701702

702703
private Set<ConnectionMonitor> getMonitorsForOutboundSignal(final OutboundSignal outbound,

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java

+9
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public interface ConnectionMonitorRegistry<T> {
170170
*/
171171
T forResponseMapped(Connection connection);
172172

173+
/**
174+
* Gets counter for {@link org.eclipse.ditto.connectivity.model.MetricDirection#OUTBOUND}/{@link
175+
* org.eclipse.ditto.connectivity.model.MetricType#OTHER} messages for responses.
176+
*
177+
* @param connection connection
178+
* @return the response mapped counter
179+
*/
180+
T forResponseOther(Connection connection);
181+
173182
/**
174183
* Gets counter for {@link org.eclipse.ditto.connectivity.model.MetricDirection#OUTBOUND}/{@link
175184
* org.eclipse.ditto.connectivity.model.MetricType#PUBLISHED} messages for responses.

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/DefaultConnectionMonitorRegistry.java

+8
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,14 @@ public ConnectionMonitor forResponseMapped(final Connection connection) {
183183
.build();
184184
}
185185

186+
@Override
187+
public ConnectionMonitor forResponseOther(final Connection connection) {
188+
return DefaultConnectionMonitor.builder(
189+
connectionCounterRegistry.forResponseOther(connection),
190+
connectionLoggerRegistry.forResponseOther(connection))
191+
.build();
192+
}
193+
186194
@Override
187195
public ConnectionMonitor forResponsePublished(final Connection connection) {
188196
return DefaultConnectionMonitor.builder(

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java

+5
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,11 @@ public ConnectionLogger forResponseMapped(final Connection connection) {
464464
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.MAPPED, RESPONSES_ADDRESS);
465465
}
466466

467+
@Override
468+
public ConnectionLogger forResponseOther(final Connection connection) {
469+
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.OTHER, RESPONSES_ADDRESS);
470+
}
471+
467472
@Override
468473
public ConnectionLogger forResponsePublished(final Connection connection) {
469474
return getLogger(connection.getId(), LogCategory.RESPONSE, LogType.PUBLISHED, RESPONSES_ADDRESS);

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectivityCounterRegistry.java

+5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ public ConnectionMetricsCounter forResponseMapped(final Connection connection) {
246246
return getCounter(connection, MetricType.MAPPED, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);
247247
}
248248

249+
@Override
250+
public ConnectionMetricsCounter forResponseOther(final Connection connection) {
251+
return getCounter(connection, MetricType.OTHER, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);
252+
}
253+
249254
@Override
250255
public ConnectionMetricsCounter forResponsePublished(final Connection connection) {
251256
return getCounter(connection, MetricType.PUBLISHED, MetricDirection.OUTBOUND, RESPONSES_ADDRESS);

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectivityCounterRegistryTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.eclipse.ditto.connectivity.model.MetricType.ENFORCED;
2424
import static org.eclipse.ditto.connectivity.model.MetricType.FILTERED;
2525
import static org.eclipse.ditto.connectivity.model.MetricType.MAPPED;
26+
import static org.eclipse.ditto.connectivity.model.MetricType.OTHER;
2627
import static org.eclipse.ditto.connectivity.model.MetricType.PUBLISHED;
2728
import static org.eclipse.ditto.connectivity.model.MetricType.THROTTLED;
2829
import static org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MeasurementWindow.ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION;
@@ -208,7 +209,7 @@ public void testThrottlingCounterRegistered() {
208209

209210
final SourceMetrics sourceMetrics = counterRegistry.aggregateSourceMetrics(connection.getId());
210211

211-
final MetricType[] expectedTypes = {CONSUMED, MAPPED, DROPPED, ACKNOWLEDGED, ENFORCED, THROTTLED};
212+
final MetricType[] expectedTypes = {OTHER, CONSUMED, MAPPED, DROPPED, ACKNOWLEDGED, ENFORCED, THROTTLED};
212213
connection.getSources()
213214
.stream()
214215
.flatMap(source -> source.getAddresses().stream())

0 commit comments

Comments
 (0)