diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift index 52788ad..449a6bc 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift @@ -20,14 +20,30 @@ struct WorkflowStateMachine: ~Copyable { enum State: ~Copyable { /// The state when the workflow is actively running. struct Active: ~Copyable { - /// Temporal requires each command to have a unique sequence number. + /// Temporal requires each command to have a unique sequence number within its operation type. /// - /// This number is then used to identify which activation job belongs to which command. + /// Each operation type (timers, activities, conditions, child workflows, external signals) maintains + /// its own independent sequence counter. These numbers are used to identify which activation job + /// belongs to which command. /// - /// As an example, let's say the workflow scheduling a timer which we will assign sequence number 1 to. - /// Later we will get an activation job of type fireTimer with sequence number 1. So we know that we have to - /// resume the continuation belonging to sequence number 1. - var nextSequenceNumber: UInt32 + /// For example, if a workflow schedules a timer which we assign sequence number 1 to, + /// later we will get an activation job of type fireTimer with sequence number 1. This allows us + /// to resume the correct continuation belonging to that timer. + + /// Next sequence number for timer operations (sleep). + var nextTimerSequenceNumber: UInt32 + + /// Next sequence number for activity executions (both local and remote). + var nextActivitySequenceNumber: UInt32 + + /// Next sequence number for wait conditions. + var nextConditionSequenceNumber: UInt32 + + /// Next sequence number for child workflow executions. + var nextChildWorkflowSequenceNumber: UInt32 + + /// Next sequence number for signaling external workflows (including child workflows). + var nextExternalSignalSequenceNumber: UInt32 /// Number of current running and unfinished handlers. var numberOfActiveHandlers: Int @@ -107,7 +123,11 @@ struct WorkflowStateMachine: ~Copyable { private var state: State = .active( .init( - nextSequenceNumber: 0, + nextTimerSequenceNumber: 0, + nextActivitySequenceNumber: 0, + nextConditionSequenceNumber: 0, + nextChildWorkflowSequenceNumber: 0, + nextExternalSignalSequenceNumber: 0, numberOfActiveHandlers: 0, isReplaying: false, now: .now, @@ -137,11 +157,51 @@ struct WorkflowStateMachine: ~Copyable { self.state = state } - mutating func nextSequenceNumber() -> UInt32 { + mutating func nextTimerSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextTimerSequenceNumber + active.nextTimerSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextActivitySequenceNumber() -> UInt32 { switch consume self.state { case .active(var active): - let sequenceNumber = active.nextSequenceNumber - active.nextSequenceNumber += 1 + let sequenceNumber = active.nextActivitySequenceNumber + active.nextActivitySequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextConditionSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextConditionSequenceNumber + active.nextConditionSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextChildWorkflowSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextChildWorkflowSequenceNumber + active.nextChildWorkflowSequenceNumber += 1 + self = .init(state: .active(active)) + return sequenceNumber + } + } + + mutating func nextExternalSignalSequenceNumber() -> UInt32 { + switch consume self.state { + case .active(var active): + let sequenceNumber = active.nextExternalSignalSequenceNumber + active.nextExternalSignalSequenceNumber += 1 self = .init(state: .active(active)) return sequenceNumber } @@ -341,8 +401,8 @@ struct WorkflowStateMachine: ~Copyable { ) -> UInt32 { switch consume self.state { case .active(var active): - let id = active.nextSequenceNumber - active.nextSequenceNumber += 1 + let id = active.nextConditionSequenceNumber + active.nextConditionSequenceNumber += 1 active.waitConditionContinuations[id] = (condition, nil) self = .init(state: .active(active)) return id @@ -378,7 +438,7 @@ struct WorkflowStateMachine: ~Copyable { } mutating func scheduleActivityExecution( - id: UInt32, + sequenceNumber: UInt32, activityType: String, options: ActivityExecutionOptions, workflowTaskQueue: String, @@ -388,13 +448,13 @@ struct WorkflowStateMachine: ~Copyable { ) { switch consume self.state { case .active(var active): - active.activityContinuations[id] = continuation + active.activityContinuations[sequenceNumber] = continuation active.commands.append( .with { switch options { case let .remote(activityOptions): $0.scheduleActivity = .init( - id: id, + id: sequenceNumber, activityType: activityType, workflowTaskQueue: workflowTaskQueue, headers: headers, @@ -403,7 +463,7 @@ struct WorkflowStateMachine: ~Copyable { ) case let .local(localActivityOptions, attempt, originalScheduleTime): $0.scheduleLocalActivity = .init( - id: id, + id: sequenceNumber, activityType: activityType, headers: headers, input: input, @@ -418,10 +478,10 @@ struct WorkflowStateMachine: ~Copyable { } } - mutating func cancelActivity(id: UInt32, isLocal: Bool) { + mutating func cancelActivity(sequenceNumber: UInt32, isLocal: Bool) { switch consume self.state { case .active(var active): - guard active.activityContinuations[id] != nil else { + guard active.activityContinuations[sequenceNumber] != nil else { // we know this is not called before the activity continuation is persisted, therefore, activity was reported as completed // in the same activation as the workflow was cancelled. self = .init(state: .active(active)) @@ -432,11 +492,11 @@ struct WorkflowStateMachine: ~Copyable { .with { if isLocal { $0.requestCancelLocalActivity = .with { - $0.seq = id + $0.seq = sequenceNumber } } else { $0.requestCancelActivity = .with { - $0.seq = id + $0.seq = sequenceNumber } } } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift index fc7b3af..6d4329b 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift @@ -87,7 +87,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { let convertedSummary = try summary.flatMap { try self.payloadConverter.convertValue($0) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextTimerSequenceNumber() try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.sleep( @@ -187,11 +187,11 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { var options = options let isLocal = options.isLocal while true { - let id = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextActivitySequenceNumber() let result = try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.scheduleActivityExecution( - id: id, + sequenceNumber: sequenceNumber, activityType: activityType, options: options, workflowTaskQueue: workflowTaskQueue, @@ -203,7 +203,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { } onCancel: { // No need to worry about this being called before the actual operation as we check earlier in the method and there are no possible // suspension points. - self.stateMachine.cancelActivity(id: id, isLocal: isLocal) + self.stateMachine.cancelActivity(sequenceNumber: sequenceNumber, isLocal: isLocal) } switch result.status { @@ -283,7 +283,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { ) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextChildWorkflowSequenceNumber() let state = UntypedChildWorkflowHandle.State(resolutionState: .unresolved(sequenceNumber: sequenceNumber)) let (workflowID, firstExecutionRunID) = try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in @@ -379,7 +379,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { ) } - let sequenceNumber = self.stateMachine.nextSequenceNumber() + let sequenceNumber = self.stateMachine.nextExternalSignalSequenceNumber() try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in self.stateMachine.signalChildWorkflow(