Skip to content

Commit 11e2c2b

Browse files
Stefan Mautethjaeckle
Stefan Maute
authored andcommittedOct 26, 2021
fixed a bug where an additional source status is shown when a mqtt source has two or more addresses;
add method to determine the number of consumers in BaseClientActor and RetrieveConnectionStatusAggregatorActor because number of consumers is calculated differently for mqtt sources; Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
1 parent d8d2f19 commit 11e2c2b

File tree

6 files changed

+48
-14
lines changed

6 files changed

+48
-14
lines changed
 

‎connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -1407,11 +1407,9 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
14071407
" Forwarding to consumers and publishers.", command.getEntityId(),
14081408
sender);
14091409

1410+
// only one PublisherActor is started for all targets (if targets are present)
14101411
final int numberOfProducers = connection.getTargets().isEmpty() ? 0 : 1;
1411-
final int numberOfConsumers = connection.getSources()
1412-
.stream()
1413-
.mapToInt(source -> source.getConsumerCount() * source.getAddresses().size())
1414-
.sum();
1412+
final int numberOfConsumers = determineNumberOfConsumers();
14151413
int expectedNumberOfChildren = numberOfProducers + numberOfConsumers;
14161414
if (getSshTunnelState().isEnabled()) {
14171415
expectedNumberOfChildren++;
@@ -1470,6 +1468,18 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
14701468
return stay();
14711469
}
14721470

1471+
/**
1472+
* Determines the number of consumers.
1473+
*
1474+
* @return the number of consumers.
1475+
*/
1476+
protected int determineNumberOfConsumers() {
1477+
return connection.getSources()
1478+
.stream()
1479+
.mapToInt(source -> source.getConsumerCount() * source.getAddresses().size())
1480+
.sum();
1481+
}
1482+
14731483
private void retrieveAddressStatusFromChildren(final RetrieveConnectionStatus command, final ActorRef sender,
14741484
final List<ActorRef> childrenToAsk) {
14751485
childrenToAsk.forEach(child -> {

‎connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/RetrieveConnectionStatusAggregatorActor.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.eclipse.ditto.base.model.headers.DittoHeaders;
2626
import org.eclipse.ditto.connectivity.model.Connection;
27+
import org.eclipse.ditto.connectivity.model.ConnectionType;
2728
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
2829
import org.eclipse.ditto.connectivity.model.ResourceStatus;
2930
import org.eclipse.ditto.connectivity.model.SshTunnel;
@@ -75,19 +76,16 @@ private RetrieveConnectionStatusAggregatorActor(final Connection connection,
7576
configuredClientCount = connection.getClientCount();
7677
// one response per client actor
7778
expectedResponses.put(ResourceStatus.ResourceType.CLIENT, configuredClientCount);
79+
7880
if (ConnectivityStatus.OPEN.equals(connection.getConnectionStatus())) {
7981
// one response per source/target
8082
expectedResponses.put(ResourceStatus.ResourceType.TARGET,
8183
connection.getTargets()
8284
.stream()
8385
.mapToInt(target -> configuredClientCount)
8486
.sum());
85-
expectedResponses.put(ResourceStatus.ResourceType.SOURCE,
86-
connection.getSources()
87-
.stream()
88-
.mapToInt(source -> configuredClientCount * source.getConsumerCount() *
89-
source.getAddresses().size())
90-
.sum());
87+
expectedResponses.put(ResourceStatus.ResourceType.SOURCE, determineSourceCount(connection));
88+
9189
if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
9290
expectedResponses.put(ResourceStatus.ResourceType.SSH_TUNNEL, configuredClientCount);
9391
}
@@ -240,4 +238,23 @@ private static ConnectivityStatus calculateOverallLiveStatus(final ConnectivityS
240238
private void stopSelf() {
241239
getContext().stop(getSelf());
242240
}
241+
242+
private int determineSourceCount(final Connection connection) {
243+
final int sourceCount;
244+
if(connection.getConnectionType().equals(ConnectionType.MQTT)) {
245+
// for mqtt only one consumer actor for all addresses of a source is started.
246+
sourceCount = connection.getSources()
247+
.stream()
248+
.mapToInt(source -> configuredClientCount * source.getConsumerCount())
249+
.sum();
250+
} else {
251+
sourceCount = connection.getSources()
252+
.stream()
253+
.mapToInt(source -> configuredClientCount * source.getConsumerCount() * source.getAddresses().size())
254+
.sum();
255+
}
256+
257+
return sourceCount;
258+
}
259+
243260
}

‎connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java

+7
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,13 @@ private String distinguishClientIdIfNecessary(final String configuredClientId) {
591591
}
592592
}
593593

594+
@Override
595+
protected int determineNumberOfConsumers() {
596+
return connection.getSources()
597+
.stream()
598+
.mapToInt(Source::getConsumerCount)
599+
.sum();
600+
}
594601

595602
static class MqttClientConnected extends AbstractWithOrigin implements ClientConnected {
596603

‎connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/HiveMqtt3ClientActor.java

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ ActorRef startPublisherActor(final Connection connection, final Mqtt3AsyncClient
124124
final Props publisherActorProps =
125125
HiveMqtt3PublisherActor.props(connection, client, isDryRun(), getDefaultClientId(),
126126
connectivityStatusResolver);
127+
127128
return startChildActorConflictFree(HiveMqtt3PublisherActor.NAME, publisherActorProps);
128129
}
129130

‎connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,8 @@ protected void processPingCommand(final PingCommand ping) {
470470
}
471471

472472
private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence correlationId) {
473-
final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId, DittoHeaders.newBuilder()
474-
.correlationId(correlationId)
475-
.build());
473+
final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId,
474+
DittoHeaders.newBuilder().correlationId(correlationId).build());
476475
Patterns.ask(getSelf(), retrieveConnectionStatus, SELF_RETRIEVE_CONNECTION_STATUS_TIMEOUT)
477476
.whenComplete((response, throwable) -> {
478477
if (response instanceof RetrieveConnectionStatusResponse) {

‎connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ public void sendsConnectionOpenedAnnouncementAfterReconnect() {
499499

500500
andConnectionFails(dummyClientActor, getRef());
501501
// not expecting a closed announcement after connection failure, since it's not possible to send a message
502-
// if a connecting is failed and thus not connected
502+
// if connecting is failed and thus not connected
503503

504504
andConnectionSuccessful(dummyClientActor, getRef());
505505

0 commit comments

Comments
 (0)
Please sign in to comment.