From 1b31a8b0c058c9dc7c8b5dd6989eb60c6cb6999c Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 10 Oct 2025 14:09:31 +0200 Subject: [PATCH] Use separate sequence numbers for each workflow operation type ### Motivation Temporal's workflow state machine previously used a single shared sequence number counter for all operation types (timers, activities, conditions, child workflows, and external signals). This could potentially lead to confusion when tracking different types of operations, as each operation type needs its own independent sequence numbering to properly identify activation jobs. ### Modifications - Split the single `nextSequenceNumber` into five operation-specific counters: - `nextTimerSequenceNumber` for sleep operations - `nextActivitySequenceNumber` for activity executions (local and remote) - `nextConditionSequenceNumber` for wait conditions - `nextChildWorkflowSequenceNumber` for child workflow executions - `nextExternalSignalSequenceNumber` for signaling external workflows - Updated all sequence number generation methods to use the appropriate counter - Updated `WorkflowStateMachineStorage` to call the correct sequence number method for each operation type - Enhanced documentation to clarify that each operation type maintains independent sequence counters ### Result Each workflow operation type now maintains its own independent sequence counter, making the sequence numbering system more robust and easier to reason about. This ensures that activation jobs can be correctly matched to their originating commands without any ambiguity across different operation types. --- .../Workflow/WorkflowStateMachine.swift | 100 ++++++++++++++---- .../WorkflowStateMachineStorage.swift | 12 +-- 2 files changed, 86 insertions(+), 26 deletions(-) diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift index 52788ad..449a6bc 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift @@ -20,14 +20,30 @@ struct WorkflowStateMachine: ~Copyable { enum State: ~Copyable { /// The state when the workflow is actively running. struct Active: ~Copyable { - /// Temporal requires each command to have a unique sequence number. + /// Temporal requires each command to have a unique sequence number within its operation type. /// - /// This number is then used to identify which activation job belongs to which command. + /// Each operation type (timers, activities, conditions, child workflows, external signals) maintains + /// its own independent sequence counter. These numbers are used to identify which activation job + /// belongs to which command. /// - /// As an example, let's say the workflow scheduling a timer which we will assign sequence number 1 to. - /// Later we will get an activation job of type fireTimer with sequence number 1. So we know that we have to - /// resume the continuation belonging to sequence number 1. - var nextSequenceNumber: UInt32 + /// For example, if a workflow schedules a timer which we assign sequence number 1 to, + /// later we will get an activation job of type fireTimer with sequence number 1. This allows us + /// to resume the correct continuation belonging to that timer. + + /// Next sequence number for timer operations (sleep). + var nextTimerSequenceNumber: UInt32 + + /// Next sequence number for activity executions (both local and remote). + var nextActivitySequenceNumber: UInt32 + + /// Next sequence number for wait conditions. + var nextConditionSequenceNumber: UInt32 + + /// Next sequence number for child workflow executions. + var nextChildWorkflowSequenceNumber: UInt32 + + /// Next sequence number for signaling external workflows (including child workflows). + var nextExternalSignalSequenceNumber: UInt32 /// Number of current running and unfinished handlers. var numberOfActiveHandlers: Int @@ -107,7 +123,11 @@ struct WorkflowStateMachine: ~Copyable { private var state: State = .active( .init( - nextSequenceNumber: 0, + nextTimerSequenceNumber: 0, + nextActivitySequenceNumber: 0, + nextConditionSequenceNumber: 0, + nextChildWorkflowSequenceNumber: 0, + nextExternalSignalSequenceNumber: 0, numberOfActiveHandlers: 0, isReplaying: false, now: .now, @@ -137,11 +157,51 @@ struct WorkflowStateMachine: ~Copyable { self.state = state } - mutating func nextSequenceNumber() -> UInt32 { + mutating func nextTimerSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextTimerSequenceNumber + active.nextTimerSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextActivitySequenceNumber() -> UInt32 { switch consume self.state { case .active(var active): - let sequenceNumber = active.nextSequenceNumber - active.nextSequenceNumber += 1 + let sequenceNumber = active.nextActivitySequenceNumber + active.nextActivitySequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextConditionSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextConditionSequenceNumber + active.nextConditionSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextChildWorkflowSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextChildWorkflowSequenceNumber + active.nextChildWorkflowSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextExternalSignalSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextExternalSignalSequenceNumber + active.nextExternalSignalSequenceNumber += 1 self = .init(state: .active(active)) return sequenceNumber } @@ -341,8 +401,8 @@ struct WorkflowStateMachine: ~Copyable { ) -> UInt32 { switch consume self.state { case .active(var active): - let id = active.nextSequenceNumber - active.nextSequenceNumber += 1 + let id = active.nextConditionSequenceNumber + active.nextConditionSequenceNumber += 1 active.waitConditionContinuations[id] = (condition, nil) self = .init(state: .active(active)) return id @@ -378,7 +438,7 @@ struct WorkflowStateMachine: ~Copyable { } mutating func scheduleActivityExecution( - id: UInt32, + sequenceNumber: UInt32, activityType: String, options: ActivityExecutionOptions, workflowTaskQueue: String, @@ -388,13 +448,13 @@ struct WorkflowStateMachine: ~Copyable { ) { switch consume self.state { case .active(var active): - active.activityContinuations[id] = continuation + active.activityContinuations[sequenceNumber] = continuation active.commands.append( .with { switch options { case let .remote(activityOptions): $0.scheduleActivity = .init( - id: id, + id: sequenceNumber, activityType: activityType, workflowTaskQueue: workflowTaskQueue, headers: headers, @@ -403,7 +463,7 @@ struct WorkflowStateMachine: ~Copyable { ) case let .local(localActivityOptions, attempt, originalScheduleTime): $0.scheduleLocalActivity = .init( - id: id, + id: sequenceNumber, activityType: activityType, headers: headers, input: input, @@ -418,10 +478,10 @@ struct WorkflowStateMachine: ~Copyable { } } - mutating func cancelActivity(id: UInt32, isLocal: Bool) { + mutating func cancelActivity(sequenceNumber: UInt32, isLocal: Bool) { switch consume self.state { case .active(var active): - guard active.activityContinuations[id] != nil else { + guard active.activityContinuations[sequenceNumber] != nil else { // we know this is not called before the activity continuation is persisted, therefore, activity was reported as completed // in the same activation as the workflow was cancelled. self = .init(state: .active(active)) @@ -432,11 +492,11 @@ struct WorkflowStateMachine: ~Copyable { .with { if isLocal { $0.requestCancelLocalActivity = .with { - $0.seq = id + $0.seq = sequenceNumber } } else { $0.requestCancelActivity = .with { - $0.seq = id + $0.seq = sequenceNumber } } } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift index fc7b3af..6d4329b 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift @@ -87,7 +87,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { let convertedSummary = try summary.flatMap { try self.payloadConverter.convertValue($0) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextTimerSequenceNumber() try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.sleep( @@ -187,11 +187,11 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { var options = options let isLocal = options.isLocal while true { - let id = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextActivitySequenceNumber() let result = try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.scheduleActivityExecution( - id: id, + sequenceNumber: sequenceNumber, activityType: activityType, options: options, workflowTaskQueue: workflowTaskQueue, @@ -203,7 +203,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { } onCancel: { // No need to worry about this being called before the actual operation as we check earlier in the method and there are no possible // suspension points. - self.stateMachine.cancelActivity(id: id, isLocal: isLocal) + self.stateMachine.cancelActivity(sequenceNumber: sequenceNumber, isLocal: isLocal) } switch result.status { @@ -283,7 +283,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { ) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextChildWorkflowSequenceNumber() let state = UntypedChildWorkflowHandle.State(resolutionState: .unresolved(sequenceNumber: sequenceNumber)) let (workflowID, firstExecutionRunID) = try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in @@ -379,7 +379,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { ) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextExternalSignalSequenceNumber() try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.signalChildWorkflow(