3
3
import java .io .Closeable ;
4
4
import java .time .Duration ;
5
5
import java .time .Instant ;
6
+ import java .util .Objects ;
6
7
import java .util .concurrent .ExecutionException ;
7
8
import java .util .concurrent .Future ;
8
9
import java .util .concurrent .atomic .AtomicReference ;
24
25
public class InterProcessMutex implements InterProcessLock , Closeable {
25
26
private static final Logger logger = LoggerFactory .getLogger (InterProcessMutex .class );
26
27
27
- private final AtomicReference < State > state = new AtomicReference <>( State . INITIAL ) ;
28
+ private final String lockName ;
28
29
private final CoordinationSession coordinationSession ;
29
30
private final Future <?> sessionConnectionTask ;
30
31
private final LockInternals lockInternals ;
31
32
private final ListenableContainer <CoordinationSession .State > sessionListenable ;
32
33
34
+ private final AtomicReference <State > state = new AtomicReference <>(State .INITIAL );
35
+
33
36
/**
34
37
* Internal state machine states
35
38
*/
@@ -82,15 +85,14 @@ public InterProcessMutex(
82
85
}
83
86
84
87
state .set (State .STARTING );
85
- logger .debug ("Initializing InterProcessMutex for lock '{}'" , lockName );
86
-
88
+ this .lockName = lockName ;
87
89
this .coordinationSession = client .createSession (coordinationNodePath );
88
90
this .sessionListenable = new ListenableContainer <>();
89
91
this .lockInternals = new LockInternals (coordinationSession , lockName );
90
92
91
93
coordinationSession .addStateListener (sessionState -> {
92
94
if (sessionState == CoordinationSession .State .LOST || sessionState == CoordinationSession .State .CLOSED ) {
93
- logger .error ("Coordination session unexpectedly changed to {} state, marking mutex as FAILED" ,
95
+ logger .error ("Coordination session unexpectedly changed to '{}' state, marking mutex as ' FAILED' " ,
94
96
sessionState );
95
97
state .set (State .FAILED );
96
98
}
@@ -109,9 +111,8 @@ public InterProcessMutex(
109
111
110
112
if (settings .isWaitConnection ()) {
111
113
try {
112
- logger .debug ("Waiting for session connection to complete..." );
114
+ logger .debug ("Waiting for session connection to complete for lock '{}'" , lockName );
113
115
sessionConnectionTask .get ();
114
- logger .debug ("Session connection completed" );
115
116
} catch (InterruptedException e ) {
116
117
Thread .currentThread ().interrupt ();
117
118
logger .error ("Interrupted while waiting for session connection for lock '{}'" , lockName , e );
@@ -128,36 +129,38 @@ public InterProcessMutex(
128
129
@ Override
129
130
public void acquire () throws Exception {
130
131
checkState ();
131
- logger .debug ("Attempting to acquire lock..." );
132
+ logger .debug ("Attempting to acquire lock '{}'" , lockName );
132
133
lockInternals .tryAcquire (
133
134
null ,
134
135
true ,
135
136
null
136
137
);
137
- logger .debug ("Lock acquired successfully" );
138
+ logger .debug ("Lock '{}' acquired successfully" , lockName );
138
139
}
139
140
140
141
@ Override
141
142
public boolean acquire (Duration waitDuration ) throws Exception {
143
+ Objects .requireNonNull (waitDuration , "wait duration must not be null" );
144
+
142
145
checkState ();
143
- logger .debug ("Attempting to acquire lock with timeout {}..." , waitDuration );
146
+ logger .debug ("Attempting to acquire lock '{}' with timeout {}" , lockName , waitDuration );
144
147
Instant deadline = Instant .now ().plus (waitDuration );
145
148
boolean acquired = lockInternals .tryAcquire (
146
149
deadline ,
147
150
true ,
148
151
null
149
152
) != null ;
150
- logger .debug ("Lock acquisition {}successful" , acquired ? "" : "un" );
153
+ logger .debug ("Lock '{}' acquisition {}successful" , lockName , acquired ? "" : "un" );
151
154
return acquired ;
152
155
}
153
156
154
157
@ Override
155
158
public boolean release () throws InterruptedException {
156
159
checkState ();
157
- logger .debug ("Attempting to release lock..." );
160
+ logger .debug ("Attempting to release lock '{}'" , lockName );
158
161
boolean released = lockInternals .release ();
159
162
if (released ) {
160
- logger .debug ("Lock released successfully" );
163
+ logger .debug ("Lock '{}' released successfully" , lockName );
161
164
} else {
162
165
logger .debug ("No lock to release" );
163
166
}
@@ -184,29 +187,27 @@ public Listenable<CoordinationSession.State> getSessionListenable() {
184
187
185
188
@ Override
186
189
public void close () {
187
- logger .debug ("Closing InterProcessMutex..." );
190
+ logger .debug ("Closing mutex '{}'" , lockName );
188
191
state .set (State .CLOSED );
189
192
try {
190
193
lockInternals .close ();
191
194
} catch (Exception e ) {
192
- logger .warn ("Error while closing lock internals" , e );
195
+ logger .warn ("Error while closing lock internals '{}'" , lockName , e );
193
196
}
194
- logger .info ("InterProcessMutex closed" );
197
+ logger .info ("Mutex '{}' closed" , lockName );
195
198
}
196
199
197
200
private void checkState () throws LockStateException {
198
201
State currentState = state .get ();
199
202
if (currentState == State .FAILED ) {
200
- throw new LockStateException ("Lock '" + lockInternals .getLockName () + "' is in FAILED state" ,
201
- lockInternals .getLockName ());
203
+ throw new LockStateException ("Lock '" + lockName + "' is in FAILED state" , lockName );
202
204
}
203
205
if (currentState == State .CLOSED ) {
204
- throw new LockStateException ("Lock '" + lockInternals .getLockName () + "' is already closed" ,
205
- lockInternals .getLockName ());
206
+ throw new LockStateException ("Lock '" + lockName + "' is already closed" , lockName );
206
207
}
207
208
if (currentState != State .STARTED ) {
208
- throw new LockStateException ("Lock '" + lockInternals . getLockName () + "' is not ready (current state: "
209
- + currentState + ")" , lockInternals . getLockName () );
209
+ throw new LockStateException ("Lock '" + lockName + "' is not ready (current state: " + currentState + ")" ,
210
+ lockName );
210
211
}
211
212
}
212
213
}
0 commit comments