diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java index 4dd60976..cf3d9383 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java @@ -142,7 +142,7 @@ public CompletableFuture handleHeartBeat(HeartBeatRequest req //first change to candidate, and notify the state-maintainer thread changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; - //TOOD notify + stateMaintainer.wakeup(); return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode())); } } @@ -224,6 +224,7 @@ public CompletableFuture handleVote(VoteRequest request, boolean s //stepped down by larger term changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; + stateMaintainer.wakeup(); //only can handleVote when the term is consistent return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY)); } @@ -624,6 +625,7 @@ public CompletableFuture handleTakeLeadership( takeLeadershipTask.update(request, response); changeRoleToCandidate(targetTerm); needIncreaseTermImmediately = true; + stateMaintainer.wakeup(); return response; } } @@ -692,7 +694,6 @@ public class StateMaintainer extends ShutdownAbleThread { public StateMaintainer(String name, Logger logger) { super(name, logger); } - @Override public void doWork() { try { @@ -700,7 +701,7 @@ public void doWork() { DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); DLedgerLeaderElector.this.maintainState(); } - sleep(10); + waitForRunning(10); } catch (Throwable t) { DLedgerLeaderElector.logger.error("Error in heartbeat", t); }