Skip to content

Commit 1beb8f1

Browse files
committed
zookeeper curator leadership test
1 parent 8d5b934 commit 1beb8f1

File tree

9 files changed

+317
-94
lines changed

9 files changed

+317
-94
lines changed

springboot-curator-demo/README.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
> Basic usage
55
6-
<a href="src/test/demo/curator.CuratorBasicUsageTest.java">See</a>
6+
<a href="src/test/java/demo/curator.CuratorBasicUsageTest.java">See</a>
77

88
```
99
usage_retryPolicies() / usage_retryPoliciesAsync()
@@ -17,3 +17,8 @@ usage_sharedLock() / usage_counter()
1717
- <a href="src/main/java/demo/lock">Server side (receive tasks)</a>
1818
- <a href="src/test/java/demo/LockTaskPushTest.java">Client side (push
1919
tasks) </a>
20+
21+
> Master slave demo
22+
23+
- working..
24+
~~~~
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package demo;
22

3+
import java.util.Arrays;
34
import java.util.stream.IntStream;
5+
import lombok.extern.slf4j.Slf4j;
46
import org.springframework.boot.SpringApplication;
57
import org.springframework.boot.autoconfigure.SpringBootApplication;
8+
import org.springframework.context.ConfigurableApplicationContext;
69

10+
@Slf4j(topic = "[MAIN]")
711
@SpringBootApplication
812
public class DemoApplication {
913

1014
public static void main(String[] args) {
11-
SpringApplication.run(DemoApplication.class, args);
12-
IntStream.range(0, 30).forEach(i-> System.out.println());
15+
ConfigurableApplicationContext ctx = SpringApplication.run(DemoApplication.class, args);
16+
IntStream.range(0, 30).forEach(i -> System.out.println());
17+
18+
log.info("## Active profiles : {}"
19+
, Arrays.toString(ctx.getEnvironment().getActiveProfiles())
20+
);
1321
}
1422
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package demo;
2+
3+
import javax.annotation.PostConstruct;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.curator.RetryPolicy;
6+
import org.apache.curator.framework.CuratorFramework;
7+
import org.apache.curator.framework.CuratorFrameworkFactory;
8+
import org.apache.curator.retry.RetryNTimes;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
13+
/**
14+
* @author zacconding
15+
* @Date 2018-12-14
16+
* @GitHub : https://github.com/zacscoding
17+
*/
18+
@Slf4j(topic = "[ZOOKEEPER-CONFIG]")
19+
@Configuration
20+
public class ZookeeperConfiguration {
21+
22+
private ZookeeperProperties zookeeperProperties;
23+
24+
@Autowired
25+
public ZookeeperConfiguration(ZookeeperProperties zookeeperProperties) {
26+
this.zookeeperProperties = zookeeperProperties;
27+
}
28+
29+
30+
@PostConstruct
31+
public void setUp() {
32+
log.info(
33+
"\n// =======================================================\n"
34+
+ "clientId : " + zookeeperProperties.getClientId() + "\n"
35+
+ "address : " + zookeeperProperties.getAddress() + "\n"
36+
+ "sleepMsBetweenRetries : " + zookeeperProperties.getSleepMsBetweenRetries() + "\n"
37+
+ "maxRetries : " + zookeeperProperties.getMaxRetries() + "\n"
38+
+ "======================================================= //\n"
39+
);
40+
}
41+
42+
@Bean
43+
public CuratorFramework curatorFramework() {
44+
RetryPolicy retryPolicy = new RetryNTimes(
45+
zookeeperProperties.getMaxRetries(),
46+
zookeeperProperties.getSleepMsBetweenRetries());
47+
48+
CuratorFramework client = CuratorFrameworkFactory.newClient(
49+
zookeeperProperties.getAddress(), retryPolicy);
50+
51+
client.start();
52+
return client;
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package demo;
2+
3+
import org.springframework.beans.factory.annotation.Value;
4+
import org.springframework.stereotype.Component;
5+
6+
/**
7+
* @author zacconding
8+
* @Date 2018-12-14
9+
* @GitHub : https://github.com/zacscoding
10+
*/
11+
@Component
12+
public class ZookeeperProperties {
13+
14+
@Value("${lock.client.id}")
15+
private String clientId;
16+
@Value("${cluster.zookeeper.address}")
17+
private String address;
18+
@Value("${cluster.zookeeper.maxRetries}")
19+
private int maxRetries;
20+
@Value("${cluster.zookeeper.sleepMsBetweenRetries}")
21+
private int sleepMsBetweenRetries;
22+
23+
public String getClientId() {
24+
return clientId;
25+
}
26+
27+
public void setClientId(String clientId) {
28+
this.clientId = clientId;
29+
}
30+
31+
public String getAddress() {
32+
return address;
33+
}
34+
35+
public void setAddress(String address) {
36+
this.address = address;
37+
}
38+
39+
public int getMaxRetries() {
40+
return maxRetries;
41+
}
42+
43+
public void setMaxRetries(int maxRetries) {
44+
this.maxRetries = maxRetries;
45+
}
46+
47+
public int getSleepMsBetweenRetries() {
48+
return sleepMsBetweenRetries;
49+
}
50+
51+
public void setSleepMsBetweenRetries(int sleepMsBetweenRetries) {
52+
this.sleepMsBetweenRetries = sleepMsBetweenRetries;
53+
}
54+
}

springboot-curator-demo/src/main/java/demo/lock/LockConfiguration.java

-57
This file was deleted.

springboot-curator-demo/src/main/java/demo/lock/LockTaskController.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package demo.lock;
22

3+
import demo.ZookeeperProperties;
34
import java.util.Random;
45
import java.util.concurrent.TimeUnit;
56
import javax.annotation.PostConstruct;
67
import lombok.extern.slf4j.Slf4j;
78
import org.apache.curator.framework.CuratorFramework;
89
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
910
import org.apache.curator.framework.recipes.shared.SharedCount;
10-
import org.apache.curator.framework.recipes.shared.VersionedValue;
1111
import org.springframework.beans.factory.annotation.Autowired;
1212
import org.springframework.context.annotation.Profile;
1313
import org.springframework.http.ResponseEntity;
14-
import org.springframework.scheduling.annotation.Async;
1514
import org.springframework.web.bind.annotation.GetMapping;
1615
import org.springframework.web.bind.annotation.PathVariable;
1716
import org.springframework.web.bind.annotation.RestController;
@@ -36,9 +35,10 @@ public class LockTaskController {
3635
private SharedCount sharedCount;
3736

3837
@Autowired
39-
public LockTaskController(CuratorFramework curatorFramework, LockConfiguration lockConfiguration) {
38+
public LockTaskController(CuratorFramework curatorFramework,
39+
ZookeeperProperties zookeeperProperties) {
4040
this.curatorFramework = curatorFramework;
41-
this.clientId = lockConfiguration.getClientId();
41+
this.clientId = zookeeperProperties.getClientId();
4242
}
4343

4444
@PostConstruct
@@ -75,17 +75,23 @@ private void doTask(long taskNumber) throws Exception {
7575
log.info("## Receive task : {} at {}", taskNumber, clientId);
7676
log.info("## > Acquire lock. client : {}, taskNumber : {}", clientId, taskNumber);
7777
if (sharedCount.getCount() >= taskNumber) {
78-
log.info("## >> Skip task. sharedCount : {} / taskNumber : {}", sharedCount.getCount(), taskNumber);
78+
log.info("## >> Skip task. sharedCount : {} / taskNumber : {}"
79+
, sharedCount.getCount(), taskNumber);
80+
7981
return;
8082
}
8183

8284
int sleep = random.nextInt(3) + 1;
83-
log.info("## >> do something. task number : {} / sleep : {} / client : {}", taskNumber, sleep, clientId);
85+
log.info("## >> do something. task number : {} / sleep : {} / client : {}"
86+
, taskNumber, sleep, clientId);
87+
8488
TimeUnit.SECONDS.sleep(sleep);
8589

8690
sharedCount.setCount((int) taskNumber);
87-
log.info("## >>> After set count : {} / task number : {}", sharedCount.getCount(), taskNumber);
88-
log.info("## >>> Complete task. client : {} / task number : {} / sharedCount : {}", clientId,
89-
taskNumber, sharedCount.getCount());
91+
log.info("## >>> After set count : {} / task number : {}"
92+
, sharedCount.getCount(), taskNumber);
93+
94+
log.info("## >>> Complete task. client : {} / task number : {} / sharedCount : {}"
95+
, clientId, taskNumber, sharedCount.getCount());
9096
}
9197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package demo.master;
2+
3+
import demo.ZookeeperProperties;
4+
import java.util.Arrays;
5+
import java.util.Random;
6+
import java.util.concurrent.TimeUnit;
7+
import javax.annotation.PostConstruct;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.curator.framework.CuratorFramework;
10+
import org.apache.curator.framework.recipes.leader.LeaderSelector;
11+
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
12+
import org.apache.curator.framework.state.ConnectionState;
13+
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.context.annotation.Profile;
15+
import org.springframework.http.ResponseEntity;
16+
import org.springframework.web.bind.annotation.GetMapping;
17+
import org.springframework.web.bind.annotation.PathVariable;
18+
import org.springframework.web.bind.annotation.RestController;
19+
20+
/**
21+
* @author zacconding
22+
* @Date 2018-12-14
23+
* @GitHub : https://github.com/zacscoding
24+
*/
25+
@Slf4j(topic = "[MASTER-SLAVE]")
26+
@Profile("master-slave")
27+
@RestController
28+
public class MasterSlaveTaskController {
29+
30+
private ZookeeperProperties zookeeperProperties;
31+
private CuratorFramework curatorFramework;
32+
private String mutexPath = "/mutex/select/leader/job/";
33+
private String fixedTaskMutexPath = "/mutex/select/leader/job/A";
34+
private LeaderSelector leaderSelector;
35+
36+
@Autowired
37+
public MasterSlaveTaskController(CuratorFramework curatorFramework,
38+
ZookeeperProperties zookeeperProperties) {
39+
40+
this.curatorFramework = curatorFramework;
41+
this.zookeeperProperties = zookeeperProperties;
42+
}
43+
44+
@PostConstruct
45+
private void setUp() {
46+
leaderSelector = new LeaderSelector(
47+
curatorFramework,
48+
fixedTaskMutexPath,
49+
new LeaderSelectorListener() {
50+
@Override
51+
public void takeLeadership(CuratorFramework internalCuratorFramework)
52+
throws Exception {
53+
54+
log.info("this app`s client : {}", curatorFramework);
55+
log.info("[{}] task leadership.... client : {}"
56+
, zookeeperProperties.getClientId(), internalCuratorFramework);
57+
58+
TimeUnit.SECONDS.sleep(2L);
59+
60+
System.exit(-1);
61+
}
62+
63+
@Override
64+
public void stateChanged(CuratorFramework curatorFramework,
65+
ConnectionState connectionState) {
66+
67+
log.info("this app`s client : {}", curatorFramework);
68+
log.info("[{}] state changed...... >> {}"
69+
, zookeeperProperties.getClientId(), connectionState);
70+
}
71+
}
72+
);
73+
leaderSelector.start();
74+
leaderSelector.autoRequeue();
75+
76+
Thread leaderChecker = new Thread(() -> {
77+
try {
78+
while (!Thread.currentThread().isInterrupted()) {
79+
log.info("[Check leader - {}] id : {}, leader id : {}, isLeader : {}\n{}"
80+
, zookeeperProperties.getClientId(),
81+
leaderSelector.getId(),
82+
leaderSelector.getLeader().getId(),
83+
leaderSelector.getLeader().isLeader(),
84+
Arrays.toString(leaderSelector.getParticipants().toArray())
85+
);
86+
87+
TimeUnit.SECONDS.sleep(1L);
88+
}
89+
} catch (Exception e) {
90+
Thread.currentThread().interrupt();
91+
}
92+
});
93+
leaderChecker.start();
94+
}
95+
96+
97+
@GetMapping("/master-slave/push/task/{taskNumber}")
98+
public ResponseEntity<Boolean> pushTask(@PathVariable("taskNumber") long taskNumber) {
99+
log.info("## [Push task. client id : {} | task number : {}\n{}",
100+
zookeeperProperties.getClientId(), taskNumber, curatorFramework);
101+
102+
new LeaderSelector(curatorFramework, mutexPath + taskNumber,
103+
new LeaderSelectorListener() {
104+
@Override
105+
public void takeLeadership(CuratorFramework internalCuratorFramework)
106+
throws Exception {
107+
log.info("## [{}] - taskLeadership at client : {} - {}"
108+
, zookeeperProperties.getClientId(), taskNumber, internalCuratorFramework);
109+
110+
int sleep = new Random().nextInt(3) + 1;
111+
TimeUnit.SECONDS.sleep(sleep);
112+
113+
log.info("## [{}] - complete task : {}"
114+
, zookeeperProperties.getClientId(), taskNumber);
115+
}
116+
117+
@Override
118+
public void stateChanged(CuratorFramework curatorFramework,
119+
ConnectionState connectionState) {
120+
log.info("## [{}] stateChanged at client >> {}"
121+
, zookeeperProperties.getClientId(), connectionState);
122+
}
123+
}).start();
124+
125+
return ResponseEntity.ok(Boolean.TRUE);
126+
}
127+
}

0 commit comments

Comments
 (0)