20
20
import java .util .HashMap ;
21
21
import java .util .Map ;
22
22
import java .util .Objects ;
23
+ import java .util .function .Function ;
23
24
import java .util .function .Supplier ;
24
25
25
26
import javax .net .ssl .SSLPeerUnverifiedException ;
41
42
import org .eclipse .hono .service .auth .device .UsernamePasswordCredentials ;
42
43
import org .eclipse .hono .service .auth .device .X509AuthProvider ;
43
44
import org .eclipse .hono .service .limiting .ConnectionLimitManager ;
44
- import org .eclipse .hono .service .plan .ResourceLimitChecks ;
45
45
import org .eclipse .hono .tracing .TracingHelper ;
46
46
import org .eclipse .hono .util .AuthenticationConstants ;
47
47
import org .eclipse .hono .util .Constants ;
69
69
* <p>
70
70
* On successful authentication of the device, a {@link Device} reflecting the device's credentials (as obtained from
71
71
* the Credentials service) is stored in the attachments record of the {@code ProtonConnection} under key
72
- * {@link AmqpAdapterConstants#KEY_CLIENT_DEVICE}. The credentials supplied by the client is verified against the credentials that
73
- * the credentials service has on record for the device.
72
+ * {@link AmqpAdapterConstants#KEY_CLIENT_DEVICE}. The credentials supplied by the client are verified against
73
+ * the credentials that the Credentials service has on record for the device.
74
74
* <p>
75
75
* This factory supports the SASL PLAIN and SASL EXTERNAL mechanisms for authenticating devices.
76
76
*/
77
77
public class AmqpAdapterSaslAuthenticatorFactory implements ProtonSaslAuthenticatorFactory {
78
78
79
79
private final ProtocolAdapterProperties config ;
80
80
private final TenantClientFactory tenantClientFactory ;
81
- private final CredentialsClientFactory credentialsClientFactory ;
82
- private final Tracer tracer ;
83
81
private final Supplier <Span > spanFactory ;
84
- private final ConnectionLimitManager connectionLimitManager ;
85
- private final ResourceLimitChecks resourceLimitChecks ;
82
+ private final ConnectionLimitManager adapterConnectionLimit ;
83
+ private final Function <TenantObject , Future <Void >> tenantConnectionLimit ;
84
+ private final DeviceCertificateValidator certValidator ;
85
+ private final HonoClientBasedAuthProvider <UsernamePasswordCredentials > usernamePasswordAuthProvider ;
86
+ private final HonoClientBasedAuthProvider <SubjectDnCredentials > clientCertAuthProvider ;
86
87
87
88
/**
88
89
* Creates a new SASL authenticator factory for an authentication provider. If the AMQP adapter supports
@@ -95,10 +96,9 @@ public class AmqpAdapterSaslAuthenticatorFactory implements ProtonSaslAuthentica
95
96
* @param tracer The tracer instance.
96
97
* @param spanFactory The factory to use for creating and starting an OpenTracing span to
97
98
* trace the authentication of the device.
98
- * @param connectionLimitManager The connection limit manager to use to monitor the number of connections.
99
- * @param resourceLimitChecks The resource limit checks instance to check if the maximum number of connections are
100
- * exceeded or not.
101
- *
99
+ * @param adapterConnectionLimit The adapter level connection limit to enforce.
100
+ * @param tenantConnectionLimit The tenant level connection limit to enforce. The function must return
101
+ * a succeeded future if the connection limit has not been reached yet.
102
102
* @throws NullPointerException if any of the parameters are null.
103
103
*/
104
104
public AmqpAdapterSaslAuthenticatorFactory (
@@ -107,69 +107,43 @@ public AmqpAdapterSaslAuthenticatorFactory(
107
107
final ProtocolAdapterProperties config ,
108
108
final Tracer tracer ,
109
109
final Supplier <Span > spanFactory ,
110
- final ConnectionLimitManager connectionLimitManager ,
111
- final ResourceLimitChecks resourceLimitChecks ) {
110
+ final ConnectionLimitManager adapterConnectionLimit ,
111
+ final Function <TenantObject , Future <Void >> tenantConnectionLimit ) {
112
+
112
113
114
+ Objects .requireNonNull (credentialsClientFactory , "Credentials client cannot be null" );
115
+ Objects .requireNonNull (tracer );
113
116
this .tenantClientFactory = Objects .requireNonNull (tenantClientFactory , "Tenant client factory cannot be null" );
114
- this .credentialsClientFactory = Objects .requireNonNull (credentialsClientFactory , "Credentials client factory cannot be null" );
115
117
this .config = Objects .requireNonNull (config , "configuration cannot be null" );
116
- this .tracer = Objects .requireNonNull (tracer );
117
118
this .spanFactory = Objects .requireNonNull (spanFactory );
118
- this .connectionLimitManager = Objects .requireNonNull (connectionLimitManager );
119
- this .resourceLimitChecks = Objects .requireNonNull (resourceLimitChecks );
119
+ this .adapterConnectionLimit = Objects .requireNonNull (adapterConnectionLimit );
120
+ this .tenantConnectionLimit = Objects .requireNonNull (tenantConnectionLimit );
121
+ this .certValidator = new DeviceCertificateValidator ();
122
+ this .clientCertAuthProvider = new X509AuthProvider (credentialsClientFactory , config , tracer );
123
+ this .usernamePasswordAuthProvider = new UsernamePasswordAuthProvider (credentialsClientFactory , config , tracer );
120
124
}
121
125
122
126
@ Override
123
127
public ProtonSaslAuthenticator create () {
124
- return new AmqpAdapterSaslAuthenticator (
125
- tenantClientFactory ,
126
- credentialsClientFactory ,
127
- config ,
128
- tracer ,
129
- spanFactory .get (),
130
- connectionLimitManager ,
131
- resourceLimitChecks );
128
+ return new AmqpAdapterSaslAuthenticator (spanFactory .get ());
132
129
}
133
130
134
131
/**
135
132
* Manage the SASL authentication process for the AMQP adapter.
136
133
*/
137
- static final class AmqpAdapterSaslAuthenticator implements ProtonSaslAuthenticator {
134
+ final class AmqpAdapterSaslAuthenticator implements ProtonSaslAuthenticator {
138
135
139
- private static final Logger LOG = LoggerFactory .getLogger (AmqpAdapterSaslAuthenticator . class );
136
+ private final Logger LOG = LoggerFactory .getLogger (getClass () );
140
137
141
- private final ProtocolAdapterProperties config ;
142
- private final TenantClientFactory tenantClientFactory ;
143
- private final CredentialsClientFactory credentialsClientFactory ;
144
- private final Tracer tracer ;
145
138
private final Span currentSpan ;
146
- private final ConnectionLimitManager connectionLimitManager ;
147
- private final ResourceLimitChecks resourceLimitChecks ;
148
139
149
140
private Sasl sasl ;
150
141
private boolean succeeded ;
151
142
private ProtonConnection protonConnection ;
152
143
private Certificate [] peerCertificateChain ;
153
- private HonoClientBasedAuthProvider <UsernamePasswordCredentials > usernamePasswordAuthProvider ;
154
- private HonoClientBasedAuthProvider <SubjectDnCredentials > clientCertAuthProvider ;
155
- private DeviceCertificateValidator certValidator ;
156
-
157
- AmqpAdapterSaslAuthenticator (
158
- final TenantClientFactory tenantClientFactory ,
159
- final CredentialsClientFactory credentialsClientFactory ,
160
- final ProtocolAdapterProperties config ,
161
- final Tracer tracer ,
162
- final Span currentSpan ,
163
- final ConnectionLimitManager connectionLimitManager ,
164
- final ResourceLimitChecks resourceLimitChecks ) {
165
-
166
- this .tenantClientFactory = tenantClientFactory ;
167
- this .credentialsClientFactory = credentialsClientFactory ;
168
- this .config = config ;
169
- this .tracer = tracer ;
144
+
145
+ AmqpAdapterSaslAuthenticator (final Span currentSpan ) {
170
146
this .currentSpan = currentSpan ;
171
- this .connectionLimitManager = connectionLimitManager ;
172
- this .resourceLimitChecks = resourceLimitChecks ;
173
147
}
174
148
175
149
@ Override
@@ -200,7 +174,7 @@ public void process(final Handler<Boolean> completionHandler) {
200
174
return ;
201
175
}
202
176
203
- if (connectionLimitManager .isLimitExceeded ()) {
177
+ if (adapterConnectionLimit .isLimitExceeded ()) {
204
178
LOG .debug ("Connection limit exceeded, reject connection request" );
205
179
sasl .done (SaslOutcome .PN_SASL_TEMP );
206
180
completionHandler .handle (Boolean .TRUE );
@@ -272,14 +246,13 @@ private void verifyPlain(final byte[] saslResponse, final Handler<AsyncResult<De
272
246
currentSpan .log (items );
273
247
274
248
final Future <DeviceUser > authenticationTracker = Future .future ();
275
- getUsernamePasswordAuthProvider () .authenticate (credentials , currentSpan .context (),
249
+ usernamePasswordAuthProvider .authenticate (credentials , currentSpan .context (),
276
250
authenticationTracker );
277
251
authenticationTracker
278
252
.compose (user -> getTenantObject (credentials .getTenantId ())
279
- .compose (tenant -> CompositeFuture
280
- .all (checkTenantIsEnabled (tenant ),
281
- resourceLimitChecks .isConnectionLimitExceeded (tenant ))
282
- .map (ok -> user )))
253
+ .compose (tenant -> CompositeFuture .all (
254
+ checkTenantIsEnabled (tenant ),
255
+ tenantConnectionLimit .apply (tenant )).map (ok -> user )))
283
256
.setHandler (completer );
284
257
}
285
258
} catch (CredentialException e ) {
@@ -309,7 +282,7 @@ private void verifyExternal(final Handler<AsyncResult<DeviceUser>> completer) {
309
282
.compose (tenant -> {
310
283
try {
311
284
final TrustAnchor trustAnchor = tenantTracker .result ().getTrustAnchor ();
312
- return getValidator () .validate (Collections .singletonList (deviceCert ), trustAnchor );
285
+ return certValidator .validate (Collections .singletonList (deviceCert ), trustAnchor );
313
286
} catch (final GeneralSecurityException e ) {
314
287
LOG .debug ("cannot retrieve trust anchor of tenant [{}]" , tenant .getTenantId (), e );
315
288
return Future .failedFuture (new CredentialException ("validation of client certificate failed" ));
@@ -319,13 +292,12 @@ private void verifyExternal(final Handler<AsyncResult<DeviceUser>> completer) {
319
292
final Future <DeviceUser > user = Future .future ();
320
293
final String tenantId = tenantTracker .result ().getTenantId ();
321
294
final SubjectDnCredentials credentials = SubjectDnCredentials .create (tenantId , deviceCert .getSubjectX500Principal ());
322
- getCertificateAuthProvider () .authenticate (credentials , currentSpan .context (), user );
295
+ clientCertAuthProvider .authenticate (credentials , currentSpan .context (), user );
323
296
return user ;
324
297
})
325
- .compose (user -> CompositeFuture
326
- .all (checkTenantIsEnabled (tenantTracker .result ()),
327
- resourceLimitChecks .isConnectionLimitExceeded (tenantTracker .result ()))
328
- .map (ok -> user ))
298
+ .compose (user -> CompositeFuture .all (
299
+ checkTenantIsEnabled (tenantTracker .result ()),
300
+ tenantConnectionLimit .apply (tenantTracker .result ())).map (ok -> user ))
329
301
.setHandler (completer );
330
302
}
331
303
}
@@ -347,26 +319,5 @@ private Future<TenantObject> checkTenantIsEnabled(final TenantObject tenant) {
347
319
return Future .failedFuture (new CredentialException ("AMQP adapter is disabled for tenant" ));
348
320
}
349
321
}
350
-
351
- private HonoClientBasedAuthProvider <UsernamePasswordCredentials > getUsernamePasswordAuthProvider () {
352
- if (usernamePasswordAuthProvider == null ) {
353
- usernamePasswordAuthProvider = new UsernamePasswordAuthProvider (credentialsClientFactory , config , tracer );
354
- }
355
- return usernamePasswordAuthProvider ;
356
- }
357
-
358
- private HonoClientBasedAuthProvider <SubjectDnCredentials > getCertificateAuthProvider () {
359
- if (clientCertAuthProvider == null ) {
360
- clientCertAuthProvider = new X509AuthProvider (credentialsClientFactory , config , tracer );
361
- }
362
- return clientCertAuthProvider ;
363
- }
364
-
365
- private DeviceCertificateValidator getValidator () {
366
- if (certValidator == null ) {
367
- certValidator = new DeviceCertificateValidator ();
368
- }
369
- return certValidator ;
370
- }
371
322
}
372
323
}
0 commit comments