146
146
import java .util .Map ;
147
147
import java .util .Set ;
148
148
import java .util .concurrent .CompletableFuture ;
149
+ import java .util .concurrent .CyclicBarrier ;
149
150
import java .util .concurrent .ExecutionException ;
150
151
import java .util .concurrent .ExecutorService ;
151
152
import java .util .concurrent .Semaphore ;
@@ -230,6 +231,9 @@ public class ApiKeyServiceTests extends ESTestCase {
230
231
"search": [ {"names": ["logs"]} ],
231
232
"replication": [ {"names": ["archive"]} ]
232
233
}""" );
234
+
235
+ private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000 ;
236
+
233
237
private ThreadPool threadPool ;
234
238
private Client client ;
235
239
private SecurityIndexManager securityIndex ;
@@ -245,7 +249,7 @@ public void createThreadPool() {
245
249
Settings .EMPTY ,
246
250
SECURITY_CRYPTO_THREAD_POOL_NAME ,
247
251
1 ,
248
- 1000 ,
252
+ TEST_THREADPOOL_QUEUE_SIZE ,
249
253
"xpack.security.crypto.thread_pool" ,
250
254
EsExecutors .TaskTrackingConfig .DO_NOT_TRACK
251
255
)
@@ -268,6 +272,90 @@ public void setupMocks() {
268
272
doAnswer (invocation -> Instant .now ()).when (clock ).instant ();
269
273
}
270
274
275
+ public void testFloodThreadpool () throws Exception {
276
+ // We're going to be blocking the security-crypto threadpool so we need a new one for the client
277
+ ThreadPool clientThreadpool = new TestThreadPool (
278
+ this .getTestName (),
279
+ new FixedExecutorBuilder (
280
+ Settings .EMPTY ,
281
+ this .getTestName (),
282
+ 1 ,
283
+ 100 ,
284
+ "no_settings_used" ,
285
+ EsExecutors .TaskTrackingConfig .DO_NOT_TRACK
286
+ )
287
+ );
288
+ try {
289
+ when (client .threadPool ()).thenReturn (clientThreadpool );
290
+
291
+ // setup copied from testAuthenticateWithApiKey
292
+ final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
293
+ final ApiKeyService service = createApiKeyService (settings );
294
+
295
+ final String id = randomAlphaOfLength (12 );
296
+ final String key = randomAlphaOfLength (16 );
297
+
298
+ final User user , authUser ;
299
+ if (randomBoolean ()) {
300
+ user =
new User (
"hulk" ,
new String [] {
"superuser" },
"Bruce Banner" ,
"[email protected] " ,
Map .
of (),
true );
301
+ authUser = new User ("authenticated_user" , "other" );
302
+ } else {
303
+ user =
new User (
"hulk" ,
new String [] {
"superuser" },
"Bruce Banner" ,
"[email protected] " ,
Map .
of (),
true );
304
+ authUser = null ;
305
+ }
306
+ final ApiKey .Type type = randomFrom (ApiKey .Type .values ());
307
+ final Map <String , Object > metadata = mockKeyDocument (id , key , user , authUser , false , Duration .ofSeconds (3600 ), null , type );
308
+
309
+ // Block the security crypto threadpool
310
+ CyclicBarrier barrier = new CyclicBarrier (2 );
311
+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> safeAwait (barrier ));
312
+ // Now fill it up while the one thread is blocked
313
+ for (int i = 0 ; i < TEST_THREADPOOL_QUEUE_SIZE ; i ++) {
314
+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> {});
315
+ }
316
+
317
+ // Check that it's full
318
+ for (var stat : threadPool .stats ().stats ()) {
319
+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
320
+ assertThat (stat .queue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
321
+ assertThat (stat .rejected (), equalTo (0L ));
322
+ }
323
+ }
324
+
325
+ // now try to auth with an API key
326
+ final AuthenticationResult <User > auth = tryAuthenticate (service , id , key , type );
327
+ assertThat (auth .getStatus (), is (AuthenticationResult .Status .TERMINATE ));
328
+
329
+ // Make sure one was rejected and the queue is still full
330
+ for (var stat : threadPool .stats ().stats ()) {
331
+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
332
+ assertThat (stat .queue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
333
+ assertThat (stat .rejected (), equalTo (1L ));
334
+ }
335
+ }
336
+ ListenableFuture <CachedApiKeyHashResult > cachedValue = service .getApiKeyAuthCache ().get (id );
337
+ assertThat ("since the request was rejected, there should be no cache entry for this key" , cachedValue , nullValue ());
338
+
339
+ // unblock the threadpool
340
+ safeAwait (barrier );
341
+
342
+ // wait for the threadpool queue to drain & check that the stats as as expected
343
+ flushThreadPoolExecutor (threadPool , SECURITY_CRYPTO_THREAD_POOL_NAME );
344
+ for (var stat : threadPool .stats ().stats ()) {
345
+ if (stat .name ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
346
+ assertThat (stat .rejected (), equalTo (1L ));
347
+ assertThat (stat .queue (), equalTo (0 ));
348
+ }
349
+ }
350
+
351
+ // try to authenticate again with the same key - if this hangs, check the future caching
352
+ final AuthenticationResult <User > shouldSucceed = tryAuthenticate (service , id , key , type );
353
+ assertThat (shouldSucceed .getStatus (), is (AuthenticationResult .Status .SUCCESS ));
354
+ } finally {
355
+ terminate (clientThreadpool );
356
+ }
357
+ }
358
+
271
359
public void testCreateApiKeyUsesBulkIndexAction () throws Exception {
272
360
final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
273
361
final ApiKeyService service = createApiKeyService (settings );
0 commit comments