Skip to content

Commit 9c0220b

Browse files
author
Hernan Gelaf-Romer
committed
Batch calls to overloaded cluster can cause meta hotspotting
1 parent c07bc80 commit 9c0220b

File tree

3 files changed

+20
-14
lines changed

3 files changed

+20
-14
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,13 +219,14 @@ public void run() {
219219
} catch (IOException e) {
220220
// The service itself failed . It may be an error coming from the communication
221221
// layer, but, as well, a functional error raised by the server.
222-
receiveGlobalFailure(multiAction, server, numAttempt, e, true);
222+
223+
receiveGlobalFailure(multiAction, server, numAttempt, e);
223224
return;
224225
} catch (Throwable t) {
225226
// This should not happen. Let's log & retry anyway.
226227
LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected."
227228
+ " Retrying. Server=" + server + ", tableName=" + tableName, t);
228-
receiveGlobalFailure(multiAction, server, numAttempt, t, true);
229+
receiveGlobalFailure(multiAction, server, numAttempt, t);
229230
return;
230231
}
231232
if (res.type() == AbstractResponse.ResponseType.MULTI) {
@@ -570,7 +571,6 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache)
570571
*/
571572
void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
572573
List<Action> actionsForReplicaThread, boolean reuseThread) {
573-
boolean clearServerCache = true;
574574
// Run the last item on the same thread if we are already on a send thread.
575575
// We hope most of the time it will be the only item, so we can cut down on threads.
576576
int actionsRemaining = actionsByServer.size();
@@ -606,15 +606,14 @@ void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttemp
606606
LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server="
607607
+ server.getServerName(), t);
608608
// Do not update cache if exception is from failing to submit action to thread pool
609-
clearServerCache = false;
610609
} else {
611610
// see #HBASE-14359 for more details
612611
LOG.warn("Caught unexpected exception/error: ", t);
613612
}
614613
asyncProcess.decTaskCounters(multiAction.getRegions(), server);
615614
// We're likely to fail again, but this will increment the attempt counter,
616615
// so it will finish.
617-
receiveGlobalFailure(multiAction, server, numAttempt, t, clearServerCache);
616+
receiveGlobalFailure(multiAction, server, numAttempt, t);
618617
}
619618
}
620619
}
@@ -764,13 +763,18 @@ private void failAll(MultiAction actions, ServerName server, int numAttempt,
764763
* @param t the throwable (if any) that caused the resubmit
765764
*/
766765
private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt,
767-
Throwable t, boolean clearServerCache) {
766+
Throwable t) {
768767
errorsByServer.reportServerError(server);
769768
Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
769+
boolean clearServerCache = ClientExceptionsUtil.isMetaClearingException(t);
770770

771771
// Do not update cache if exception is from failing to submit action to thread pool
772772
if (clearServerCache) {
773773
cleanServerCache(server, t);
774+
775+
if (LOG.isTraceEnabled()) {
776+
LOG.trace("Cleared meta cache for server {} due to global failure {}", server, t);
777+
}
774778
}
775779

776780
int failed = 0;
@@ -779,12 +783,8 @@ private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int
779783
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
780784
byte[] regionName = e.getKey();
781785
byte[] row = e.getValue().get(0).getAction().getRow();
782-
// Do not use the exception for updating cache because it might be coming from
783-
// any of the regions in the MultiAction and do not update cache if exception is
784-
// from failing to submit action to thread pool
785786
if (clearServerCache) {
786-
updateCachedLocations(server, regionName, row,
787-
ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
787+
updateCachedLocations(server, regionName, row, t);
788788
}
789789
for (Action action : e.getValue()) {
790790
Retry retry =

hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.net.SocketTimeoutException;
2727
import java.nio.channels.ClosedChannelException;
2828
import java.util.Set;
29+
import java.util.concurrent.RejectedExecutionException;
2930
import java.util.concurrent.TimeoutException;
3031
import org.apache.hadoop.hbase.CallDroppedException;
3132
import org.apache.hadoop.hbase.CallQueueTooBigException;
@@ -56,8 +57,8 @@ public static boolean isMetaClearingException(Throwable cur) {
5657
if (cur == null) {
5758
return true;
5859
}
59-
return !isSpecialException(cur) || (cur instanceof RegionMovedException)
60-
|| cur instanceof NotServingRegionException;
60+
return (!isExecutorException(cur) && !isSpecialException(cur))
61+
|| (cur instanceof RegionMovedException) || cur instanceof NotServingRegionException;
6162
}
6263

6364
public static boolean isSpecialException(Throwable cur) {
@@ -177,4 +178,8 @@ public static Throwable translatePFFE(Throwable t) throws IOException {
177178
}
178179
return t;
179180
}
181+
182+
private static boolean isExecutorException(Throwable t) {
183+
return RejectedExecutionException.class.isAssignableFrom(t.getClass());
184+
}
180185
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Arrays;
2828
import java.util.List;
2929
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.RejectedExecutionException;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.stream.Collectors;
3233
import java.util.stream.IntStream;
@@ -397,7 +398,7 @@ public static List<Throwable> metaCachePreservingExceptions() {
397398
return Arrays.asList(new RegionOpeningException(" "),
398399
new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
399400
new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
400-
new CallQueueTooBigException());
401+
new CallQueueTooBigException(), new RejectedExecutionException(" "));
401402
}
402403

403404
public static class RegionServerWithFakeRpcServices extends HRegionServer {

0 commit comments

Comments
 (0)