|
38 | 38 | import java.util.concurrent.CompletableFuture; |
39 | 39 | import java.util.concurrent.ExecutionException; |
40 | 40 | import java.util.concurrent.TimeUnit; |
| 41 | +import java.util.concurrent.atomic.AtomicBoolean; |
41 | 42 | import java.util.concurrent.atomic.AtomicInteger; |
42 | 43 | import java.util.function.Function; |
43 | 44 | import lombok.Cleanup; |
@@ -1035,11 +1036,14 @@ public void testSeekWillNotEncounteredFencedError() throws Exception { |
1035 | 1036 | // Create a pulsar client with a subscription fenced counter. |
1036 | 1037 | ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); |
1037 | 1038 | AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); |
| 1039 | + // Count switch: Default off, turn on again before seek starts. |
| 1040 | + final AtomicBoolean countAfterSeek = new AtomicBoolean(false); |
1038 | 1041 | @Cleanup |
1039 | 1042 | PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> |
1040 | 1043 | new ClientCnx(conf, eventLoopGroup) { |
1041 | 1044 | protected void handleError(CommandError error) { |
1042 | | - if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { |
| 1045 | + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced") |
| 1046 | + && countAfterSeek.get()) { |
1043 | 1047 | receivedFencedErrorCounter.incrementAndGet(); |
1044 | 1048 | } |
1045 | 1049 | super.handleError(error); |
@@ -1076,10 +1080,9 @@ protected void handleError(CommandError error) { |
1076 | 1080 | assertNotNull(msg); |
1077 | 1081 | consumer.acknowledge(msg); |
1078 | 1082 | } |
| 1083 | + countAfterSeek.set(true); |
1079 | 1084 | consumer.seek(msgId1); |
1080 | | - Awaitility.await().untilAsserted(() -> { |
1081 | | - assertTrue(consumer.isConnected()); |
1082 | | - }); |
| 1085 | + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); |
1083 | 1086 | assertEquals(receivedFencedErrorCounter.get(), 0); |
1084 | 1087 |
|
1085 | 1088 | // cleanup. |
|
0 commit comments