Skip to content

Commit 5b6e8d0

Browse files
authored
Merge pull request #854 from bosch-io/feature/soft-failure-revocation-check
Introduce SilentlyFailingRevocationChecker
2 parents e32e981 + 0d8dfe6 commit 5b6e8d0

38 files changed

+447
-225
lines changed

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BasePublisherActor.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,15 @@
5656
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
5757
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectivityConfig;
5858
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
59+
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringConfig;
60+
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringLoggerConfig;
5961
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
6062
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
6163
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressStatus;
6264
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
6365
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitorRegistry;
6466
import org.eclipse.ditto.services.connectivity.messaging.monitoring.DefaultConnectionMonitorRegistry;
67+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
6568
import org.eclipse.ditto.services.connectivity.util.ConnectivityMdcEntryKey;
6669
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
6770
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
@@ -94,6 +97,7 @@ public abstract class BasePublisherActor<T extends PublishTarget> extends Abstra
9497

9598
protected final ConnectivityConfig connectivityConfig;
9699
protected final ConnectionConfig connectionConfig;
100+
protected final ConnectionLogger connectionLogger;
97101

98102
/**
99103
* Common logger for all sub-classes of BasePublisherActor as its MDC already contains the connection ID.
@@ -119,14 +123,16 @@ protected BasePublisherActor(final Connection connection) {
119123

120124
connectivityConfig = getConnectivityConfig();
121125
connectionConfig = connectivityConfig.getConnectionConfig();
122-
connectionMonitorRegistry =
123-
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig());
126+
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
127+
final MonitoringLoggerConfig loggerConfig = monitoringConfig.logger();
128+
this.connectionLogger = ConnectionLogger.getInstance(connection.getId(), loggerConfig);
129+
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
124130
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(connection.getId());
125131
responsePublishedMonitor = connectionMonitorRegistry.forResponsePublished(connection.getId());
126132
responseAcknowledgedMonitor = connectionMonitorRegistry.forResponseAcknowledged(connection.getId());
127133
replyTargets = connection.getSources().stream().map(Source::getReplyTarget).collect(Collectors.toList());
128134
acknowledgementSizeBudget = connectionConfig.getAcknowledgementConfig().getIssuedMaxBytes();
129-
logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
135+
this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
130136
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());
131137

132138
connectionIdResolver = PlaceholderFactory.newExpressionResolver(PlaceholderFactory.newConnectionIdPlaceholder(),

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ protected void allocateResourcesOnConnection(final ClientConnected clientConnect
274274
jmsConnection.addConnectionListener(connectionListener);
275275
jmsSession = c.session;
276276
} else {
277-
logger.info("ClientConnected was not JmsConnected as expected, ignoring as this probably was a reconnection");
277+
logger.info(
278+
"ClientConnected was not JmsConnected as expected, ignoring as this probably was a reconnection");
278279
}
279280
}
280281

@@ -419,7 +420,8 @@ private ActorRef startConnectionHandlingActor(final String suffix, final Connect
419420
final String namePrefix =
420421
JMSConnectionHandlingActor.ACTOR_NAME_PREFIX + escapeActorName(connectionId() + "-" + suffix);
421422
final Props props =
422-
JMSConnectionHandlingActor.propsWithOwnDispatcher(connection, this, jmsConnectionFactory);
423+
JMSConnectionHandlingActor.propsWithOwnDispatcher(connection, this, jmsConnectionFactory,
424+
connectionLogger);
423425
return startChildActorConflictFree(namePrefix, props);
424426
}
425427

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpPublisherActor.java

-13
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,8 @@
5252
import org.eclipse.ditto.services.connectivity.messaging.backoff.BackOffActor;
5353
import org.eclipse.ditto.services.connectivity.messaging.config.Amqp10Config;
5454
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
55-
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringConfig;
56-
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringLoggerConfig;
5755
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
5856
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
59-
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
60-
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLoggerRegistry;
6157
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
6258
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
6359
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
@@ -99,7 +95,6 @@ public final class AmqpPublisherActor extends BasePublisherActor<AmqpTarget> {
9995
private final Map<Destination, MessageProducer> staticTargets;
10096
private final int producerCacheSize;
10197
private final ActorRef backOffActor;
102-
private final ConnectionLogger connectionLogger;
10398
private final SourceQueueWithComplete<Pair<ExternalMessage, AmqpMessageContext>> sourceQueue;
10499
private final KillSwitch killSwitch;
105100

@@ -132,7 +127,6 @@ private AmqpPublisherActor(final Connection connection, final Session session,
132127
producerCacheSize = checkArgument(config.getProducerCacheSize(), i -> i > 0,
133128
() -> "producer-cache-size must be 1 or more");
134129

135-
connectionLogger = getConnectionLogger(connection);
136130
backOffActor = getContext().actorOf(BackOffActor.props(config.getBackOffConfig()));
137131
isInBackOffMode = false;
138132
}
@@ -154,13 +148,6 @@ private static CompletableFuture<Object> triggerPublishAsync(
154148
.thenApply(aVoid -> Done.done());
155149
}
156150

157-
private ConnectionLogger getConnectionLogger(final Connection connection) {
158-
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
159-
final MonitoringLoggerConfig loggerConfig = monitoringConfig.logger();
160-
final ConnectionLoggerRegistry connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(loggerConfig);
161-
return connectionLoggerRegistry.forConnection(connection.getId());
162-
}
163-
164151
/**
165152
* Creates Akka configuration object {@link Props} for this {@code AmqpPublisherActor}.
166153
*

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/ConnectionBasedJmsConnectionFactory.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.eclipse.ditto.model.connectivity.Connection;
4040
import org.eclipse.ditto.services.connectivity.messaging.config.Amqp10Config;
4141
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;
42+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
4243
import org.slf4j.LoggerFactory;
4344

4445
/**
@@ -70,8 +71,9 @@ public static ConnectionBasedJmsConnectionFactory getInstance(final Amqp10Config
7071
}
7172

7273
@Override
73-
public JmsConnection createConnection(final Connection connection, final ExceptionListener exceptionListener)
74-
throws JMSException, NamingException {
74+
public JmsConnection createConnection(final Connection connection, final ExceptionListener exceptionListener,
75+
final ConnectionLogger connectionLogger) throws JMSException, NamingException {
76+
7577
checkNotNull(connection, "Connection");
7678
checkNotNull(exceptionListener, "Exception Listener");
7779

@@ -80,7 +82,8 @@ public JmsConnection createConnection(final Connection connection, final Excepti
8082
(org.apache.qpid.jms.JmsConnectionFactory) ctx.lookup(connection.getId().toString());
8183

8284
if (isSecuredConnection(connection) && connection.isValidateCertificates()) {
83-
cf.setSslContext(SSLContextCreator.fromConnection(connection, null).withoutClientCertificate());
85+
cf.setSslContext(SSLContextCreator.fromConnection(connection, null, connectionLogger)
86+
.withoutClientCertificate());
8487
}
8588

8689
@SuppressWarnings("squid:S2095") final JmsConnection jmsConnection = (JmsConnection) cf.createConnection();

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/JMSConnectionHandlingActor.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
3939
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
4040
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
41+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
4142
import org.eclipse.ditto.services.utils.akka.LogUtil;
4243
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
4344
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
@@ -87,16 +88,18 @@ public final class JMSConnectionHandlingActor extends AbstractActor {
8788
private final Connection connection;
8889
private final ExceptionListener exceptionListener;
8990
private final JmsConnectionFactory jmsConnectionFactory;
91+
private final ConnectionLogger connectionLogger;
9092

9193
@Nullable private Session currentSession = null;
9294

9395
@SuppressWarnings("unused")
9496
private JMSConnectionHandlingActor(final Connection connection, final ExceptionListener exceptionListener,
95-
final JmsConnectionFactory jmsConnectionFactory) {
97+
final JmsConnectionFactory jmsConnectionFactory, final ConnectionLogger connectionLogger) {
9698

9799
this.connection = checkNotNull(connection, "connection");
98100
this.exceptionListener = exceptionListener;
99101
this.jmsConnectionFactory = jmsConnectionFactory;
102+
this.connectionLogger = connectionLogger;
100103
}
101104

102105
/**
@@ -105,17 +108,19 @@ private JMSConnectionHandlingActor(final Connection connection, final ExceptionL
105108
* @param connection the connection
106109
* @param exceptionListener the exception listener
107110
* @param jmsConnectionFactory the jms connection factory
111+
* @param connectionLogger used to log failures during certificate validation.
108112
* @return the Akka configuration Props object.
109113
*/
110114
static Props props(final Connection connection, final ExceptionListener exceptionListener,
111-
final JmsConnectionFactory jmsConnectionFactory) {
115+
final JmsConnectionFactory jmsConnectionFactory, final ConnectionLogger connectionLogger) {
112116

113-
return Props.create(JMSConnectionHandlingActor.class, connection, exceptionListener, jmsConnectionFactory);
117+
return Props.create(JMSConnectionHandlingActor.class, connection, exceptionListener, jmsConnectionFactory,
118+
connectionLogger);
114119
}
115120

116121
static Props propsWithOwnDispatcher(final Connection connection, final ExceptionListener exceptionListener,
117-
final JmsConnectionFactory jmsConnectionFactory) {
118-
return props(connection, exceptionListener, jmsConnectionFactory)
122+
final JmsConnectionFactory jmsConnectionFactory, final ConnectionLogger connectionLogger) {
123+
return props(connection, exceptionListener, jmsConnectionFactory, connectionLogger)
119124
.withDispatcher(DISPATCHER_NAME);
120125
}
121126

@@ -362,7 +367,7 @@ private JmsConnection createJmsConnection() {
362367
ConnectionBasedJmsConnectionFactory
363368
.buildAmqpConnectionUriFromConnection(connection, amqp10Config));
364369
}
365-
return jmsConnectionFactory.createConnection(connection, exceptionListener);
370+
return jmsConnectionFactory.createConnection(connection, exceptionListener, connectionLogger);
366371
});
367372
}
368373

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/JmsConnectionFactory.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.qpid.jms.JmsConnection;
2020
import org.eclipse.ditto.model.connectivity.Connection;
21+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
2122

2223
/**
2324
* Creates a new {@link javax.jms.Connection}.
@@ -29,11 +30,12 @@ public interface JmsConnectionFactory {
2930
*
3031
* @param connection the connection to use for the returned JMS Connection.
3132
* @param exceptionListener the ExceptionListener to configure for the returned JMS Connection.
33+
* @param connectionLogger used to log failures during certificate validation.
3234
* @return the JMS Connection.
3335
* @throws javax.jms.JMSException if the context could not be created.
3436
* @throws javax.naming.NamingException if the identifier of {@code connection} could not be found in the Context.
3537
*/
36-
JmsConnection createConnection(Connection connection, ExceptionListener exceptionListener)
37-
throws JMSException, NamingException;
38+
JmsConnection createConnection(Connection connection, ExceptionListener exceptionListener,
39+
ConnectionLogger connectionLogger) throws JMSException, NamingException;
3840

3941
}

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/httppush/DefaultHttpPushFactory.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.eclipse.ditto.model.connectivity.ConnectionId;
2525
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;
2626
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;
27+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
2728

2829
import akka.actor.ActorSystem;
2930
import akka.event.LoggingAdapter;
@@ -74,15 +75,16 @@ private DefaultHttpPushFactory(final ConnectionId connectionId, final Uri baseUr
7475
this.httpsConnectionContext = httpsConnectionContext;
7576
}
7677

77-
static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig) {
78+
static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig,
79+
final ConnectionLogger connectionLogger) {
7880
final ConnectionId connectionId = connection.getId();
7981
final Uri baseUri = Uri.create(connection.getUri());
8082
final int parallelism = parseParallelism(connection.getSpecificConfig());
8183

8284
final HttpsConnectionContext httpsConnectionContext;
8385
if (HttpPushValidator.isSecureScheme(baseUri.getScheme())) {
8486
final SSLContextCreator sslContextCreator =
85-
SSLContextCreator.fromConnection(connection, DittoHeaders.empty());
87+
SSLContextCreator.fromConnection(connection, DittoHeaders.empty(), connectionLogger);
8688
final SSLContext sslContext = connection.getCredentials()
8789
.map(credentials -> credentials.accept(sslContextCreator))
8890
.orElse(sslContextCreator.withoutClientCertificate());

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/httppush/HttpPublisherActor.java

-13
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@
5050
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
5151
import org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor;
5252
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;
53-
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringConfig;
54-
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringLoggerConfig;
5553
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
5654
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
57-
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
58-
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLoggerRegistry;
5955
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
6056
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
6157
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
@@ -115,7 +111,6 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {
115111

116112
private final HttpPushFactory factory;
117113

118-
private final ConnectionLogger connectionLogger;
119114
private final Materializer materializer;
120115
private final SourceQueue<Pair<HttpRequest, HttpPushContext>> sourceQueue;
121116
private final KillSwitch killSwitch;
@@ -127,7 +122,6 @@ private HttpPublisherActor(final Connection connection, final HttpPushFactory fa
127122

128123
final HttpPushConfig config = connectionConfig.getHttpPushConfig();
129124

130-
connectionLogger = getConnectionLogger(connection);
131125
materializer = Materializer.createMaterializer(this::getContext);
132126
final Pair<Pair<SourceQueueWithComplete<Pair<HttpRequest, HttpPushContext>>, UniqueKillSwitch>,
133127
CompletionStage<Done>> materialized =
@@ -149,13 +143,6 @@ static Props props(final Connection connection, final HttpPushFactory factory) {
149143
return Props.create(HttpPublisherActor.class, connection, factory);
150144
}
151145

152-
private ConnectionLogger getConnectionLogger(final Connection connection) {
153-
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
154-
final MonitoringLoggerConfig loggerConfig = monitoringConfig.logger();
155-
final ConnectionLoggerRegistry connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(loggerConfig);
156-
return connectionLoggerRegistry.forConnection(connection.getId());
157-
}
158-
159146
@Override
160147
public void postStop() throws Exception {
161148
killSwitch.shutdown();

services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/httppush/HttpPushClientActor.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import org.eclipse.ditto.services.connectivity.messaging.BaseClientActor;
3636
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
3737
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;
38+
import org.eclipse.ditto.services.connectivity.messaging.config.MonitoringLoggerConfig;
3839
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
3940
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
4041
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;
42+
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
4143
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
4244
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection;
4345

@@ -54,6 +56,7 @@ public final class HttpPushClientActor extends BaseClientActor {
5456
private static final int PROXY_CONNECT_TIMEOUT_SECONDS = 15;
5557

5658
private final HttpPushFactory factory;
59+
private final ConnectionLogger connectionLogger;
5760

5861
@Nullable
5962
private ActorRef httpPublisherActor;
@@ -63,12 +66,15 @@ public final class HttpPushClientActor extends BaseClientActor {
6366
private HttpPushClientActor(final Connection connection, final ActorRef connectionActor) {
6467
super(connection, ActorRef.noSender(), connectionActor);
6568

66-
httpPushConfig = DittoConnectivityConfig.of(
69+
final DittoConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(
6770
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
68-
)
71+
);
72+
httpPushConfig = connectivityConfig
6973
.getConnectionConfig()
7074
.getHttpPushConfig();
71-
factory = HttpPushFactory.of(connection, httpPushConfig);
75+
final MonitoringLoggerConfig loggerConfig = connectivityConfig.getMonitoringConfig().logger();
76+
connectionLogger = ConnectionLogger.getInstance(connection.getId(), loggerConfig);
77+
factory = HttpPushFactory.of(connection, httpPushConfig, connectionLogger);
7278
}
7379

7480
/**
@@ -153,7 +159,7 @@ private CompletionStage<Status.Status> testSSL(final Connection connection, fina
153159
} else {
154160
// check without HTTP proxy
155161
final SSLContextCreator sslContextCreator =
156-
SSLContextCreator.fromConnection(connection, DittoHeaders.empty());
162+
SSLContextCreator.fromConnection(connection, DittoHeaders.empty(), connectionLogger);
157163
final SSLSocketFactory socketFactory = connection.getCredentials()
158164
.map(credentials -> credentials.accept(sslContextCreator))
159165
.orElse(sslContextCreator.withoutClientCertificate()).getSocketFactory();

0 commit comments

Comments
 (0)