Skip to content

Commit ce56376

Browse files
committed
feat(otel): add tracing for startWithUpdate. fixes #2620.
1 parent 8d8ca1b commit ce56376

4 files changed

Lines changed: 154 additions & 0 deletions

File tree

contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
public enum SpanOperationType {
44
START_WORKFLOW("StartWorkflow"),
55
SIGNAL_WITH_START_WORKFLOW("SignalWithStartWorkflow"),
6+
UPDATE_WITH_START_WORKFLOW("UpdateWithStartWorkflow"),
67
RUN_WORKFLOW("RunWorkflow"),
78
START_CHILD_WORKFLOW("StartChildWorkflow"),
89
START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"),

contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
5555
switch (operationType) {
5656
case START_WORKFLOW:
5757
case SIGNAL_WITH_START_WORKFLOW:
58+
case UPDATE_WITH_START_WORKFLOW:
5859
return ImmutableMap.of(StandardTagNames.WORKFLOW_ID, context.getWorkflowId());
5960
case START_CHILD_WORKFLOW:
6061
return ImmutableMap.of(

contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,28 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
7878
}
7979
}
8080

81+
@Override
82+
public <R> WorkflowUpdateWithStartOutput<R> updateWithStart(
83+
WorkflowUpdateWithStartInput<R> input) {
84+
WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
85+
StartUpdateInput<R> startUpdateInput = input.getStartUpdateInput();
86+
Span workflowStartSpan =
87+
contextAccessor.writeSpanContextToHeader(
88+
() ->
89+
createWorkflowStartSpanBuilder(
90+
workflowStartInput, SpanOperationType.UPDATE_WITH_START_WORKFLOW)
91+
.start(),
92+
workflowStartInput.getHeader(),
93+
tracer);
94+
contextAccessor.writeSpanContextToHeader(
95+
workflowStartSpan.context(), startUpdateInput.getHeader(), tracer);
96+
try (Scope ignored = tracer.scopeManager().activate(workflowStartSpan)) {
97+
return super.updateWithStart(input);
98+
} finally {
99+
workflowStartSpan.finish();
100+
}
101+
}
102+
81103
@Override
82104
public <R> QueryOutput<R> query(QueryInput<R> input) {
83105
Span workflowQuerySpan =
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package io.temporal.opentracing;
2+
3+
import static org.junit.Assert.*;
4+
5+
import io.opentracing.Scope;
6+
import io.opentracing.Span;
7+
import io.opentracing.mock.MockSpan;
8+
import io.opentracing.mock.MockTracer;
9+
import io.opentracing.util.ThreadLocalScopeManager;
10+
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
11+
import io.temporal.client.*;
12+
import io.temporal.testing.internal.SDKTestWorkflowRule;
13+
import io.temporal.worker.WorkerFactoryOptions;
14+
import io.temporal.workflow.CompletablePromise;
15+
import io.temporal.workflow.UpdateMethod;
16+
import io.temporal.workflow.Workflow;
17+
import io.temporal.workflow.WorkflowInterface;
18+
import io.temporal.workflow.WorkflowMethod;
19+
import java.util.Arrays;
20+
import java.util.HashSet;
21+
import java.util.List;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
24+
import org.junit.After;
25+
import org.junit.Rule;
26+
import org.junit.Test;
27+
28+
public class UpdateWithStartTest {
29+
30+
private static final MockTracer mockTracer =
31+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
32+
33+
private final OpenTracingOptions OT_OPTIONS =
34+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
35+
36+
@Rule
37+
public SDKTestWorkflowRule testWorkflowRule =
38+
SDKTestWorkflowRule.newBuilder()
39+
.setWorkflowClientOptions(
40+
WorkflowClientOptions.newBuilder()
41+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
42+
.validateAndBuildWithDefaults())
43+
.setWorkerFactoryOptions(
44+
WorkerFactoryOptions.newBuilder()
45+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
46+
.validateAndBuildWithDefaults())
47+
.setWorkflowTypes(WorkflowImpl.class)
48+
.build();
49+
50+
@After
51+
public void tearDown() {
52+
mockTracer.reset();
53+
}
54+
55+
@WorkflowInterface
56+
public interface TestWorkflow {
57+
@WorkflowMethod
58+
String workflow(String input);
59+
60+
@UpdateMethod
61+
String update(String value);
62+
}
63+
64+
public static class WorkflowImpl implements TestWorkflow {
65+
66+
private final CompletablePromise<Void> promise = Workflow.newPromise();
67+
private String value;
68+
69+
@Override
70+
public String workflow(String input) {
71+
promise.get();
72+
return value;
73+
}
74+
75+
@Override
76+
public String update(String value) {
77+
this.value = value;
78+
promise.complete(null);
79+
return value;
80+
}
81+
}
82+
83+
@Test
84+
public void updateWithStart() {
85+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
86+
TestWorkflow workflow =
87+
client.newWorkflowStub(
88+
TestWorkflow.class,
89+
WorkflowOptions.newBuilder()
90+
.setTaskQueue(testWorkflowRule.getTaskQueue())
91+
.setWorkflowIdConflictPolicy(
92+
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL)
93+
.validateBuildWithDefaults());
94+
95+
Span span = mockTracer.buildSpan("ClientFunction").start();
96+
97+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
98+
WithStartWorkflowOperation<String> startOp =
99+
new WithStartWorkflowOperation<>(workflow::workflow, "input");
100+
WorkflowClient.executeUpdateWithStart(
101+
workflow::update,
102+
"update",
103+
UpdateOptions.<String>newBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(),
104+
startOp);
105+
} finally {
106+
span.finish();
107+
}
108+
109+
WorkflowStub.fromTyped(workflow).getResult(String.class);
110+
OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans());
111+
MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction");
112+
MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0);
113+
114+
assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId());
115+
assertEquals("UpdateWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName());
116+
117+
// updateWithStart propagates the start span context into both the StartWorkflow and
118+
// UpdateWorkflow operation headers
119+
List<MockSpan> workflowSpans = spansHelper.getByParentSpan(workflowStartSpan);
120+
assertEquals(2, workflowSpans.size());
121+
for (MockSpan workflowSpan : workflowSpans) {
122+
assertEquals(workflowStartSpan.context().spanId(), workflowSpan.parentId());
123+
}
124+
Set<String> operationNames =
125+
workflowSpans.stream().map(MockSpan::operationName).collect(Collectors.toSet());
126+
assertEquals(
127+
new HashSet<>(Arrays.asList("HandleUpdate:update", "RunWorkflow:TestWorkflow")),
128+
operationNames);
129+
}
130+
}

0 commit comments

Comments
 (0)