From 31e35cd9aef71b656522e3ffe94c5d3d8b0fb3e6 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 3 Feb 2026 23:07:27 -0800 Subject: [PATCH 1/9] Store worker_instance_key in ActivityInfo --- api/persistence/v1/executions.pb.go | 18 +++++++++++---- go.mod | 1 + go.sum | 8 ++----- .../api/persistence/v1/executions.proto | 3 +++ .../api/recordactivitytaskstarted/api.go | 1 + .../api/respondactivitytaskcompleted/api.go | 23 ++++++++++--------- .../workflow_task_completed_handler.go | 1 + service/history/history_engine_test.go | 1 + service/history/interfaces/mutable_state.go | 1 + .../history/workflow/mutable_state_impl.go | 4 ++++ ...utable_state_impl_restart_activity_test.go | 1 + .../workflow/mutable_state_impl_test.go | 2 ++ 12 files changed, 43 insertions(+), 21 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 2000a2538d..da2dbced4e 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -2521,8 +2521,10 @@ type ActivityInfo struct { // set to true if reset heartbeat flag was set with an activity reset ResetHeartbeats bool `protobuf:"varint,48,opt,name=reset_heartbeats,json=resetHeartbeats,proto3" json:"reset_heartbeats,omitempty"` StartVersion int64 `protobuf:"varint,50,opt,name=start_version,json=startVersion,proto3" json:"start_version,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Worker instance key of the worker executing this activity. + WorkerInstanceKey string `protobuf:"bytes,51,opt,name=worker_instance_key,json=workerInstanceKey,proto3" json:"worker_instance_key,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityInfo) Reset() { @@ -2895,6 +2897,13 @@ func (x *ActivityInfo) GetStartVersion() int64 { return 0 } +func (x *ActivityInfo) GetWorkerInstanceKey() string { + if x != nil { + return x.WorkerInstanceKey + } + return "" +} + type isActivityInfo_BuildIdInfo interface { isActivityInfo_BuildIdInfo() } @@ -4840,7 +4849,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x17NexusInvocationTaskInfo\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattempt\"4\n" + "\x18NexusCancelationTaskInfo\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"\x9e\x1b\n" + + "\aattempt\x18\x01 \x01(\x05R\aattempt\"\xce\x1b\n" + "\fActivityInfo\x12\x18\n" + "\aversion\x18\x01 \x01(\x03R\aversion\x127\n" + "\x18scheduled_event_batch_id\x18\x02 \x01(\x03R\x15scheduledEventBatchId\x12A\n" + @@ -4893,7 +4902,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "pause_info\x18. \x01(\v2:.temporal.server.api.persistence.v1.ActivityInfo.PauseInfoR\tpauseInfo\x12%\n" + "\x0eactivity_reset\x18/ \x01(\bR\ractivityReset\x12)\n" + "\x10reset_heartbeats\x180 \x01(\bR\x0fresetHeartbeats\x12#\n" + - "\rstart_version\x182 \x01(\x03R\fstartVersion\x1ay\n" + + "\rstart_version\x182 \x01(\x03R\fstartVersion\x12.\n" + + "\x13worker_instance_key\x183 \x01(\tR\x11workerInstanceKey\x1ay\n" + "\x16UseWorkflowBuildIdInfo\x12+\n" + "\x12last_used_build_id\x18\x01 \x01(\tR\x0flastUsedBuildId\x122\n" + "\x15last_redirect_counter\x18\x02 \x01(\x03R\x13lastRedirectCounter\x1a\x89\x02\n" + diff --git a/go.mod b/go.mod index 60ad1aa45a..d9bc2d26b6 100644 --- a/go.mod +++ b/go.mod @@ -173,3 +173,4 @@ require ( modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) + diff --git a/go.sum b/go.sum index 2cd1258aaf..a4c698ef6a 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5 h1:Van9KGGs8lcDgxzSNFbDhEMNeJ80TbBxwZ45f9iBk9U= -github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= +github.com/nexus-rpc/sdk-go v0.5.1 h1:UFYYfoHlQc+Pn9gQpmn9QE7xluewAn2AO1OSkAh7YFU= +github.com/nexus-rpc/sdk-go v0.5.1/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -317,8 +317,6 @@ github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb/go.mod h1:143 github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938 h1:sEJGhmDo+0FaPWM6f0v8Tjia0H5pR6/Baj6+kS78B+M= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938/go.mod h1:ezRQRwu9KQXy8Wuuv1aaFFxoCNz5CeNbVOOkh3xctbY= -github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= -github.com/tidwall/btree v1.8.1/go.mod h1:jBbTdUWhSZClZWoDg54VnvV7/54modSOzDN7VXftj1A= github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber-common/bark v1.0.0/go.mod h1:g0ZuPcD7XiExKHynr93Q742G/sbrdVQkghrqLGOoFuY= @@ -375,8 +373,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= -go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 0a025649ae..bbf99aa918 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -631,6 +631,9 @@ message ActivityInfo { bool reset_heartbeats = 48; int64 start_version = 50; + + // Worker instance key of the worker executing this activity. + string worker_instance_key = 51; } // timer_map column diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 05c65076b7..381cbf94b5 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -242,6 +242,7 @@ func recordActivityTaskStarted( if _, err := mutableState.AddActivityTaskStartedEvent( ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(), versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(), + request.PollRequest.GetWorkerInstanceKey(), ); err != nil { return nil, rejectCodeUndefined, err } diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index 69764f9fd3..2186d0c2ba 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -88,17 +88,18 @@ func Invoke( // we need to force complete an activity fabricateStartedEvent = ai.StartedEventId == common.EmptyEventID if fabricateStartedEvent { - _, err := mutableState.AddActivityTaskStartedEvent( - ai, - scheduledEventID, - "", - req.GetCompleteRequest().GetIdentity(), - nil, - nil, - // TODO (shahab): do we need to do anything with wf redirect in this case or any - // other case where an activity starts? - nil, - ) + _, err := mutableState.AddActivityTaskStartedEvent( + ai, + scheduledEventID, + "", + req.GetCompleteRequest().GetIdentity(), + nil, + nil, + // TODO (shahab): do we need to do anything with wf redirect in this case or any + // other case where an activity starts? + nil, + "", // workerInstanceKey not available for force complete + ) if err != nil { return nil, err } diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index 0b526b4c7f..615f0ee092 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -559,6 +559,7 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi stamp, nil, nil, + "", // workerInstanceKey not available for eager dispatch ); err != nil { return nil, err } diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 8b888ca4e1..165782b3ef 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -6679,6 +6679,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6 nil, nil, nil, + "", ) return event } diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index 68aedfd2ac..ae6c59d276 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -57,6 +57,7 @@ type ( *commonpb.WorkerVersionStamp, *deploymentpb.Deployment, *taskqueuespb.BuildIdRedirectInfo, + string, // workerInstanceKey ) (*historypb.HistoryEvent, error) AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error) AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e0b21c8342..6d49ad34b6 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4057,6 +4057,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, deployment *deploymentpb.Deployment, redirectInfo *taskqueuespb.BuildIdRedirectInfo, + workerInstanceKey string, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskStarted err := ms.checkMutability(opTag) @@ -4085,6 +4086,8 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( ai.LastDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment) } + ai.WorkerInstanceKey = workerInstanceKey + if !ai.HasRetryPolicy { event := ms.hBuilder.AddActivityTaskStartedEvent( scheduledEventID, @@ -4116,6 +4119,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( activityInfo.RequestId = requestID activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now()) activityInfo.StartedIdentity = identity + activityInfo.WorkerInstanceKey = workerInstanceKey return nil }); err != nil { return nil, err diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 9337a98fff..3cb91b7fbd 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -419,6 +419,7 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp nil, nil, nil, + "", ) s.NoError(err) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 2b302fd7da..f5a205e715 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2878,6 +2878,7 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() { nil, nil, nil, + "", ) s.NoError(err) @@ -2943,6 +2944,7 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() { nil, nil, nil, + "", ) s.NoError(err) From 2874f4e17e45069745fb490cf9b7d873e9d15f70 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 5 Feb 2026 09:53:22 -0800 Subject: [PATCH 2/9] Add unit test --- api/persistence/v1/executions.pb.go | 2 +- go.mod | 1 - go.sum | 2 + .../api/persistence/v1/executions.proto | 2 +- .../history/interfaces/mutable_state_mock.go | 8 +- .../workflow/mutable_state_impl_test.go | 73 +++++++++++++++++++ 6 files changed, 81 insertions(+), 7 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index da2dbced4e..2866fc4f98 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -2521,7 +2521,7 @@ type ActivityInfo struct { // set to true if reset heartbeat flag was set with an activity reset ResetHeartbeats bool `protobuf:"varint,48,opt,name=reset_heartbeats,json=resetHeartbeats,proto3" json:"reset_heartbeats,omitempty"` StartVersion int64 `protobuf:"varint,50,opt,name=start_version,json=startVersion,proto3" json:"start_version,omitempty"` - // Worker instance key of the worker executing this activity. + // Unique identifier of the worker that is this activity. WorkerInstanceKey string `protobuf:"bytes,51,opt,name=worker_instance_key,json=workerInstanceKey,proto3" json:"worker_instance_key,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache diff --git a/go.mod b/go.mod index d9bc2d26b6..60ad1aa45a 100644 --- a/go.mod +++ b/go.mod @@ -173,4 +173,3 @@ require ( modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) - diff --git a/go.sum b/go.sum index a4c698ef6a..e8cf371965 100644 --- a/go.sum +++ b/go.sum @@ -373,6 +373,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index bbf99aa918..7912db4a1a 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -632,7 +632,7 @@ message ActivityInfo { int64 start_version = 50; - // Worker instance key of the worker executing this activity. + // Unique identifier of the worker that is this activity. string worker_instance_key = 51; } diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 103a53840e..9b89e38359 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -148,18 +148,18 @@ func (mr *MockMutableStateMockRecorder) AddActivityTaskScheduledEvent(arg0, arg1 } // AddActivityTaskStartedEvent mocks base method. -func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo) (*history.HistoryEvent, error) { +func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7 string) (*history.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddActivityTaskStartedEvent indicates an expected call of AddActivityTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // AddActivityTaskTimedOutEvent mocks base method. diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index f5a205e715..18d77f0a41 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -6152,3 +6152,76 @@ func (s *mutableStateSuite) TestSetContextMetadata() { s.True(ok) s.Equal(taskQueue, tq) } + +func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceKey() { + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + + // Setup workflow execution + _, err := s.mutableState.AddWorkflowExecutionStartedEvent( + &commonpb.WorkflowExecution{WorkflowId: tests.WorkflowID, RunId: tests.RunID}, + &historyservice.StartWorkflowExecutionRequest{ + NamespaceId: tests.NamespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: "workflow-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"}, + WorkflowRunTimeout: durationpb.New(200 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + di, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + _, _, err = s.mutableState.AddWorkflowTaskStartedEvent( + di.ScheduledEventID, + di.RequestID, + di.TaskQueue, + "identity", + nil, + nil, + nil, + false, + nil, + ) + s.NoError(err) + _, err = s.mutableState.AddWorkflowTaskCompletedEvent( + di, + &workflowservice.RespondWorkflowTaskCompletedRequest{Identity: "identity"}, + workflowTaskCompletionLimits, + ) + s.NoError(err) + + // Schedule activity + workflowTaskCompletedEventID := int64(4) + _, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent( + workflowTaskCompletedEventID, + &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "test-activity-1", + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + }, + false, + ) + s.NoError(err) + s.Empty(activityInfo.WorkerInstanceKey, "WorkerInstanceKey should be empty before activity starts") + + // Start activity with workerInstanceKey + expectedWorkerInstanceKey := "test-worker-instance-key-12345" + _, err = s.mutableState.AddActivityTaskStartedEvent( + activityInfo, + activityInfo.ScheduledEventId, + uuid.NewString(), + "worker-identity", + nil, + nil, + nil, + expectedWorkerInstanceKey, + ) + s.NoError(err) + + // Verify workerInstanceKey is stored + updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) + s.True(ok) + s.Equal(expectedWorkerInstanceKey, updatedActivityInfo.WorkerInstanceKey) +} From ad8c4809150c73fe46d3026e35b01b1ce580b8e3 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 5 Feb 2026 13:16:54 -0800 Subject: [PATCH 3/9] Fix lint --- .../api/respondactivitytaskcompleted/api.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index 2186d0c2ba..d9c5a8d665 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -88,18 +88,18 @@ func Invoke( // we need to force complete an activity fabricateStartedEvent = ai.StartedEventId == common.EmptyEventID if fabricateStartedEvent { - _, err := mutableState.AddActivityTaskStartedEvent( - ai, - scheduledEventID, - "", - req.GetCompleteRequest().GetIdentity(), - nil, - nil, - // TODO (shahab): do we need to do anything with wf redirect in this case or any - // other case where an activity starts? - nil, - "", // workerInstanceKey not available for force complete - ) + _, err := mutableState.AddActivityTaskStartedEvent( + ai, + scheduledEventID, + "", + req.GetCompleteRequest().GetIdentity(), + nil, + nil, + // TODO (shahab): do we need to do anything with wf redirect in this case or any + // other case where an activity starts? + nil, + "", // workerInstanceKey not available for force complete + ) if err != nil { return nil, err } From 986e9a140d607ae770d8066a478589a8bed2e145 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 9 Feb 2026 11:46:16 -0800 Subject: [PATCH 4/9] Remove redundant WorkerInstanceKey assignment in UpdateActivity callback --- service/history/workflow/mutable_state_impl.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 6d49ad34b6..7e2fb318ff 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4119,7 +4119,6 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( activityInfo.RequestId = requestID activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now()) activityInfo.StartedIdentity = identity - activityInfo.WorkerInstanceKey = workerInstanceKey return nil }); err != nil { return nil, err From fb3066c0c968967d47b79bee9827039262ba9208 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 11 Feb 2026 11:37:02 -0800 Subject: [PATCH 5/9] Store worker_control_task_queue in ActivityInfo Add workerControlTaskQueue parameter to AddActivityTaskStartedEvent and persist it in ActivityInfo when an activity starts. This enables routing activity cancellation requests to the correct worker's control queue via Nexus. Changes: - Add worker_control_task_queue field to ActivityInfo proto - Update MutableState interface and implementation - Pass workerControlTaskQueue from poll request for regular activities - Pass from RespondWorkflowTaskCompleted request for eager activities - Update all test call sites --- api/persistence/v1/executions.pb.go | 18 ++++++++++++++---- .../server/api/persistence/v1/executions.proto | 3 +++ .../api/recordactivitytaskstarted/api.go | 1 + .../api/respondactivitytaskcompleted/api.go | 1 + .../api/respondworkflowtaskcompleted/api.go | 2 ++ .../workflow_task_completed_handler.go | 11 +++++++++-- .../workflow_task_completed_handler_test.go | 2 ++ service/history/history_engine_test.go | 1 + service/history/interfaces/mutable_state.go | 1 + .../history/interfaces/mutable_state_mock.go | 8 ++++---- service/history/workflow/mutable_state_impl.go | 2 ++ ...mutable_state_impl_restart_activity_test.go | 1 + .../workflow/mutable_state_impl_test.go | 9 +++++++-- 13 files changed, 48 insertions(+), 12 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 2866fc4f98..1fc0ca3b79 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -2523,8 +2523,10 @@ type ActivityInfo struct { StartVersion int64 `protobuf:"varint,50,opt,name=start_version,json=startVersion,proto3" json:"start_version,omitempty"` // Unique identifier of the worker that is this activity. WorkerInstanceKey string `protobuf:"bytes,51,opt,name=worker_instance_key,json=workerInstanceKey,proto3" json:"worker_instance_key,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // The task queue on which the server will send control tasks to the worker running this activity. + WorkerControlTaskQueue string `protobuf:"bytes,52,opt,name=worker_control_task_queue,json=workerControlTaskQueue,proto3" json:"worker_control_task_queue,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityInfo) Reset() { @@ -2904,6 +2906,13 @@ func (x *ActivityInfo) GetWorkerInstanceKey() string { return "" } +func (x *ActivityInfo) GetWorkerControlTaskQueue() string { + if x != nil { + return x.WorkerControlTaskQueue + } + return "" +} + type isActivityInfo_BuildIdInfo interface { isActivityInfo_BuildIdInfo() } @@ -4849,7 +4858,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x17NexusInvocationTaskInfo\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattempt\"4\n" + "\x18NexusCancelationTaskInfo\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"\xce\x1b\n" + + "\aattempt\x18\x01 \x01(\x05R\aattempt\"\x89\x1c\n" + "\fActivityInfo\x12\x18\n" + "\aversion\x18\x01 \x01(\x03R\aversion\x127\n" + "\x18scheduled_event_batch_id\x18\x02 \x01(\x03R\x15scheduledEventBatchId\x12A\n" + @@ -4903,7 +4912,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x0eactivity_reset\x18/ \x01(\bR\ractivityReset\x12)\n" + "\x10reset_heartbeats\x180 \x01(\bR\x0fresetHeartbeats\x12#\n" + "\rstart_version\x182 \x01(\x03R\fstartVersion\x12.\n" + - "\x13worker_instance_key\x183 \x01(\tR\x11workerInstanceKey\x1ay\n" + + "\x13worker_instance_key\x183 \x01(\tR\x11workerInstanceKey\x129\n" + + "\x19worker_control_task_queue\x184 \x01(\tR\x16workerControlTaskQueue\x1ay\n" + "\x16UseWorkflowBuildIdInfo\x12+\n" + "\x12last_used_build_id\x18\x01 \x01(\tR\x0flastUsedBuildId\x122\n" + "\x15last_redirect_counter\x18\x02 \x01(\x03R\x13lastRedirectCounter\x1a\x89\x02\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 7912db4a1a..8401ceb858 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -634,6 +634,9 @@ message ActivityInfo { // Unique identifier of the worker that is this activity. string worker_instance_key = 51; + + // The task queue on which the server will send control tasks to the worker running this activity. + string worker_control_task_queue = 52; } // timer_map column diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 381cbf94b5..976c5e11a0 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -243,6 +243,7 @@ func recordActivityTaskStarted( ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(), versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(), request.PollRequest.GetWorkerInstanceKey(), + request.PollRequest.GetWorkerControlTaskQueue(), ); err != nil { return nil, rejectCodeUndefined, err } diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index d9c5a8d665..a641f09edc 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -99,6 +99,7 @@ func Invoke( // other case where an activity starts? nil, "", // workerInstanceKey not available for force complete + "", // workerControlTaskQueue not available for force complete ) if err != nil { return nil, err diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 310dc4c1ae..0fd22aa980 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -389,6 +389,8 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( workflowTaskHandler := newWorkflowTaskCompletedHandler( request.GetIdentity(), + request.GetWorkerInstanceKey(), + request.GetWorkerControlTaskQueue(), completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler. ms, updateRegistry, diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index 615f0ee092..8685aae95d 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -53,6 +53,8 @@ type ( workflowTaskCompletedHandler struct { identity string + workerInstanceKey string + workerControlTaskQueue string workflowTaskCompletedID int64 // internal state @@ -104,6 +106,8 @@ type ( func newWorkflowTaskCompletedHandler( identity string, + workerInstanceKey string, + workerControlTaskQueue string, workflowTaskCompletedID int64, mutableState historyi.MutableState, updateRegistry update.Registry, @@ -122,7 +126,9 @@ func newWorkflowTaskCompletedHandler( versionMembershipCache worker_versioning.VersionMembershipCache, ) *workflowTaskCompletedHandler { return &workflowTaskCompletedHandler{ - identity: identity, + identity: identity, + workerInstanceKey: workerInstanceKey, + workerControlTaskQueue: workerControlTaskQueue, workflowTaskCompletedID: workflowTaskCompletedID, // internal state @@ -559,7 +565,8 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi stamp, nil, nil, - "", // workerInstanceKey not available for eager dispatch + handler.workerInstanceKey, + handler.workerControlTaskQueue, ); err != nil { return nil, err } diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go index 0018795df3..4cdaf1f708 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go @@ -84,6 +84,8 @@ func TestCommandProtocolMessage(t *testing.T) { ) out.handler = newWorkflowTaskCompletedHandler( // 😲 t.Name(), // identity + "", // workerInstanceKey + "", // workerControlTaskQueue 123, // workflowTaskCompletedID out.ms, out.updates, diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 165782b3ef..76a9821f5d 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -6680,6 +6680,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6 nil, nil, "", + "", ) return event } diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index ae6c59d276..e1f7b48cd2 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -58,6 +58,7 @@ type ( *deploymentpb.Deployment, *taskqueuespb.BuildIdRedirectInfo, string, // workerInstanceKey + string, // workerControlTaskQueue ) (*historypb.HistoryEvent, error) AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error) AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 9b89e38359..9c249569a3 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -148,18 +148,18 @@ func (mr *MockMutableStateMockRecorder) AddActivityTaskScheduledEvent(arg0, arg1 } // AddActivityTaskStartedEvent mocks base method. -func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7 string) (*history.HistoryEvent, error) { +func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7, arg8 string) (*history.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) + ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddActivityTaskStartedEvent indicates an expected call of AddActivityTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) } // AddActivityTaskTimedOutEvent mocks base method. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 7e2fb318ff..a944703e8c 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4058,6 +4058,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( deployment *deploymentpb.Deployment, redirectInfo *taskqueuespb.BuildIdRedirectInfo, workerInstanceKey string, + workerControlTaskQueue string, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskStarted err := ms.checkMutability(opTag) @@ -4087,6 +4088,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( } ai.WorkerInstanceKey = workerInstanceKey + ai.WorkerControlTaskQueue = workerControlTaskQueue if !ai.HasRetryPolicy { event := ms.hBuilder.AddActivityTaskStartedEvent( diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 3cb91b7fbd..a47174e731 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -420,6 +420,7 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp nil, nil, "", + "", ) s.NoError(err) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 18d77f0a41..1392f8ac5e 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2879,6 +2879,7 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() { nil, nil, "", + "", ) s.NoError(err) @@ -2945,6 +2946,7 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() { nil, nil, "", + "", ) s.NoError(err) @@ -6206,8 +6208,9 @@ func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceK s.NoError(err) s.Empty(activityInfo.WorkerInstanceKey, "WorkerInstanceKey should be empty before activity starts") - // Start activity with workerInstanceKey + // Start activity with workerInstanceKey and workerControlTaskQueue expectedWorkerInstanceKey := "test-worker-instance-key-12345" + expectedWorkerControlTaskQueue := "test-control-queue" _, err = s.mutableState.AddActivityTaskStartedEvent( activityInfo, activityInfo.ScheduledEventId, @@ -6217,11 +6220,13 @@ func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceK nil, nil, expectedWorkerInstanceKey, + expectedWorkerControlTaskQueue, ) s.NoError(err) - // Verify workerInstanceKey is stored + // Verify workerInstanceKey and workerControlTaskQueue are stored updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) s.True(ok) s.Equal(expectedWorkerInstanceKey, updatedActivityInfo.WorkerInstanceKey) + s.Equal(expectedWorkerControlTaskQueue, updatedActivityInfo.WorkerControlTaskQueue) } From 6ef264022a060ac4e43c01e232689b8f90e45a0b Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 11 Feb 2026 13:04:06 -0800 Subject: [PATCH 6/9] Update go.temporal.io/api to include worker_instance_key and worker_control_task_queue fields --- go.mod | 2 ++ go.sum | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 60ad1aa45a..72ac5d39f8 100644 --- a/go.mod +++ b/go.mod @@ -173,3 +173,5 @@ require ( modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) + +replace go.temporal.io/api => /Users/krajah/Code/api-go diff --git a/go.sum b/go.sum index e8cf371965..2c188359f1 100644 --- a/go.sum +++ b/go.sum @@ -317,6 +317,8 @@ github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb/go.mod h1:143 github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938 h1:sEJGhmDo+0FaPWM6f0v8Tjia0H5pR6/Baj6+kS78B+M= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938/go.mod h1:ezRQRwu9KQXy8Wuuv1aaFFxoCNz5CeNbVOOkh3xctbY= +github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= +github.com/tidwall/btree v1.8.1/go.mod h1:jBbTdUWhSZClZWoDg54VnvV7/54modSOzDN7VXftj1A= github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber-common/bark v1.0.0/go.mod h1:g0ZuPcD7XiExKHynr93Q742G/sbrdVQkghrqLGOoFuY= @@ -373,8 +375,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74= -go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2-0.20260211210638-ce51fa5547d4 h1:4IFoNlk70wmZ7TtOxNOW+n8YTxrsp3UaCnxKnP+AqmU= +go.temporal.io/api v1.62.2-0.20260211210638-ce51fa5547d4/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= From eab94b026fb04c067ba203be10b576048e42363d Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 11 Feb 2026 13:14:21 -0800 Subject: [PATCH 7/9] Fix lint --- go.sum | 4 ++-- .../workflow_task_completed_handler.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.sum b/go.sum index 2c188359f1..8c5dd481b9 100644 --- a/go.sum +++ b/go.sum @@ -375,8 +375,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.2-0.20260211210638-ce51fa5547d4 h1:4IFoNlk70wmZ7TtOxNOW+n8YTxrsp3UaCnxKnP+AqmU= -go.temporal.io/api v1.62.2-0.20260211210638-ce51fa5547d4/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2-0.20260211212044-12e31472fd35 h1:5Ig1D6s0oQVnj5blVPwoMqG/jvVaHcWWHXO9Y4B9n/M= +go.temporal.io/api v1.62.2-0.20260211212044-12e31472fd35/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index 8685aae95d..945a452275 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -126,9 +126,9 @@ func newWorkflowTaskCompletedHandler( versionMembershipCache worker_versioning.VersionMembershipCache, ) *workflowTaskCompletedHandler { return &workflowTaskCompletedHandler{ - identity: identity, - workerInstanceKey: workerInstanceKey, - workerControlTaskQueue: workerControlTaskQueue, + identity: identity, + workerInstanceKey: workerInstanceKey, + workerControlTaskQueue: workerControlTaskQueue, workflowTaskCompletedID: workflowTaskCompletedID, // internal state From 0f8e48e73f6cb2a93bbb38470ac368c656ee6eea Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 11 Feb 2026 16:40:46 -0800 Subject: [PATCH 8/9] Remove worker_instance_key as it is not needed --- api/persistence/v1/executions.pb.go | 18 ++++-------------- .../server/api/persistence/v1/executions.proto | 5 +---- .../api/recordactivitytaskstarted/api.go | 1 - .../api/respondactivitytaskcompleted/api.go | 1 - .../api/respondworkflowtaskcompleted/api.go | 1 - .../workflow_task_completed_handler.go | 4 ---- .../workflow_task_completed_handler_test.go | 1 - service/history/history_engine_test.go | 1 - service/history/interfaces/mutable_state.go | 1 - .../history/interfaces/mutable_state_mock.go | 8 ++++---- service/history/workflow/mutable_state_impl.go | 2 -- ...mutable_state_impl_restart_activity_test.go | 1 - .../workflow/mutable_state_impl_test.go | 13 ++++--------- 13 files changed, 13 insertions(+), 44 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 1fc0ca3b79..d14e49bf16 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -2521,10 +2521,8 @@ type ActivityInfo struct { // set to true if reset heartbeat flag was set with an activity reset ResetHeartbeats bool `protobuf:"varint,48,opt,name=reset_heartbeats,json=resetHeartbeats,proto3" json:"reset_heartbeats,omitempty"` StartVersion int64 `protobuf:"varint,50,opt,name=start_version,json=startVersion,proto3" json:"start_version,omitempty"` - // Unique identifier of the worker that is this activity. - WorkerInstanceKey string `protobuf:"bytes,51,opt,name=worker_instance_key,json=workerInstanceKey,proto3" json:"worker_instance_key,omitempty"` // The task queue on which the server will send control tasks to the worker running this activity. - WorkerControlTaskQueue string `protobuf:"bytes,52,opt,name=worker_control_task_queue,json=workerControlTaskQueue,proto3" json:"worker_control_task_queue,omitempty"` + WorkerControlTaskQueue string `protobuf:"bytes,51,opt,name=worker_control_task_queue,json=workerControlTaskQueue,proto3" json:"worker_control_task_queue,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2899,13 +2897,6 @@ func (x *ActivityInfo) GetStartVersion() int64 { return 0 } -func (x *ActivityInfo) GetWorkerInstanceKey() string { - if x != nil { - return x.WorkerInstanceKey - } - return "" -} - func (x *ActivityInfo) GetWorkerControlTaskQueue() string { if x != nil { return x.WorkerControlTaskQueue @@ -4858,7 +4849,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x17NexusInvocationTaskInfo\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattempt\"4\n" + "\x18NexusCancelationTaskInfo\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"\x89\x1c\n" + + "\aattempt\x18\x01 \x01(\x05R\aattempt\"\xd9\x1b\n" + "\fActivityInfo\x12\x18\n" + "\aversion\x18\x01 \x01(\x03R\aversion\x127\n" + "\x18scheduled_event_batch_id\x18\x02 \x01(\x03R\x15scheduledEventBatchId\x12A\n" + @@ -4911,9 +4902,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "pause_info\x18. \x01(\v2:.temporal.server.api.persistence.v1.ActivityInfo.PauseInfoR\tpauseInfo\x12%\n" + "\x0eactivity_reset\x18/ \x01(\bR\ractivityReset\x12)\n" + "\x10reset_heartbeats\x180 \x01(\bR\x0fresetHeartbeats\x12#\n" + - "\rstart_version\x182 \x01(\x03R\fstartVersion\x12.\n" + - "\x13worker_instance_key\x183 \x01(\tR\x11workerInstanceKey\x129\n" + - "\x19worker_control_task_queue\x184 \x01(\tR\x16workerControlTaskQueue\x1ay\n" + + "\rstart_version\x182 \x01(\x03R\fstartVersion\x129\n" + + "\x19worker_control_task_queue\x183 \x01(\tR\x16workerControlTaskQueue\x1ay\n" + "\x16UseWorkflowBuildIdInfo\x12+\n" + "\x12last_used_build_id\x18\x01 \x01(\tR\x0flastUsedBuildId\x122\n" + "\x15last_redirect_counter\x18\x02 \x01(\x03R\x13lastRedirectCounter\x1a\x89\x02\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 8401ceb858..fc3cf5b220 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -632,11 +632,8 @@ message ActivityInfo { int64 start_version = 50; - // Unique identifier of the worker that is this activity. - string worker_instance_key = 51; - // The task queue on which the server will send control tasks to the worker running this activity. - string worker_control_task_queue = 52; + string worker_control_task_queue = 51; } // timer_map column diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 976c5e11a0..bb8fd4c56c 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -242,7 +242,6 @@ func recordActivityTaskStarted( if _, err := mutableState.AddActivityTaskStartedEvent( ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(), versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(), - request.PollRequest.GetWorkerInstanceKey(), request.PollRequest.GetWorkerControlTaskQueue(), ); err != nil { return nil, rejectCodeUndefined, err diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index a641f09edc..99675b340a 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -98,7 +98,6 @@ func Invoke( // TODO (shahab): do we need to do anything with wf redirect in this case or any // other case where an activity starts? nil, - "", // workerInstanceKey not available for force complete "", // workerControlTaskQueue not available for force complete ) if err != nil { diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 0fd22aa980..39e7987745 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -389,7 +389,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( workflowTaskHandler := newWorkflowTaskCompletedHandler( request.GetIdentity(), - request.GetWorkerInstanceKey(), request.GetWorkerControlTaskQueue(), completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler. ms, diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index 945a452275..ee23a76893 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -53,7 +53,6 @@ type ( workflowTaskCompletedHandler struct { identity string - workerInstanceKey string workerControlTaskQueue string workflowTaskCompletedID int64 @@ -106,7 +105,6 @@ type ( func newWorkflowTaskCompletedHandler( identity string, - workerInstanceKey string, workerControlTaskQueue string, workflowTaskCompletedID int64, mutableState historyi.MutableState, @@ -127,7 +125,6 @@ func newWorkflowTaskCompletedHandler( ) *workflowTaskCompletedHandler { return &workflowTaskCompletedHandler{ identity: identity, - workerInstanceKey: workerInstanceKey, workerControlTaskQueue: workerControlTaskQueue, workflowTaskCompletedID: workflowTaskCompletedID, @@ -565,7 +562,6 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi stamp, nil, nil, - handler.workerInstanceKey, handler.workerControlTaskQueue, ); err != nil { return nil, err diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go index 4cdaf1f708..c68b8d1478 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go @@ -84,7 +84,6 @@ func TestCommandProtocolMessage(t *testing.T) { ) out.handler = newWorkflowTaskCompletedHandler( // 😲 t.Name(), // identity - "", // workerInstanceKey "", // workerControlTaskQueue 123, // workflowTaskCompletedID out.ms, diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 76a9821f5d..165782b3ef 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -6680,7 +6680,6 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6 nil, nil, "", - "", ) return event } diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index e1f7b48cd2..cd07f9ac69 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -57,7 +57,6 @@ type ( *commonpb.WorkerVersionStamp, *deploymentpb.Deployment, *taskqueuespb.BuildIdRedirectInfo, - string, // workerInstanceKey string, // workerControlTaskQueue ) (*historypb.HistoryEvent, error) AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 9c249569a3..9b89e38359 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -148,18 +148,18 @@ func (mr *MockMutableStateMockRecorder) AddActivityTaskScheduledEvent(arg0, arg1 } // AddActivityTaskStartedEvent mocks base method. -func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7, arg8 string) (*history.HistoryEvent, error) { +func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7 string) (*history.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) + ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddActivityTaskStartedEvent indicates an expected call of AddActivityTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // AddActivityTaskTimedOutEvent mocks base method. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index a944703e8c..0b92eb7ddc 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4057,7 +4057,6 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, deployment *deploymentpb.Deployment, redirectInfo *taskqueuespb.BuildIdRedirectInfo, - workerInstanceKey string, workerControlTaskQueue string, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskStarted @@ -4087,7 +4086,6 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( ai.LastDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment) } - ai.WorkerInstanceKey = workerInstanceKey ai.WorkerControlTaskQueue = workerControlTaskQueue if !ai.HasRetryPolicy { diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index a47174e731..3cb91b7fbd 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -420,7 +420,6 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp nil, nil, "", - "", ) s.NoError(err) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 1392f8ac5e..62e48a43ed 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2879,7 +2879,6 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() { nil, nil, "", - "", ) s.NoError(err) @@ -2946,7 +2945,6 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() { nil, nil, "", - "", ) s.NoError(err) @@ -6155,7 +6153,7 @@ func (s *mutableStateSuite) TestSetContextMetadata() { s.Equal(taskQueue, tq) } -func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceKey() { +func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerControlTaskQueue() { s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() // Setup workflow execution @@ -6206,10 +6204,9 @@ func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceK false, ) s.NoError(err) - s.Empty(activityInfo.WorkerInstanceKey, "WorkerInstanceKey should be empty before activity starts") + s.Empty(activityInfo.WorkerControlTaskQueue, "WorkerControlTaskQueue should be empty before activity starts") - // Start activity with workerInstanceKey and workerControlTaskQueue - expectedWorkerInstanceKey := "test-worker-instance-key-12345" + // Start activity with workerControlTaskQueue expectedWorkerControlTaskQueue := "test-control-queue" _, err = s.mutableState.AddActivityTaskStartedEvent( activityInfo, @@ -6219,14 +6216,12 @@ func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceK nil, nil, nil, - expectedWorkerInstanceKey, expectedWorkerControlTaskQueue, ) s.NoError(err) - // Verify workerInstanceKey and workerControlTaskQueue are stored + // Verify workerControlTaskQueue is stored updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) s.True(ok) - s.Equal(expectedWorkerInstanceKey, updatedActivityInfo.WorkerInstanceKey) s.Equal(expectedWorkerControlTaskQueue, updatedActivityInfo.WorkerControlTaskQueue) } From 539075959119c94776004ca3ae3e09c9d5902af9 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 17 Feb 2026 18:47:51 -0800 Subject: [PATCH 9/9] Forward WorkerControlTaskQueue through matching service partitions --- go.mod | 2 +- go.sum | 8 ++++---- service/matching/forwarder.go | 2 ++ service/matching/matching_engine.go | 3 +++ service/matching/pri_forwarder.go | 2 ++ 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 72ac5d39f8..92b96de1dd 100644 --- a/go.mod +++ b/go.mod @@ -174,4 +174,4 @@ require ( modernc.org/memory v1.11.0 // indirect ) -replace go.temporal.io/api => /Users/krajah/Code/api-go +replace go.temporal.io/api => github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9 diff --git a/go.sum b/go.sum index 8c5dd481b9..204f76e6e4 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/nexus-rpc/sdk-go v0.5.1 h1:UFYYfoHlQc+Pn9gQpmn9QE7xluewAn2AO1OSkAh7YFU= -github.com/nexus-rpc/sdk-go v0.5.1/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= +github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5 h1:Van9KGGs8lcDgxzSNFbDhEMNeJ80TbBxwZ45f9iBk9U= +github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -310,6 +310,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9 h1:nv1DjGfsfM/ITt5ehoHodVzCtTxv+mM+sMqM9Zgx0Tc= +github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9/go.mod h1:oewVgOWEx67DlpbXkEJl5PlcpDPXjR8h9+raDfl0fpo= github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7 h1:lEebX/hZss+TSH3EBwhztnBavJVj7pWGJOH8UgKHS0w= github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4= github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb h1:YzHH/U/dN7vMP+glybzcXRTczTrgfdRisNTzAj7La04= @@ -375,8 +377,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.2-0.20260211212044-12e31472fd35 h1:5Ig1D6s0oQVnj5blVPwoMqG/jvVaHcWWHXO9Y4B9n/M= -go.temporal.io/api v1.62.2-0.20260211212044-12e31472fd35/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index 018aeffca1..6b893db2aa 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -249,6 +249,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, @@ -273,6 +274,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index b09c10338c..0195923910 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -95,6 +95,7 @@ type ( forwardedFrom string localPollStartTime time.Time workerInstanceKey string + workerControlTaskQueue string } userDataUpdate struct { @@ -679,6 +680,7 @@ pollLoop: forwardedFrom: req.ForwardedSource, conditions: req.Conditions, workerInstanceKey: request.WorkerInstanceKey, + workerControlTaskQueue: request.WorkerControlTaskQueue, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { @@ -984,6 +986,7 @@ pollLoop: forwardedFrom: req.ForwardedSource, conditions: req.Conditions, workerInstanceKey: request.WorkerInstanceKey, + workerControlTaskQueue: request.WorkerControlTaskQueue, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { diff --git a/service/matching/pri_forwarder.go b/service/matching/pri_forwarder.go index ca1e1f6f5c..dd2ab8c8c3 100644 --- a/service/matching/pri_forwarder.go +++ b/service/matching/pri_forwarder.go @@ -219,6 +219,7 @@ func ForwardPollWithTarget( WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions, @@ -243,6 +244,7 @@ func ForwardPollWithTarget( WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions,