Skip to content

Commit 336316c

Browse files
authored
Merge pull request #41 from mesmerizeBy/master
Fix consistency hash bug
2 parents 91cd884 + c4e912c commit 336316c

File tree

10 files changed

+61
-19
lines changed

10 files changed

+61
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package github.javaguide.loadbalance;
22

3+
import github.javaguide.remoting.dto.RpcRequest;
4+
35
import java.util.List;
46

57
/**
@@ -10,16 +12,16 @@
1012
*/
1113
public abstract class AbstractLoadBalance implements LoadBalance {
1214
@Override
13-
public String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName) {
15+
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
1416
if (serviceAddresses == null || serviceAddresses.size() == 0) {
1517
return null;
1618
}
1719
if (serviceAddresses.size() == 1) {
1820
return serviceAddresses.get(0);
1921
}
20-
return doSelect(serviceAddresses, rpcServiceName);
22+
return doSelect(serviceAddresses, rpcRequest);
2123
}
2224

23-
protected abstract String doSelect(List<String> serviceAddresses, String rpcServiceName);
25+
protected abstract String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest);
2426

2527
}

rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package github.javaguide.loadbalance;
22

33
import github.javaguide.extension.SPI;
4+
import github.javaguide.remoting.dto.RpcRequest;
45

56
import java.util.List;
67

@@ -18,5 +19,5 @@ public interface LoadBalance {
1819
* @param serviceAddresses Service address list
1920
* @return target service address
2021
*/
21-
String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName);
22+
String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
2223
}

rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package github.javaguide.loadbalance.loadbalancer;
22

33
import github.javaguide.loadbalance.AbstractLoadBalance;
4+
import github.javaguide.remoting.dto.RpcRequest;
45
import lombok.extern.slf4j.Slf4j;
56

67
import java.nio.charset.StandardCharsets;
78
import java.security.MessageDigest;
89
import java.security.NoSuchAlgorithmException;
10+
import java.util.Arrays;
911
import java.util.List;
1012
import java.util.Map;
1113
import java.util.TreeMap;
@@ -22,8 +24,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
2224
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
2325

2426
@Override
25-
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
27+
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
2628
int identityHashCode = System.identityHashCode(serviceAddresses);
29+
// build rpc service name by rpcRequest
30+
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
2731

2832
ConsistentHashSelector selector = selectors.get(rpcServiceName);
2933

@@ -33,7 +37,7 @@ protected String doSelect(List<String> serviceAddresses, String rpcServiceName)
3337
selector = selectors.get(rpcServiceName);
3438
}
3539

36-
return selector.select(rpcServiceName);
40+
return selector.select(rpcServiceName+ Arrays.stream(rpcRequest.getParameters()));
3741
}
3842

3943
static class ConsistentHashSelector {
@@ -73,8 +77,8 @@ static long hash(byte[] digest, int idx) {
7377
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
7478
}
7579

76-
public String select(String rpcServiceName) {
77-
byte[] digest = md5(rpcServiceName);
80+
public String select(String rpcServiceKey) {
81+
byte[] digest = md5(rpcServiceKey);
7882
return selectForKey(hash(digest, 0));
7983
}
8084

rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package github.javaguide.loadbalance.loadbalancer;
22

33
import github.javaguide.loadbalance.AbstractLoadBalance;
4+
import github.javaguide.remoting.dto.RpcRequest;
45

56
import java.util.List;
67
import java.util.Random;
@@ -13,7 +14,7 @@
1314
*/
1415
public class RandomLoadBalance extends AbstractLoadBalance {
1516
@Override
16-
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
17+
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
1718
Random random = new Random();
1819
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
1920
}

rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package github.javaguide.registry;
22

33
import github.javaguide.extension.SPI;
4+
import github.javaguide.remoting.dto.RpcRequest;
45

56
import java.net.InetSocketAddress;
67

@@ -15,8 +16,8 @@ public interface ServiceDiscovery {
1516
/**
1617
* lookup service by rpcServiceName
1718
*
18-
* @param rpcServiceName rpc service name
19+
* @param rpcRequest rpc service pojo
1920
* @return service address
2021
*/
21-
InetSocketAddress lookupService(String rpcServiceName);
22+
InetSocketAddress lookupService(RpcRequest rpcRequest);
2223
}

rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import github.javaguide.loadbalance.LoadBalance;
77
import github.javaguide.registry.ServiceDiscovery;
88
import github.javaguide.registry.zk.util.CuratorUtils;
9+
import github.javaguide.remoting.dto.RpcRequest;
910
import lombok.extern.slf4j.Slf4j;
1011
import org.apache.curator.framework.CuratorFramework;
1112

@@ -27,14 +28,15 @@ public ZkServiceDiscovery() {
2728
}
2829

2930
@Override
30-
public InetSocketAddress lookupService(String rpcServiceName) {
31+
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
32+
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
3133
CuratorFramework zkClient = CuratorUtils.getZkClient();
3234
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
3335
if (serviceUrlList == null || serviceUrlList.size() == 0) {
3436
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
3537
}
3638
// load balancing
37-
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName);
39+
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
3840
log.info("Successfully found the service address:[{}]", targetServiceUrl);
3941
String[] socketAddressArray = targetServiceUrl.split(":");
4042
String host = socketAddressArray[0];

rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public Object sendRpcRequest(RpcRequest rpcRequest) {
9898
// build return value
9999
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
100100
// build rpc service name by rpcRequest
101-
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
101+
// String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
102102
// get server address
103-
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
103+
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
104104
// get server address related channel
105105
Channel channel = getChannel(inetSocketAddress);
106106
if (channel.isActive()) {

rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public Object sendRpcRequest(RpcRequest rpcRequest) {
3535
// build rpc service name by rpcRequest
3636
String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName())
3737
.group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName();
38-
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
38+
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
3939
try (Socket socket = new Socket()) {
4040
socket.connect(inetSocketAddress);
4141
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
import github.javaguide.extension.ExtensionLoader;
44
import github.javaguide.loadbalance.LoadBalance;
5+
import github.javaguide.remoting.dto.RpcRequest;
56
import org.junit.jupiter.api.Test;
67

78
import java.util.ArrayList;
89
import java.util.Arrays;
910
import java.util.List;
11+
import java.util.UUID;
1012

1113
import static org.junit.Assert.assertEquals;
1214

@@ -17,10 +19,29 @@ void TestConsistentHashLoadBalance() {
1719
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
1820
List<String> serviceUrlList = new ArrayList<>(Arrays.asList("127.0.0.1:9997", "127.0.0.1:9998", "127.0.0.1:9999"));
1921
String userRpcServiceName = "github.javaguide.UserServicetest1version1";
20-
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, userRpcServiceName);
22+
//build rpcCall
23+
RpcRequest rpcRequest = RpcRequest.builder()
24+
// .parameters(args)
25+
.interfaceName(userRpcServiceName)
26+
// .paramTypes(method.getParameterTypes())
27+
.requestId(UUID.randomUUID().toString())
28+
.group("test2")
29+
.version("version2")
30+
.build();
31+
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
2132
assertEquals("127.0.0.1:9999",userServiceAddress);
33+
34+
2235
String schoolRpcServiceName = "github.javaguide.SchoolServicetest1version1";
23-
String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, schoolRpcServiceName);
36+
rpcRequest = RpcRequest.builder()
37+
// .parameters(args)
38+
.interfaceName(userRpcServiceName)
39+
// .paramTypes(method.getParameterTypes())
40+
.requestId(UUID.randomUUID().toString())
41+
.group("test2")
42+
.version("version2")
43+
.build();
44+
String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
2445
assertEquals("127.0.0.1:9997",schoolServiceAddress);
2546
}
2647
}

rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import github.javaguide.registry.zk.ZkServiceDiscovery;
44
import github.javaguide.registry.zk.ZkServiceRegistry;
5+
import github.javaguide.remoting.dto.RpcRequest;
56
import org.junit.jupiter.api.Test;
67

78
import java.net.InetSocketAddress;
9+
import java.util.UUID;
810

911
import static org.junit.jupiter.api.Assertions.assertEquals;
1012

@@ -21,7 +23,15 @@ void should_register_service_successful_and_lookup_service_by_service_name() {
2123
InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333);
2224
zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress);
2325
ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery();
24-
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.zk.ZkServiceRegistry");
26+
RpcRequest rpcRequest = RpcRequest.builder()
27+
// .parameters(args)
28+
.interfaceName("github.javaguide.registry.zk.ZkServiceRegistry")
29+
// .paramTypes(method.getParameterTypes())
30+
.requestId(UUID.randomUUID().toString())
31+
.group("test2")
32+
.version("version2")
33+
.build();
34+
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService(rpcRequest);
2535
assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString());
2636
}
2737
}

0 commit comments

Comments
 (0)