|
72 | 72 | import java.util.HashSet; |
73 | 73 | import java.util.Set; |
74 | 74 | import java.util.concurrent.CompletableFuture; |
| 75 | +import java.util.concurrent.CountDownLatch; |
75 | 76 | import java.util.concurrent.TimeUnit; |
76 | 77 | import java.util.concurrent.TimeoutException; |
77 | 78 | import java.util.concurrent.atomic.AtomicInteger; |
@@ -328,7 +329,7 @@ public void shouldSubscribeWithAnonymousSubscriberAfterServiceRestart() throws E |
328 | 329 | Eventually.assertDeferred(() -> isTopicServiceRunning(member), is(true)); |
329 | 330 |
|
330 | 331 | try (Publisher<Message> publisher = topic.createPublisher(); |
331 | | - Subscriber<Message> subscriber = topic.createSubscriber()) |
| 332 | + PagedTopicSubscriber<Message> subscriber = (PagedTopicSubscriber<Message>) topic.createSubscriber()) |
332 | 333 | { |
333 | 334 | System.err.println("Publishing " + cMsgTotal + " messages of " + cbMessage + " bytes"); |
334 | 335 | for (int i = 0; i < cMsgTotal; i++) |
@@ -361,9 +362,21 @@ public void shouldSubscribeWithAnonymousSubscriberAfterServiceRestart() throws E |
361 | 362 | assertThat(element, is(notNullValue())); |
362 | 363 | } |
363 | 364 |
|
| 365 | + // A latch to catch the subscriber disconnect |
| 366 | + CountDownLatch latch = new CountDownLatch(1); |
| 367 | + subscriber.addStateListener((subscriber1, nNewState, nPrevState) -> |
| 368 | + { |
| 369 | + if (nNewState == PagedTopicSubscriber.STATE_DISCONNECTED) |
| 370 | + { |
| 371 | + latch.countDown(); |
| 372 | + } |
| 373 | + }); |
| 374 | + |
364 | 375 | restartService(topic); |
365 | 376 |
|
366 | | - assertThat(((PagedTopicSubscriber<Message>) subscriber).getState(), is(PagedTopicSubscriber.STATE_DISCONNECTED)); |
| 377 | + // The subscriber should have disconnected at least once, it may already be reconnected |
| 378 | + // so we cannot just check its state |
| 379 | + assertThat(latch.await(5, TimeUnit.MINUTES), is(true)); |
367 | 380 |
|
368 | 381 | System.err.println("Subscriber receiving remaining " + (cMsgTotal - m) + " messages of " + cbMessage + " bytes"); |
369 | 382 | for ( ; m < cMsgTotal; m++) |
|
0 commit comments