Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -716,6 +720,48 @@ public long getSessionId() throws KeeperException, InterruptedException {
}
}

/**
* This method will delete all server locks for a given path according the predicate conditions.
*
* @param hostPortPredicate conditional predicate for determining if the lock should be removed.
* @param messageOutput function for setting where the output from the lockPath goes
* @param dryRun allows lock format validation and the messageOutput to be sent without actually
* deleting the lock
*
*/
public static void deleteLocks(ZooReaderWriter zk, String zPath,
Predicate<HostAndPort> hostPortPredicate, Consumer<String> messageOutput, Boolean dryRun)
throws KeeperException, InterruptedException {

Objects.requireNonNull(zPath, "Lock path cannot be null");
if (!zk.exists(zPath)) {
throw new IllegalStateException("Path " + zPath + " does not exist");
}

List<String> servers = zk.getChildren(zPath);
if (servers.isEmpty()) {
throw new IllegalStateException("No server locks are held at " + zPath);
}

for (String server : servers) {
if (hostPortPredicate.test(HostAndPort.fromString(server))) {
messageOutput.accept("Deleting " + zPath + "/" + server + " from zookeeper");
if (!dryRun) {
LOG.debug("Deleting all locks at path {} due to lock deletion", zPath);
zk.recursiveDelete(zPath + "/" + server, NodeMissingPolicy.SKIP);
}
}
}
}

/**
* This method will delete the top server lock for a given lock path
*
* @param zk zookeeper client
* @param path path for lock deletion only the top child lock will be removed
*
*/

public static void deleteLock(ZooReaderWriter zk, ServiceLockPath path)
throws InterruptedException, KeeperException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public void zap(SiteConfiguration siteConf, String... args) {
zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
}
} else {
removeLocks(zoo, tserversPath, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, tserversPath, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting tserver locks", e);
Expand Down Expand Up @@ -269,7 +270,8 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
List<String> groups = zoo.getChildren(path);
for (String group : groups) {
if (groupPredicate.test(group)) {
removeLocks(zoo, path + "/" + group, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, path + "/" + group, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
}
}
Expand All @@ -278,19 +280,7 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
static void removeLocks(ZooReaderWriter zoo, String path,
Predicate<HostAndPort> hostPortPredicate, Opts opts)
throws KeeperException, InterruptedException {
if (zoo.exists(path)) {
List<String> children = zoo.getChildren(path);
for (String child : children) {
if (hostPortPredicate.test(HostAndPort.fromString(child))) {
message("Deleting " + path + "/" + child + " from zookeeper", opts);
if (!opts.dryRun) {
// TODO not sure this is the correct way to delete this lock.. the code was deleting
// locks in multiple different ways for diff servers types.
zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
}
}
}
}
ServiceLock.deleteLocks(zoo, path, hostPortPredicate, m -> message(m, opts), opts.dryRun);
}

static void removeSingletonLock(ZooReaderWriter zoo, String path,
Expand Down