Skip to content

Commit b3b7806

Browse files
authored
Worker Versioning Annotations & Options (#2463)
1 parent 75f5d1a commit b3b7806

File tree

44 files changed

+1829
-70
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1829
-70
lines changed

.github/workflows/ci.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ jobs:
101101
--dynamic-config-value matching.useNewMatcher=true \
102102
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
103103
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
104-
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
104+
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
105+
--dynamic-config-value system.enableDeploymentVersions=true &
105106
sleep 10s
106107
107108
- name: Run unit tests
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.common;
22+
23+
import io.temporal.worker.WorkerDeploymentOptions;
24+
25+
/** Specifies when a workflow might move from a worker of one Build Id to another. */
26+
@Experimental
27+
public enum VersioningBehavior {
28+
/**
29+
* An unspecified versioning behavior. By default, workers opting into worker versioning will be
30+
* required to specify a behavior. See {@link
31+
* io.temporal.worker.WorkerOptions.Builder#setDeploymentOptions(WorkerDeploymentOptions)}.
32+
*/
33+
UNSPECIFIED,
34+
/** The workflow will be pinned to the current Build ID unless manually moved. */
35+
PINNED,
36+
/**
37+
* The workflow will automatically move to the latest version (default Build ID of the task queue)
38+
* when the next task is dispatched.
39+
*/
40+
AUTO_UPGRADE
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.common;
22+
23+
import java.util.Objects;
24+
import javax.annotation.Nonnull;
25+
import javax.annotation.Nullable;
26+
27+
/** Represents the version of a specific worker deployment. */
28+
@Experimental
29+
public class WorkerDeploymentVersion {
30+
private final String deploymentName;
31+
private final String buildId;
32+
33+
/** Build a worker deployment version from an explicit deployment name and build ID. */
34+
public WorkerDeploymentVersion(@Nonnull String deploymentName, @Nonnull String buildId) {
35+
this.deploymentName = deploymentName;
36+
this.buildId = buildId;
37+
}
38+
39+
/**
40+
* Build a worker deployment version from a canonical string representation.
41+
*
42+
* @param canonicalString The canonical string representation of the worker deployment version,
43+
* formatted as "deploymentName.buildId". Deployment name must not have a "." in it.
44+
* @return A new instance of {@link WorkerDeploymentVersion}.
45+
* @throws IllegalArgumentException if the input string is not in the expected format.
46+
*/
47+
public static WorkerDeploymentVersion fromCanonicalString(String canonicalString) {
48+
String[] parts = canonicalString.split("\\.", 2);
49+
if (parts.length != 2) {
50+
throw new IllegalArgumentException(
51+
"Invalid canonical string format. Expected 'deploymentName.buildId'");
52+
}
53+
return new WorkerDeploymentVersion(parts[0], parts[1]);
54+
}
55+
56+
/**
57+
* @return The canonical string representation of this worker deployment version.
58+
*/
59+
public String toCanonicalString() {
60+
return deploymentName + "." + buildId;
61+
}
62+
63+
/**
64+
* @return The name of the deployment.
65+
*/
66+
@Nullable // Marked nullable for future compatibility with custom strings
67+
public String getDeploymentName() {
68+
return deploymentName;
69+
}
70+
71+
/**
72+
* @return The Build ID of this version.
73+
*/
74+
@Nullable // Marked nullable for future compatibility with custom strings
75+
public String getBuildId() {
76+
return buildId;
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (o == null || getClass() != o.getClass()) return false;
82+
WorkerDeploymentVersion that = (WorkerDeploymentVersion) o;
83+
return Objects.equals(deploymentName, that.deploymentName)
84+
&& Objects.equals(buildId, that.buildId);
85+
}
86+
87+
@Override
88+
public int hashCode() {
89+
return Objects.hash(deploymentName, buildId);
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return "WorkerDeploymentVersion{"
95+
+ "deploymentName='"
96+
+ deploymentName
97+
+ '\''
98+
+ ", buildId='"
99+
+ buildId
100+
+ '\''
101+
+ '}';
102+
}
103+
}

temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowImplMetadata.java

+36
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222

2323
import com.google.common.collect.ImmutableList;
2424
import io.temporal.common.Experimental;
25+
import io.temporal.common.VersioningBehavior;
2526
import io.temporal.internal.common.InternalUtils;
2627
import io.temporal.internal.common.env.ReflectionUtils;
28+
import io.temporal.workflow.WorkflowVersioningBehavior;
2729
import java.lang.reflect.Constructor;
30+
import java.lang.reflect.Method;
2831
import java.util.*;
2932
import java.util.stream.Collectors;
3033
import javax.annotation.Nullable;
@@ -68,6 +71,7 @@ public int hashCode() {
6871
}
6972
}
7073

74+
private final Class<?> implementationClass;
7175
private final List<POJOWorkflowInterfaceMetadata> workflowInterfaces;
7276
private final List<POJOWorkflowMethodMetadata> workflowMethods;
7377
private final List<POJOWorkflowMethodMetadata> signalMethods;
@@ -111,6 +115,7 @@ private POJOWorkflowImplMetadata(
111115
throw new IllegalArgumentException("concrete class expected: " + implClass);
112116
}
113117

118+
implementationClass = implClass;
114119
List<POJOWorkflowInterfaceMetadata> workflowInterfaces = new ArrayList<>();
115120
Map<String, POJOWorkflowMethodMetadata> workflowMethods = new HashMap<>();
116121
Map<String, POJOWorkflowMethodMetadata> queryMethods = new HashMap<>();
@@ -238,4 +243,35 @@ public List<POJOWorkflowMethodMetadata> getUpdateValidatorMethods() {
238243
public @Nullable Constructor<?> getWorkflowInit() {
239244
return workflowInit;
240245
}
246+
247+
/**
248+
* @return The {@link VersioningBehavior} for the workflow method on the implementation class. If
249+
* the method is annotated with {@link WorkflowVersioningBehavior}.
250+
* @throws RuntimeException if the method is not found on the implementation class or is not a
251+
* workflow method.
252+
*/
253+
@Experimental
254+
@Nullable
255+
public static VersioningBehavior getVersioningBehaviorForMethod(
256+
Class<?> implementationClass, POJOWorkflowMethodMetadata workflowMethod) {
257+
Method method = workflowMethod.getWorkflowMethod();
258+
// Find the same method on the implementation class
259+
Method implMethod;
260+
try {
261+
implMethod = implementationClass.getMethod(method.getName(), method.getParameterTypes());
262+
} catch (NoSuchMethodException e) {
263+
throw new RuntimeException(
264+
"Unable to find workflow method "
265+
+ workflowMethod.getName()
266+
+ " in implementation class "
267+
+ implementationClass.getName(),
268+
e);
269+
}
270+
if (implMethod.isAnnotationPresent(WorkflowVersioningBehavior.class)) {
271+
WorkflowVersioningBehavior vb = implMethod.getAnnotation(WorkflowVersioningBehavior.class);
272+
return vb.value();
273+
} else {
274+
return null;
275+
}
276+
}
241277
}

temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowInterfaceMetadata.java

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.common.metadata;
2222

2323
import io.temporal.workflow.*;
24+
import io.temporal.workflow.WorkflowVersioningBehavior;
2425
import java.lang.reflect.Method;
2526
import java.lang.reflect.Modifier;
2627
import java.util.*;
@@ -426,6 +427,14 @@ private static boolean validateAndQualifiedForWorkflowMethod(POJOWorkflowMethod
426427
}
427428
}
428429

430+
if (method.getAnnotation(WorkflowVersioningBehavior.class) != null) {
431+
// This annotation is only allowed in implementation classes, not interfaces
432+
throw new IllegalArgumentException(
433+
"@WorkflowVersioningBehavior annotation is not allowed on interface methods, only on"
434+
+ " implementation methods: "
435+
+ method);
436+
}
437+
429438
if (isAnnotatedWorkflowMethod) {
430439
// all methods explicitly marked with one of workflow method qualifiers
431440
return true;

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ public WorkflowTaskResult handleWorkflowTask(
196196
if (workflowStateMachines.sdkVersionToWrite() != null) {
197197
result.setWriteSdkVersion(workflowStateMachines.sdkVersionToWrite());
198198
}
199+
if (workflow.getWorkflowContext() != null) {
200+
result.setVersioningBehavior(workflow.getWorkflowContext().getVersioningBehavior());
201+
}
199202
return result.build();
200203
} finally {
201204
lock.unlock();

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ private Result createCompletedWFTRequest(
231231
result.getNonfirstLocalActivityAttempts())
232232
.build())
233233
.setReturnNewWorkflowTask(result.isForceWorkflowTask())
234+
.setVersioningBehavior(
235+
WorkerVersioningProtoUtils.behaviorToProto(result.getVersioningBehavior()))
234236
.setCapabilities(
235237
RespondWorkflowTaskCompletedRequest.Capabilities.newBuilder()
236238
.setDiscardSpeculativeWorkflowTaskWithEvents(true)

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.internal.replay;
2222

2323
import io.temporal.api.failure.v1.Failure;
24+
import io.temporal.common.VersioningBehavior;
2425
import io.temporal.common.context.ContextPropagator;
2526
import io.temporal.internal.sync.SignalHandlerInfo;
2627
import io.temporal.internal.sync.UpdateHandlerInfo;
@@ -72,4 +73,6 @@ public interface WorkflowContext {
7273
Map<Long, SignalHandlerInfo> getRunningSignalHandlers();
7374

7475
Map<String, UpdateHandlerInfo> getRunningUpdateHandlers();
76+
77+
VersioningBehavior getVersioningBehavior();
7578
}

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.api.command.v1.Command;
2424
import io.temporal.api.protocol.v1.Message;
2525
import io.temporal.api.query.v1.WorkflowQueryResult;
26+
import io.temporal.common.VersioningBehavior;
2627
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Map;
@@ -43,6 +44,7 @@ public static final class Builder {
4344
private List<Integer> sdkFlags;
4445
private String writeSdkName;
4546
private String writeSdkVersion;
47+
private VersioningBehavior versioningBehavior;
4648

4749
public Builder setCommands(List<Command> commands) {
4850
this.commands = commands;
@@ -89,6 +91,11 @@ public Builder setWriteSdkVersion(String writeSdkVersion) {
8991
return this;
9092
}
9193

94+
public Builder setVersioningBehavior(VersioningBehavior versioningBehavior) {
95+
this.versioningBehavior = versioningBehavior;
96+
return this;
97+
}
98+
9299
public WorkflowTaskResult build() {
93100
return new WorkflowTaskResult(
94101
commands == null ? Collections.emptyList() : commands,
@@ -99,7 +106,8 @@ public WorkflowTaskResult build() {
99106
nonfirstLocalActivityAttempts,
100107
sdkFlags == null ? Collections.emptyList() : sdkFlags,
101108
writeSdkName,
102-
writeSdkVersion);
109+
writeSdkVersion,
110+
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior);
103111
}
104112
}
105113

@@ -112,6 +120,7 @@ public WorkflowTaskResult build() {
112120
private final List<Integer> sdkFlags;
113121
private final String writeSdkName;
114122
private final String writeSdkVersion;
123+
private final VersioningBehavior versioningBehavior;
115124

116125
private WorkflowTaskResult(
117126
List<Command> commands,
@@ -122,7 +131,8 @@ private WorkflowTaskResult(
122131
int nonfirstLocalActivityAttempts,
123132
List<Integer> sdkFlags,
124133
String writeSdkName,
125-
String writeSdkVersion) {
134+
String writeSdkVersion,
135+
VersioningBehavior versioningBehavior) {
126136
this.commands = commands;
127137
this.messages = messages;
128138
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
@@ -135,6 +145,7 @@ private WorkflowTaskResult(
135145
this.sdkFlags = sdkFlags;
136146
this.writeSdkName = writeSdkName;
137147
this.writeSdkVersion = writeSdkVersion;
148+
this.versioningBehavior = versioningBehavior;
138149
}
139150

140151
public List<Command> getCommands() {
@@ -173,4 +184,8 @@ public String getWriteSdkName() {
173184
public String getWriteSdkVersion() {
174185
return writeSdkVersion;
175186
}
187+
188+
public VersioningBehavior getVersioningBehavior() {
189+
return versioningBehavior;
190+
}
176191
}

temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.internal.sync;
2222

2323
import io.temporal.api.common.v1.Payloads;
24+
import io.temporal.common.VersioningBehavior;
2425
import io.temporal.common.converter.DataConverter;
2526
import io.temporal.common.converter.EncodedValues;
2627
import io.temporal.common.converter.Values;
@@ -57,6 +58,7 @@ public void initialize(Optional<Payloads> input) {
5758
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
5859
RootWorkflowInboundCallsInterceptor rootWorkflowInvoker =
5960
new RootWorkflowInboundCallsInterceptor(workflowContext, input);
61+
this.rootWorkflowInvoker = rootWorkflowInvoker;
6062
workflowInvoker = rootWorkflowInvoker;
6163
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
6264
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
@@ -81,6 +83,14 @@ public Object getInstance() {
8183
return rootWorkflowInvoker.getInstance();
8284
}
8385

86+
@Override
87+
public VersioningBehavior getVersioningBehavior() {
88+
if (rootWorkflowInvoker == null || rootWorkflowInvoker.workflow == null) {
89+
return VersioningBehavior.UNSPECIFIED;
90+
}
91+
return rootWorkflowInvoker.workflow.getVersioningBehavior();
92+
}
93+
8494
class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
8595
private DynamicWorkflow workflow;
8696
private Optional<Payloads> input;

0 commit comments

Comments
 (0)