Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ public enum Property {
"The number of threads used to run fault-tolerant executions (FATE)."
+ " These are primarily table operations like merge.",
"1.4.3"),
MANAGER_TSERVER_HALT_DURATION("manager.tservers.halt.grace.period", "0",
PropertyType.TIMEDURATION,
"Allows the manager to force tserver halting by setting the max duration of time spent attempting to halt a tserver "
+ " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.",
"2.1.5"),
@Deprecated(since = "2.1.0")
MANAGER_REPLICATION_SCAN_INTERVAL("manager.replication.status.scan.interval", "30s",
PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ public long getSessionId() throws KeeperException, InterruptedException {
/**
* This method will delete all server locks for a given path according the predicate conditions.
*
* @param zk zookeeper instance
* @param zPath can be a path directly to a host or a general path like @{link
* org.apache.accumulo.core.Constants.ZTSERVERS} or a resource group
* @param hostPortPredicate conditional predicate for determining if the lock should be removed.
* @param messageOutput function for setting where the output from the lockPath goes
* @param dryRun allows lock format validation and the messageOutput to be sent without actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.manager.metrics.BalancerMetrics;
Expand Down Expand Up @@ -195,6 +196,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener,
final AuditedSecurityOperation security;
final Map<TServerInstance,AtomicInteger> badServers =
Collections.synchronizedMap(new HashMap<>());
final Map<TServerInstance,GracefulHaltTimer> tserverHaltRpcAttempts =
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
final EventCoordinator nextEvent = new EventCoordinator();
Expand Down Expand Up @@ -1141,6 +1144,30 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,

}

/**
* This class tracks details about the haltRPCs used
*/
private static class GracefulHaltTimer {

Duration maxHaltGraceDuration;
Timer timer;

public GracefulHaltTimer(AccumuloConfiguration config) {
timer = null;
maxHaltGraceDuration =
Duration.ofMillis(config.getTimeInMillis(Property.MANAGER_TSERVER_HALT_DURATION));
}

public void startTimer() {
timer = Timer.startNew();
}

public boolean shouldForceHalt() {
return maxHaltGraceDuration.toMillis() != 0 && timer != null
&& timer.hasElapsed(maxHaltGraceDuration);
}
}

private SortedMap<TServerInstance,TabletServerStatus>
gatherTableInformation(Set<TServerInstance> currentServers) {
final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
Expand All @@ -1150,6 +1177,9 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
long start = System.currentTimeMillis();
final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>();
final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
final int maxTserverRpcHaltAttempts =
getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_DURATION);
final boolean forceHaltingEnabled = maxTserverRpcHaltAttempts != 0;
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
if (threads == 0) {
Expand Down Expand Up @@ -1190,15 +1220,35 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
> MAX_BAD_STATUS_COUNT) {
if (shutdownServerRateLimiter.tryAcquire()) {
log.warn("attempting to stop {}", server);
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
var gracefulHaltTimer = tserverHaltRpcAttempts.computeIfAbsent(server,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the intent to guard this new code with:
if (forceHaltingEnabled) {
?

s -> new GracefulHaltTimer(getConfiguration()));
if (gracefulHaltTimer.shouldForceHalt()) {
log.warn("tserver {} is not responding to halt requests, deleting zlock", server);
var zk = getContext().getZooReaderWriter();
var iid = getContext().getInstanceID();
String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
try {
ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals,
log::info, false);
tserverHaltRpcAttempts.remove(server);
badServers.remove(server);
} catch (KeeperException | InterruptedException e) {
log.error("Failed to delete zlock for server {}", server, e);
}
} else {
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
}
} catch (TTransportException e1) {
// ignore: it's probably down so log the exception at trace
log.trace("error attempting to halt tablet server {}", server, e1);
} catch (Exception e2) {
log.info("error talking to troublesome tablet server {}", server, e2);
} finally {
gracefulHaltTimer.startTimer();
}
} catch (TTransportException e1) {
// ignore: it's probably down
} catch (Exception e2) {
log.info("error talking to troublesome tablet server", e2);
}
} else {
log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server,
Expand All @@ -1225,6 +1275,12 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
badServers.keySet().retainAll(currentServers);
badServers.keySet().removeAll(info.keySet());
}

synchronized (tserverHaltRpcAttempts) {
tserverHaltRpcAttempts.keySet().retainAll(currentServers);
tserverHaltRpcAttempts.keySet().removeAll(info.keySet());
}

log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds",
info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));

Expand Down Expand Up @@ -1727,14 +1783,17 @@ public void update(LiveTServerSet current, Set<TServerInstance> deleted,
}
serversToShutdown.removeAll(deleted);
badServers.keySet().removeAll(deleted);
tserverHaltRpcAttempts.keySet().removeAll(deleted);
// clear out any bad server with the same host/port as a new server
synchronized (badServers) {
cleanListByHostAndPort(badServers.keySet(), deleted, added);
}
synchronized (serversToShutdown) {
cleanListByHostAndPort(serversToShutdown, deleted, added);
}

synchronized (tserverHaltRpcAttempts) {
cleanListByHostAndPort(tserverHaltRpcAttempts.keySet(), deleted, added);
}
migrations.removeServers(deleted);
nextEvent.event("There are now %d tablet servers", current.size());
}
Expand Down
Loading