Skip to content

Commit 65484bc

Browse files
committed
Keepalive OpensergoClient.
force-pushed after main-branch was force-pushed by Eric Zhao. Co-authored-by: Eric Zhao <[email protected]> Co-authored-by: Jiangnan Jia <[email protected]> Signed-off-by: Jiangnan Jia <[email protected]>
1 parent 6ca87a2 commit 65484bc

6 files changed

+173
-25
lines changed

src/main/java/io/opensergo/ConfigKind.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ public enum ConfigKind {
2626
FAULT_TOLERANCE_RULE("fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule", "FaultToleranceRule"),
2727
RATE_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy", "RateLimitStrategy"),
2828
THROTTLING_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ThrottlingStrategy", "ThrottlingStrategy"),
29-
CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy",
30-
"ConcurrencyLimitStrategy"),
29+
CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", "ConcurrencyLimitStrategy"),
3130
CIRCUIT_BREAKER_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/CircuitBreakerStrategy", "CircuitBreakerStrategy");
3231

3332
private final String kindName;

src/main/java/io/opensergo/OpenSergoClient.java

+63-12
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.opensergo.util.AssertUtils;
3131
import io.opensergo.util.IdentifierUtils;
3232

33+
import java.util.Optional;
34+
import java.util.concurrent.TimeUnit;
3335
import java.util.concurrent.atomic.AtomicInteger;
3436

3537
/**
@@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable {
4648
private final SubscribeRegistry subscribeRegistry;
4749

4850
private AtomicInteger reqId;
51+
protected volatile OpensergoClientStatus status;
4952

5053
public OpenSergoClient(String host, int port) {
5154
this.channel = ManagedChannelBuilder.forAddress(host, port)
@@ -56,17 +59,68 @@ public OpenSergoClient(String host, int port) {
5659
this.configCache = new SubscribedConfigCache();
5760
this.subscribeRegistry = new SubscribeRegistry();
5861
this.reqId = new AtomicInteger(0);
62+
status = OpensergoClientStatus.INITIAL;
63+
}
64+
65+
public void registerSubscribeInfo(OpensergoClientSubscribeInfo subscribeInfo) {
66+
// Register subscriber to local.
67+
if (Optional.of(subscribeInfo.getSubscriberList()).isPresent() && subscribeInfo.getSubscriberList().size() > 0) {
68+
subscribeInfo.getSubscriberList().forEach(subscriber -> {
69+
this.subscribeRegistry.registerSubscriber(subscribeInfo.getSubscribeKey(), subscriber);
70+
OpenSergoLogger.info("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}", subscribeInfo.getSubscribeKey(), subscriber);
71+
72+
if (requestAndResponseWriter != null && this.status == OpensergoClientStatus.STARTED) {
73+
this.subscribeConfig(subscribeInfo.getSubscribeKey());
74+
}
75+
});
76+
}
5977
}
6078

6179
public void start() throws Exception {
80+
OpenSergoLogger.info("OpensergoClient is starting...");
81+
82+
if (status == OpensergoClientStatus.INITIAL) {
83+
OpenSergoLogger.info("open keepavlive thread");
84+
new Thread(this::keepAlive).start();
85+
}
86+
87+
status = OpensergoClientStatus.STARTING;
88+
6289
this.requestAndResponseWriter = transportGrpcStub.withWaitForReady()
63-
.subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry));
90+
.subscribeConfig(new OpenSergoSubscribeClientObserver(this));
91+
92+
OpenSergoLogger.info("begin to subscribe config-data...");
93+
this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> {
94+
this.subscribeConfig(subscribeKey);
95+
});
96+
97+
OpenSergoLogger.info("openSergoClient is started");
98+
status = OpensergoClientStatus.STARTED;
99+
}
100+
101+
private void keepAlive() {
102+
try {
103+
if (status != OpensergoClientStatus.STARTING
104+
&& status != OpensergoClientStatus.STARTED
105+
&& status != OpensergoClientStatus.SHUTDOWN) {
106+
OpenSergoLogger.info("try to restart openSergoClient...");
107+
this.start();
108+
}
109+
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
110+
if( status != OpensergoClientStatus.SHUTDOWN) {
111+
keepAlive();
112+
}
113+
} catch (Exception e) {
114+
e.printStackTrace();
115+
}
64116
}
65117

66118
@Override
67119
public void close() throws Exception {
68120
requestAndResponseWriter.onCompleted();
69121

122+
status = OpensergoClientStatus.SHUTDOWN;
123+
70124
// gracefully drain the requests, then close the connection
71125
channel.shutdown();
72126
}
@@ -77,8 +131,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77131
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");
78132

79133
if (requestAndResponseWriter == null) {
80-
// TODO: return status that indicates not ready
81-
throw new IllegalStateException("gRPC stream is not ready");
134+
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
135+
status = OpensergoClientStatus.INTERRUPTED;
82136
}
83137
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
84138
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
@@ -106,8 +160,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
106160
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");
107161

108162
if (requestAndResponseWriter == null) {
109-
// TODO: return status that indicates not ready
110-
throw new IllegalStateException("gRPC stream is not ready");
163+
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
164+
status = OpensergoClientStatus.INTERRUPTED;
111165
}
112166
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
113167
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
@@ -121,18 +175,15 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121175
// Send SubscribeRequest
122176
requestAndResponseWriter.onNext(request);
123177

124-
// Register subscriber to local.
125-
if (subscriber != null) {
126-
subscribeRegistry.registerSubscriber(subscribeKey, subscriber);
127-
OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}",
128-
subscribeKey, subscriber);
129-
}
130-
131178
return true;
132179
}
133180

134181
public SubscribedConfigCache getConfigCache() {
135182
return configCache;
136183
}
137184

185+
public SubscribeRegistry getSubscribeRegistry() {
186+
return subscribeRegistry;
187+
}
188+
138189
}

src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import io.opensergo.proto.transport.v1.SubscribeResponse;
3131
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
3232
import io.opensergo.subscribe.SubscribeKey;
33-
import io.opensergo.subscribe.SubscribeRegistry;
34-
import io.opensergo.subscribe.SubscribedConfigCache;
3533
import io.opensergo.util.StringUtils;
3634

3735
/**
@@ -41,13 +39,10 @@ public class OpenSergoSubscribeClientObserver implements ClientResponseObserver<
4139

4240
private ClientCallStreamObserver<SubscribeRequest> requestStream;
4341

44-
private final SubscribedConfigCache configCache;
45-
private final SubscribeRegistry subscribeRegistry;
42+
private OpenSergoClient openSergoClient;
4643

47-
public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache,
48-
SubscribeRegistry subscribeRegistry) {
49-
this.configCache = configCache;
50-
this.subscribeRegistry = subscribeRegistry;
44+
public OpenSergoSubscribeClientObserver(OpenSergoClient openSergoClient) {
45+
this.openSergoClient = openSergoClient;
5146
}
5247

5348
@Override
@@ -58,7 +53,7 @@ public void beforeStart(ClientCallStreamObserver<SubscribeRequest> requestStream
5853
private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion)
5954
throws Exception {
6055
long receivedVersion = dataWithVersion.getVersion();
61-
SubscribedData cachedData = configCache.getDataFor(subscribeKey);
56+
SubscribedData cachedData = this.openSergoClient.getConfigCache().getDataFor(subscribeKey);
6257
if (cachedData != null && cachedData.getVersion() > receivedVersion) {
6358
// The upcoming data is out-dated, so we'll not resolve the push request.
6459
return new LocalDataNotifyResult().setCode(OpenSergoTransportConstants.CODE_ERROR_VERSION_OUTDATED);
@@ -67,9 +62,9 @@ private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWi
6762
// Decode actual data from the raw "Any" data.
6863
List<Object> dataList = decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList());
6964
// Update to local config cache.
70-
configCache.updateData(subscribeKey, dataList, receivedVersion);
65+
this.openSergoClient.getConfigCache().updateData(subscribeKey, dataList, receivedVersion);
7166

72-
List<OpenSergoConfigSubscriber> subscribers = subscribeRegistry.getSubscribersOf(subscribeKey);
67+
List<OpenSergoConfigSubscriber> subscribers = this.openSergoClient.getSubscribeRegistry().getSubscribersOf(subscribeKey);
7368
if (subscribers == null || subscribers.isEmpty()) {
7469
// no-subscriber is acceptable (just for cache-and-pull mode)
7570
return LocalDataNotifyResult.withSuccess(dataList);
@@ -178,6 +173,11 @@ private List<Object> decodeActualData(String kind, List<Any> rawList) throws Exc
178173

179174
@Override
180175
public void onError(Throwable t) {
176+
// TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver<SubscribeRequest>
177+
io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode();
178+
if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) {
179+
this.openSergoClient.status = OpensergoClientStatus.INTERRUPTED;
180+
}
181181
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t);
182182
}
183183

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2022, OpenSergo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.opensergo;
17+
18+
/**
19+
* @author Jiangnan Jia
20+
**/
21+
public enum OpensergoClientStatus {
22+
23+
/* initial*/
24+
INITIAL,
25+
STARTING,
26+
STARTED,
27+
INTERRUPTED,
28+
SHUTDOWN
29+
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2022, OpenSergo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.opensergo;
17+
18+
import com.google.common.collect.Lists;
19+
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
20+
import io.opensergo.subscribe.SubscribeKey;
21+
22+
import java.util.List;
23+
24+
/**
25+
* @author Jiangnan Jia
26+
**/
27+
public class OpensergoClientSubscribeInfo {
28+
29+
private SubscribeKey subscribeKey;
30+
private List<OpenSergoConfigSubscriber> subscriberList;
31+
32+
public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey) {
33+
this.subscribeKey = subscribeKey;
34+
this.subscriberList = Lists.newArrayList();
35+
}
36+
public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey, List<OpenSergoConfigSubscriber> subscriberList) {
37+
this.subscribeKey = subscribeKey;
38+
this.subscriberList = subscriberList;
39+
}
40+
41+
public OpensergoClientSubscribeInfo addSubscriber(OpenSergoConfigSubscriber subscriber) {
42+
// TODO distinct the same OpenSergoConfigSubscriber
43+
this.subscriberList.add(subscriber);
44+
return this;
45+
}
46+
47+
public SubscribeKey getSubscribeKey() {
48+
return subscribeKey;
49+
}
50+
51+
public List<OpenSergoConfigSubscriber> getSubscriberList() {
52+
return subscriberList;
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return "OpensergoClientSubscribeInfo{" +
58+
"subscribeKey=" + subscribeKey +
59+
", subscriberList=" + subscriberList +
60+
'}';
61+
}
62+
}

src/main/java/io/opensergo/subscribe/SubscribeRegistry.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.opensergo.subscribe;
1717

1818
import java.util.List;
19+
import java.util.Set;
1920
import java.util.concurrent.ConcurrentHashMap;
2021
import java.util.concurrent.ConcurrentMap;
2122
import java.util.concurrent.CopyOnWriteArrayList;
@@ -33,9 +34,14 @@ public void registerSubscriber(SubscribeKey key, OpenSergoConfigSubscriber subsc
3334
AssertUtils.assertNotNull(key, "subscribeKey cannot be null");
3435
AssertUtils.assertNotNull(subscriber, "subscriber cannot be null");
3536
List<OpenSergoConfigSubscriber> list = subscriberMap.computeIfAbsent(key, v -> new CopyOnWriteArrayList<>());
37+
// TODO distinct the same OpenSergoConfigSubscriber
3638
list.add(subscriber);
3739
}
3840

41+
public Set<SubscribeKey> getSubscriberKeysAll() {
42+
return subscriberMap.keySet();
43+
}
44+
3945
public List<OpenSergoConfigSubscriber> getSubscribersOf(SubscribeKey key) {
4046
if (key == null) {
4147
return null;

0 commit comments

Comments
 (0)