From e5b9d0d49d573306b047edc4ad86e3f5371e6cc7 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 21 Jan 2025 10:25:58 +0800 Subject: [PATCH 01/10] HDFS-17713. [ARR] Throtting asynchronous calls for each nameservice. --- ...outerAsyncRpcFairnessPolicyController.java | 73 ++++++ .../federation/router/RBFConfigKeys.java | 3 + .../federation/router/RouterRpcClient.java | 4 +- .../router/async/RouterAsyncRpcClient.java | 208 ++++++++++++++---- .../src/main/resources/hdfs-rbf-default.xml | 9 + .../router/async/TestRouterAsyncRpc.java | 15 +- 6 files changed, 258 insertions(+), 54 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..439f4f3391267 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + +/** + * When router async rpc enabled, it is recommended to use this fairness controller. + */ +public class RouterAsyncRpcFairnessPolicyController extends + AbstractRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class); + + public RouterAsyncRpcFairnessPolicyController(Configuration conf) { + init(conf); + } + + public void init(Configuration conf) throws IllegalArgumentException { + super.init(conf); + + int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + if (maxAsyncCallPermit <= 0) { + maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + } + LOG.info("Max async call permits per nameservice: {}", maxAsyncCallPermit); + + // Get all name services configured + Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + + for (String nsId : allConfiguredNS) { + LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId); + insertNameServiceWithPermits(nsId, maxAsyncCallPermit); + logAssignment(nsId, maxAsyncCallPermit); + } + // Avoid NPE when router async rpc disable. + insertNameServiceWithPermits(CONCURRENT_NS, maxAsyncCallPermit); + LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, CONCURRENT_NS); + } + + private static void logAssignment(String nsId, int count) { + LOG.info("Assigned {} permits to nsId {} ", count, nsId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 955d98cfc852c..cedcddfdeb862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit"; + public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000; public static final String DFS_ROUTER_METRICS_ENABLE = FEDERATION_ROUTER_PREFIX + "metrics.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33ec7..c20e425aff732 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1880,7 +1880,7 @@ protected void releasePermit(final String nsId, final UserGroupInformation ugi, return routerRpcFairnessPolicyController; } - private void incrRejectedPermitForNs(String ns) { + protected void incrRejectedPermitForNs(String ns) { rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } @@ -1889,7 +1889,7 @@ public Long getRejectedPermitForNs(String ns) { rejectedPermitsPerNs.get(ns).longValue() : 0L; } - private void incrAcceptedPermitForNs(String ns) { + protected void incrAcceptedPermitForNs(String ns) { acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index c214adf1f2abb..5353c4c9ef1de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -58,7 +60,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor; @@ -178,7 +179,7 @@ public Object invokeMethod( namenodes.toString(), params); } threadLocalContext.transfer(); - invokeMethodAsync(ugi, (List) namenodes, + invokeMethodAsync(nsid, ugi, (List) namenodes, useObserver, protocol, method, params); }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); @@ -202,11 +203,13 @@ public Object invokeMethod( * @param params The parameters for the method invocation. */ private void invokeMethodAsync( + String nsid, final UserGroupInformation ugi, final List namenodes, boolean useObserver, final Class protocol, final Method method, final Object... params) { + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); addClientInfoToCallerContext(ugi); if (rpcMonitor != null) { rpcMonitor.proxyOp(); @@ -214,46 +217,55 @@ private void invokeMethodAsync( final ExecutionStatus status = new ExecutionStatus(false, useObserver); Map ioes = new LinkedHashMap<>(); final ConnectionContext[] connection = new ConnectionContext[1]; - asyncForEach(namenodes.iterator(), - (foreach, namenode) -> { - if (!status.isShouldUseObserver() - && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { - asyncComplete(null); - return; - } - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); - asyncTry(() -> { - connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); - NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); - invoke(namenode, status.isShouldUseObserver(), 0, method, + asyncTry(() -> { + acquirePermit(nsid, ugi, method, controller); + asyncForEach(namenodes.iterator(), + (foreach, namenode) -> { + if (!status.isShouldUseObserver() + && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { + asyncComplete(null); + return; + } + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + asyncTry(() -> { + connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); + NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); + invoke(namenode, status.isShouldUseObserver(), 0, method, client.getProxy(), params); - asyncApply(res -> { - status.setComplete(true); - postProcessResult(method, status, namenode, nsId, client); - foreach.breakNow(); + asyncApply(res -> { + status.setComplete(true); + postProcessResult(method, status, namenode, nsId, client); + foreach.breakNow(); + return res; + }); + }); + asyncCatch((res, ioe) -> { + ioes.put(namenode, ioe); + handleInvokeMethodIOException(namenode, ioe, status, useObserver); + return res; + }, IOException.class); + asyncFinally(res -> { + if (connection[0] != null) { + connection[0].release(); + } return res; }); }); - asyncCatch((res, ioe) -> { - ioes.put(namenode, ioe); - handleInvokeMethodIOException(namenode, ioe, status, useObserver); - return res; - }, IOException.class); - asyncFinally(res -> { - if (connection[0] != null) { - connection[0].release(); - } - return res; - }); - }); - asyncApply(res -> { - if (status.isComplete()) { - return res; - } - return handlerAllNamenodeFail(namenodes, method, ioes, params); + asyncApply(res -> { + if (status.isComplete()) { + return res; + } + return handlerAllNamenodeFail(namenodes, method, ioes, params); + }); + }); + + asyncFinally(res -> { + releasePermit(nsid, ugi, method, controller); + return res; }); + } /** @@ -363,7 +375,6 @@ public RemoteResult invokeSequential( Class expectedResultClass, Object expectedResultValue) throws IOException { - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = remoteMethod.getMethod(); List thrownExceptions = new ArrayList<>(); @@ -378,7 +389,6 @@ public RemoteResult invokeSequential( boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); - acquirePermit(ns, ugi, remoteMethod, controller); asyncTry(() -> { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -420,7 +430,6 @@ public RemoteResult invokeSequential( return ret; }, Exception.class); asyncFinally(ret -> { - releasePermit(ns, ugi, remoteMethod, controller); return ret; }); }); @@ -479,6 +488,76 @@ public Map invokeConcurrent( return asyncReturn(Map.class); } + @SuppressWarnings("unchecked") + public List> invokeConcurrent( + final Collection locations, final RemoteMethod method, + boolean standby, long timeOutMs, + Class clazz) throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + + if (locations.isEmpty()) { + throw new IOException("No remote locations available"); + } else if (locations.size() == 1 && timeOutMs <= 0) { + // Shortcut, just one call + return invokeSingle(locations.iterator().next(), method); + } + // Don't acquire CONCURRENT_NS permit here. + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + + List orderedLocations = new ArrayList<>(); + List> callables = new ArrayList<>(); + // transfer originCall & callerContext to worker threads of executor. + final Server.Call originCall = Server.getCurCall().get(); + final CallerContext originContext = CallerContext.getCurrent(); + for (final T location : locations) { + String nsId = location.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(nsId, m); + final List namenodes = + getOrderedNamenodes(nsId, isObserverRead); + final Class proto = method.getProtocol(); + final Object[] paramList = method.getParams(location); + if (standby) { + // Call the objectGetter to all NNs (including standby) + for (final FederationNamenodeContext nn : namenodes) { + String nnId = nn.getNamenodeId(); + final List nnList = + Collections.singletonList(nn); + T nnLocation = location; + if (location instanceof RemoteLocation) { + nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + } + orderedLocations.add(nnLocation); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod( + ugi, nnList, isObserverRead, proto, m, paramList); + }); + } + } else { + // Call the objectGetter in order of nameservices in the NS list + orderedLocations.add(location); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod( + ugi, namenodes, isObserverRead, proto, m, paramList); + }); + } + } + + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + if (this.router.getRouterClientMetrics() != null) { + this.router.getRouterClientMetrics().incInvokedConcurrent(m); + } + + return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); + } + /** * Invokes multiple concurrent proxy calls to different clients. Returns an * array of results. @@ -498,7 +577,6 @@ public Map invokeConcurrent( protected List> getRemoteResults( RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller, List orderedLocations, List> callables) throws IOException { - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = method.getMethod(); final CompletableFuture[] futures = new CompletableFuture[callables.size()]; @@ -523,8 +601,6 @@ protected List> getRemot LOG.error("Unexpected error while invoking API: {}", e.getMessage()); throw warpCompletionException(new IOException( "Unexpected error while invoking API " + e.getMessage(), e)); - } finally { - releasePermit(CONCURRENT_NS, ugi, method, controller); } })); return asyncReturn(List.class); @@ -553,8 +629,6 @@ public List> invokeSingl boolean isObserverRead = isObserverReadEligible(ns, m); final List namenodes = getOrderedNamenodes(ns, isObserverRead); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); asyncTry(() -> { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -568,7 +642,6 @@ public List> invokeSingl throw processException(ioe, location); }, IOException.class); asyncFinally(o -> { - releasePermit(ns, ugi, method, controller); return o; }); return asyncReturn(List.class); @@ -589,8 +662,6 @@ public List> invokeSingl public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); asyncTry(() -> { boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); List nns = getOrderedNamenodes(nsId, isObserverRead); @@ -601,7 +672,6 @@ public Object invokeSingle(final String nsId, RemoteMethod method) invokeMethod(ugi, nns, isObserverRead, proto, m, params); }); asyncFinally(o -> { - releasePermit(nsId, ugi, method, controller); return o; }); return null; @@ -627,4 +697,46 @@ public T invokeSingle( invokeSequential(locations, remoteMethod); return asyncReturn(clazz); } + + protected void acquirePermit(final String nsId, final UserGroupInformation ugi, + final Method m, RouterRpcFairnessPolicyController controller) + throws IOException { + if (controller != null) { + if (!controller.acquirePermit(nsId)) { + // Throw StandByException, + // Clients could fail over and try another router. + if (rpcMonitor != null) { + rpcMonitor.proxyOpPermitRejected(nsId); + } + incrRejectedPermitForNs(nsId); + LOG.debug("Permit denied for ugi: {} for method: {}", + ugi, m.getName()); + String msg = + "Router " + router.getRouterId() + + " is overloaded for NS: " + nsId; + throw new StandbyException(msg); + } + if (rpcMonitor != null) { + rpcMonitor.proxyOpPermitAccepted(nsId); + } + incrAcceptedPermitForNs(nsId); + } + } + + /** + * Release permit for specific nsId after processing against downstream + * nsId is completed. + * @param nsId Identifier of the block pool. + * @param ugi UserGroupIdentifier associated with the user. + * @param m Remote method that needs to be invoked. + * @param controller fairness policy controller to release permit from + */ + protected void releasePermit(final String nsId, final UserGroupInformation ugi, + final Method m, RouterRpcFairnessPolicyController controller) { + if (controller != null) { + controller.releasePermit(nsId); + LOG.trace("Permit released for ugi: {} for method: {}", ugi, + m.getName()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 46d273ab522bd..470dc61e8eb41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -152,6 +152,15 @@ + + dfs.federation.router.async.rpc.max.asynccall.permit + 20000 + + Maximum number of asynchronous RPC requests the Router can send to + one downstream nameservice. + + + dfs.federation.router.connection.creator.queue-size 100 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java index 7290c0a0aee81..0e007e6eb729d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -42,16 +45,20 @@ public class TestRouterAsyncRpc extends TestRouterRpc { @BeforeClass public static void globalSetUp() throws Exception { - // Start routers with only an RPC service + // Start routers with only an RPC service. Configuration routerConf = new RouterConfigBuilder() .metrics() .rpc() .build(); - // We decrease the DN cache times to make the test faster + // We decrease the DN cache times to make the test faster. routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - // use async router. + // Use async router. routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + // Use RouterAsyncRpcFairnessPolicyController as the fairness controller. + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); setUp(routerConf); } @@ -59,7 +66,7 @@ public static void globalSetUp() throws Exception { public void testSetup() throws Exception { super.testSetup(); cluster = super.getCluster(); - // Random router for this test + // Random router for this test. rndRouter = cluster.getRandomRouter(); } From e61cfd1dbd275c83ed93670bd1bd08e4ff8eb5c2 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 14 Mar 2025 15:08:35 +0800 Subject: [PATCH 02/10] fix checkstyle. --- .../fairness/RouterAsyncRpcFairnessPolicyController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java index 439f4f3391267..ad8445503caea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -54,7 +54,7 @@ public void init(Configuration conf) throws IllegalArgumentException { } LOG.info("Max async call permits per nameservice: {}", maxAsyncCallPermit); - // Get all name services configured + // Get all name services configured. Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); for (String nsId : allConfiguredNS) { From 3065a2f3f757493b5c73e57e71f648d3eabbcfde Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 14 Mar 2025 16:47:08 +0800 Subject: [PATCH 03/10] modify unit test. --- .../router/async/TestRouterAsyncRpcMultiDestination.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java index ec1ff0ce97b9f..0a2ee3e03595f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination; import org.apache.hadoop.security.UserGroupInformation; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -49,6 +52,10 @@ public static void globalSetUp() throws Exception { RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // use async router. routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + // Use RouterAsyncRpcFairnessPolicyController as the fairness controller. + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); setUp(routerConf); } From 4d59b446a9b73f4817d9d81c15249c1a628e6816 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 17 Mar 2025 16:29:46 +0800 Subject: [PATCH 04/10] fix review --- ...outerAsyncRpcFairnessPolicyController.java | 17 +++ .../federation/router/RouterRpcClient.java | 14 +- .../router/async/RouterAsyncRpcClient.java | 137 ++---------------- 3 files changed, 38 insertions(+), 130 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java index ad8445503caea..d14f9bd94fdbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. @@ -70,4 +71,20 @@ public void init(Configuration conf) throws IllegalArgumentException { private static void logAssignment(String nsId, int count) { LOG.info("Assigned {} permits to nsId {} ", count, nsId); } + + @Override + public boolean acquirePermit(String nsId) { + if (nsId.equals(CONCURRENT_NS)) { + return true; + } + return super.acquirePermit(nsId); + } + + @Override + public void releasePermit(String nsId) { + if (nsId.equals(CONCURRENT_NS)) { + return; + } + super.releasePermit(nsId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c20e425aff732..7917f834937fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1024,7 +1024,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); + acquirePermit(nsId, ugi, method.getMethodName(), controller); try { boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); List nns = getOrderedNamenodes(nsId, isObserverRead); @@ -1199,7 +1199,7 @@ public RemoteResult invokeSequential( boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); - acquirePermit(ns, ugi, remoteMethod, controller); + acquirePermit(ns, ugi, remoteMethod.getMethodName(), controller); try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -1579,7 +1579,7 @@ protected static Map postProcessResul return invokeSingle(locations.iterator().next(), method); } RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(CONCURRENT_NS, ugi, method, controller); + acquirePermit(CONCURRENT_NS, ugi, method.getMethodName(), controller); List orderedLocations = new ArrayList<>(); List> callables = new ArrayList<>(); @@ -1758,7 +1758,7 @@ public List> invokeSingl final List namenodes = getOrderedNamenodes(ns, isObserverRead); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); + acquirePermit(ns, ugi, method.getMethodName(), controller); try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -1829,12 +1829,12 @@ private String getNameserviceForBlockPoolId(final String bpId) * * @param nsId Identifier of the block pool. * @param ugi UserGroupIdentifier associated with the user. - * @param m Remote method that needs to be invoked. + * @param methodName The name of remote method that needs to be invoked. * @param controller fairness policy controller to acquire permit from * @throws IOException If permit could not be acquired for the nsId. */ protected void acquirePermit(final String nsId, final UserGroupInformation ugi, - final RemoteMethod m, RouterRpcFairnessPolicyController controller) + final String methodName, RouterRpcFairnessPolicyController controller) throws IOException { if (controller != null) { if (!controller.acquirePermit(nsId)) { @@ -1845,7 +1845,7 @@ protected void acquirePermit(final String nsId, final UserGroupInformation ugi, } incrRejectedPermitForNs(nsId); LOG.debug("Permit denied for ugi: {} for method: {}", - ugi, m.getMethodName()); + ugi, methodName); String msg = "Router " + router.getRouterId() + " is overloaded for NS: " + nsId; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 5353c4c9ef1de..e18e5980b1fc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -179,8 +179,14 @@ public Object invokeMethod( namenodes.toString(), params); } threadLocalContext.transfer(); - invokeMethodAsync(nsid, ugi, (List) namenodes, + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(nsid, ugi, method.getName(), controller); + invokeMethodAsync(ugi, (List) namenodes, useObserver, protocol, method, params); + asyncFinally(object -> { + releasePermit(nsid, ugi, method, controller); + return object; + }); }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); return null; @@ -203,13 +209,11 @@ public Object invokeMethod( * @param params The parameters for the method invocation. */ private void invokeMethodAsync( - String nsid, final UserGroupInformation ugi, final List namenodes, boolean useObserver, final Class protocol, final Method method, final Object... params) { - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); addClientInfoToCallerContext(ugi); if (rpcMonitor != null) { rpcMonitor.proxyOp(); @@ -218,7 +222,6 @@ private void invokeMethodAsync( Map ioes = new LinkedHashMap<>(); final ConnectionContext[] connection = new ConnectionContext[1]; asyncTry(() -> { - acquirePermit(nsid, ugi, method, controller); asyncForEach(namenodes.iterator(), (foreach, namenode) -> { if (!status.isShouldUseObserver() @@ -260,12 +263,6 @@ private void invokeMethodAsync( return handlerAllNamenodeFail(namenodes, method, ioes, params); }); }); - - asyncFinally(res -> { - releasePermit(nsid, ugi, method, controller); - return res; - }); - } /** @@ -429,9 +426,6 @@ public RemoteResult invokeSequential( } return ret; }, Exception.class); - asyncFinally(ret -> { - return ret; - }); }); asyncApply(result -> { if (status.isComplete()) { @@ -488,76 +482,6 @@ public Map invokeConcurrent( return asyncReturn(Map.class); } - @SuppressWarnings("unchecked") - public List> invokeConcurrent( - final Collection locations, final RemoteMethod method, - boolean standby, long timeOutMs, - Class clazz) throws IOException { - - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - final Method m = method.getMethod(); - - if (locations.isEmpty()) { - throw new IOException("No remote locations available"); - } else if (locations.size() == 1 && timeOutMs <= 0) { - // Shortcut, just one call - return invokeSingle(locations.iterator().next(), method); - } - // Don't acquire CONCURRENT_NS permit here. - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - - List orderedLocations = new ArrayList<>(); - List> callables = new ArrayList<>(); - // transfer originCall & callerContext to worker threads of executor. - final Server.Call originCall = Server.getCurCall().get(); - final CallerContext originContext = CallerContext.getCurrent(); - for (final T location : locations) { - String nsId = location.getNameserviceId(); - boolean isObserverRead = isObserverReadEligible(nsId, m); - final List namenodes = - getOrderedNamenodes(nsId, isObserverRead); - final Class proto = method.getProtocol(); - final Object[] paramList = method.getParams(location); - if (standby) { - // Call the objectGetter to all NNs (including standby) - for (final FederationNamenodeContext nn : namenodes) { - String nnId = nn.getNamenodeId(); - final List nnList = - Collections.singletonList(nn); - T nnLocation = location; - if (location instanceof RemoteLocation) { - nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); - } - orderedLocations.add(nnLocation); - callables.add( - () -> { - transferThreadLocalContext(originCall, originContext); - return invokeMethod( - ugi, nnList, isObserverRead, proto, m, paramList); - }); - } - } else { - // Call the objectGetter in order of nameservices in the NS list - orderedLocations.add(location); - callables.add( - () -> { - transferThreadLocalContext(originCall, originContext); - return invokeMethod( - ugi, namenodes, isObserverRead, proto, m, paramList); - }); - } - } - - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - if (this.router.getRouterClientMetrics() != null) { - this.router.getRouterClientMetrics().incInvokedConcurrent(m); - } - - return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); - } - /** * Invokes multiple concurrent proxy calls to different clients. Returns an * array of results. @@ -641,9 +565,6 @@ public List> invokeSingl asyncCatch((o, ioe) -> { throw processException(ioe, location); }, IOException.class); - asyncFinally(o -> { - return o; - }); return asyncReturn(List.class); } @@ -662,18 +583,13 @@ public List> invokeSingl public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - asyncTry(() -> { - boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); - List nns = getOrderedNamenodes(nsId, isObserverRead); - RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); - Class proto = method.getProtocol(); - Method m = method.getMethod(); - Object[] params = method.getParams(loc); - invokeMethod(ugi, nns, isObserverRead, proto, m, params); - }); - asyncFinally(o -> { - return o; - }); + boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); + List nns = getOrderedNamenodes(nsId, isObserverRead); + RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); + Class proto = method.getProtocol(); + Method m = method.getMethod(); + Object[] params = method.getParams(loc); + invokeMethod(ugi, nns, isObserverRead, proto, m, params); return null; } @@ -698,31 +614,6 @@ public T invokeSingle( return asyncReturn(clazz); } - protected void acquirePermit(final String nsId, final UserGroupInformation ugi, - final Method m, RouterRpcFairnessPolicyController controller) - throws IOException { - if (controller != null) { - if (!controller.acquirePermit(nsId)) { - // Throw StandByException, - // Clients could fail over and try another router. - if (rpcMonitor != null) { - rpcMonitor.proxyOpPermitRejected(nsId); - } - incrRejectedPermitForNs(nsId); - LOG.debug("Permit denied for ugi: {} for method: {}", - ugi, m.getName()); - String msg = - "Router " + router.getRouterId() + - " is overloaded for NS: " + nsId; - throw new StandbyException(msg); - } - if (rpcMonitor != null) { - rpcMonitor.proxyOpPermitAccepted(nsId); - } - incrAcceptedPermitForNs(nsId); - } - } - /** * Release permit for specific nsId after processing against downstream * nsId is completed. From bda637688a9034b8b3510df84a1df01a215b0d11 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 17 Mar 2025 16:39:22 +0800 Subject: [PATCH 05/10] remove unused import. --- .../fairness/RouterAsyncRpcFairnessPolicyController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java index d14f9bd94fdbc..f9c65c5338c41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory; import java.util.Set; -import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. From 03b2f8ae4d2108e758ca1049d271d4c252b8d03e Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 17 Mar 2025 16:39:52 +0800 Subject: [PATCH 06/10] fix unused import. --- .../server/federation/router/async/RouterAsyncRpcClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index e18e5980b1fc3..9c9381eb9b391 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -37,9 +37,7 @@ import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; From b94b43076218ea30e4f30d045dd0f89b144c45ab Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 17 Mar 2025 20:12:30 +0800 Subject: [PATCH 07/10] trigger yetus. From 4b09e7b7fa0f69b6c6d59e66e3d6cbd9429d9eaa Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 17 Mar 2025 20:22:18 +0800 Subject: [PATCH 08/10] remove unused asyncTry. --- .../router/async/RouterAsyncRpcClient.java | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 9c9381eb9b391..ea2d3b40ca527 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -219,47 +219,45 @@ private void invokeMethodAsync( final ExecutionStatus status = new ExecutionStatus(false, useObserver); Map ioes = new LinkedHashMap<>(); final ConnectionContext[] connection = new ConnectionContext[1]; - asyncTry(() -> { - asyncForEach(namenodes.iterator(), - (foreach, namenode) -> { - if (!status.isShouldUseObserver() - && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { - asyncComplete(null); - return; - } - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); - asyncTry(() -> { - connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); - NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); - invoke(namenode, status.isShouldUseObserver(), 0, method, - client.getProxy(), params); - asyncApply(res -> { - status.setComplete(true); - postProcessResult(method, status, namenode, nsId, client); - foreach.breakNow(); - return res; - }); - }); - asyncCatch((res, ioe) -> { - ioes.put(namenode, ioe); - handleInvokeMethodIOException(namenode, ioe, status, useObserver); - return res; - }, IOException.class); - asyncFinally(res -> { - if (connection[0] != null) { - connection[0].release(); - } + asyncForEach(namenodes.iterator(), + (foreach, namenode) -> { + if (!status.isShouldUseObserver() + && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { + asyncComplete(null); + return; + } + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + asyncTry(() -> { + connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); + NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); + invoke(namenode, status.isShouldUseObserver(), 0, method, + client.getProxy(), params); + asyncApply(res -> { + status.setComplete(true); + postProcessResult(method, status, namenode, nsId, client); + foreach.breakNow(); return res; }); }); + asyncCatch((res, ioe) -> { + ioes.put(namenode, ioe); + handleInvokeMethodIOException(namenode, ioe, status, useObserver); + return res; + }, IOException.class); + asyncFinally(res -> { + if (connection[0] != null) { + connection[0].release(); + } + return res; + }); + }); - asyncApply(res -> { - if (status.isComplete()) { - return res; - } - return handlerAllNamenodeFail(namenodes, method, ioes, params); - }); + asyncApply(res -> { + if (status.isComplete()) { + return res; + } + return handlerAllNamenodeFail(namenodes, method, ioes, params); }); } From 8497ae0f9f97e4f0a18d2d889455e1181574a567 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 18 Mar 2025 23:02:06 +0800 Subject: [PATCH 09/10] add new ut. --- ...outerAsyncRpcFairnessPolicyController.java | 4 +- ...outerAsyncRpcFairnessPolicyController.java | 160 ++++++++++++++++++ 2 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java index f9c65c5338c41..fadd0c93671f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -40,6 +40,8 @@ public class RouterAsyncRpcFairnessPolicyController extends private static final Logger LOG = LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class); + public static final String INIT_MSG = "Max async call permits per nameservice: %d"; + public RouterAsyncRpcFairnessPolicyController(Configuration conf) { init(conf); } @@ -52,7 +54,7 @@ public void init(Configuration conf) throws IllegalArgumentException { if (maxAsyncCallPermit <= 0) { maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; } - LOG.info("Max async call permits per nameservice: {}", maxAsyncCallPermit); + LOG.info(String.format(INIT_MSG, maxAsyncCallPermit)); // Get all name services configured. Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..936b2288e6ea2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test functionality of {@link RouterAsyncRpcFairnessPolicyController). + */ +public class TestRouterAsyncRpcFairnessPolicyController { + + private static String nameServices = + "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + private static int perNsPermits = 30; + + @Test + public void testHandlerAllocationEqualAssignment() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(perNsPermits); + verifyHandlerAllocation(routerRpcFairnessPolicyController); + } + + @Test + public void testAcquireTimeout() { + Configuration conf = createConf(perNsPermits); + conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // Ns1 should have number of perNsPermits permits allocated. + for (int i = 0; i < perNsPermits; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + long acquireBeginTimeMs = Time.monotonicNow(); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs; + + // There are some other operations, so acquireTimeMs >= 100ms. + assertTrue(acquireTimeMs >= 100); + } + + @Test + public void testAllocationSuccessfullyWithZeroHandlers() { + Configuration conf = createConf(0); + verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + } + + @Test + public void testAllocationSuccessfullyWithNegativePermits() { + Configuration conf = createConf(-1); + verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + } + + @Test + public void testGetAvailableHandlerOnPerNs() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(perNsPermits); + assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":30}", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + routerRpcFairnessPolicyController.acquirePermit("ns1"); + assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":29}", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + } + + @Test + public void testGetAvailableHandlerOnPerNsForNoFairness() { + Configuration conf = new Configuration(); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + assertEquals("N/A", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + } + + private void verifyInstantiationStatus(Configuration conf, int permits) { + GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer + .captureLogs(LoggerFactory.getLogger( + RouterAsyncRpcFairnessPolicyController.class)); + try { + FederationUtil.newFairnessPolicyController(conf); + } catch (IllegalArgumentException e) { + // Ignore the exception as it is expected here. + } + String infoMsg = String.format( + RouterAsyncRpcFairnessPolicyController.INIT_MSG, permits); + assertTrue("Should contain info message: " + infoMsg, + logs.getOutput().contains(infoMsg)); + } + + private RouterRpcFairnessPolicyController getFairnessPolicyController( + int asyncCallPermits) { + return FederationUtil.newFairnessPolicyController(createConf(asyncCallPermits)); + } + + private void verifyHandlerAllocation( + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) { + for (int i = 0; i < perNsPermits; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + // CONCURRENT_NS doesn't acquire permits. + assertTrue( + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + + routerRpcFairnessPolicyController.releasePermit("ns1"); + routerRpcFairnessPolicyController.releasePermit("ns2"); + routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS); + + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + private Configuration createConf(int asyncCallPermits) { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, asyncCallPermits); + conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices); + conf.setClass( + RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); + return conf; + } +} From 71edeaa271d5153b19a0c979803121d07cf83b35 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Wed, 19 Mar 2025 08:07:55 +0800 Subject: [PATCH 10/10] fix blanks. --- .../fairness/RouterAsyncRpcFairnessPolicyController.java | 2 +- .../fairness/TestRouterAsyncRpcFairnessPolicyController.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java index fadd0c93671f4..f23568d4c8b9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -41,7 +41,7 @@ public class RouterAsyncRpcFairnessPolicyController extends LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class); public static final String INIT_MSG = "Max async call permits per nameservice: %d"; - + public RouterAsyncRpcFairnessPolicyController(Configuration conf) { init(conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java index 936b2288e6ea2..e0e49636fda4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java @@ -46,7 +46,7 @@ public class TestRouterAsyncRpcFairnessPolicyController { private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; private static int perNsPermits = 30; - + @Test public void testHandlerAllocationEqualAssignment() { RouterRpcFairnessPolicyController routerRpcFairnessPolicyController