Skip to content

Commit 0f581ff

Browse files
Stefan Mautethjaeckle
Stefan Maute
authored andcommitted
align sourceStatus presentation for status "unknown/failure/misconfiguration" with status open for mqtt connection;
don't split up source addresses for mqtt sources; Signed-off-by: Stefan Maute <[email protected]>
1 parent 11e2c2b commit 0f581ff

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.function.Predicate;
4343
import java.util.regex.Pattern;
4444
import java.util.stream.Collectors;
45+
import java.util.stream.Stream;
4546
import java.util.stream.StreamSupport;
4647

4748
import javax.annotation.Nullable;
@@ -1426,9 +1427,7 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
14261427
.info("Responding early with static 'CLOSED' ResourceStatus for all sub-sources and " +
14271428
"-targets and SSH tunnel, because some children could not be started, due to a " +
14281429
"live status <{}> in the client actor.", clientConnectionStatus);
1429-
connection.getSources().stream()
1430-
.map(Source::getAddresses)
1431-
.flatMap(Collection::stream)
1430+
getSourceAddresses()
14321431
.map(sourceAddress -> ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
14331432
ConnectivityStatus.CLOSED,
14341433
sourceAddress,
@@ -1469,7 +1468,7 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
14691468
}
14701469

14711470
/**
1472-
* Determines the number of consumers.
1471+
* Determine the number of consumers.
14731472
*
14741473
* @return the number of consumers.
14751474
*/
@@ -1480,6 +1479,17 @@ protected int determineNumberOfConsumers() {
14801479
.sum();
14811480
}
14821481

1482+
/**
1483+
* Get the source addresses as stream of strings.
1484+
*
1485+
* @return the stream of source addresses.
1486+
*/
1487+
protected Stream<String> getSourceAddresses() {
1488+
return connection.getSources().stream()
1489+
.map(Source::getAddresses)
1490+
.flatMap(Collection::stream);
1491+
}
1492+
14831493
private void retrieveAddressStatusFromChildren(final RetrieveConnectionStatus command, final ActorRef sender,
14841494
final List<ActorRef> childrenToAsk) {
14851495
childrenToAsk.forEach(child -> {

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java

+14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.function.Consumer;
27+
import java.util.stream.Stream;
2728

2829
import javax.annotation.Nullable;
2930

@@ -591,6 +592,9 @@ private String distinguishClientIdIfNecessary(final String configuredClientId) {
591592
}
592593
}
593594

595+
/*
596+
* For MQTT connections only one Consumer Actor for all addresses is started.
597+
*/
594598
@Override
595599
protected int determineNumberOfConsumers() {
596600
return connection.getSources()
@@ -599,6 +603,16 @@ protected int determineNumberOfConsumers() {
599603
.sum();
600604
}
601605

606+
/*
607+
* For MQTT connections only one Consumer Actor for all addresses is started.
608+
*/
609+
@Override
610+
protected Stream<String> getSourceAddresses() {
611+
return connection.getSources().stream()
612+
.map(Source::getAddresses)
613+
.map(sourceAddresses -> String.join(";", sourceAddresses));
614+
}
615+
602616
static class MqttClientConnected extends AbstractWithOrigin implements ClientConnected {
603617

604618
MqttClientConnected(@Nullable final ActorRef origin) {

0 commit comments

Comments
 (0)