Skip to content

Commit 31fe263

Browse files
committedMar 26, 2025·
added ThreadFactoryWrapper in WorkerFactoryOptions
1 parent b0f0a7e commit 31fe263

13 files changed

+118
-18
lines changed
 

‎src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@ public class SyncActivityWorker implements SuspendableWorker {
3131

3232
private final ActivityWorker worker;
3333
private final POJOActivityTaskHandler taskHandler;
34-
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
34+
private final ScheduledExecutorService heartbeatExecutor;
3535

3636
public SyncActivityWorker(
3737
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
38+
heartbeatExecutor =
39+
Executors.newScheduledThreadPool(
40+
4, options.getThreadFactoryWrapper().wrap(Executors.privilegedThreadFactory()));
3841
taskHandler =
3942
new POJOActivityTaskHandler(service, domain, options.getDataConverter(), heartbeatExecutor);
4043
worker = new ActivityWorker(service, domain, taskList, options, taskHandler);

‎src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public class SyncWorkflowWorker
5151
private final POJOWorkflowImplementationFactory factory;
5252
private final DataConverter dataConverter;
5353
private final POJOActivityTaskHandler laTaskHandler;
54-
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
55-
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
54+
private final ScheduledExecutorService heartbeatExecutor;
55+
private final ScheduledExecutorService ldaHeartbeatExecutor;
5656
private SuspendableWorker ldaWorker;
5757
private POJOActivityTaskHandler ldaTaskHandler;
5858
private final IWorkflowService service;
@@ -73,6 +73,16 @@ public SyncWorkflowWorker(
7373
this.dataConverter = workflowOptions.getDataConverter();
7474
this.service = service;
7575

76+
// heartbeat executors
77+
heartbeatExecutor =
78+
Executors.newScheduledThreadPool(
79+
4,
80+
localActivityOptions.getThreadFactoryWrapper().wrap(Executors.defaultThreadFactory()));
81+
ldaHeartbeatExecutor =
82+
Executors.newScheduledThreadPool(
83+
4,
84+
localActivityOptions.getThreadFactoryWrapper().wrap(Executors.defaultThreadFactory()));
85+
7686
factory =
7787
new POJOWorkflowImplementationFactory(
7888
workflowOptions.getDataConverter(),

‎src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public void start() {
100100
getOrCreateActivityPollTask(),
101101
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
102102
options.getPollerOptions(),
103-
options.getMetricsScope());
103+
options.getMetricsScope(),
104+
options.getThreadFactoryWrapper());
104105
poller.start();
105106
setPoller(poller);
106107
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

‎src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void start() {
8383
laPollTask,
8484
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
8585
options.getPollerOptions(),
86-
options.getMetricsScope());
86+
options.getMetricsScope(),
87+
options.getThreadFactoryWrapper());
8788
poller.start();
8889
setPoller(poller);
8990
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

‎src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,15 @@ public interface TaskHandler<TT> {
5555
TimeUnit.SECONDS,
5656
new SynchronousQueue<>());
5757
taskExecutor.setThreadFactory(
58-
new ExecutorThreadFactory(
59-
options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"),
60-
options.getPollerOptions().getUncaughtExceptionHandler()));
58+
options
59+
.getThreadFactoryWrapper()
60+
.wrap(
61+
new ExecutorThreadFactory(
62+
options
63+
.getPollerOptions()
64+
.getPollThreadNamePrefix()
65+
.replaceFirst("Poller", "Executor"),
66+
options.getPollerOptions().getUncaughtExceptionHandler())));
6167
taskExecutor.setRejectedExecutionHandler(new BlockCallerPolicy());
6268
}
6369

‎src/main/java/com/uber/cadence/internal/worker/Poller.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.internal.metrics.MetricsType;
2323
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
2424
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
25+
import com.uber.cadence.worker.ThreadFactoryWrapper;
2526
import com.uber.m3.tally.Scope;
2627
import java.util.Objects;
2728
import java.util.concurrent.ArrayBlockingQueue;
@@ -72,24 +73,29 @@ interface ThrowingRunnable {
7273

7374
private final AutoScaler pollerAutoScaler;
7475

76+
private final ThreadFactoryWrapper threadFactoryWrapper;
77+
7578
public Poller(
7679
String identity,
7780
PollTask<T> pollTask,
7881
ShutdownableTaskExecutor<T> taskExecutor,
7982
PollerOptions pollerOptions,
80-
Scope metricsScope) {
83+
Scope metricsScope,
84+
ThreadFactoryWrapper threadFactoryWrapper) {
8185
Objects.requireNonNull(identity, "identity cannot be null");
8286
Objects.requireNonNull(pollTask, "poll service should not be null");
8387
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
8488
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
8589
Objects.requireNonNull(metricsScope, "metricsScope should not be null");
90+
Objects.requireNonNull(metricsScope, "threadFactoryWrapper should not be null");
8691

8792
this.identity = identity;
8893
this.pollTask = pollTask;
8994
this.taskExecutor = taskExecutor;
9095
this.pollerOptions = pollerOptions;
9196
this.metricsScope = metricsScope;
9297
this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
98+
this.threadFactoryWrapper = threadFactoryWrapper;
9399
}
94100

95101
@Override
@@ -116,8 +122,10 @@ public void start() {
116122
TimeUnit.SECONDS,
117123
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
118124
pollExecutor.setThreadFactory(
119-
new ExecutorThreadFactory(
120-
pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
125+
threadFactoryWrapper.wrap(
126+
new ExecutorThreadFactory(
127+
pollerOptions.getPollThreadNamePrefix(),
128+
pollerOptions.getUncaughtExceptionHandler())));
121129

122130
pollBackoffThrottler =
123131
new BackoffThrottler(

‎src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.uber.cadence.converter.DataConverter;
2222
import com.uber.cadence.converter.JsonDataConverter;
2323
import com.uber.cadence.internal.metrics.NoopScope;
24+
import com.uber.cadence.worker.ThreadFactoryWrapper;
2425
import com.uber.m3.tally.Scope;
2526
import io.opentracing.Tracer;
2627
import java.time.Duration;
@@ -46,6 +47,7 @@ public static final class Builder {
4647
private boolean enableLoggingInReplay;
4748
private List<ContextPropagator> contextPropagators;
4849
private Tracer tracer;
50+
private ThreadFactoryWrapper threadFactoryWrapper;
4951

5052
private Builder() {}
5153

@@ -59,6 +61,7 @@ public Builder(SingleWorkerOptions options) {
5961
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
6062
this.contextPropagators = options.getContextPropagators();
6163
this.tracer = options.getTracer();
64+
this.threadFactoryWrapper = options.getThreadFactoryWrapper();
6265
}
6366

6467
public Builder setIdentity(String identity) {
@@ -107,6 +110,11 @@ public Builder setTracer(Tracer tracer) {
107110
return this;
108111
}
109112

113+
public Builder setThreadFactoryWrapper(ThreadFactoryWrapper threadFactoryWrapper) {
114+
this.threadFactoryWrapper = threadFactoryWrapper;
115+
return this;
116+
}
117+
110118
public SingleWorkerOptions build() {
111119
if (pollerOptions == null) {
112120
pollerOptions =
@@ -134,7 +142,8 @@ public SingleWorkerOptions build() {
134142
metricsScope,
135143
enableLoggingInReplay,
136144
contextPropagators,
137-
tracer);
145+
tracer,
146+
threadFactoryWrapper);
138147
}
139148
}
140149

@@ -147,6 +156,7 @@ public SingleWorkerOptions build() {
147156
private final boolean enableLoggingInReplay;
148157
private List<ContextPropagator> contextPropagators;
149158
private final Tracer tracer;
159+
private final ThreadFactoryWrapper threadFactoryWrapper;
150160

151161
private SingleWorkerOptions(
152162
String identity,
@@ -157,7 +167,8 @@ private SingleWorkerOptions(
157167
Scope metricsScope,
158168
boolean enableLoggingInReplay,
159169
List<ContextPropagator> contextPropagators,
160-
Tracer tracer) {
170+
Tracer tracer,
171+
ThreadFactoryWrapper threadFactoryWrapper) {
161172
this.identity = identity;
162173
this.dataConverter = dataConverter;
163174
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
@@ -167,6 +178,7 @@ private SingleWorkerOptions(
167178
this.enableLoggingInReplay = enableLoggingInReplay;
168179
this.contextPropagators = contextPropagators;
169180
this.tracer = tracer;
181+
this.threadFactoryWrapper = threadFactoryWrapper;
170182
}
171183

172184
public String getIdentity() {
@@ -204,4 +216,8 @@ public List<ContextPropagator> getContextPropagators() {
204216
public Tracer getTracer() {
205217
return tracer;
206218
}
219+
220+
public ThreadFactoryWrapper getThreadFactoryWrapper() {
221+
return threadFactoryWrapper;
222+
}
207223
}

‎src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ public void start() {
112112
options.getIdentity()),
113113
pollTaskExecutor,
114114
options.getPollerOptions(),
115-
options.getMetricsScope());
115+
options.getMetricsScope(),
116+
options.getThreadFactoryWrapper());
116117
poller.start();
117118
setPoller(poller);
118119
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

‎src/main/java/com/uber/cadence/worker/ShadowingWorker.java

+2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public ShadowingWorker(
113113
.setTaskListActivitiesPerSecond(options.getTaskListActivitiesPerSecond())
114114
.setPollerOptions(options.getActivityPollerOptions())
115115
.setMetricsScope(metricsScope)
116+
.setThreadFactoryWrapper(
117+
testOptions.getWorkerFactoryOptions().getThreadFactoryWrapper())
116118
.build();
117119
activityWorker =
118120
new SyncActivityWorker(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.worker;
17+
18+
import java.util.concurrent.ThreadFactory;
19+
20+
public interface ThreadFactoryWrapper {
21+
ThreadFactory wrap(ThreadFactory delegate);
22+
23+
static ThreadFactoryWrapper newDefaultInstance() {
24+
return new ThreadFactoryWrapper() {
25+
@Override
26+
public ThreadFactory wrap(ThreadFactory delegate) {
27+
return delegate;
28+
}
29+
};
30+
}
31+
}

‎src/main/java/com/uber/cadence/worker/Worker.java

+3
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public final class Worker implements Suspendable {
102102
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
103103
.setContextPropagators(contextPropagators)
104104
.setTracer(options.getTracer())
105+
.setThreadFactoryWrapper(factoryOptions.getThreadFactoryWrapper())
105106
.build();
106107
activityWorker =
107108
new SyncActivityWorker(
@@ -117,6 +118,7 @@ public final class Worker implements Suspendable {
117118
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
118119
.setContextPropagators(contextPropagators)
119120
.setTracer(options.getTracer())
121+
.setThreadFactoryWrapper(factoryOptions.getThreadFactoryWrapper())
120122
.build();
121123
SingleWorkerOptions localActivityOptions =
122124
SingleWorkerOptions.newBuilder()
@@ -128,6 +130,7 @@ public final class Worker implements Suspendable {
128130
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
129131
.setContextPropagators(contextPropagators)
130132
.setTracer(options.getTracer())
133+
.setThreadFactoryWrapper(factoryOptions.getThreadFactoryWrapper())
131134
.build();
132135
workflowWorker =
133136
new SyncWorkflowWorker(

‎src/main/java/com/uber/cadence/worker/WorkerFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
101101
TimeUnit.SECONDS,
102102
new SynchronousQueue<>());
103103
workflowThreadPool.setThreadFactory(
104-
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
104+
factoryOptions
105+
.getThreadFactoryWrapper()
106+
.wrap(
107+
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet())));
105108

106109
if (this.factoryOptions.isDisableStickyExecution()) {
107110
return;
@@ -136,7 +139,8 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
136139
.setPollThreadNamePrefix(POLL_THREAD_NAME)
137140
.setPollThreadCount(this.factoryOptions.getStickyPollerCount())
138141
.build(),
139-
stickyScope);
142+
stickyScope,
143+
factoryOptions.getThreadFactoryWrapper());
140144
}
141145

142146
/**

‎src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static class Builder {
4848
private int maxWorkflowThreadCount = DEFAULT_MAX_WORKFLOW_THREAD_COUNT;
4949
private boolean enableLoggingInReplay;
5050
private int stickyPollerCount = DEFAULT_STICKY_POLLER_COUNT;
51+
private ThreadFactoryWrapper threadFactoryWrapper = ThreadFactoryWrapper.newDefaultInstance();
5152

5253
private Builder() {}
5354

@@ -106,14 +107,20 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
106107
return this;
107108
}
108109

110+
public Builder setThreadFactoryWrapper(ThreadFactoryWrapper executorServiceWrapper) {
111+
this.threadFactoryWrapper = executorServiceWrapper;
112+
return this;
113+
}
114+
109115
public WorkerFactoryOptions build() {
110116
return new WorkerFactoryOptions(
111117
disableStickyExecution,
112118
stickyCacheSize,
113119
maxWorkflowThreadCount,
114120
stickyTaskScheduleToStartTimeout,
115121
stickyPollerCount,
116-
enableLoggingInReplay);
122+
enableLoggingInReplay,
123+
threadFactoryWrapper);
117124
}
118125
}
119126

@@ -123,14 +130,16 @@ public WorkerFactoryOptions build() {
123130
private Duration stickyTaskScheduleToStartTimeout;
124131
private boolean enableLoggingInReplay;
125132
private int stickyPollerCount;
133+
private ThreadFactoryWrapper threadFactoryWrapper;
126134

127135
private WorkerFactoryOptions(
128136
boolean disableStickyExecution,
129137
int cacheMaximumSize,
130138
int maxWorkflowThreadCount,
131139
Duration stickyTaskScheduleToStartTimeout,
132140
int stickyPollerCount,
133-
boolean enableLoggingInReplay) {
141+
boolean enableLoggingInReplay,
142+
ThreadFactoryWrapper threadServiceWrapper) {
134143
Preconditions.checkArgument(cacheMaximumSize > 0, "cacheMaximumSize should be greater than 0");
135144
Preconditions.checkArgument(
136145
maxWorkflowThreadCount > 0, "maxWorkflowThreadCount should be greater than 0");
@@ -141,6 +150,7 @@ private WorkerFactoryOptions(
141150
this.stickyPollerCount = stickyPollerCount;
142151
this.enableLoggingInReplay = enableLoggingInReplay;
143152
this.stickyTaskScheduleToStartTimeout = stickyTaskScheduleToStartTimeout;
153+
this.threadFactoryWrapper = threadServiceWrapper;
144154
}
145155

146156
public int getMaxWorkflowThreadCount() {
@@ -166,4 +176,8 @@ public int getStickyPollerCount() {
166176
public Duration getStickyTaskScheduleToStartTimeout() {
167177
return stickyTaskScheduleToStartTimeout;
168178
}
179+
180+
public ThreadFactoryWrapper getThreadFactoryWrapper() {
181+
return threadFactoryWrapper;
182+
}
169183
}

0 commit comments

Comments
 (0)
Please sign in to comment.