Skip to content

Commit df5e205

Browse files
HDFS-17713. [ARR] Throtting asynchronous calls for each nameservice. (#7304). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <[email protected]>
1 parent 1f640aa commit df5e205

File tree

8 files changed

+321
-45
lines changed

8 files changed

+321
-45
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.fairness;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.Set;
27+
28+
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
29+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
30+
DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY;
31+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
32+
DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
33+
34+
/**
35+
* When router async rpc enabled, it is recommended to use this fairness controller.
36+
*/
37+
public class RouterAsyncRpcFairnessPolicyController extends
38+
AbstractRouterRpcFairnessPolicyController {
39+
40+
private static final Logger LOG =
41+
LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class);
42+
43+
public static final String INIT_MSG = "Max async call permits per nameservice: %d";
44+
45+
public RouterAsyncRpcFairnessPolicyController(Configuration conf) {
46+
init(conf);
47+
}
48+
49+
public void init(Configuration conf) throws IllegalArgumentException {
50+
super.init(conf);
51+
52+
int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY,
53+
DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
54+
if (maxAsyncCallPermit <= 0) {
55+
maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
56+
}
57+
LOG.info(String.format(INIT_MSG, maxAsyncCallPermit));
58+
59+
// Get all name services configured.
60+
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
61+
62+
for (String nsId : allConfiguredNS) {
63+
LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId);
64+
insertNameServiceWithPermits(nsId, maxAsyncCallPermit);
65+
logAssignment(nsId, maxAsyncCallPermit);
66+
}
67+
// Avoid NPE when router async rpc disable.
68+
insertNameServiceWithPermits(CONCURRENT_NS, maxAsyncCallPermit);
69+
LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, CONCURRENT_NS);
70+
}
71+
72+
private static void logAssignment(String nsId, int count) {
73+
LOG.info("Assigned {} permits to nsId {} ", count, nsId);
74+
}
75+
76+
@Override
77+
public boolean acquirePermit(String nsId) {
78+
if (nsId.equals(CONCURRENT_NS)) {
79+
return true;
80+
}
81+
return super.acquirePermit(nsId);
82+
}
83+
84+
@Override
85+
public void releasePermit(String nsId) {
86+
if (nsId.equals(CONCURRENT_NS)) {
87+
return;
88+
}
89+
super.releasePermit(nsId);
90+
}
91+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

+3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
8888
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
8989
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
9090
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
91+
public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY =
92+
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit";
93+
public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000;
9194

9295
public static final String DFS_ROUTER_METRICS_ENABLE =
9396
FEDERATION_ROUTER_PREFIX + "metrics.enable";

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
10241024
throws IOException {
10251025
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
10261026
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
1027-
acquirePermit(nsId, ugi, method, controller);
1027+
acquirePermit(nsId, ugi, method.getMethodName(), controller);
10281028
try {
10291029
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
10301030
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
@@ -1199,7 +1199,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
11991199
boolean isObserverRead = isObserverReadEligible(ns, m);
12001200
List<? extends FederationNamenodeContext> namenodes =
12011201
getOrderedNamenodes(ns, isObserverRead);
1202-
acquirePermit(ns, ugi, remoteMethod, controller);
1202+
acquirePermit(ns, ugi, remoteMethod.getMethodName(), controller);
12031203
try {
12041204
Class<?> proto = remoteMethod.getProtocol();
12051205
Object[] params = remoteMethod.getParams(loc);
@@ -1579,7 +1579,7 @@ protected static <T extends RemoteLocationContext, R> Map<T, R> postProcessResul
15791579
return invokeSingle(locations.iterator().next(), method);
15801580
}
15811581
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
1582-
acquirePermit(CONCURRENT_NS, ugi, method, controller);
1582+
acquirePermit(CONCURRENT_NS, ugi, method.getMethodName(), controller);
15831583

15841584
List<T> orderedLocations = new ArrayList<>();
15851585
List<Callable<Object>> callables = new ArrayList<>();
@@ -1758,7 +1758,7 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
17581758
final List<? extends FederationNamenodeContext> namenodes =
17591759
getOrderedNamenodes(ns, isObserverRead);
17601760
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
1761-
acquirePermit(ns, ugi, method, controller);
1761+
acquirePermit(ns, ugi, method.getMethodName(), controller);
17621762
try {
17631763
Class<?> proto = method.getProtocol();
17641764
Object[] paramList = method.getParams(location);
@@ -1829,12 +1829,12 @@ private String getNameserviceForBlockPoolId(final String bpId)
18291829
*
18301830
* @param nsId Identifier of the block pool.
18311831
* @param ugi UserGroupIdentifier associated with the user.
1832-
* @param m Remote method that needs to be invoked.
1832+
* @param methodName The name of remote method that needs to be invoked.
18331833
* @param controller fairness policy controller to acquire permit from
18341834
* @throws IOException If permit could not be acquired for the nsId.
18351835
*/
18361836
protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
1837-
final RemoteMethod m, RouterRpcFairnessPolicyController controller)
1837+
final String methodName, RouterRpcFairnessPolicyController controller)
18381838
throws IOException {
18391839
if (controller != null) {
18401840
if (!controller.acquirePermit(nsId)) {
@@ -1845,7 +1845,7 @@ protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
18451845
}
18461846
incrRejectedPermitForNs(nsId);
18471847
LOG.debug("Permit denied for ugi: {} for method: {}",
1848-
ugi, m.getMethodName());
1848+
ugi, methodName);
18491849
String msg =
18501850
"Router " + router.getRouterId() +
18511851
" is overloaded for NS: " + nsId;
@@ -1880,7 +1880,7 @@ protected void releasePermit(final String nsId, final UserGroupInformation ugi,
18801880
return routerRpcFairnessPolicyController;
18811881
}
18821882

1883-
private void incrRejectedPermitForNs(String ns) {
1883+
protected void incrRejectedPermitForNs(String ns) {
18841884
rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
18851885
}
18861886

@@ -1889,7 +1889,7 @@ public Long getRejectedPermitForNs(String ns) {
18891889
rejectedPermitsPerNs.get(ns).longValue() : 0L;
18901890
}
18911891

1892-
private void incrAcceptedPermitForNs(String ns) {
1892+
protected void incrAcceptedPermitForNs(String ns) {
18931893
acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
18941894
}
18951895

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

+31-32
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.util.concurrent.Callable;
5959
import java.util.concurrent.CompletableFuture;
6060

61-
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
6261
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
6362
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
6463
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
@@ -178,8 +177,14 @@ public Object invokeMethod(
178177
namenodes.toString(), params);
179178
}
180179
threadLocalContext.transfer();
180+
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
181+
acquirePermit(nsid, ugi, method.getName(), controller);
181182
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
182183
useObserver, protocol, method, params);
184+
asyncFinally(object -> {
185+
releasePermit(nsid, ugi, method, controller);
186+
return object;
187+
});
183188
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
184189
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
185190
return null;
@@ -227,7 +232,7 @@ private void invokeMethodAsync(
227232
connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
228233
NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
229234
invoke(namenode, status.isShouldUseObserver(), 0, method,
230-
client.getProxy(), params);
235+
client.getProxy(), params);
231236
asyncApply(res -> {
232237
status.setComplete(true);
233238
postProcessResult(method, status, namenode, nsId, client);
@@ -363,7 +368,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
363368
Class<T> expectedResultClass, Object expectedResultValue)
364369
throws IOException {
365370

366-
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
367371
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
368372
final Method m = remoteMethod.getMethod();
369373
List<IOException> thrownExceptions = new ArrayList<>();
@@ -378,7 +382,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
378382
boolean isObserverRead = isObserverReadEligible(ns, m);
379383
List<? extends FederationNamenodeContext> namenodes =
380384
getOrderedNamenodes(ns, isObserverRead);
381-
acquirePermit(ns, ugi, remoteMethod, controller);
382385
asyncTry(() -> {
383386
Class<?> proto = remoteMethod.getProtocol();
384387
Object[] params = remoteMethod.getParams(loc);
@@ -419,10 +422,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
419422
}
420423
return ret;
421424
}, Exception.class);
422-
asyncFinally(ret -> {
423-
releasePermit(ns, ugi, remoteMethod, controller);
424-
return ret;
425-
});
426425
});
427426
asyncApply(result -> {
428427
if (status.isComplete()) {
@@ -498,7 +497,6 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
498497
protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
499498
RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
500499
List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
501-
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
502500
final Method m = method.getMethod();
503501
final CompletableFuture<Object>[] futures =
504502
new CompletableFuture[callables.size()];
@@ -523,8 +521,6 @@ protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemot
523521
LOG.error("Unexpected error while invoking API: {}", e.getMessage());
524522
throw warpCompletionException(new IOException(
525523
"Unexpected error while invoking API " + e.getMessage(), e));
526-
} finally {
527-
releasePermit(CONCURRENT_NS, ugi, method, controller);
528524
}
529525
}));
530526
return asyncReturn(List.class);
@@ -553,8 +549,6 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
553549
boolean isObserverRead = isObserverReadEligible(ns, m);
554550
final List<? extends FederationNamenodeContext> namenodes =
555551
getOrderedNamenodes(ns, isObserverRead);
556-
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
557-
acquirePermit(ns, ugi, method, controller);
558552
asyncTry(() -> {
559553
Class<?> proto = method.getProtocol();
560554
Object[] paramList = method.getParams(location);
@@ -567,10 +561,6 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
567561
asyncCatch((o, ioe) -> {
568562
throw processException(ioe, location);
569563
}, IOException.class);
570-
asyncFinally(o -> {
571-
releasePermit(ns, ugi, method, controller);
572-
return o;
573-
});
574564
return asyncReturn(List.class);
575565
}
576566

@@ -589,21 +579,13 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
589579
public Object invokeSingle(final String nsId, RemoteMethod method)
590580
throws IOException {
591581
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
592-
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
593-
acquirePermit(nsId, ugi, method, controller);
594-
asyncTry(() -> {
595-
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
596-
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
597-
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
598-
Class<?> proto = method.getProtocol();
599-
Method m = method.getMethod();
600-
Object[] params = method.getParams(loc);
601-
invokeMethod(ugi, nns, isObserverRead, proto, m, params);
602-
});
603-
asyncFinally(o -> {
604-
releasePermit(nsId, ugi, method, controller);
605-
return o;
606-
});
582+
boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
583+
List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
584+
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
585+
Class<?> proto = method.getProtocol();
586+
Method m = method.getMethod();
587+
Object[] params = method.getParams(loc);
588+
invokeMethod(ugi, nns, isObserverRead, proto, m, params);
607589
return null;
608590
}
609591

@@ -627,4 +609,21 @@ public <T> T invokeSingle(
627609
invokeSequential(locations, remoteMethod);
628610
return asyncReturn(clazz);
629611
}
612+
613+
/**
614+
* Release permit for specific nsId after processing against downstream
615+
* nsId is completed.
616+
* @param nsId Identifier of the block pool.
617+
* @param ugi UserGroupIdentifier associated with the user.
618+
* @param m Remote method that needs to be invoked.
619+
* @param controller fairness policy controller to release permit from
620+
*/
621+
protected void releasePermit(final String nsId, final UserGroupInformation ugi,
622+
final Method m, RouterRpcFairnessPolicyController controller) {
623+
if (controller != null) {
624+
controller.releasePermit(nsId);
625+
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
626+
m.getName());
627+
}
628+
}
630629
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

+9
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@
152152
</description>
153153
</property>
154154

155+
<property>
156+
<name>dfs.federation.router.async.rpc.max.asynccall.permit</name>
157+
<value>20000</value>
158+
<description>
159+
Maximum number of asynchronous RPC requests the Router can send to
160+
one downstream nameservice.
161+
</description>
162+
</property>
163+
155164
<property>
156165
<name>dfs.federation.router.connection.creator.queue-size</name>
157166
<value>100</value>

0 commit comments

Comments
 (0)