Skip to content

Commit a4c9a5a

Browse files
committed
Implement nexus-based activity cancels
1 parent 3c2d938 commit a4c9a5a

31 files changed

Lines changed: 831 additions & 51 deletions

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484

8585
- name: Start CLI server
8686
env:
87-
TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations
87+
TEMPORAL_CLI_VERSION: 1.7.2-standalone-nexus-operations
8888
run: |
8989
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
9090
tar -xzf temporal_cli.tar.gz
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.temporal.activity;
2+
3+
import io.temporal.client.ActivityCanceledException;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
/** Token that allows an Activity implementation to observe cancellation requests. */
7+
public interface ActivityCancellationToken {
8+
9+
ActivityCancellationToken NONE =
10+
new ActivityCancellationToken() {
11+
@Override
12+
public boolean isCancellationRequested() {
13+
return false;
14+
}
15+
16+
@Override
17+
public void throwIfCancellationRequested() throws ActivityCanceledException {}
18+
19+
@Override
20+
public CompletableFuture<Void> getCancellationRequest() {
21+
return new CompletableFuture<>();
22+
}
23+
};
24+
25+
/**
26+
* Returns true after cancellation has been requested for this Activity Execution.
27+
*
28+
* <p>If this method returns true, the Activity implementation should stop its work and usually
29+
* call {@link #throwIfCancellationRequested()} to report successful cancellation to Temporal.
30+
*/
31+
boolean isCancellationRequested();
32+
33+
/**
34+
* Throws {@link ActivityCanceledException} if cancellation has been requested for this Activity
35+
* Execution.
36+
*
37+
* <p>Rethrowing this exception from Activity code reports successful cancellation to Temporal.
38+
*/
39+
void throwIfCancellationRequested() throws ActivityCanceledException;
40+
41+
/**
42+
* Future that completes when cancellation has been requested for this Activity Execution.
43+
*
44+
* <p>The future completes normally. Activity code should still call {@link
45+
* #throwIfCancellationRequested()} or otherwise report cancellation if it wants the Activity
46+
* Execution to complete as canceled.
47+
*/
48+
CompletableFuture<Void> getCancellationRequest();
49+
}

temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ public interface ActivityExecutionContext {
8989
*/
9090
byte[] getTaskToken();
9191

92+
/**
93+
* Returns a token that can be used by Activity code to observe cancellation requests without
94+
* recording Heartbeats.
95+
*/
96+
default ActivityCancellationToken getCancellationToken() {
97+
return ActivityCancellationToken.NONE;
98+
}
99+
92100
/**
93101
* If this method is called during an Activity Execution then the Activity Execution is not going
94102
* to complete when it's method returns. It is expected to be completed asynchronously using
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.temporal.internal.activity;
2+
3+
import io.temporal.activity.ActivityCancellationToken;
4+
import io.temporal.client.ActivityCanceledException;
5+
import java.util.concurrent.CompletableFuture;
6+
7+
final class ActivityCancellationTokenImpl implements ActivityCancellationToken {
8+
private final CompletableFuture<Void> cancellationRequest = new CompletableFuture<>();
9+
private volatile ActivityCanceledException cancellationException;
10+
11+
@Override
12+
public boolean isCancellationRequested() {
13+
return cancellationException != null;
14+
}
15+
16+
@Override
17+
public void throwIfCancellationRequested() throws ActivityCanceledException {
18+
ActivityCanceledException exception = cancellationException;
19+
if (exception != null) {
20+
throw exception;
21+
}
22+
}
23+
24+
@Override
25+
public CompletableFuture<Void> getCancellationRequest() {
26+
return cancellationRequest;
27+
}
28+
29+
void requestCancel(ActivityCanceledException exception) {
30+
cancellationException = exception;
31+
cancellationRequest.complete(null);
32+
}
33+
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,13 @@
55
public interface ActivityExecutionContextFactory {
66
InternalActivityExecutionContext createContext(
77
ActivityInfoInternal info, Object activity, Scope metricsScope);
8+
9+
/**
10+
* Requests cancellation for a currently running activity identified by task token.
11+
*
12+
* @return true if the activity was found and marked canceled.
13+
*/
14+
default boolean requestCancel(byte[] taskToken) {
15+
return false;
16+
}
817
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import io.temporal.client.WorkflowClient;
55
import io.temporal.common.converter.DataConverter;
66
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
7+
import java.nio.ByteBuffer;
78
import java.time.Duration;
9+
import java.util.Arrays;
810
import java.util.Objects;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.ConcurrentMap;
913
import java.util.concurrent.ScheduledExecutorService;
1014

1115
public class ActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {
@@ -17,6 +21,8 @@ public class ActivityExecutionContextFactoryImpl implements ActivityExecutionCon
1721
private final DataConverter dataConverter;
1822
private final ScheduledExecutorService heartbeatExecutor;
1923
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
24+
private final ConcurrentMap<ByteBuffer, ActivityExecutionContextImpl> activeContexts =
25+
new ConcurrentHashMap<>();
2026

2127
public ActivityExecutionContextFactoryImpl(
2228
WorkflowClient client,
@@ -42,18 +48,37 @@ public ActivityExecutionContextFactoryImpl(
4248
@Override
4349
public InternalActivityExecutionContext createContext(
4450
ActivityInfoInternal info, Object activity, Scope metricsScope) {
45-
return new ActivityExecutionContextImpl(
46-
client,
47-
namespace,
48-
activity,
49-
info,
50-
dataConverter,
51-
heartbeatExecutor,
52-
manualCompletionClientFactory,
53-
info.getCompletionHandle(),
54-
metricsScope,
55-
identity,
56-
maxHeartbeatThrottleInterval,
57-
defaultHeartbeatThrottleInterval);
51+
ByteBuffer taskToken = taskTokenKey(info.getTaskToken());
52+
ActivityExecutionContextImpl context =
53+
new ActivityExecutionContextImpl(
54+
client,
55+
namespace,
56+
activity,
57+
info,
58+
dataConverter,
59+
heartbeatExecutor,
60+
manualCompletionClientFactory,
61+
info.getCompletionHandle(),
62+
metricsScope,
63+
identity,
64+
maxHeartbeatThrottleInterval,
65+
defaultHeartbeatThrottleInterval,
66+
() -> activeContexts.remove(taskToken));
67+
activeContexts.put(taskToken, context);
68+
return context;
69+
}
70+
71+
@Override
72+
public boolean requestCancel(byte[] taskToken) {
73+
ActivityExecutionContextImpl context = activeContexts.get(taskTokenKey(taskToken));
74+
if (context == null) {
75+
return false;
76+
}
77+
context.cancelFromWorkerCommand();
78+
return true;
79+
}
80+
81+
private static ByteBuffer taskTokenKey(byte[] taskToken) {
82+
return ByteBuffer.wrap(Arrays.copyOf(taskToken, taskToken.length)).asReadOnlyBuffer();
5883
}
5984
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.temporal.internal.activity;
22

33
import com.uber.m3.tally.Scope;
4+
import io.temporal.activity.ActivityCancellationToken;
45
import io.temporal.activity.ActivityExecutionContext;
56
import io.temporal.activity.ActivityInfo;
67
import io.temporal.activity.ManualActivityCompletionClient;
@@ -32,6 +33,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
3233
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
3334
private final Functions.Proc completionHandle;
3435
private final HeartbeatContext heartbeatContext;
36+
private final Functions.Proc closeCallback;
3537

3638
private final Scope metricsScope;
3739
private final ActivityInfo info;
@@ -51,12 +53,14 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
5153
Scope metricsScope,
5254
String identity,
5355
Duration maxHeartbeatThrottleInterval,
54-
Duration defaultHeartbeatThrottleInterval) {
56+
Duration defaultHeartbeatThrottleInterval,
57+
Functions.Proc closeCallback) {
5558
this.client = client;
5659
this.activity = activity;
5760
this.metricsScope = metricsScope;
5861
this.info = info;
5962
this.completionHandle = completionHandle;
63+
this.closeCallback = closeCallback;
6064
this.manualCompletionClientFactory = manualCompletionClientFactory;
6165
this.heartbeatContext =
6266
new HeartbeatContextImpl(
@@ -105,6 +109,11 @@ public byte[] getTaskToken() {
105109
return info.getTaskToken();
106110
}
107111

112+
@Override
113+
public ActivityCancellationToken getCancellationToken() {
114+
return heartbeatContext.getCancellationToken();
115+
}
116+
108117
@Override
109118
public void doNotCompleteOnReturn() {
110119
lock.lock();
@@ -170,6 +179,11 @@ public Object getLastHeartbeatValue() {
170179
@Override
171180
public void cancelOutstandingHeartbeat() {
172181
heartbeatContext.cancelOutstandingHeartbeat();
182+
closeCallback.apply();
183+
}
184+
185+
void cancelFromWorkerCommand() {
186+
heartbeatContext.cancelFromWorkerCommand();
173187
}
174188

175189
@Override

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public boolean isTypeSupported(String type) {
8383
return activities.get(type) != null || dynamicActivity != null;
8484
}
8585

86+
public boolean requestCancel(byte[] taskToken) {
87+
return executionContextFactory.requestCancel(taskToken);
88+
}
89+
8690
public void registerActivityImplementations(Object[] activitiesImplementation) {
8791
for (Object activity : activitiesImplementation) {
8892
registerActivityImplementation(activity);

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.internal.activity;
22

3+
import io.temporal.activity.ActivityCancellationToken;
34
import io.temporal.client.ActivityCompletionException;
45
import java.lang.reflect.Type;
56
import java.util.Optional;
@@ -23,6 +24,11 @@ interface HeartbeatContext {
2324

2425
Object getLatestHeartbeatDetails();
2526

27+
ActivityCancellationToken getCancellationToken();
28+
29+
/** Mark this activity as canceled by an external worker command. */
30+
void cancelFromWorkerCommand();
31+
2632
/** Cancel any pending heartbeat and discard cached heartbeat details. */
2733
void cancelOutstandingHeartbeat();
2834
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.uber.m3.tally.Scope;
44
import io.grpc.Status;
55
import io.grpc.StatusRuntimeException;
6+
import io.temporal.activity.ActivityCancellationToken;
67
import io.temporal.activity.ActivityExecutionContext;
78
import io.temporal.activity.ActivityInfo;
89
import io.temporal.api.common.v1.Payloads;
@@ -74,6 +75,8 @@ static long getLocalHeartbeatTimeoutBufferMillis() {
7475
private boolean heartbeatTimedOut;
7576

7677
private ActivityCompletionException lastException;
78+
private final ActivityCancellationTokenImpl cancellationToken =
79+
new ActivityCancellationTokenImpl();
7780

7881
public HeartbeatContextImpl(
7982
WorkflowServiceStubs service,
@@ -151,6 +154,7 @@ public <V> void heartbeat(V details) throws ActivityCompletionException {
151154
checkHeartbeatTimeoutDeadlineLocked();
152155
receivedAHeartbeat = true;
153156
lastDetails = details;
157+
cancellationToken.throwIfCancellationRequested();
154158
hasOutstandingHeartbeat = true;
155159
// Only do sync heartbeat if there is no such call scheduled.
156160
if (scheduledHeartbeat == null) {
@@ -228,6 +232,21 @@ public void cancelOutstandingHeartbeat() {
228232
}
229233
}
230234

235+
@Override
236+
public void cancelFromWorkerCommand() {
237+
lock.lock();
238+
try {
239+
requestCancelLocked();
240+
} finally {
241+
lock.unlock();
242+
}
243+
}
244+
245+
@Override
246+
public ActivityCancellationToken getCancellationToken() {
247+
return cancellationToken;
248+
}
249+
231250
private void doHeartBeatLocked(Object details) {
232251
long nextHeartbeatDelay;
233252
try {
@@ -307,7 +326,7 @@ private void sendHeartbeatRequest(Object details) {
307326
dataConverterWithActivityContext.toPayloads(details),
308327
metricsScope);
309328
if (status.getCancelRequested()) {
310-
lastException = new ActivityCanceledException(info);
329+
requestCancelLocked();
311330
} else if (status.getActivityReset()) {
312331
lastException = new ActivityResetException(info);
313332
} else if (status.getActivityPaused()) {
@@ -327,6 +346,12 @@ private void sendHeartbeatRequest(Object details) {
327346
}
328347
}
329348

349+
private void requestCancelLocked() {
350+
ActivityCanceledException exception = new ActivityCanceledException(info);
351+
lastException = exception;
352+
cancellationToken.requestCancel(exception);
353+
}
354+
330355
private static long getHeartbeatIntervalMs(
331356
Duration activityHeartbeatTimeout,
332357
Duration maxHeartbeatThrottleInterval,

0 commit comments

Comments
 (0)