|
38 | 38 | import java.util.concurrent.CompletableFuture; |
39 | 39 | import java.util.concurrent.ExecutionException; |
40 | 40 | import java.util.concurrent.TimeUnit; |
| 41 | +import java.util.concurrent.atomic.AtomicBoolean; |
41 | 42 | import java.util.concurrent.atomic.AtomicInteger; |
42 | 43 | import java.util.function.Function; |
43 | 44 | import lombok.Cleanup; |
@@ -1045,11 +1046,14 @@ public void testSeekWillNotEncounteredFencedError() throws Exception { |
1045 | 1046 | // Create a pulsar client with a subscription fenced counter. |
1046 | 1047 | ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); |
1047 | 1048 | AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); |
| 1049 | + // Count switch: Default off, turn on again before seek starts. |
| 1050 | + final AtomicBoolean countAfterSeek = new AtomicBoolean(false); |
1048 | 1051 | @Cleanup |
1049 | 1052 | PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> |
1050 | 1053 | new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { |
1051 | 1054 | protected void handleError(CommandError error) { |
1052 | | - if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { |
| 1055 | + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced") |
| 1056 | + && countAfterSeek.get()) { |
1053 | 1057 | receivedFencedErrorCounter.incrementAndGet(); |
1054 | 1058 | } |
1055 | 1059 | super.handleError(error); |
@@ -1086,10 +1090,9 @@ protected void handleError(CommandError error) { |
1086 | 1090 | assertNotNull(msg); |
1087 | 1091 | consumer.acknowledge(msg); |
1088 | 1092 | } |
| 1093 | + countAfterSeek.set(true); |
1089 | 1094 | consumer.seek(msgId1); |
1090 | | - Awaitility.await().untilAsserted(() -> { |
1091 | | - assertTrue(consumer.isConnected()); |
1092 | | - }); |
| 1095 | + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); |
1093 | 1096 | assertEquals(receivedFencedErrorCounter.get(), 0); |
1094 | 1097 |
|
1095 | 1098 | // cleanup. |
|
0 commit comments