Skip to content

Commit 9002821

Browse files
authored
Merge branch 'master' into manger-otel-context-prop
2 parents 16e84dd + 63a34bc commit 9002821

File tree

11 files changed

+117
-9
lines changed

11 files changed

+117
-9
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ dependencies {
5353
errorproneJavac('com.google.errorprone:javac:9+181-r4173-1')
5454
errorprone('com.google.errorprone:error_prone_core:2.3.4')
5555

56-
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5'
56+
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30'
5757
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
5858
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
5959
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@
3939
import java.lang.reflect.Type;
4040
import java.time.Duration;
4141
import java.util.Objects;
42-
import java.util.concurrent.Executors;
43-
import java.util.concurrent.ScheduledExecutorService;
44-
import java.util.concurrent.ThreadPoolExecutor;
45-
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.*;
4643
import java.util.function.Consumer;
4744
import java.util.function.Function;
4845

@@ -59,6 +56,7 @@ public class SyncWorkflowWorker
5956
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
6057
private SuspendableWorker ldaWorker;
6158
private POJOActivityTaskHandler ldaTaskHandler;
59+
private final IWorkflowService service;
6260

6361
public SyncWorkflowWorker(
6462
IWorkflowService service,
@@ -74,6 +72,7 @@ public SyncWorkflowWorker(
7472
ThreadPoolExecutor workflowThreadPool) {
7573
Objects.requireNonNull(workflowThreadPool);
7674
this.dataConverter = workflowOptions.getDataConverter();
75+
this.service = service;
7776

7877
factory =
7978
new POJOWorkflowImplementationFactory(
@@ -252,4 +251,8 @@ public <R> R queryWorkflowExecution(
252251
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
253252
workflowWorker.accept(pollForDecisionTaskResponse);
254253
}
254+
255+
public CompletableFuture<Boolean> isHealthy() {
256+
return service.isHealthy();
257+
}
255258
}

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Random;
4747
import java.util.UUID;
4848
import java.util.concurrent.CancellationException;
49+
import java.util.concurrent.CompletableFuture;
4950
import java.util.concurrent.Executors;
5051
import java.util.concurrent.ScheduledExecutorService;
5152
import java.util.concurrent.atomic.AtomicInteger;
@@ -317,6 +318,11 @@ private class WorkflowServiceWrapper implements IWorkflowService {
317318

318319
private final IWorkflowService impl;
319320

321+
@Override
322+
public CompletableFuture<Boolean> isHealthy() {
323+
return impl.isHealthy();
324+
}
325+
320326
private WorkflowServiceWrapper(IWorkflowService impl) {
321327
if (impl == null) {
322328
// Create empty implementation that just ignores all requests.

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+5
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,11 @@ public void GetWorkflowExecutionHistoryWithTimeout(
484484
impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis);
485485
}
486486

487+
@Override
488+
public CompletableFuture<Boolean> isHealthy() {
489+
return impl.isHealthy();
490+
}
491+
487492
@Override
488493
public void PollForDecisionTask(
489494
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler)

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

+7
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,13 @@ public void GetWorkflowExecutionHistoryWithTimeout(
845845
GetWorkflowExecutionHistory(getRequest, resultHandler);
846846
}
847847

848+
@Override
849+
public CompletableFuture<Boolean> isHealthy() {
850+
CompletableFuture<Boolean> rval = new CompletableFuture<>();
851+
rval.complete(Boolean.TRUE);
852+
return rval;
853+
}
854+
848855
@Override
849856
public void PollForDecisionTask(
850857
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException {

src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.StartWorkflowExecutionRequest;
2424
import com.uber.cadence.WorkflowService.AsyncIface;
2525
import com.uber.cadence.WorkflowService.Iface;
26+
import java.util.concurrent.CompletableFuture;
2627
import org.apache.thrift.TException;
2728
import org.apache.thrift.async.AsyncMethodCallback;
2829

@@ -70,6 +71,7 @@ void GetWorkflowExecutionHistoryWithTimeout(
7071
AsyncMethodCallback resultHandler,
7172
Long timeoutInMillis)
7273
throws TException;
74+
7375
/**
7476
* SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with
7577
* timeout
@@ -84,4 +86,10 @@ void SignalWorkflowExecutionWithTimeout(
8486
AsyncMethodCallback resultHandler,
8587
Long timeoutInMillis)
8688
throws TException;
89+
90+
/**
91+
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
92+
* list
93+
*/
94+
CompletableFuture<Boolean> isHealthy();
8795
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

+52-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import com.uber.tchannel.errors.ErrorType;
105105
import com.uber.tchannel.messages.ThriftRequest;
106106
import com.uber.tchannel.messages.ThriftResponse;
107+
import com.uber.tchannel.messages.generated.Meta;
107108
import java.net.InetAddress;
108109
import java.net.InetSocketAddress;
109110
import java.net.UnknownHostException;
@@ -127,7 +128,7 @@ public class WorkflowServiceTChannel implements IWorkflowService {
127128
private final ClientOptions options;
128129
private final Map<String, String> thriftHeaders;
129130
private final TChannel tChannel;
130-
private final SubChannel subChannel;
131+
private SubChannel subChannel;
131132

132133
/**
133134
* Creates Cadence client that connects to the specified host and port using specified options.
@@ -159,6 +160,13 @@ public WorkflowServiceTChannel(ClientOptions options) {
159160
+ Version.FEATURE_VERSION);
160161
}
161162

163+
public void resetSubchannelPeers() throws UnknownHostException {
164+
InetAddress address = InetAddress.getByName(options.getHost());
165+
ArrayList<InetSocketAddress> peers = new ArrayList<>();
166+
peers.add(new InetSocketAddress(address, options.getPort()));
167+
this.subChannel.setPeers(peers);
168+
}
169+
162170
/**
163171
* Creates Cadence client with specified sub channel and options.
164172
*
@@ -207,6 +215,49 @@ private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body) {
207215
return buildThriftRequest(apiName, body, null);
208216
}
209217

218+
/**
219+
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
220+
* list
221+
*/
222+
@Override
223+
public CompletableFuture<Boolean> isHealthy() {
224+
final ThriftRequest<Meta.health_args> req =
225+
new ThriftRequest.Builder<Meta.health_args>(options.getServiceName(), "Meta::health")
226+
.setBody(new Meta.health_args())
227+
.build();
228+
final CompletableFuture<Boolean> result = new CompletableFuture<>();
229+
try {
230+
231+
final TFuture<ThriftResponse<Meta.health_result>> future = this.subChannel.send(req);
232+
future.addCallback(
233+
response -> {
234+
req.releaseQuietly();
235+
if (response.isError()) {
236+
try {
237+
this.resetSubchannelPeers();
238+
} catch (final Exception inner_e) {
239+
}
240+
result.completeExceptionally(new TException("Rpc error:" + response.getError()));
241+
} else {
242+
result.complete(response.getBody(Meta.health_result.class).getSuccess().isOk());
243+
}
244+
try {
245+
response.release();
246+
} catch (final Exception e) {
247+
// ignore
248+
}
249+
});
250+
} catch (final TChannelError e) {
251+
req.releaseQuietly();
252+
try {
253+
this.resetSubchannelPeers();
254+
} catch (final Exception inner_e) {
255+
}
256+
result.complete(Boolean.FALSE);
257+
}
258+
return result;
259+
}
260+
210261
private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) {
211262
String endpoint = getEndpoint(INTERFACE_NAME, apiName);
212263
ThriftRequest.Builder<T> builder =

src/main/java/com/uber/cadence/worker/Worker.java

+8
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,12 @@ public void resumePolling() {
330330
public boolean isSuspended() {
331331
return workflowWorker.isSuspended() && activityWorker.isSuspended();
332332
}
333+
334+
/**
335+
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
336+
* list
337+
*/
338+
public CompletableFuture<Boolean> isHealthy() {
339+
return workflowWorker.isHealthy();
340+
}
333341
}

src/main/java/com/uber/cadence/worker/WorkerFactory.java

+16
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
import java.util.List;
4040
import java.util.Objects;
4141
import java.util.UUID;
42+
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.SynchronousQueue;
4344
import java.util.concurrent.ThreadPoolExecutor;
4445
import java.util.concurrent.TimeUnit;
4546
import java.util.concurrent.atomic.AtomicInteger;
47+
import java.util.stream.Collectors;
4648
import org.slf4j.Logger;
4749
import org.slf4j.LoggerFactory;
4850

@@ -287,6 +289,20 @@ public synchronized void shutdownNow() {
287289
}
288290
}
289291

292+
/**
293+
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
294+
* list
295+
*/
296+
public CompletableFuture<Boolean> isHealthy() {
297+
List<CompletableFuture<Boolean>> healthyList =
298+
workers.stream().map(Worker::isHealthy).collect(Collectors.toList());
299+
CompletableFuture<Boolean> result = CompletableFuture.supplyAsync(() -> true);
300+
for (CompletableFuture<Boolean> future : healthyList) {
301+
result = result.thenCombine(future, (current, other) -> current && other);
302+
}
303+
return result;
304+
}
305+
290306
/**
291307
* Blocks until all tasks have completed execution after a shutdown request, or the timeout
292308
* occurs, or the current thread is interrupted, whichever happens first.

src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package com.uber.cadence.workerFactory;
1919

20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertTrue;
20+
import static org.junit.Assert.*;
2221

2322
import com.uber.cadence.client.WorkflowClient;
2423
import com.uber.cadence.serviceclient.ClientOptions;
@@ -65,6 +64,11 @@ public void whenAFactoryIsStartedAllWorkersStart() {
6564

6665
factory.start();
6766
assertTrue(factory.isStarted());
67+
try {
68+
assertTrue(factory.isHealthy().get());
69+
} catch (Exception e) {
70+
assertNull("Failed to check if cluster is health!", e);
71+
}
6872
factory.shutdown();
6973
factory.awaitTermination(1, TimeUnit.SECONDS);
7074
}

src/test/resources/logback-test.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@
2828
<root level="INFO">
2929
<appender-ref ref="STDOUT" />
3030
</root>
31-
</configuration>
31+
</configuration>

0 commit comments

Comments
 (0)