Skip to content

Commit 8828734

Browse files
Dream95lhotari
andauthored
[fix][broker] Flaky-test: TopicTransactionBufferTest.testMessagePublishInOrder (#24826)
Signed-off-by: Dream95 <[email protected]> Co-authored-by: Lari Hotari <[email protected]>
1 parent 678db6b commit 8828734

File tree

2 files changed

+6
-17
lines changed

2 files changed

+6
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
232232
return CompletableFuture.completedFuture(null);
233233
}
234234

235+
@VisibleForTesting
236+
public void setPublishFuture(CompletableFuture<Position> publishFuture) {
237+
this.publishFuture = publishFuture;
238+
}
239+
235240
@VisibleForTesting
236241
public CompletableFuture<Position> getPublishFuture() {
237242
return publishFuture;
@@ -298,7 +303,7 @@ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceI
298303
"Transaction Buffer recover failed, the current state is: " + getState()));
299304
}
300305
}).whenComplete(((position, throwable) -> buffer.release()));
301-
publishFuture = future;
306+
setPublishFuture(future);
302307
return future;
303308
}
304309

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,20 @@
1818
*/
1919
package org.apache.pulsar.broker.transaction.buffer.utils;
2020

21-
import java.util.concurrent.CompletableFuture;
2221
import lombok.Setter;
23-
import org.apache.bookkeeper.mledger.Position;
2422
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2523
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
2624

2725
public class TransactionBufferTestImpl extends TopicTransactionBuffer {
28-
@Setter
29-
public CompletableFuture<Void> transactionBufferFuture = null;
3026
@Setter
3127
public State state = null;
32-
@Setter
33-
public CompletableFuture<Position> publishFuture = null;
3428

3529
public TransactionBufferTestImpl(PersistentTopic topic) {
3630
super(topic);
3731
}
3832

39-
@Override
40-
public CompletableFuture<Void> getTransactionBufferFuture() {
41-
return transactionBufferFuture == null ? super.getTransactionBufferFuture() : transactionBufferFuture;
42-
}
43-
4433
@Override
4534
public State getState() {
4635
return state == null ? super.getState() : state;
4736
}
48-
49-
@Override
50-
public CompletableFuture<Position> getPublishFuture() {
51-
return publishFuture == null ? super.getPublishFuture() : publishFuture;
52-
}
5337
}

0 commit comments

Comments
 (0)