Skip to content

Commit 59001cc

Browse files
vvasilevboschthjaeckle
authored andcommitted
* add possibility to configure whether server address should be resolved by ditto in mqtt connection * fix retry timeout strategy unable increase retry count and apply effective backoff Signed-off-by: Vasil Vasilev <[email protected]>
1 parent 30db8fe commit 59001cc

File tree

6 files changed

+67
-11
lines changed

6 files changed

+67
-11
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultMqttConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ final class DefaultMqttConfig implements MqttConfig {
4242
private final int maxQueueSize;
4343
private final int eventLoopThreads;
4444
private final boolean cleanSession;
45+
private final boolean shouldResolveServerAddress;
4546
private final boolean reconnectForRedelivery;
4647
private final Duration reconnectForRedeliveryDelay;
4748
private final SessionExpiryInterval sessionExpiryInterval;
@@ -54,6 +55,7 @@ final class DefaultMqttConfig implements MqttConfig {
5455
private DefaultMqttConfig(final ScopedConfig config) {
5556
eventLoopThreads = config.getNonNegativeIntOrThrow(MqttConfigValue.EVENT_LOOP_THREADS);
5657
cleanSession = config.getBoolean(MqttConfigValue.CLEAN_SESSION.getConfigPath());
58+
shouldResolveServerAddress = config.getBoolean(MqttConfigValue.SHOULD_RESOLVE_SERVER_ADDRESS.getConfigPath());
5759
reconnectForRedelivery = config.getBoolean(MqttConfigValue.RECONNECT_FOR_REDELIVERY.getConfigPath());
5860
reconnectForRedeliveryDelay =
5961
config.getNonNegativeDurationOrThrow(MqttConfigValue.RECONNECT_FOR_REDELIVERY_DELAY);
@@ -108,6 +110,11 @@ public boolean isCleanSession() {
108110
return cleanSession;
109111
}
110112

113+
@Override
114+
public boolean shouldResolveServerAddress() {
115+
return shouldResolveServerAddress;
116+
}
117+
111118
@Override
112119
public boolean shouldReconnectForRedelivery() {
113120
return reconnectForRedelivery;

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/MqttConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public interface MqttConfig {
4444
*/
4545
boolean isCleanSession();
4646

47+
boolean shouldResolveServerAddress();
48+
4749
/**
4850
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
4951
*
@@ -136,6 +138,11 @@ enum MqttConfigValue implements KnownConfigValue {
136138
*/
137139
CLEAN_SESSION("clean-session", false),
138140

141+
/**
142+
* Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand.
143+
*/
144+
SHOULD_RESOLVE_SERVER_ADDRESS("should-resolve-server-address", true),
145+
139146
/**
140147
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
141148
*/

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@
7474
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
7575
import com.typesafe.config.Config;
7676

77-
import scala.concurrent.ExecutionContextExecutor;
78-
7977
import io.reactivex.disposables.Disposable;
78+
import scala.concurrent.ExecutionContextExecutor;
8079

8180
/**
8281
* Actor for handling connection to an MQTT broker for protocol versions 3 or 5.
@@ -89,6 +88,7 @@ public final class MqttClientActor extends BaseClientActor {
8988
private final GenericMqttClientFactory genericMqttClientFactory;
9089
@Nullable private GenericMqttClient genericMqttClient;
9190
private final AtomicBoolean automaticReconnect;
91+
private final RetryTimeoutStrategy retryTimeoutStrategy;
9292
@Nullable private ActorRef publishingActorRef;
9393
private final List<ActorRef> mqttConsumerActorRefs;
9494
@Nullable private Disposable unsolicitedPublishesAutoAckSubscription;
@@ -105,6 +105,8 @@ private MqttClientActor(final Connection connection,
105105
final var connectivityConfig = connectivityConfig();
106106
final var connectionConfig = connectivityConfig.getConnectionConfig();
107107
mqttConfig = connectionConfig.getMqttConfig();
108+
retryTimeoutStrategy = RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy(
109+
mqttConfig.getReconnectBackOffConfig().getTimeoutConfig());
108110

109111
mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig);
110112

@@ -312,7 +314,6 @@ private static String getClientId(final ClientRole clientRole,
312314
private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
313315
return (context, clientRole) -> {
314316
final var mqttClientReconnector = context.getReconnector();
315-
final var retryTimeoutStrategy = getRetryTimeoutStrategy();
316317

317318
if (0 == mqttClientReconnector.getAttempts()) {
318319
retryTimeoutStrategy.reset();
@@ -363,11 +364,6 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
363364
};
364365
}
365366

366-
private RetryTimeoutStrategy getRetryTimeoutStrategy() {
367-
final var reconnectBackOffConfig = mqttConfig.getReconnectBackOffConfig();
368-
return RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy(reconnectBackOffConfig.getTimeoutConfig());
369-
}
370-
371367
private static boolean isMqttClientInConnectingState(final MqttClientConfig mqttClientConfig) {
372368
return MqttClientState.CONNECTING == mqttClientConfig.getState();
373369
}

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactory.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ private static MqttClientBuilder getGenericMqttClientBuilder(
115115
final var mqttConfig = hiveMqttClientProperties.getMqttConfig();
116116

117117
return MqttClient.builder()
118-
.serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties)))
118+
.serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties),
119+
hiveMqttClientProperties.getMqttConfig().shouldResolveServerAddress()))
119120
.executorConfig(getMqttClientExecutorConfig(mqttConfig.getEventLoopThreads()))
120121
.sslConfig(getMqttClientSslConfig(hiveMqttClientProperties).orElse(null))
121122
.addConnectedListener(getConnectedListener(
@@ -134,8 +135,9 @@ private static URI getConnectionUri(final HiveMqttClientProperties hiveMqttClien
134135
return sshTunnelState.getURI(hiveMqttClientProperties.getMqttConnection());
135136
}
136137

137-
private static InetSocketAddress getInetSocketAddress(final URI connectionUri) {
138-
return new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort());
138+
private static InetSocketAddress getInetSocketAddress(final URI connectionUri, final boolean shouldResolveServerAddress) {
139+
return shouldResolveServerAddress ? new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort()) :
140+
InetSocketAddress.createUnresolved(connectionUri.getHost(), connectionUri.getPort());
139141
}
140142

141143
private static MqttClientExecutorConfig getMqttClientExecutorConfig(final int eventLoopThreadNumber) {

connectivity/service/src/main/resources/connectivity.conf

+6
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,12 @@ ditto {
438438
clean-session = false
439439
clean-session = ${?CONNECTIVITY_MQTT_CLEAN_SESSION}
440440

441+
# Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand.
442+
# When true, the address will be resolved before passing it to the used mqtt client.
443+
# When false, the address will be unresolved and will rely on the used mqtt client to resolve it when needed.
444+
should-resolve-server-address = true
445+
should-resolve-server-address = ${?CONNECTIVITY_MQTT_SHOULD_RESOLVE_SERVER_ADDRESS}
446+
441447
# Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
442448
reconnect-for-redelivery = false
443449
reconnect-for-redelivery = ${?CONNECTIVITY_MQTT_RECONNECT_FOR_REDELIVERY}

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public void before() {
8383
Mockito.when(mqttConnection.getPassword()).thenReturn(Optional.of(PASSWORD));
8484

8585
Mockito.when(mqttConfig.getEventLoopThreads()).thenReturn(EVENT_LOOP_THREAD_NUMBER);
86+
Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.TRUE);
8687

8788
final var connectionConfig = Mockito.mock(ConnectionConfig.class);
8889
Mockito.when(connectionConfig.getMqttConfig()).thenReturn(mqttConfig);
@@ -396,4 +397,41 @@ public void getMqtt5ClientWithoutLastWillWithSslReturnsExpected() throws NoMqttC
396397
});
397398
}
398399

400+
@Test
401+
public void getMqttClientWithShouldResolveServerAddressFalseAddressShouldBeUnresolved()
402+
throws NoMqttConnectionException {
403+
Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.FALSE);
404+
final var hiveMqttClientProperties = HiveMqttClientProperties.builder()
405+
.withMqttConnection(mqttConnection)
406+
.withConnectivityConfig(connectivityConfig)
407+
.withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig))
408+
.withSshTunnelStateSupplier(sshTunnelStateSupplier)
409+
.withConnectionLogger(connectionLogger)
410+
.withActorUuid(ACTOR_UUID)
411+
.withClientConnectedListener(mqttClientConnectedListener)
412+
.withClientDisconnectedListener(mqttClientDisconnectedListener)
413+
.build();
414+
415+
final var mqtt3ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties,
416+
MQTT_CLIENT_IDENTIFIER,
417+
ClientRole.CONSUMER_PUBLISHER);
418+
419+
final var mqtt3ClientConfig = mqtt3ClientUnderTest.getConfig();
420+
softly.assertThat(mqtt3ClientConfig.getTransportConfig())
421+
.as("transport config")
422+
.satisfies(transportConfig -> {
423+
softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue();
424+
});
425+
426+
final var mqtt5ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties,
427+
MQTT_CLIENT_IDENTIFIER,
428+
ClientRole.CONSUMER_PUBLISHER);
429+
430+
final var mqtt5ClientConfig = mqtt5ClientUnderTest.getConfig();
431+
softly.assertThat(mqtt5ClientConfig.getTransportConfig())
432+
.as("transport config")
433+
.satisfies(transportConfig -> {
434+
softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue();
435+
});
436+
}
399437
}

0 commit comments

Comments
 (0)