Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17713. [ARR] Throtting asynchronous calls for each nameservice. #7304

Merged
merged 10 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;

/**
* When router async rpc enabled, it is recommended to use this fairness controller.
*/
public class RouterAsyncRpcFairnessPolicyController extends
AbstractRouterRpcFairnessPolicyController {

private static final Logger LOG =
LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class);

public static final String INIT_MSG = "Max async call permits per nameservice: %d";

public RouterAsyncRpcFairnessPolicyController(Configuration conf) {
init(conf);
}

public void init(Configuration conf) throws IllegalArgumentException {
super.init(conf);

int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY,
DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
if (maxAsyncCallPermit <= 0) {
maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
}
LOG.info(String.format(INIT_MSG, maxAsyncCallPermit));

// Get all name services configured.
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);

for (String nsId : allConfiguredNS) {
LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId);
insertNameServiceWithPermits(nsId, maxAsyncCallPermit);
logAssignment(nsId, maxAsyncCallPermit);
}
// Avoid NPE when router async rpc disable.
insertNameServiceWithPermits(CONCURRENT_NS, maxAsyncCallPermit);
LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, CONCURRENT_NS);
}

private static void logAssignment(String nsId, int count) {
LOG.info("Assigned {} permits to nsId {} ", count, nsId);
}

@Override
public boolean acquirePermit(String nsId) {
if (nsId.equals(CONCURRENT_NS)) {
return true;
}
return super.acquirePermit(nsId);
}

@Override
public void releasePermit(String nsId) {
if (nsId.equals(CONCURRENT_NS)) {
return;
}
super.releasePermit(nsId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit";
public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000;

public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsId, ugi, method, controller);
acquirePermit(nsId, ugi, method.getMethodName(), controller);
try {
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
Expand Down Expand Up @@ -1199,7 +1199,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
boolean isObserverRead = isObserverReadEligible(ns, m);
List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(ns, isObserverRead);
acquirePermit(ns, ugi, remoteMethod, controller);
acquirePermit(ns, ugi, remoteMethod.getMethodName(), controller);
try {
Class<?> proto = remoteMethod.getProtocol();
Object[] params = remoteMethod.getParams(loc);
Expand Down Expand Up @@ -1579,7 +1579,7 @@ protected static <T extends RemoteLocationContext, R> Map<T, R> postProcessResul
return invokeSingle(locations.iterator().next(), method);
}
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(CONCURRENT_NS, ugi, method, controller);
acquirePermit(CONCURRENT_NS, ugi, method.getMethodName(), controller);

List<T> orderedLocations = new ArrayList<>();
List<Callable<Object>> callables = new ArrayList<>();
Expand Down Expand Up @@ -1758,7 +1758,7 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
final List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(ns, isObserverRead);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(ns, ugi, method, controller);
acquirePermit(ns, ugi, method.getMethodName(), controller);
try {
Class<?> proto = method.getProtocol();
Object[] paramList = method.getParams(location);
Expand Down Expand Up @@ -1829,12 +1829,12 @@ private String getNameserviceForBlockPoolId(final String bpId)
*
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param methodName The name of remote method that needs to be invoked.
* @param controller fairness policy controller to acquire permit from
* @throws IOException If permit could not be acquired for the nsId.
*/
protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller)
final String methodName, RouterRpcFairnessPolicyController controller)
throws IOException {
if (controller != null) {
if (!controller.acquirePermit(nsId)) {
Expand All @@ -1845,7 +1845,7 @@ protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
}
incrRejectedPermitForNs(nsId);
LOG.debug("Permit denied for ugi: {} for method: {}",
ugi, m.getMethodName());
ugi, methodName);
String msg =
"Router " + router.getRouterId() +
" is overloaded for NS: " + nsId;
Expand Down Expand Up @@ -1880,7 +1880,7 @@ protected void releasePermit(final String nsId, final UserGroupInformation ugi,
return routerRpcFairnessPolicyController;
}

private void incrRejectedPermitForNs(String ns) {
protected void incrRejectedPermitForNs(String ns) {
rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
}

Expand All @@ -1889,7 +1889,7 @@ public Long getRejectedPermitForNs(String ns) {
rejectedPermitsPerNs.get(ns).longValue() : 0L;
}

private void incrAcceptedPermitForNs(String ns) {
protected void incrAcceptedPermitForNs(String ns) {
acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
Expand Down Expand Up @@ -178,8 +177,14 @@ public Object invokeMethod(
namenodes.toString(), params);
}
threadLocalContext.transfer();
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsid, ugi, method.getName(), controller);
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
asyncFinally(object -> {
releasePermit(nsid, ugi, method, controller);
return object;
});
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
return null;
Expand Down Expand Up @@ -227,7 +232,7 @@ private void invokeMethodAsync(
connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
invoke(namenode, status.isShouldUseObserver(), 0, method,
client.getProxy(), params);
client.getProxy(), params);
asyncApply(res -> {
status.setComplete(true);
postProcessResult(method, status, namenode, nsId, client);
Expand Down Expand Up @@ -363,7 +368,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
Class<T> expectedResultClass, Object expectedResultValue)
throws IOException {

RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod();
List<IOException> thrownExceptions = new ArrayList<>();
Expand All @@ -378,7 +382,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
boolean isObserverRead = isObserverReadEligible(ns, m);
List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(ns, isObserverRead);
acquirePermit(ns, ugi, remoteMethod, controller);
asyncTry(() -> {
Class<?> proto = remoteMethod.getProtocol();
Object[] params = remoteMethod.getParams(loc);
Expand Down Expand Up @@ -419,10 +422,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
}
return ret;
}, Exception.class);
asyncFinally(ret -> {
releasePermit(ns, ugi, remoteMethod, controller);
return ret;
});
});
asyncApply(result -> {
if (status.isComplete()) {
Expand Down Expand Up @@ -498,7 +497,6 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = method.getMethod();
final CompletableFuture<Object>[] futures =
new CompletableFuture[callables.size()];
Expand All @@ -523,8 +521,6 @@ protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemot
LOG.error("Unexpected error while invoking API: {}", e.getMessage());
throw warpCompletionException(new IOException(
"Unexpected error while invoking API " + e.getMessage(), e));
} finally {
releasePermit(CONCURRENT_NS, ugi, method, controller);
}
}));
return asyncReturn(List.class);
Expand Down Expand Up @@ -553,8 +549,6 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
boolean isObserverRead = isObserverReadEligible(ns, m);
final List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(ns, isObserverRead);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(ns, ugi, method, controller);
asyncTry(() -> {
Class<?> proto = method.getProtocol();
Object[] paramList = method.getParams(location);
Expand All @@ -567,10 +561,6 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
asyncCatch((o, ioe) -> {
throw processException(ioe, location);
}, IOException.class);
asyncFinally(o -> {
releasePermit(ns, ugi, method, controller);
return o;
});
return asyncReturn(List.class);
}

Expand All @@ -589,21 +579,13 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsId, ugi, method, controller);
asyncTry(() -> {
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
invokeMethod(ugi, nns, isObserverRead, proto, m, params);
});
asyncFinally(o -> {
releasePermit(nsId, ugi, method, controller);
return o;
});
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
invokeMethod(ugi, nns, isObserverRead, proto, m, params);
return null;
}

Expand All @@ -627,4 +609,21 @@ public <T> T invokeSingle(
invokeSequential(locations, remoteMethod);
return asyncReturn(clazz);
}

/**
* Release permit for specific nsId after processing against downstream
* nsId is completed.
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to release permit from
*/
protected void releasePermit(final String nsId, final UserGroupInformation ugi,
final Method m, RouterRpcFairnessPolicyController controller) {
if (controller != null) {
controller.releasePermit(nsId);
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
m.getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@
</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.max.asynccall.permit</name>
<value>20000</value>
<description>
Maximum number of asynchronous RPC requests the Router can send to
one downstream nameservice.
</description>
</property>

<property>
<name>dfs.federation.router.connection.creator.queue-size</name>
<value>100</value>
Expand Down
Loading