Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,5 @@ require (
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
)

replace go.temporal.io/api => github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9 h1:nv1DjGfsfM/ITt5ehoHodVzCtTxv+mM+sMqM9Zgx0Tc=
github.com/temporalio/api-go v1.62.2-0.20260217225453-e6a9241288b9/go.mod h1:oewVgOWEx67DlpbXkEJl5PlcpDPXjR8h9+raDfl0fpo=
github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7 h1:lEebX/hZss+TSH3EBwhztnBavJVj7pWGJOH8UgKHS0w=
github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4=
github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb h1:YzHH/U/dN7vMP+glybzcXRTczTrgfdRisNTzAj7La04=
Expand Down Expand Up @@ -375,8 +377,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo=
go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4=
go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ message ActivityInfo {
bool reset_heartbeats = 48;

int64 start_version = 50;

// The task queue on which the server will send control tasks to the worker running this activity.
string worker_control_task_queue = 51;
}

// timer_map column
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func recordActivityTaskStarted(
if _, err := mutableState.AddActivityTaskStartedEvent(
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(),
request.PollRequest.GetWorkerControlTaskQueue(),
); err != nil {
return nil, rejectCodeUndefined, err
}
Expand Down
1 change: 1 addition & 0 deletions service/history/api/respondactivitytaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func Invoke(
// TODO (shahab): do we need to do anything with wf redirect in this case or any
// other case where an activity starts?
nil,
"", // workerControlTaskQueue not available for force complete
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(

workflowTaskHandler := newWorkflowTaskCompletedHandler(
request.GetIdentity(),
request.GetWorkerControlTaskQueue(),
completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler.
ms,
updateRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (

workflowTaskCompletedHandler struct {
identity string
workerControlTaskQueue string
workflowTaskCompletedID int64

// internal state
Expand Down Expand Up @@ -104,6 +105,7 @@ type (

func newWorkflowTaskCompletedHandler(
identity string,
workerControlTaskQueue string,
workflowTaskCompletedID int64,
mutableState historyi.MutableState,
updateRegistry update.Registry,
Expand All @@ -123,6 +125,7 @@ func newWorkflowTaskCompletedHandler(
) *workflowTaskCompletedHandler {
return &workflowTaskCompletedHandler{
identity: identity,
workerControlTaskQueue: workerControlTaskQueue,
workflowTaskCompletedID: workflowTaskCompletedID,

// internal state
Expand Down Expand Up @@ -559,6 +562,7 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi
stamp,
nil,
nil,
handler.workerControlTaskQueue,
); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestCommandProtocolMessage(t *testing.T) {
)
out.handler = newWorkflowTaskCompletedHandler( // 😲
t.Name(), // identity
"", // workerControlTaskQueue
123, // workflowTaskCompletedID
out.ms,
out.updates,
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6679,6 +6679,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6
nil,
nil,
nil,
"",
)
return event
}
Expand Down
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
*commonpb.WorkerVersionStamp,
*deploymentpb.Deployment,
*taskqueuespb.BuildIdRedirectInfo,
string, // workerControlTaskQueue
) (*historypb.HistoryEvent, error)
AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error)
AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error)
Expand Down
8 changes: 4 additions & 4 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4057,6 +4057,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
versioningStamp *commonpb.WorkerVersionStamp,
deployment *deploymentpb.Deployment,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
workerControlTaskQueue string,
) (*historypb.HistoryEvent, error) {
opTag := tag.WorkflowActionActivityTaskStarted
err := ms.checkMutability(opTag)
Expand Down Expand Up @@ -4085,6 +4086,8 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
ai.LastDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment)
}

ai.WorkerControlTaskQueue = workerControlTaskQueue

if !ai.HasRetryPolicy {
event := ms.hBuilder.AddActivityTaskStartedEvent(
scheduledEventID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down
75 changes: 75 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,7 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down Expand Up @@ -2943,6 +2944,7 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down Expand Up @@ -6150,3 +6152,76 @@ func (s *mutableStateSuite) TestSetContextMetadata() {
s.True(ok)
s.Equal(taskQueue, tq)
}

func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerControlTaskQueue() {
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

// Setup workflow execution
_, err := s.mutableState.AddWorkflowExecutionStartedEvent(
&commonpb.WorkflowExecution{WorkflowId: tests.WorkflowID, RunId: tests.RunID},
&historyservice.StartWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: "workflow-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
WorkflowRunTimeout: durationpb.New(200 * time.Second),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
},
},
)
s.NoError(err)

di, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
s.NoError(err)
_, _, err = s.mutableState.AddWorkflowTaskStartedEvent(
di.ScheduledEventID,
di.RequestID,
di.TaskQueue,
"identity",
nil,
nil,
nil,
false,
nil,
)
s.NoError(err)
_, err = s.mutableState.AddWorkflowTaskCompletedEvent(
di,
&workflowservice.RespondWorkflowTaskCompletedRequest{Identity: "identity"},
workflowTaskCompletionLimits,
)
s.NoError(err)

// Schedule activity
workflowTaskCompletedEventID := int64(4)
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID,
&commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "test-activity-1",
ActivityType: &commonpb.ActivityType{Name: "test-activity-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"},
},
false,
)
s.NoError(err)
s.Empty(activityInfo.WorkerControlTaskQueue, "WorkerControlTaskQueue should be empty before activity starts")

// Start activity with workerControlTaskQueue
expectedWorkerControlTaskQueue := "test-control-queue"
_, err = s.mutableState.AddActivityTaskStartedEvent(
activityInfo,
activityInfo.ScheduledEventId,
uuid.NewString(),
"worker-identity",
nil,
nil,
nil,
expectedWorkerControlTaskQueue,
)
s.NoError(err)

// Verify workerControlTaskQueue is stored
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
s.True(ok)
s.Equal(expectedWorkerControlTaskQueue, updatedActivityInfo.WorkerControlTaskQueue)
}
2 changes: 2 additions & 0 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: fwdr.partition.RpcName(),
Conditions: pollMetadata.conditions,
Expand All @@ -273,6 +274,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: fwdr.partition.RpcName(),
Conditions: pollMetadata.conditions,
Expand Down
3 changes: 3 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type (
forwardedFrom string
localPollStartTime time.Time
workerInstanceKey string
workerControlTaskQueue string
}

userDataUpdate struct {
Expand Down Expand Up @@ -679,6 +680,7 @@ pollLoop:
forwardedFrom: req.ForwardedSource,
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
workerControlTaskQueue: request.WorkerControlTaskQueue,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
Expand Down Expand Up @@ -984,6 +986,7 @@ pollLoop:
forwardedFrom: req.ForwardedSource,
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
workerControlTaskQueue: request.WorkerControlTaskQueue,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/matching/pri_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func ForwardPollWithTarget(
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: source.RpcName(),
Conditions: pollMetadata.conditions,
Expand All @@ -243,6 +244,7 @@ func ForwardPollWithTarget(
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: source.RpcName(),
Conditions: pollMetadata.conditions,
Expand Down
Loading