-
Notifications
You must be signed in to change notification settings - Fork 219
Implement nexus-based activity cancels #2917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
56fac19
75577e2
6463541
356493b
9fb3b05
e0cb83d
1ac2aa9
c5ab593
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| package io.temporal.activity; | ||
|
|
||
| import io.temporal.client.ActivityCanceledException; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** Token that allows an Activity implementation to observe cancellation requests. */ | ||
| @Experimental | ||
| public interface ActivityCancellationToken { | ||
|
|
||
| ActivityCancellationToken NONE = | ||
| new ActivityCancellationToken() { | ||
| @Override | ||
| public boolean isCancellationRequested() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public void throwIfCancellationRequested() throws ActivityCanceledException {} | ||
|
|
||
| @Override | ||
| public CompletableFuture<Void> getCancellationFuture() { | ||
| return new CompletableFuture<>(); | ||
| } | ||
| }; | ||
|
|
||
| /** | ||
| * Returns true after cancellation has been requested for this Activity Execution. | ||
| * | ||
| * <p>If this method returns true, the Activity implementation should stop its work and usually | ||
| * call {@link #throwIfCancellationRequested()} to report successful cancellation to Temporal. | ||
| */ | ||
| boolean isCancellationRequested(); | ||
|
|
||
| /** | ||
| * Throws {@link ActivityCanceledException} if cancellation has been requested for this Activity | ||
| * Execution. | ||
| * | ||
| * <p>Rethrowing this exception from Activity code reports successful cancellation to Temporal. | ||
| */ | ||
| void throwIfCancellationRequested() throws ActivityCanceledException; | ||
|
|
||
| /** | ||
| * Future that completes exceptionally with {@link ActivityCanceledException} when cancellation | ||
| * has been requested for this Activity Execution. | ||
| * | ||
| * <p>Activity code should still call {@link #throwIfCancellationRequested()} or otherwise report | ||
| * cancellation if it wants the Activity Execution to complete as canceled. | ||
| */ | ||
| CompletableFuture<Void> getCancellationFuture(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,51 @@ | ||||||||||||||||||||||||
| package io.temporal.internal.activity; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import io.temporal.activity.ActivityCancellationToken; | ||||||||||||||||||||||||
| import io.temporal.client.ActivityCanceledException; | ||||||||||||||||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||||||||||||||||
| import java.util.concurrent.CompletionException; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| final class ActivityCancellationTokenImpl implements ActivityCancellationToken { | ||||||||||||||||||||||||
| private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>(); | ||||||||||||||||||||||||
| private volatile ActivityCanceledException cancellationException; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||
| public boolean isCancellationRequested() { | ||||||||||||||||||||||||
| return cancellationException != null; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||
| public void throwIfCancellationRequested() throws ActivityCanceledException { | ||||||||||||||||||||||||
| ActivityCanceledException exception = cancellationException; | ||||||||||||||||||||||||
| if (exception != null) { | ||||||||||||||||||||||||
| throw exception; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||
| public CompletableFuture<Void> getCancellationFuture() { | ||||||||||||||||||||||||
| CompletableFuture<Void> result = new CompletableFuture<>(); | ||||||||||||||||||||||||
| cancellationFuture.whenComplete( | ||||||||||||||||||||||||
| (ignored, exception) -> { | ||||||||||||||||||||||||
| if (exception == null) { | ||||||||||||||||||||||||
| result.complete(null); | ||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||
| result.completeExceptionally(unwrapCompletionException(exception)); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||
|
Comment on lines
+27
to
+36
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this can be simplified:
Suggested change
This will return a new dependent future on each call, so calling
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't actually work since the unwrapped exception is checked unfortunately. End up needing to change the unwrapper to be just as long as this more or less. |
||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| synchronized void requestCancel(ActivityCanceledException exception) { | ||||||||||||||||||||||||
| if (cancellationException == null) { | ||||||||||||||||||||||||
| cancellationException = exception; | ||||||||||||||||||||||||
| cancellationFuture.completeExceptionally(exception); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| private static Throwable unwrapCompletionException(Throwable exception) { | ||||||||||||||||||||||||
| return exception instanceof CompletionException && exception.getCause() != null | ||||||||||||||||||||||||
| ? exception.getCause() | ||||||||||||||||||||||||
| : exception; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.