Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 80 additions & 20 deletions Sources/Temporal/Worker/Workflow/WorkflowStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,30 @@ struct WorkflowStateMachine: ~Copyable {
enum State: ~Copyable {
/// The state when the workflow is actively running.
struct Active: ~Copyable {
/// Temporal requires each command to have a unique sequence number.
/// Temporal requires each command to have a unique sequence number within its operation type.
///
/// This number is then used to identify which activation job belongs to which command.
/// Each operation type (timers, activities, conditions, child workflows, external signals) maintains
/// its own independent sequence counter. These numbers are used to identify which activation job
/// belongs to which command.
///
/// As an example, let's say the workflow scheduling a timer which we will assign sequence number 1 to.
/// Later we will get an activation job of type fireTimer with sequence number 1. So we know that we have to
/// resume the continuation belonging to sequence number 1.
var nextSequenceNumber: UInt32
/// For example, if a workflow schedules a timer which we assign sequence number 1 to,
/// later we will get an activation job of type fireTimer with sequence number 1. This allows us
/// to resume the correct continuation belonging to that timer.

/// Next sequence number for timer operations (sleep).
var nextTimerSequenceNumber: UInt32

/// Next sequence number for activity executions (both local and remote).
var nextActivitySequenceNumber: UInt32

/// Next sequence number for wait conditions.
var nextConditionSequenceNumber: UInt32

/// Next sequence number for child workflow executions.
var nextChildWorkflowSequenceNumber: UInt32

/// Next sequence number for signaling external workflows (including child workflows).
var nextExternalSignalSequenceNumber: UInt32

/// Number of current running and unfinished handlers.
var numberOfActiveHandlers: Int
Expand Down Expand Up @@ -107,7 +123,11 @@ struct WorkflowStateMachine: ~Copyable {

private var state: State = .active(
.init(
nextSequenceNumber: 0,
nextTimerSequenceNumber: 0,
nextActivitySequenceNumber: 0,
nextConditionSequenceNumber: 0,
nextChildWorkflowSequenceNumber: 0,
nextExternalSignalSequenceNumber: 0,
numberOfActiveHandlers: 0,
isReplaying: false,
now: .now,
Expand Down Expand Up @@ -137,11 +157,51 @@ struct WorkflowStateMachine: ~Copyable {
self.state = state
}

mutating func nextSequenceNumber() -> UInt32 {
mutating func nextTimerSequenceNumber() -> UInt32 {
switch consume self.state {
case .active(var active):
let sequenceNumber = active.nextTimerSequenceNumber
active.nextTimerSequenceNumber += 1
self = .init(state: .active(active))
return sequenceNumber
}
}

mutating func nextActivitySequenceNumber() -> UInt32 {
switch consume self.state {
case .active(var active):
let sequenceNumber = active.nextSequenceNumber
active.nextSequenceNumber += 1
let sequenceNumber = active.nextActivitySequenceNumber
active.nextActivitySequenceNumber += 1
self = .init(state: .active(active))
return sequenceNumber
}
}

mutating func nextConditionSequenceNumber() -> UInt32 {
switch consume self.state {
case .active(var active):
let sequenceNumber = active.nextConditionSequenceNumber
active.nextConditionSequenceNumber += 1
self = .init(state: .active(active))
return sequenceNumber
}
}

mutating func nextChildWorkflowSequenceNumber() -> UInt32 {
switch consume self.state {
case .active(var active):
let sequenceNumber = active.nextChildWorkflowSequenceNumber
active.nextChildWorkflowSequenceNumber += 1
self = .init(state: .active(active))
return sequenceNumber
}
}

mutating func nextExternalSignalSequenceNumber() -> UInt32 {
switch consume self.state {
case .active(var active):
let sequenceNumber = active.nextExternalSignalSequenceNumber
active.nextExternalSignalSequenceNumber += 1
self = .init(state: .active(active))
return sequenceNumber
}
Expand Down Expand Up @@ -341,8 +401,8 @@ struct WorkflowStateMachine: ~Copyable {
) -> UInt32 {
switch consume self.state {
case .active(var active):
let id = active.nextSequenceNumber
active.nextSequenceNumber += 1
let id = active.nextConditionSequenceNumber
active.nextConditionSequenceNumber += 1
active.waitConditionContinuations[id] = (condition, nil)
self = .init(state: .active(active))
return id
Expand Down Expand Up @@ -378,7 +438,7 @@ struct WorkflowStateMachine: ~Copyable {
}

mutating func scheduleActivityExecution(
id: UInt32,
sequenceNumber: UInt32,
activityType: String,
options: ActivityExecutionOptions,
workflowTaskQueue: String,
Expand All @@ -388,13 +448,13 @@ struct WorkflowStateMachine: ~Copyable {
) {
switch consume self.state {
case .active(var active):
active.activityContinuations[id] = continuation
active.activityContinuations[sequenceNumber] = continuation
active.commands.append(
.with {
switch options {
case let .remote(activityOptions):
$0.scheduleActivity = .init(
id: id,
id: sequenceNumber,
activityType: activityType,
workflowTaskQueue: workflowTaskQueue,
headers: headers,
Expand All @@ -403,7 +463,7 @@ struct WorkflowStateMachine: ~Copyable {
)
case let .local(localActivityOptions, attempt, originalScheduleTime):
$0.scheduleLocalActivity = .init(
id: id,
id: sequenceNumber,
activityType: activityType,
headers: headers,
input: input,
Expand All @@ -418,10 +478,10 @@ struct WorkflowStateMachine: ~Copyable {
}
}

mutating func cancelActivity(id: UInt32, isLocal: Bool) {
mutating func cancelActivity(sequenceNumber: UInt32, isLocal: Bool) {
switch consume self.state {
case .active(var active):
guard active.activityContinuations[id] != nil else {
guard active.activityContinuations[sequenceNumber] != nil else {
// we know this is not called before the activity continuation is persisted, therefore, activity was reported as completed
// in the same activation as the workflow was cancelled.
self = .init(state: .active(active))
Expand All @@ -432,11 +492,11 @@ struct WorkflowStateMachine: ~Copyable {
.with {
if isLocal {
$0.requestCancelLocalActivity = .with {
$0.seq = id
$0.seq = sequenceNumber
}
} else {
$0.requestCancelActivity = .with {
$0.seq = id
$0.seq = sequenceNumber
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {

let convertedSummary = try summary.flatMap { try self.payloadConverter.convertValue($0) }

let sequenceNumber = self.stateMachine.nextSequenceNumber()
let sequenceNumber = self.stateMachine.nextTimerSequenceNumber()
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
self.stateMachine.sleep(
Expand Down Expand Up @@ -187,11 +187,11 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {
var options = options
let isLocal = options.isLocal
while true {
let id = self.stateMachine.nextSequenceNumber()
let sequenceNumber = self.stateMachine.nextActivitySequenceNumber()
let result = try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
self.stateMachine.scheduleActivityExecution(
id: id,
sequenceNumber: sequenceNumber,
activityType: activityType,
options: options,
workflowTaskQueue: workflowTaskQueue,
Expand All @@ -203,7 +203,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {
} onCancel: {
// No need to worry about this being called before the actual operation as we check earlier in the method and there are no possible
// suspension points.
self.stateMachine.cancelActivity(id: id, isLocal: isLocal)
self.stateMachine.cancelActivity(sequenceNumber: sequenceNumber, isLocal: isLocal)
}

switch result.status {
Expand Down Expand Up @@ -283,7 +283,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {
)
}

let sequenceNumber = self.stateMachine.nextSequenceNumber()
let sequenceNumber = self.stateMachine.nextChildWorkflowSequenceNumber()
let state = UntypedChildWorkflowHandle.State(resolutionState: .unresolved(sequenceNumber: sequenceNumber))
let (workflowID, firstExecutionRunID) = try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
Expand Down Expand Up @@ -379,7 +379,7 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {
)
}

let sequenceNumber = self.stateMachine.nextSequenceNumber()
let sequenceNumber = self.stateMachine.nextExternalSignalSequenceNumber()
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
self.stateMachine.signalChildWorkflow(
Expand Down
Loading