11
11
import java .util .concurrent .Executors ;
12
12
import java .util .concurrent .Future ;
13
13
import java .util .concurrent .ScheduledExecutorService ;
14
+ import java .util .concurrent .ThreadFactory ;
14
15
import java .util .concurrent .atomic .AtomicReference ;
15
16
import java .util .function .Supplier ;
16
17
import java .util .stream .Collectors ;
17
18
import java .util .stream .Stream ;
18
19
19
20
import com .google .common .base .Preconditions ;
21
+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
20
22
import org .slf4j .Logger ;
21
23
import org .slf4j .LoggerFactory ;
22
24
35
37
import tech .ydb .core .Status ;
36
38
import tech .ydb .core .StatusCode ;
37
39
38
- // TODO: документцаия / логгирование / рекомендации по коду
40
+ /**
41
+ * A distributed leader election implementation using coordination services.
42
+ * This class provides a mechanism for multiple instances to compete for leadership
43
+ * of a named resource, with exactly one instance becoming the leader at any time.
44
+ *
45
+ * <p>The election process uses a semaphore-based approach where:
46
+ * <ul>
47
+ * <li>The leader holds the semaphore lock</li>
48
+ * <li>Other participants wait in a queue</li>
49
+ * <li>Leadership can be voluntarily released or lost due to session issues</li>
50
+ * </ul>
51
+ *
52
+ * <p>Thread safety: This class is thread-safe. All public methods can be called
53
+ * from multiple threads concurrently.
54
+ */
39
55
public class LeaderElection implements Closeable , SessionListenableProvider {
40
56
private static final Logger logger = LoggerFactory .getLogger (LeaderElection .class );
57
+ private static final ThreadFactory threadFactory = new ThreadFactoryBuilder ()
58
+ .setNameFormat ("ydb-leader-election-%d" )
59
+ .setDaemon (true )
60
+ .build ();
41
61
private static final long MAX_LEASE = 1L ;
42
62
43
63
private final LeaderElectionListener leaderElectionListener ;
@@ -68,6 +88,15 @@ private enum State {
68
88
CLOSED
69
89
}
70
90
91
+ /**
92
+ * Creates a new LeaderElection instance with default settings.
93
+ *
94
+ * @param client the coordination client to use
95
+ * @param coordinationNodePath path to the coordination node
96
+ * @param electionName name of the election (must be unique per coordination node)
97
+ * @param data optional data to associate with the leader (visible to all participants)
98
+ * @param leaderElectionListener callback for leadership events
99
+ */
71
100
public LeaderElection (
72
101
CoordinationClient client ,
73
102
String coordinationNodePath ,
@@ -86,6 +115,17 @@ public LeaderElection(
86
115
);
87
116
}
88
117
118
+ /**
119
+ * Creates a new LeaderElection instance with custom settings.
120
+ *
121
+ * @param client the coordination client to use
122
+ * @param coordinationNodePath path to the coordination node
123
+ * @param electionName name of the election (must be unique per coordination node)
124
+ * @param data optional data to associate with the leader (visible to all participants)
125
+ * @param leaderElectionListener callback for leadership events
126
+ * @param settings configuration settings for the election process
127
+ * @throws NullPointerException if any required parameter is null
128
+ */
89
129
public LeaderElection (
90
130
CoordinationClient client ,
91
131
String coordinationNodePath ,
@@ -94,21 +134,28 @@ public LeaderElection(
94
134
LeaderElectionListener leaderElectionListener ,
95
135
LeaderElectionSettings settings
96
136
) {
137
+ Preconditions .checkNotNull (client , "CoordinationClient cannot be null" );
138
+ Preconditions .checkNotNull (coordinationNodePath , "Coordination node path cannot be null" );
139
+ Preconditions .checkNotNull (electionName , "Election name cannot be null" );
140
+ Preconditions .checkNotNull (leaderElectionListener , "LeaderElectionListener cannot be null" );
141
+ Preconditions .checkNotNull (settings , "LeaderElectionSettings cannot be null" );
142
+
97
143
this .coordinationNodePath = coordinationNodePath ;
98
144
this .electionName = electionName ;
99
145
this .data = data ;
100
146
this .leaderElectionListener = leaderElectionListener ;
101
147
this .scheduledExecutor = settings .getScheduledExecutor ();
102
- this .blockingExecutor = Executors .newSingleThreadExecutor (); // TODO: thread factory
148
+ this .blockingExecutor = Executors .newSingleThreadExecutor (threadFactory );
103
149
this .retryPolicy = settings .getRetryPolicy ();
104
150
105
151
this .coordinationSession = client .createSession (coordinationNodePath );
106
152
this .sessionListenable = new ListenableContainer <>();
107
153
coordinationSession .addStateListener (sessionState -> {
108
- if (sessionState == CoordinationSession .State .LOST || sessionState == CoordinationSession .State .CLOSED ) {
154
+ if (!state .get ().equals (State .CLOSED ) && (sessionState == CoordinationSession .State .LOST ||
155
+ sessionState == CoordinationSession .State .CLOSED )) {
109
156
logger .error ("Coordination session unexpectedly changed to {} state, marking election as FAILED" ,
110
157
sessionState );
111
- state . set (State .FAILED );
158
+ stopInternal (State .FAILED );
112
159
}
113
160
sessionListenable .notifyListeners (sessionState );
114
161
});
@@ -127,6 +174,11 @@ public LeaderElection(
127
174
);
128
175
}
129
176
177
+ /**
178
+ * Starts the leader election process.
179
+ *
180
+ * @throws IllegalStateException if the election is already started or closed
181
+ */
130
182
public void start () {
131
183
Preconditions .checkState (
132
184
state .compareAndSet (State .INITIAL , State .STARTING ),
@@ -159,9 +211,7 @@ public void start() {
159
211
return semaphoreStatus ;
160
212
}).exceptionally (ex -> {
161
213
logger .error ("Leader election initializing task failed" , ex );
162
- state .set (State .FAILED );
163
- semaphoreObserver .close ();
164
- startingLatch .countDown ();
214
+ stopInternal (State .FAILED );
165
215
return Status .of (StatusCode .CLIENT_INTERNAL_ERROR );
166
216
});
167
217
@@ -176,20 +226,30 @@ private CompletableFuture<Status> executeWithRetry(Supplier<CompletableFuture<St
176
226
return new RetryableTask ("leaderElectionInitialize" , taskSupplier , scheduledExecutor , retryPolicy ).execute ();
177
227
}
178
228
229
+ /**
230
+ * Enables automatic requeueing when leadership is lost.
231
+ * If called before start election will be started immediately.
232
+ */
179
233
public void autoRequeue () {
180
234
autoRequeue = true ;
181
235
}
182
236
237
+ /**
238
+ * Checks if this instance is currently the leader.
239
+ *
240
+ * @return true if this instance is the leader, false otherwise
241
+ */
183
242
public boolean isLeader () {
184
243
return isLeader ;
185
244
}
186
245
187
246
/**
188
247
* Re-queue an attempt for leadership. If this instance is already queued, nothing
189
248
* happens and false is returned. If the instance was not queued, it is re-queued and true
190
- * is returned
249
+ * is returned.
191
250
*
192
- * @return true if re-enqueue was successful
251
+ * @return true if reenqueue was successful
252
+ * @throws IllegalStateException if the election is not in STARTED or STARTING state
193
253
*/
194
254
public boolean requeue () {
195
255
State localState = state .get ();
@@ -201,6 +261,11 @@ public boolean requeue() {
201
261
return enqueueElection ();
202
262
}
203
263
264
+ /**
265
+ * Interrupts the current leadership attempt if one is in progress.
266
+ *
267
+ * @return true if leadership was interrupted, false if no attempt was in progress
268
+ */
204
269
public synchronized boolean interruptLeadership () {
205
270
Future <?> localTask = electionTask ;
206
271
if (localTask != null ) {
@@ -231,11 +296,16 @@ public Void call() throws Exception {
231
296
return false ;
232
297
}
233
298
299
+ /**
300
+ * Main work loop for leadership acquisition and maintenance.
301
+ *
302
+ * @throws Exception if the leadership attempt fails
303
+ */
234
304
private void doWork () throws Exception {
235
305
isLeader = false ;
236
306
237
307
try {
238
- waitStartedState ();
308
+ waitStartedStateOrFail ();
239
309
lock .tryAcquire (
240
310
null ,
241
311
true ,
@@ -248,7 +318,7 @@ private void doWork() throws Exception {
248
318
Thread .currentThread ().interrupt ();
249
319
throw e ;
250
320
} catch (Throwable e ) {
251
- logger .debug ( "takeLeadership exception " , e );
321
+ logger .error ( "Unexpected error in takeLeadership " , e );
252
322
}
253
323
} catch (InterruptedException e ) {
254
324
Thread .currentThread ().interrupt ();
@@ -270,7 +340,7 @@ private void doWork() throws Exception {
270
340
}
271
341
}
272
342
273
- private void waitStartedState () throws InterruptedException {
343
+ private void waitStartedStateOrFail () throws InterruptedException {
274
344
State localState = state .get ();
275
345
if (localState == State .STARTING ) {
276
346
startingLatch .await ();
@@ -295,9 +365,10 @@ private boolean isQueued() {
295
365
}
296
366
297
367
/**
298
- * Не гарантированы все, кроме лидера
368
+ * Gets all participants in the election.
369
+ * Note: Due to observer limitations, waiters may be visible only eventually (after lease changes).
299
370
*
300
- * @return
371
+ * @return list of election participants (owners and visible waiters)
301
372
*/
302
373
public List <ElectionParticipant > getParticipants () {
303
374
SemaphoreDescription semaphoreDescription = semaphoreObserver .getCachedData ();
@@ -313,6 +384,11 @@ public List<ElectionParticipant> getParticipants() {
313
384
).collect (Collectors .toList ());
314
385
}
315
386
387
+ /**
388
+ * Gets the current leader if one exists.
389
+ *
390
+ * @return Optional containing the current leader, or empty if no leader exists
391
+ */
316
392
public Optional <ElectionParticipant > getCurrentLeader () {
317
393
SemaphoreDescription semaphoreDescription = semaphoreObserver .getCachedData ();
318
394
if (semaphoreDescription == null ) {
@@ -336,18 +412,59 @@ public Listenable<CoordinationSession.State> getSessionListenable() {
336
412
return sessionListenable ;
337
413
}
338
414
415
+ /**
416
+ * Closes the leader election and releases all resources.
417
+ * After closing, the instance cannot be reused.
418
+ */
339
419
@ Override
340
420
public synchronized void close () {
341
- // TODO: Учесть все стейты
342
- Preconditions .checkState (state .compareAndSet (State .STARTED , State .CLOSED ), "Already closed" );
421
+ stopInternal (State .CLOSED );
422
+ }
423
+
424
+ /**
425
+ * Internal method to stop the election with the specified termination state.
426
+ *
427
+ * @param terminationState the state to transition to (FAILED or CLOSED)
428
+ * @return true if the state was changed, false if already terminated
429
+ */
430
+ private synchronized boolean stopInternal (State terminationState ) {
431
+ State localState = state .get ();
432
+ if (localState == State .FAILED || localState == State .CLOSED ) {
433
+ logger .warn ("Already stopped leader election {} with status: {}" , electionName , localState );
434
+ return false ;
435
+ }
436
+ logger .debug ("Transitioning leader election {} from {} to {}" , electionName , localState , terminationState );
437
+
438
+ // change state
439
+ state .set (terminationState );
343
440
441
+ // unblock starting latch if not yet
442
+ startingLatch .countDown ();
443
+
444
+ // stop tasks
445
+ Future <Status > localInitializingTask = initializingTask .get ();
446
+ if (localInitializingTask != null ) {
447
+ localInitializingTask .cancel (true );
448
+ initializingTask .set (null );
449
+ }
344
450
Future <Void > localTask = electionTask ;
345
451
if (localTask != null ) {
346
452
localTask .cancel (true );
347
453
electionTask = null ;
348
454
}
349
455
350
- blockingExecutor .shutdown ();
351
- semaphoreObserver .close ();
456
+ // Clean up resources
457
+ try {
458
+ semaphoreObserver .close ();
459
+ } catch (Exception e ) {
460
+ logger .warn ("Error closing semaphore observer for {}: {}" , electionName , e .getMessage ());
461
+ }
462
+
463
+ try {
464
+ blockingExecutor .shutdown ();
465
+ } catch (Exception e ) {
466
+ logger .warn ("Error shutting down executor for {}: {}" , electionName , e .getMessage ());
467
+ }
468
+ return true ;
352
469
}
353
470
}
0 commit comments