24
24
import java .util .concurrent .ConcurrentHashMap ;
25
25
26
26
import com .google .common .base .Preconditions ;
27
+ import com .google .common .util .concurrent .Uninterruptibles ;
27
28
import org .slf4j .Logger ;
28
29
import org .slf4j .LoggerFactory ;
29
30
40
41
import org .apache .cassandra .service .paxos .PaxosRepair ;
41
42
import org .apache .cassandra .service .paxos .PaxosState ;
42
43
import org .apache .cassandra .service .paxos .uncommitted .UncommittedPaxosKey ;
44
+ import org .apache .cassandra .utils .Clock ;
43
45
import org .apache .cassandra .utils .CloseableIterator ;
44
46
import org .apache .cassandra .utils .concurrent .AsyncFuture ;
45
47
48
+ import static java .util .concurrent .TimeUnit .MICROSECONDS ;
49
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
50
+ import static java .util .concurrent .TimeUnit .SECONDS ;
51
+ import static org .apache .cassandra .config .DatabaseDescriptor .getCasContentionTimeout ;
52
+ import static org .apache .cassandra .config .DatabaseDescriptor .getWriteRpcTimeout ;
46
53
import static org .apache .cassandra .service .paxos .cleanup .PaxosCleanupSession .TIMEOUT_NANOS ;
47
54
48
55
public class PaxosCleanupLocalCoordinator extends AsyncFuture <PaxosCleanupResponse >
@@ -126,16 +133,18 @@ private void scheduleKeyRepairsOrFinish()
126
133
return ;
127
134
}
128
135
136
+ long txnTimeoutMicros = Math .max (getCasContentionTimeout (MICROSECONDS ), getWriteRpcTimeout (MICROSECONDS ));
137
+ boolean waitForCoordinator = DatabaseDescriptor .getPaxosRepairRaceWait ();
129
138
while (inflight .size () < parallelism && uncommittedIter .hasNext ())
130
- repairKey (uncommittedIter .next ());
139
+ repairKey (uncommittedIter .next (), txnTimeoutMicros , waitForCoordinator );
131
140
132
141
}
133
142
134
143
if (inflight .isEmpty ())
135
144
finish ();
136
145
}
137
146
138
- private boolean repairKey (UncommittedPaxosKey uncommitted )
147
+ private boolean repairKey (UncommittedPaxosKey uncommitted , long txnTimeoutMicros , boolean waitForCoordinator )
139
148
{
140
149
logger .trace ("repairing {}" , uncommitted );
141
150
Preconditions .checkState (!inflight .containsKey (uncommitted .getKey ()));
@@ -146,6 +155,9 @@ private boolean repairKey(UncommittedPaxosKey uncommitted)
146
155
if (consistency == null )
147
156
return false ;
148
157
158
+ if (waitForCoordinator )
159
+ maybeWaitForOriginalCoordinator (uncommitted , txnTimeoutMicros );
160
+
149
161
inflight .put (uncommitted .getKey (), tableRepairs .startOrGetOrQueue (uncommitted .getKey (), uncommitted .ballot (), uncommitted .getConsistencyLevel (), table , result -> {
150
162
if (result .wasSuccessful ())
151
163
onKeyFinish (uncommitted .getKey ());
@@ -155,6 +167,24 @@ private boolean repairKey(UncommittedPaxosKey uncommitted)
155
167
return true ;
156
168
}
157
169
170
+ /**
171
+ * Wait to repair things that are still potentially executing at the original coordinator to avoid
172
+ * causing timeouts. This should only have to happen at most a few times when the repair starts
173
+ */
174
+ private static void maybeWaitForOriginalCoordinator (UncommittedPaxosKey uncommitted , long txnTimeoutMicros )
175
+ {
176
+ long nowMicros = MILLISECONDS .toMicros (Clock .Global .currentTimeMillis ());
177
+ long ballotElapsedMicros = nowMicros - uncommitted .ballot ().unixMicros ();
178
+ if (ballotElapsedMicros < 0 && Math .abs (ballotElapsedMicros ) > SECONDS .toMicros (1 ))
179
+ logger .warn ("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}" , uncommitted .ballot ());
180
+ if (ballotElapsedMicros < txnTimeoutMicros )
181
+ {
182
+ long sleepMicros = txnTimeoutMicros - ballotElapsedMicros ;
183
+ logger .info ("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish" , sleepMicros );
184
+ Uninterruptibles .sleepUninterruptibly (sleepMicros , MICROSECONDS );
185
+ }
186
+ }
187
+
158
188
private synchronized void onKeyFinish (DecoratedKey key )
159
189
{
160
190
if (!inflight .containsKey (key ))
0 commit comments