From e3b35ae339b75b41bee3ae72c86439aab747ea7d Mon Sep 17 00:00:00 2001 From: Saad Mohsin Khan Date: Mon, 9 Feb 2026 14:26:37 +0500 Subject: [PATCH] Add schedule_to_start_timeout to WorkflowTaskScheduledEvent --- .../history/historybuilder/event_factory.go | 15 +++++--- .../history/historybuilder/history_builder.go | 3 +- .../history_builder_categorization_test.go | 2 +- .../historybuilder/history_builder_test.go | 1 + .../workflow/workflow_task_state_machine.go | 38 ++++++++++++++++--- 5 files changed, 47 insertions(+), 12 deletions(-) diff --git a/service/history/historybuilder/event_factory.go b/service/history/historybuilder/event_factory.go index 33627bbe62..f5c4aa1f79 100644 --- a/service/history/historybuilder/event_factory.go +++ b/service/history/historybuilder/event_factory.go @@ -102,14 +102,19 @@ func (b *EventFactory) CreateWorkflowTaskScheduledEvent( startToCloseTimeout *durationpb.Duration, attempt int32, scheduleTime time.Time, + scheduleToStartTimeout *durationpb.Duration, ) *historypb.HistoryEvent { event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, scheduleTime) + attrs := &historypb.WorkflowTaskScheduledEventAttributes{ + TaskQueue: taskQueue, + StartToCloseTimeout: startToCloseTimeout, + Attempt: attempt, + } + if scheduleToStartTimeout != nil { + attrs.ScheduleToStartTimeout = scheduleToStartTimeout + } event.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ - WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{ - TaskQueue: taskQueue, - StartToCloseTimeout: startToCloseTimeout, - Attempt: attempt, - }, + WorkflowTaskScheduledEventAttributes: attrs, } return event diff --git a/service/history/historybuilder/history_builder.go b/service/history/historybuilder/history_builder.go index 024078fd09..e90877960e 100644 --- a/service/history/historybuilder/history_builder.go +++ b/service/history/historybuilder/history_builder.go @@ -182,8 +182,9 @@ func (b *HistoryBuilder) AddWorkflowTaskScheduledEvent( startToCloseTimeout *durationpb.Duration, attempt int32, scheduleTime time.Time, + scheduleToStartTimeout *durationpb.Duration, ) *historypb.HistoryEvent { - event := b.EventFactory.CreateWorkflowTaskScheduledEvent(taskQueue, startToCloseTimeout, attempt, scheduleTime) + event := b.EventFactory.CreateWorkflowTaskScheduledEvent(taskQueue, startToCloseTimeout, attempt, scheduleTime, scheduleToStartTimeout) event, _ = b.EventStore.add(event) return event } diff --git a/service/history/historybuilder/history_builder_categorization_test.go b/service/history/historybuilder/history_builder_categorization_test.go index 4ea2920673..43381cf0a2 100644 --- a/service/history/historybuilder/history_builder_categorization_test.go +++ b/service/history/historybuilder/history_builder_categorization_test.go @@ -1294,7 +1294,7 @@ func (s *sutTestingAdapter) AddWorkflowTaskTimedOutEvent(_ ...eventConfig) *hist } func (s *sutTestingAdapter) AddWorkflowTaskScheduledEvent(_ ...eventConfig) *historypb.HistoryEvent { - return s.HistoryBuilder.AddWorkflowTaskScheduledEvent(nil, nil, 1, s.today) + return s.HistoryBuilder.AddWorkflowTaskScheduledEvent(nil, nil, 1, s.today, nil) } func (s *sutTestingAdapter) AddActivityTaskStartedEvent(optionalConfig ...eventConfig) *historypb.HistoryEvent { diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 34ebdcb9ef..c939cf1d80 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -628,6 +628,7 @@ func (s *historyBuilderSuite) TestWorkflowTaskScheduled() { durationpb.New(startToCloseTimeout), attempt, s.now, + nil, ) s.Equal(event, s.flush()) s.Equal(&historypb.HistoryEvent{ diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index e5a3e5f322..52689ef5a0 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -281,11 +281,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent( m.ms.RemoveSpeculativeWorkflowTaskTimeoutTask() // Create corresponding WorkflowTaskScheduled event for speculative WT. + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( m.ms.CurrentTaskQueue(), durationpb.New(workflowTask.WorkflowTaskTimeout), workflowTask.Attempt, workflowTask.ScheduledTime.UTC(), + scheduleToStartTimeout, ) workflowTask.ScheduledEventID = scheduledEvent.GetEventId() } @@ -361,11 +363,16 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( var scheduledEventID int64 if createWorkflowTaskScheduledEvent { + var scheduleToStartTimeout *durationpb.Duration + if taskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY || workflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + _, scheduleToStartTimeout = m.ms.GetWorkflowTaskScheduleToStartTimeout() + } scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent( taskQueue, startToCloseTimeout, attempt, scheduleTime, + scheduleToStartTimeout, ) scheduledEventID = scheduledEvent.GetEventId() } else { @@ -528,6 +535,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( if !workflowTaskScheduledEventCreated && (workflowTask.ScheduledEventID != m.ms.GetNextEventID() || workflowTask.Version != m.ms.GetCurrentVersion()) { + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() workflowTask.Attempt = 1 workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL workflowTaskScheduledEventCreated = true @@ -538,6 +546,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( durationpb.New(workflowTask.WorkflowTaskTimeout), workflowTask.Attempt, startTime, + scheduleToStartTimeout, ) scheduledEventID = scheduledEvent.GetEventId() } @@ -618,6 +627,7 @@ func (m *workflowTaskStateMachine) processBuildIdRedirectInfo( if m.ms.IsTransientWorkflowTask() && m.ms.GetExecutionInfo().GetWorkflowTaskBuildId() != buildId { // we're retrying a workflow task and this attempt is on a different build ID, converting the transient wf task // to a normal wf task by creating a scheduled event for it and setting its attempt to 1. + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( m.ms.CurrentTaskQueue(), durationpb.New(workflowTask.WorkflowTaskTimeout), @@ -625,6 +635,7 @@ func (m *workflowTaskStateMachine) processBuildIdRedirectInfo( // build ID + 1 (because it's being reset to 1 for the next build ID. See bellow.) workflowTask.Attempt, workflowTask.ScheduledTime, + scheduleToStartTimeout, ) newWorkflowTask = m.getWorkflowTaskInfo() newWorkflowTask.ScheduledEventID = scheduledEvent.GetEventId() @@ -731,11 +742,16 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( if !workflowTaskScheduledStartedEventsCreated { // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for transient/speculative workflow tasks. + var scheduleToStartTimeout *durationpb.Duration + if workflowTask.TaskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY || workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + _, scheduleToStartTimeout = m.ms.GetWorkflowTaskScheduleToStartTimeout() + } scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( m.ms.CurrentTaskQueue(), durationpb.New(workflowTask.WorkflowTaskTimeout), workflowTask.Attempt, workflowTask.ScheduledTime.UTC(), + scheduleToStartTimeout, ) workflowTask.ScheduledEventID = scheduledEvent.GetEventId() @@ -822,11 +838,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( m.ms.RemoveSpeculativeWorkflowTaskTimeoutTask() // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT. + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( m.ms.CurrentTaskQueue(), durationpb.New(workflowTask.WorkflowTaskTimeout), workflowTask.Attempt, workflowTask.ScheduledTime.UTC(), + scheduleToStartTimeout, ) workflowTask.ScheduledEventID = scheduledEvent.GetEventId() startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent( @@ -894,11 +912,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( m.ms.RemoveSpeculativeWorkflowTaskTimeoutTask() // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT. + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( m.ms.CurrentTaskQueue(), durationpb.New(workflowTask.WorkflowTaskTimeout), workflowTask.Attempt, workflowTask.ScheduledTime.UTC(), + scheduleToStartTimeout, ) workflowTask.ScheduledEventID = scheduledEvent.GetEventId() startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent( @@ -1145,17 +1165,23 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( ) *historyspb.TransientWorkflowTaskInfo { // Create scheduled and started events which are not written to the history yet. + attrs := &historypb.WorkflowTaskScheduledEventAttributes{ + TaskQueue: m.ms.CurrentTaskQueue(), + StartToCloseTimeout: durationpb.New(workflowTask.WorkflowTaskTimeout), + Attempt: workflowTask.Attempt, + } + if workflowTask.TaskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY || workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + if _, sst := m.ms.GetWorkflowTaskScheduleToStartTimeout(); sst != nil { + attrs.ScheduleToStartTimeout = sst + } + } scheduledEvent := &historypb.HistoryEvent{ EventId: workflowTask.ScheduledEventID, EventTime: timestamppb.New(workflowTask.ScheduledTime), EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, Version: m.ms.currentVersion, Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ - WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{ - TaskQueue: m.ms.CurrentTaskQueue(), - StartToCloseTimeout: durationpb.New(workflowTask.WorkflowTaskTimeout), - Attempt: workflowTask.Attempt, - }, + WorkflowTaskScheduledEventAttributes: attrs, }, } @@ -1425,6 +1451,7 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro m.ms.workflowTaskUpdated = true } + _, scheduleToStartTimeout := m.ms.GetWorkflowTaskScheduleToStartTimeout() m.ms.executionInfo.WorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1, metrics.ReasonTag("close_transaction")) @@ -1436,6 +1463,7 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro durationpb.New(wt.WorkflowTaskTimeout), wt.Attempt, wt.ScheduledTime, + scheduleToStartTimeout, ) if scheduledEvent.EventId != wt.ScheduledEventID {