@@ -74,12 +74,15 @@ public class LeaderElection implements Closeable, SessionListenableProvider {
74
74
private final SemaphoreObserver semaphoreObserver ;
75
75
76
76
private final CountDownLatch startingLatch = new CountDownLatch (1 );
77
- private AtomicReference <State > state = new AtomicReference <>(State .INITIAL );
78
- private AtomicReference <Future <Status >> initializingTask = new AtomicReference <>(null );
77
+ private final AtomicReference <State > state = new AtomicReference <>(State .INITIAL );
78
+ private final AtomicReference <Future <Status >> initializingTask = new AtomicReference <>(null );
79
79
private Future <Void > electionTask = null ;
80
80
private volatile boolean autoRequeue = false ;
81
81
private volatile boolean isLeader = false ;
82
82
83
+ /**
84
+ * Internal state
85
+ */
83
86
private enum State {
84
87
INITIAL ,
85
88
STARTING ,
@@ -91,10 +94,10 @@ private enum State {
91
94
/**
92
95
* Creates a new LeaderElection instance with default settings.
93
96
*
94
- * @param client the coordination client to use
95
- * @param coordinationNodePath path to the coordination node
96
- * @param electionName name of the election (must be unique per coordination node)
97
- * @param data optional data to associate with the leader (visible to all participants)
97
+ * @param client the coordination client to use
98
+ * @param coordinationNodePath path to the coordination node
99
+ * @param electionName name of the election (must be unique per coordination node)
100
+ * @param data optional data to associate with the leader (visible to all participants)
98
101
* @param leaderElectionListener callback for leadership events
99
102
*/
100
103
public LeaderElection (
@@ -118,12 +121,12 @@ public LeaderElection(
118
121
/**
119
122
* Creates a new LeaderElection instance with custom settings.
120
123
*
121
- * @param client the coordination client to use
122
- * @param coordinationNodePath path to the coordination node
123
- * @param electionName name of the election (must be unique per coordination node)
124
- * @param data optional data to associate with the leader (visible to all participants)
124
+ * @param client the coordination client to use
125
+ * @param coordinationNodePath path to the coordination node
126
+ * @param electionName name of the election (must be unique per coordination node)
127
+ * @param data optional data to associate with the leader (visible to all participants)
125
128
* @param leaderElectionListener callback for leadership events
126
- * @param settings configuration settings for the election process
129
+ * @param settings configuration settings for the election process
127
130
* @throws NullPointerException if any required parameter is null
128
131
*/
129
132
public LeaderElection (
@@ -185,6 +188,8 @@ public void start() {
185
188
"Leader election may be started only once"
186
189
);
187
190
191
+ logger .debug ("Starting leader election '{}' initialization" , electionName );
192
+
188
193
CompletableFuture <Status > connectionTask = executeWithRetry (coordinationSession ::connect );
189
194
CompletableFuture <Status > semaphoreCreateTask = executeWithRetry (
190
195
() -> coordinationSession .createSemaphore (electionName , MAX_LEASE )
@@ -459,12 +464,16 @@ private synchronized boolean stopInternal(State terminationState) {
459
464
} catch (Exception e ) {
460
465
logger .warn ("Error closing semaphore observer for {}: {}" , electionName , e .getMessage ());
461
466
}
462
-
463
467
try {
464
468
blockingExecutor .shutdown ();
465
469
} catch (Exception e ) {
466
470
logger .warn ("Error shutting down executor for {}: {}" , electionName , e .getMessage ());
467
471
}
472
+ try {
473
+ coordinationSession .close ();
474
+ } catch (Exception e ) {
475
+ logger .warn ("Error closing session for {}: {}" , electionName , e .getMessage ());
476
+ }
468
477
return true ;
469
478
}
470
479
}
0 commit comments