diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index c3961dca6d4c6..0c777afaa26c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -232,6 +232,11 @@ public CompletableFuture getTransactionMeta(TxnID txnID) { return CompletableFuture.completedFuture(null); } + @VisibleForTesting + public void setPublishFuture(CompletableFuture publishFuture) { + this.publishFuture = publishFuture; + } + @VisibleForTesting public CompletableFuture getPublishFuture() { return publishFuture; @@ -298,7 +303,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI "Transaction Buffer recover failed, the current state is: " + getState())); } }).whenComplete(((position, throwable) -> buffer.release())); - publishFuture = future; + setPublishFuture(future); return future; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java index 18724e13f6743..b1168d0850166 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -18,36 +18,20 @@ */ package org.apache.pulsar.broker.transaction.buffer.utils; -import java.util.concurrent.CompletableFuture; import lombok.Setter; -import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; public class TransactionBufferTestImpl extends TopicTransactionBuffer { - @Setter - public CompletableFuture transactionBufferFuture = null; @Setter public State state = null; - @Setter - public CompletableFuture publishFuture = null; public TransactionBufferTestImpl(PersistentTopic topic) { super(topic); } - @Override - public CompletableFuture getTransactionBufferFuture() { - return transactionBufferFuture == null ? super.getTransactionBufferFuture() : transactionBufferFuture; - } - @Override public State getState() { return state == null ? super.getState() : state; } - - @Override - public CompletableFuture getPublishFuture() { - return publishFuture == null ? super.getPublishFuture() : publishFuture; - } }