Skip to content

Commit 6abced9

Browse files
authored
added ExecutorWrapper in WorkerFactoryOptions (#988)
What changed? added new option in WorkerFactoryOptions to expose user provided ExecutorWrapper added a ExecutorWrapper interface apply ExecutorWrapper to executors in heartbeat, pollers (both poller executor and poller task executor), SyncActivityWorker and SyncWorkflowWorker. Why? By wrapping the ExecutorWrapper, it is now possible to propagate "root" context in background polling and task executions. Uber internally built a threadlocal storage to propagate context inside executors. How did you test it? Unit Test WIP internal test samples on context propagation Potential risks No risk due to it's a new fiel with fallback on original behavior
1 parent 0ed373a commit 6abced9

13 files changed

+136
-30
lines changed

src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ public class SyncActivityWorker implements SuspendableWorker {
3131

3232
private final ActivityWorker worker;
3333
private final POJOActivityTaskHandler taskHandler;
34-
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
34+
private final ScheduledExecutorService heartbeatExecutor;
3535

3636
public SyncActivityWorker(
3737
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
38+
heartbeatExecutor = options.getExecutorWrapper().wrap(Executors.newScheduledThreadPool(4));
3839
taskHandler =
3940
new POJOActivityTaskHandler(service, domain, options.getDataConverter(), heartbeatExecutor);
4041
worker = new ActivityWorker(service, domain, taskList, options, taskHandler);

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.uber.cadence.internal.worker.SuspendableWorker;
3333
import com.uber.cadence.internal.worker.WorkflowWorker;
3434
import com.uber.cadence.serviceclient.IWorkflowService;
35+
import com.uber.cadence.worker.ExecutorWrapper;
3536
import com.uber.cadence.worker.WorkflowImplementationOptions;
3637
import com.uber.cadence.workflow.Functions.Func;
3738
import com.uber.cadence.workflow.WorkflowInterceptor;
@@ -51,8 +52,8 @@ public class SyncWorkflowWorker
5152
private final POJOWorkflowImplementationFactory factory;
5253
private final DataConverter dataConverter;
5354
private final POJOActivityTaskHandler laTaskHandler;
54-
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
55-
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
55+
private final ScheduledExecutorService heartbeatExecutor;
56+
private final ScheduledExecutorService ldaHeartbeatExecutor;
5657
private SuspendableWorker ldaWorker;
5758
private POJOActivityTaskHandler ldaTaskHandler;
5859
private final IWorkflowService service;
@@ -73,6 +74,11 @@ public SyncWorkflowWorker(
7374
this.dataConverter = workflowOptions.getDataConverter();
7475
this.service = service;
7576

77+
// heartbeat executors
78+
ExecutorWrapper executorWrapper = localActivityOptions.getExecutorWrapper();
79+
heartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4));
80+
ldaHeartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4));
81+
7682
factory =
7783
new POJOWorkflowImplementationFactory(
7884
workflowOptions.getDataConverter(),

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public void start() {
100100
getOrCreateActivityPollTask(),
101101
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
102102
options.getPollerOptions(),
103-
options.getMetricsScope());
103+
options.getMetricsScope(),
104+
options.getExecutorWrapper());
104105
poller.start();
105106
setPoller(poller);
106107
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void start() {
8383
laPollTask,
8484
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
8585
options.getPollerOptions(),
86-
options.getMetricsScope());
86+
options.getMetricsScope(),
87+
options.getExecutorWrapper());
8788
poller.start();
8889
setPoller(poller);
8990
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,15 @@ public interface TaskHandler<TT> {
4848

4949
this.options = options;
5050
taskExecutor =
51-
new ThreadPoolExecutor(
52-
0,
53-
options.getTaskExecutorThreadPoolSize(),
54-
1,
55-
TimeUnit.SECONDS,
56-
new SynchronousQueue<>());
51+
options
52+
.getExecutorWrapper()
53+
.wrap(
54+
new ThreadPoolExecutor(
55+
0,
56+
options.getTaskExecutorThreadPoolSize(),
57+
1,
58+
TimeUnit.SECONDS,
59+
new SynchronousQueue<>()));
5760
taskExecutor.setThreadFactory(
5861
new ExecutorThreadFactory(
5962
options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"),

src/main/java/com/uber/cadence/internal/worker/Poller.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.internal.metrics.MetricsType;
2323
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
2424
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
25+
import com.uber.cadence.worker.ExecutorWrapper;
2526
import com.uber.m3.tally.Scope;
2627
import java.util.Objects;
2728
import java.util.concurrent.ArrayBlockingQueue;
@@ -72,24 +73,29 @@ interface ThrowingRunnable {
7273

7374
private final AutoScaler pollerAutoScaler;
7475

76+
private final ExecutorWrapper executorWrapper;
77+
7578
public Poller(
7679
String identity,
7780
PollTask<T> pollTask,
7881
ShutdownableTaskExecutor<T> taskExecutor,
7982
PollerOptions pollerOptions,
80-
Scope metricsScope) {
83+
Scope metricsScope,
84+
ExecutorWrapper executorWrapper) {
8185
Objects.requireNonNull(identity, "identity cannot be null");
8286
Objects.requireNonNull(pollTask, "poll service should not be null");
8387
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
8488
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
8589
Objects.requireNonNull(metricsScope, "metricsScope should not be null");
90+
Objects.requireNonNull(metricsScope, "executorWrapper should not be null");
8691

8792
this.identity = identity;
8893
this.pollTask = pollTask;
8994
this.taskExecutor = taskExecutor;
9095
this.pollerOptions = pollerOptions;
9196
this.metricsScope = metricsScope;
9297
this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
98+
this.executorWrapper = executorWrapper;
9399
}
94100

95101
@Override
@@ -109,12 +115,13 @@ public void start() {
109115
// As task enqueues next task the buffering is needed to queue task until the previous one
110116
// releases a thread.
111117
pollExecutor =
112-
new ThreadPoolExecutor(
113-
pollerOptions.getPollThreadCount(),
114-
pollerOptions.getPollThreadCount(),
115-
1,
116-
TimeUnit.SECONDS,
117-
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
118+
executorWrapper.wrap(
119+
new ThreadPoolExecutor(
120+
pollerOptions.getPollThreadCount(),
121+
pollerOptions.getPollThreadCount(),
122+
1,
123+
TimeUnit.SECONDS,
124+
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount())));
118125
pollExecutor.setThreadFactory(
119126
new ExecutorThreadFactory(
120127
pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));

src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.uber.cadence.converter.DataConverter;
2222
import com.uber.cadence.converter.JsonDataConverter;
2323
import com.uber.cadence.internal.metrics.NoopScope;
24+
import com.uber.cadence.worker.ExecutorWrapper;
2425
import com.uber.m3.tally.Scope;
2526
import io.opentracing.Tracer;
2627
import java.time.Duration;
@@ -46,6 +47,7 @@ public static final class Builder {
4647
private boolean enableLoggingInReplay;
4748
private List<ContextPropagator> contextPropagators;
4849
private Tracer tracer;
50+
private ExecutorWrapper executorWrapper;
4951

5052
private Builder() {}
5153

@@ -59,6 +61,7 @@ public Builder(SingleWorkerOptions options) {
5961
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
6062
this.contextPropagators = options.getContextPropagators();
6163
this.tracer = options.getTracer();
64+
this.executorWrapper = options.getExecutorWrapper();
6265
}
6366

6467
public Builder setIdentity(String identity) {
@@ -107,6 +110,11 @@ public Builder setTracer(Tracer tracer) {
107110
return this;
108111
}
109112

113+
public Builder setExecutorWrapper(ExecutorWrapper executorWrapper) {
114+
this.executorWrapper = executorWrapper;
115+
return this;
116+
}
117+
110118
public SingleWorkerOptions build() {
111119
if (pollerOptions == null) {
112120
pollerOptions =
@@ -134,7 +142,8 @@ public SingleWorkerOptions build() {
134142
metricsScope,
135143
enableLoggingInReplay,
136144
contextPropagators,
137-
tracer);
145+
tracer,
146+
executorWrapper);
138147
}
139148
}
140149

@@ -147,6 +156,7 @@ public SingleWorkerOptions build() {
147156
private final boolean enableLoggingInReplay;
148157
private List<ContextPropagator> contextPropagators;
149158
private final Tracer tracer;
159+
private final ExecutorWrapper executorWrapper;
150160

151161
private SingleWorkerOptions(
152162
String identity,
@@ -157,7 +167,8 @@ private SingleWorkerOptions(
157167
Scope metricsScope,
158168
boolean enableLoggingInReplay,
159169
List<ContextPropagator> contextPropagators,
160-
Tracer tracer) {
170+
Tracer tracer,
171+
ExecutorWrapper executorWrapper) {
161172
this.identity = identity;
162173
this.dataConverter = dataConverter;
163174
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
@@ -167,6 +178,7 @@ private SingleWorkerOptions(
167178
this.enableLoggingInReplay = enableLoggingInReplay;
168179
this.contextPropagators = contextPropagators;
169180
this.tracer = tracer;
181+
this.executorWrapper = executorWrapper;
170182
}
171183

172184
public String getIdentity() {
@@ -204,4 +216,8 @@ public List<ContextPropagator> getContextPropagators() {
204216
public Tracer getTracer() {
205217
return tracer;
206218
}
219+
220+
public ExecutorWrapper getExecutorWrapper() {
221+
return executorWrapper;
222+
}
207223
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ public void start() {
112112
options.getIdentity()),
113113
pollTaskExecutor,
114114
options.getPollerOptions(),
115-
options.getMetricsScope());
115+
options.getMetricsScope(),
116+
options.getExecutorWrapper());
116117
poller.start();
117118
setPoller(poller);
118119
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
package com.uber.cadence.worker;
18+
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.ScheduledExecutorService;
21+
import java.util.concurrent.ThreadPoolExecutor;
22+
23+
public interface ExecutorWrapper {
24+
ExecutorService wrap(ExecutorService delegate);
25+
26+
ThreadPoolExecutor wrap(ThreadPoolExecutor delegate);
27+
28+
ScheduledExecutorService wrap(ScheduledExecutorService delegate);
29+
30+
static ExecutorWrapper newDefaultInstance() {
31+
return new ExecutorWrapper() {
32+
@Override
33+
public ExecutorService wrap(ExecutorService delegate) {
34+
return delegate;
35+
}
36+
37+
@Override
38+
public ThreadPoolExecutor wrap(ThreadPoolExecutor delegate) {
39+
return delegate;
40+
}
41+
42+
@Override
43+
public ScheduledExecutorService wrap(ScheduledExecutorService delegate) {
44+
return delegate;
45+
}
46+
};
47+
}
48+
}

src/main/java/com/uber/cadence/worker/ShadowingWorker.java

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public ShadowingWorker(
113113
.setTaskListActivitiesPerSecond(options.getTaskListActivitiesPerSecond())
114114
.setPollerOptions(options.getActivityPollerOptions())
115115
.setMetricsScope(metricsScope)
116+
.setExecutorWrapper(testOptions.getWorkerFactoryOptions().getExecutorWrapper())
116117
.build();
117118
activityWorker =
118119
new SyncActivityWorker(

src/main/java/com/uber/cadence/worker/Worker.java

+3
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public final class Worker implements Suspendable {
102102
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
103103
.setContextPropagators(contextPropagators)
104104
.setTracer(options.getTracer())
105+
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
105106
.build();
106107
activityWorker =
107108
new SyncActivityWorker(
@@ -117,6 +118,7 @@ public final class Worker implements Suspendable {
117118
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
118119
.setContextPropagators(contextPropagators)
119120
.setTracer(options.getTracer())
121+
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
120122
.build();
121123
SingleWorkerOptions localActivityOptions =
122124
SingleWorkerOptions.newBuilder()
@@ -128,6 +130,7 @@ public final class Worker implements Suspendable {
128130
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
129131
.setContextPropagators(contextPropagators)
130132
.setTracer(options.getTracer())
133+
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
131134
.build();
132135
workflowWorker =
133136
new SyncWorkflowWorker(

src/main/java/com/uber/cadence/worker/WorkerFactory.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,15 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
9595
MoreObjects.firstNonNull(factoryOptions, WorkerFactoryOptions.defaultInstance());
9696

9797
workflowThreadPool =
98-
new ThreadPoolExecutor(
99-
0,
100-
this.factoryOptions.getMaxWorkflowThreadCount(),
101-
1,
102-
TimeUnit.SECONDS,
103-
new SynchronousQueue<>());
98+
factoryOptions
99+
.getExecutorWrapper()
100+
.wrap(
101+
new ThreadPoolExecutor(
102+
0,
103+
this.factoryOptions.getMaxWorkflowThreadCount(),
104+
1,
105+
TimeUnit.SECONDS,
106+
new SynchronousQueue<>()));
104107
workflowThreadPool.setThreadFactory(
105108
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
106109

@@ -140,7 +143,8 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
140143
.setPollThreadNamePrefix(POLL_THREAD_NAME)
141144
.setPollThreadCount(this.factoryOptions.getStickyPollerCount())
142145
.build(),
143-
stickyScope);
146+
stickyScope,
147+
factoryOptions.getExecutorWrapper());
144148
}
145149

146150
/**

0 commit comments

Comments
 (0)