diff --git a/Package.swift b/Package.swift index c4b916f06..dd92df84e 100644 --- a/Package.swift +++ b/Package.swift @@ -193,7 +193,7 @@ var dependencies: [Package.Dependency] = [ // ~~~ Swift libraries ~~~ .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0-beta"), - .package(url: "https://github.com/apple/swift-collections", from: "1.0.5"), + .package(url: "https://github.com/apple/swift-collections", from: "1.1.0"), // ~~~ Observability ~~~ .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), diff --git a/Sources/DistributedCluster/ClusterSystem.swift b/Sources/DistributedCluster/ClusterSystem.swift index dbdd9ee15..0b853e039 100644 --- a/Sources/DistributedCluster/ClusterSystem.swift +++ b/Sources/DistributedCluster/ClusterSystem.swift @@ -1070,7 +1070,7 @@ extension ClusterSystem { } // Spawn a behavior actor for it: - let behavior = InvocationBehavior.behavior(instance: Weak(actor)) + let behavior = InvocationBehavior.behavior(instance: WeakLocalRef(actor)) let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) // Store references diff --git a/Sources/DistributedCluster/Concurrency/_ClusterCancellableCheckedContinuation.swift b/Sources/DistributedCluster/Concurrency/_ClusterCancellableCheckedContinuation.swift new file mode 100644 index 000000000..0b13ce770 --- /dev/null +++ b/Sources/DistributedCluster/Concurrency/_ClusterCancellableCheckedContinuation.swift @@ -0,0 +1,134 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedActorsConcurrencyHelpers +import NIOConcurrencyHelpers + +/// A checked continuation that offers easier APIs for working with cancellation, +/// as well as has its unique identity. +internal final class ClusterCancellableCheckedContinuation: Hashable, @unchecked Sendable where Success: Sendable { + private struct _State: Sendable { + var cancelled: Bool = false + var onCancel: (@Sendable (ClusterCancellableCheckedContinuation) -> Void)? + var continuation: CheckedContinuation? + } + + private let state: NIOLockedValueBox<_State> = .init(_State()) + + fileprivate init() {} + + func setContinuation(_ continuation: CheckedContinuation) -> Bool { + var alreadyCancelled = false + self.state.withLockedValue { state in + if state.cancelled { + alreadyCancelled = true + } else { + state.continuation = continuation + } + } + if alreadyCancelled { + continuation.resume(throwing: CancellationError()) + } + return !alreadyCancelled + } + + /// Register a cancellation handler, or call it immediately if the continuation was already cancelled. + @Sendable + func onCancel(handler: @Sendable @escaping (ClusterCancellableCheckedContinuation) -> Void) { + var alreadyCancelled: Bool = self.state.withLockedValue { state in + if state.cancelled { + return true + } + + state.onCancel = handler + return false + } + if alreadyCancelled { + handler(self) + } + } + + private func withContinuation(cancelled: Bool = false, _ operation: (CheckedContinuation) -> Void) { + var safeContinuation: CheckedContinuation? + var safeOnCancel: (@Sendable (ClusterCancellableCheckedContinuation) -> Void)? + self.state.withLockedValue { (state: inout _State) -> Void in + state.cancelled = state.cancelled || cancelled + safeContinuation = state.continuation + safeOnCancel = state.onCancel + state.continuation = nil + state.onCancel = nil + } + if let safeContinuation { + operation(safeContinuation) + } + if cancelled { + safeOnCancel?(self) + } + } + + func resume(returning value: Success) { + self.withContinuation { + $0.resume(returning: value) + } + } + + func resume(throwing error: any Error) { + self.withContinuation { + $0.resume(throwing: error) + } + } + + var isCancelled: Bool { + self.state.withLockedValue { $0.cancelled } + } + + func cancel() { + self.withContinuation(cancelled: true) { + $0.resume(throwing: CancellationError()) + } + } +} + +extension ClusterCancellableCheckedContinuation where Success == Void { + func resume() { + self.resume(returning: ()) + } +} + +extension ClusterCancellableCheckedContinuation { + static func == (lhs: ClusterCancellableCheckedContinuation, rhs: ClusterCancellableCheckedContinuation) -> Bool { + ObjectIdentifier(lhs) == ObjectIdentifier(rhs) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self)) + } +} + +func _withClusterCancellableCheckedContinuation( + of successType: Success.Type = Success.self, + _ body: @escaping (ClusterCancellableCheckedContinuation) -> Void, + function: String = #function +) async throws -> Success where Success: Sendable { + let cccc = ClusterCancellableCheckedContinuation() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation(function: function) { continuation in + if cccc.setContinuation(continuation) { + body(cccc) + } + } + } onCancel: { + cccc.cancel() + } +} diff --git a/Sources/DistributedCluster/InvocationBehavior.swift b/Sources/DistributedCluster/InvocationBehavior.swift index 94e700ac6..5246a0f1b 100644 --- a/Sources/DistributedCluster/InvocationBehavior.swift +++ b/Sources/DistributedCluster/InvocationBehavior.swift @@ -36,7 +36,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { // FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957) enum InvocationBehavior { - static func behavior(instance weakInstance: Weak) -> _Behavior { + static func behavior(instance weakInstance: WeakLocalRef) -> _Behavior { _Behavior.setup { context in ._receiveMessageAsync { (message) async throws -> _Behavior in guard let _ = weakInstance.actor else { diff --git a/Sources/DistributedCluster/Pattern/WorkerPool.swift b/Sources/DistributedCluster/Pattern/WorkerPool.swift index 8d3cfb077..0ec8f39f3 100644 --- a/Sources/DistributedCluster/Pattern/WorkerPool.swift +++ b/Sources/DistributedCluster/Pattern/WorkerPool.swift @@ -14,13 +14,15 @@ import Distributed import Logging +import OrderedCollections // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Worker -public protocol DistributedWorker: DistributedActor { - associatedtype WorkItem: Codable - associatedtype WorkResult: Codable +/// Protocol to be implemented by workers participating in a simple ``WorkerPool``. +public protocol DistributedWorker: DistributedActor where ActorSystem == ClusterSystem { + associatedtype WorkItem: Codable & Sendable + associatedtype WorkResult: Codable & Sendable distributed func submit(work: WorkItem) async throws -> WorkResult } @@ -30,21 +32,30 @@ public protocol DistributedWorker: DistributedActor { /// A `WorkerPool` represents a pool of actors that are all equally qualified to handle incoming work items. /// -/// Pool members may be local or remote, // TODO: and there should be ways to say "prefer local or something". -/// /// A pool can populate its member list using the `Receptionist` mechanism, and thus allows members to join and leave /// dynamically, e.g. if a node joins or removes itself from the cluster. -/// -// TODO: A pool can be configured to terminate itself when any of its workers terminate or attempt to spawn replacements. public distributed actor WorkerPool: DistributedWorker, LifecycleWatch, CustomStringConvertible where Worker.ActorSystem == ClusterSystem { public typealias ActorSystem = ClusterSystem public typealias WorkItem = Worker.WorkItem public typealias WorkResult = Worker.WorkResult + lazy var log = Logger(actor: self) + // Don't store `WorkerPoolSettings` or `Selector` because it would cause `WorkerPool` // to hold on to `Worker` references and prevent them from getting terminated. - private let whenAllWorkersTerminated: AllWorkersTerminatedDirective - private let logLevel: Logger.Level + private var whenAllWorkersTerminated: AllWorkersTerminatedDirective { + self.settings.whenAllWorkersTerminated + } + + private var logLevel: Logger.Level { + self.settings.logLevel + } + + private var strategy: WorkerPool.Strategy { + self.settings.strategy + } + + private let settings: WorkerPoolSettings /// `Task` for subscribing to receptionist listing in case of `Selector.dynamic` mode. private var newWorkersSubscribeTask: Task? @@ -53,13 +64,13 @@ public distributed actor WorkerPool: DistributedWorke // MARK: WorkerPool state /// The worker pool. Worker will be selected round-robin. - private var workers: [Worker.ID: Weak] = [:] + private var workers: OrderedSet> = [] private var roundRobinPos = 0 /// Boolean flag to help determine if pool becomes empty because at least one worker has terminated. private var hasTerminatedWorkers = false /// Control for waiting and getting notified for new worker. - private var newWorkerContinuations: [CheckedContinuation] = [] + private var newWorkerContinuations: OrderedSet> = [] public init(selector: Selector, actorSystem: ActorSystem) async throws { try await self.init(settings: .init(selector: selector), actorSystem: actorSystem) @@ -69,16 +80,23 @@ public distributed actor WorkerPool: DistributedWorke try settings.validate() self.actorSystem = system - self.whenAllWorkersTerminated = settings.whenAllWorkersTerminated - self.logLevel = settings.logLevel + self.settings = settings switch settings.selector.underlying { case .dynamic(let key): self.newWorkersSubscribeTask = Task { for await worker in await self.actorSystem.receptionist.listing(of: key) { - self.actorSystem.log.log(level: self.logLevel, "Got listing member for \(key): \(worker)") - self.workers[worker.id] = Weak(worker) + self.workers.append(WeakLocalRef(worker)) + // Notify those waiting for new worker + log.log( + level: self.logLevel, + "Updated workers for \(key)", + metadata: [ + "workers": "\(self.workers)", + "newWorkerContinuations": "\(self.newWorkerContinuations.count)", + ] + ) for (i, continuation) in self.newWorkerContinuations.enumerated().reversed() { continuation.resume() self.newWorkerContinuations.remove(at: i) @@ -87,9 +105,10 @@ public distributed actor WorkerPool: DistributedWorke } } case .static(let workers): - for worker in workers { - self.workers[worker.id] = Weak(worker) - watchTermination(of: worker) + self.workers.reserveCapacity(workers.count) + self.workers = .init(workers) + for actor in workers.compactMap(\.actor) { + watchTermination(of: actor) } } } @@ -99,55 +118,101 @@ public distributed actor WorkerPool: DistributedWorke } public distributed func submit(work: WorkItem) async throws -> WorkResult { - let worker = try await self.selectWorker() - self.actorSystem.log.log(level: self.logLevel, "Submitting [\(work)] to [\(worker)]") + self.log.log( + level: self.logLevel, + "Incoming work, selecting worker", + metadata: [ + "workers/count": "\(self.size())", + "worker/item": "\(work)", + ] + ) + let worker = try await self.selectWorker(for: work) + self.log.log( + level: self.logLevel, + "Selected worker, submitting [\(work)] to [\(worker)]", + metadata: [ + "worker": "\(worker.id)", + "workers/count": "\(self.size())", + ] + ) return try await worker.submit(work: work) } - // FIXME: make this a computed property instead when https://github.com/apple/swift/pull/42321 is in - internal distributed func size() async throws -> Int { + // FIXME: there is an issue in latest Swift (6.2 nightlies) with Cxx interop, change back to computed properties when fixed. + internal distributed func size() -> Int { self.workers.count } - private func selectWorker() async throws -> Worker { + private func selectWorker(for work: WorkItem) async throws -> Worker { // Wait if we haven't received the initial workers listing yet. // Otherwise, the pool has become empty because all workers have been terminated, // in which case we either wait for new worker or throw error. if self.workers.isEmpty { switch (self.hasTerminatedWorkers, self.whenAllWorkersTerminated) { - case (false, _), (true, .awaitNewWorkers): - self.actorSystem.log.log(level: self.logLevel, "Worker pool is empty, waiting for new worker.") - await withCheckedContinuation { (continuation: CheckedContinuation) in - self.newWorkerContinuations.append(continuation) + case (false, _), // if we never received any workers yet, wait for some to show up. + (true, .awaitNewWorkers): + self.log.log(level: self.logLevel, "Worker pool is empty, waiting for new worker.") + + try await _withClusterCancellableCheckedContinuation(of: Void.self) { cccc in + self.newWorkerContinuations.append(cccc) + let log = self.log + cccc.onCancel { cccc in + log.debug("Member selection was cancelled, call probably timed-out, schedule removal of continuation") + cccc.resume(throwing: CancellationError()) + Task { + await self.removeWorkerWaitContinuation(cccc) + } + } } case (true, .throw(let error)): throw error } } - let selectedWorkerID = self.nextWorkerID() - if let worker = self.workers[selectedWorkerID]?.actor { - return worker - } else { - // Worker terminated; clean up and try again - self.terminated(actor: selectedWorkerID) - return try await self.selectWorker() + guard let selected = nextWorker() else { + switch self.whenAllWorkersTerminated { + case .awaitNewWorkers: + // try again + return try await self.selectWorker(for: work) + case .throw(let error): + throw error + } + } + + guard let selectedWorker = selected.actor else { + self.log.debug("Selected actor has deallocated: \(selected.id)!") + // remove this actor from the pool + self.terminated(actor: selected.id) + // and, try again + return try await self.selectWorker(for: work) } + + return selectedWorker } - private func nextWorkerID() -> Worker.ID { - var ids = Array(self.workers.keys) - ids.sort { l, r in l.description < r.description } + private func removeWorkerWaitContinuation(_ cccc: ClusterCancellableCheckedContinuation) { + self.newWorkerContinuations.remove(cccc) + } - let selected = ids[self.roundRobinPos] - self.roundRobinPos = (self.roundRobinPos + 1) % ids.count - return selected + private func nextWorker() -> WeakLocalRef? { + switch self.strategy.underlying { + case .random: + return self.workers.shuffled().first + case .simpleRoundRobin: + if self.roundRobinPos >= self.size() { + self.roundRobinPos = 0 // loop around from zero + } + let selected = self.workers[self.roundRobinPos] + self.roundRobinPos = self.workers.index(after: self.roundRobinPos) % self.size() + return selected + } } public func terminated(actor id: Worker.ID) { - self.workers.removeValue(forKey: id) + self.log.debug("Worker terminated: \(id)", metadata: ["worker": "\(id)"]) + self.workers.remove(WeakLocalRef(forRemoval: id)) self.hasTerminatedWorkers = true - self.roundRobinPos = 0 + self.roundRobinPos = 0 // FIXME: naively reset the round robin counter; we should do better than that } public nonisolated var description: String { @@ -173,9 +238,9 @@ extension WorkerPool { /// of members to be statically provided etc. public struct Selector { enum _Selector { - // TODO: let awaitAtLeast: Int // before starting to direct traffic case dynamic(DistributedReception.Key) - case `static`([Worker]) + /// Should be array of WeakLocalRefs not to create strong references to local actors + case `static`([WeakLocalRef]) } let underlying: _Selector @@ -196,7 +261,32 @@ extension WorkerPool { /// You may death-watch the worker pool in order to react to this situation, e.g. by spawning a replacement pool, /// or gracefully shutting down your application. public static func `static`(_ workers: [Worker]) -> Selector { - .init(underlying: .static(workers)) + .init(underlying: .static(workers.map(WeakLocalRef.init))) + } + } + + public struct Strategy { + enum _Strategy { + case random + case simpleRoundRobin + } + + let underlying: _Strategy + + /// Simple random selection on every target worker selection. + public static var random: Strategy { + .init(underlying: .random) + } + + /// Round-robin strategy which attempts to go "around" known workers one-by-one + /// giving them equal amounts of work. This strategy is NOT strict, and when new + /// workers arrive at the pool it may result in submitting work to previously notified + /// workers as the round-robin strategy "resets". + /// + /// We could consider implementing a strict round robin strategy which remains strict even + /// as new workers arrive in the pool. + public static var simpleRoundRobin: Strategy { + .init(underlying: .simpleRoundRobin) } } } @@ -249,16 +339,24 @@ public struct WorkerPoolSettings where Worker.ActorSy /// Configures how to select / discover actors for the pool. var selector: WorkerPool.Selector + /// Configures how the "next" worker is determined for submitting a work request. + /// Generally random strategies or a form of round robin are preferred, but we + /// could implement more sophisticated workload balancing/estimating strategies as well. + /// + /// Defaults to a simple round-robin strategy. + var strategy: WorkerPool.Strategy + /// Determine what action should be taken once the number of alive workers in the pool reaches zero (after being positive for at least a moment). /// /// The default value depends on the `selector` and is: /// - `.crash` for the `.static` selector, /// - `.awaitNewWorkers` for the `.dynamic` selector, as it is assumed that replacement workers will likely be spawned - // in place of terminated workers. Messages sent to the pool while no workers are available will be buffered (up to `noWorkersAvailableBufferSize` messages). + /// in place of terminated workers. Messages sent to the pool while no workers are available will be buffered (up to `noWorkersAvailableBufferSize` messages). var whenAllWorkersTerminated: WorkerPool.AllWorkersTerminatedDirective - public init(selector: WorkerPool.Selector) { + public init(selector: WorkerPool.Selector, strategy: WorkerPool.Strategy = .random) { self.selector = selector + self.strategy = strategy switch selector.underlying { case .dynamic: diff --git a/Sources/DistributedCluster/WeakActorDictionary.swift b/Sources/DistributedCluster/WeakActorDictionary.swift index 08c86ae9b..b2d8abc54 100644 --- a/Sources/DistributedCluster/WeakActorDictionary.swift +++ b/Sources/DistributedCluster/WeakActorDictionary.swift @@ -15,21 +15,9 @@ import Distributed /// A dictionary which only weakly retains the -public struct WeakActorDictionary: ExpressibleByDictionaryLiteral +public struct WeakLocalRefDictionary: ExpressibleByDictionaryLiteral where Act.ID == ClusterSystem.ActorID { - var underlying: [ClusterSystem.ActorID: WeakContainer] - - final class WeakContainer { - weak var actor: Act? - - init(_ actor: Act) { - self.actor = actor - } - - // init(idForRemoval id: ClusterSystem.ActorID) { - // self.actor = nil - // } - } + var underlying: [ClusterSystem.ActorID: WeakLocalRef] /// Initialize an empty dictionary. public init() { @@ -49,10 +37,8 @@ where Act.ID == ClusterSystem.ActorID { /// Note that the dictionary only holds the actor weakly, /// so if no other strong references to the actor remain this dictionary /// will not contain the actor anymore. - /// - /// - Parameter actor: public mutating func insert(_ actor: Act) { - self.underlying[actor.id] = WeakContainer(actor) + self.underlying[actor.id] = WeakLocalRef(actor) } public mutating func getActor(identifiedBy id: ClusterSystem.ActorID) -> Act? { @@ -84,10 +70,6 @@ public struct WeakAnyDistributedActorDictionary { init(_ actor: Act) where Act.ID == ClusterSystem.ActorID { self.actor = actor } - - init(idForRemoval id: ClusterSystem.ActorID) { - self.actor = nil - } } public init() { @@ -118,14 +100,73 @@ public struct WeakAnyDistributedActorDictionary { } } -final class Weak { - weak var actor: Act? +/// Distributed actor reference helper which avoids strongly retaining local actors, +/// in order no to accidentally extend their lifetimes. Specifically very useful +/// when designing library code which should NOT keep user-actors alive, and should +/// work with remote and local actors in the same way. +/// +/// The reference is *weak* when the actor is **local**, +/// in order to not prevent the actor from being deallocated when all **other** +/// references to it are released. +/// +/// The reference is *strong* when referencing a **remote** distributed actor, +/// a strong reference to a remote actor does not necessarily keep it alive, +/// however it allows keeping `weak var` references to remote distributed actors +/// without them being immediately released if they were obtained from a `resolve` +/// call for example -- as by design, no-one else will be retaining them, there +/// is a risk of always observing an immediately released reference. +/// +/// Rather than relying on reference counting for remote references, utilize the +/// `LifecycleWatch/watchTermination(of:)` lifecycle monitoring method. This +/// mechanism will invoke an actors ``LifecycleWatch/actorTerminated(_:)` when +/// the remote actor has terminated and we should clean it up locally. +/// +/// Generally, the pattern should be to store actor references local-weakly +/// when we "don't want to keep them alive" on behalf of the user, and at the +/// same time always use ``LifecycleMonitoring`` for handling their lifecycle - +/// regardless if the actor is local or remote, lifecycle monitoring behaves +/// in the expected way. +final class WeakLocalRef: Hashable where Act.ID == ClusterSystem.ActorID { + let id: Act.ID + + private weak var weakLocalRef: Act? + private let strongRemoteRef: Act? + + var actor: Act? { + self.strongRemoteRef ?? self.weakLocalRef + } init(_ actor: Act) { - self.actor = actor + if isDistributedKnownRemote(actor) { + self.weakLocalRef = nil + self.strongRemoteRef = actor + } else { + self.weakLocalRef = actor + self.strongRemoteRef = nil + } + self.id = actor.id } - init(idForRemoval id: ClusterSystem.ActorID) { - self.actor = nil + init(forRemoval id: ClusterSystem.ActorID) { + self.weakLocalRef = nil + self.strongRemoteRef = nil + self.id = id + } + + func hash(into hasher: inout Hasher) { + hasher.combine(self.id) + } + + static func == (lhs: WeakLocalRef, rhs: WeakLocalRef) -> Bool { + if lhs === rhs { + return true + } + if lhs.id != rhs.id { + return false + } + return true } } + +@_silgen_name("swift_distributed_actor_is_remote") +internal func isDistributedKnownRemote(_ actor: AnyObject) -> Bool diff --git a/Sources/DistributedCluster/_ActorNaming.swift b/Sources/DistributedCluster/_ActorNaming.swift index c2105b77a..7d414e747 100644 --- a/Sources/DistributedCluster/_ActorNaming.swift +++ b/Sources/DistributedCluster/_ActorNaming.swift @@ -65,9 +65,9 @@ extension _ActorNaming { } /// Used while spawning actors to identify how its name should be created. -public struct _ActorNaming: ExpressibleByStringLiteral, ExpressibleByStringInterpolation { +public struct _ActorNaming: ExpressibleByStringLiteral, ExpressibleByStringInterpolation, Hashable { // We keep an internal enum, but do not expose it as we may want to add more naming strategies in the future? - internal enum _Naming { + internal enum _Naming: Hashable { case unique(String) // case uniqueNumeric(NumberingScheme) case prefixed(prefix: String, suffixScheme: SuffixScheme) diff --git a/Tests/DistributedClusterTests/Pattern/WorkerPoolTests.swift b/Tests/DistributedClusterTests/Pattern/WorkerPoolTests.swift index 4458a4ca9..9e46b6681 100644 --- a/Tests/DistributedClusterTests/Pattern/WorkerPoolTests.swift +++ b/Tests/DistributedClusterTests/Pattern/WorkerPoolTests.swift @@ -24,7 +24,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { func test_workerPool_registerNewlyStartedActors() async throws { let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") - let settings = WorkerPoolSettings(selector: .dynamic(workerKey)) + let settings = WorkerPoolSettings(selector: .dynamic(workerKey), strategy: .simpleRoundRobin) let workers = try await WorkerPool(settings: settings, actorSystem: system) let pA: ActorTestProbe = self.testKit.makeTestProbe("pA") @@ -40,8 +40,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { workerB.id: pB, workerC.id: pC, ] - // Workers are sorted by id then selected round-robin - let sortedWorkerIDs = Array(workerProbes.keys).sorted() + let workerIDs = [workerA.id, workerB.id, workerC.id] // Wait for all workers to be registered with the receptionist let finished = expectation(description: "all workers available") @@ -61,7 +60,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { _ = try await workers.submit(work: "\(i)") // We are submitting more work than there are workers - let workerID = sortedWorkerIDs[i % workerProbes.count] + let workerID = workerIDs[i % workerIDs.count] guard let probe = workerProbes[workerID] else { throw testKit.fail("Missing test probe for worker \(workerID)") } @@ -70,11 +69,9 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } func test_workerPool_dynamic_removeDeadActors() async throws { - throw XCTSkip("!!! Skipping test \(#function) !!!") // FIXME(distributed): Pending fix for #831 to be able to terminate worker by setting it to nil - let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") - let workers = try await WorkerPool(selector: .dynamic(workerKey), actorSystem: system) + let workers = try await WorkerPool(settings: .init(selector: .dynamic(workerKey), strategy: .simpleRoundRobin), actorSystem: system) let pA: ActorTestProbe = self.testKit.makeTestProbe("pA") let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") @@ -90,8 +87,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { workerB!.id: pB, workerC!.id: pC, ] - // Workers are sorted by id then selected round-robin - var sortedWorkerIDs = Array(workerProbes.keys).sorted() + var workerIDs = [workerA!.id, workerB!.id, workerC!.id] // Wait for all workers to be registered with the receptionist let finished = expectation(description: "all workers available") @@ -110,7 +106,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { for i in 0...2 { _ = try await workers.submit(work: "all-available-\(i)") - let workerID = sortedWorkerIDs[i] + let workerID = workerIDs[i % workerIDs.count] guard let probe = workerProbes[workerID] else { throw testKit.fail("Missing test probe for worker \(workerID)") } @@ -118,7 +114,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } // Terminate workerA - sortedWorkerIDs.removeAll { $0 == workerA!.id } + workerIDs.removeAll { $0 == workerA!.id } workerA = nil try pA.expectMessage("Greeter deinit") @@ -128,7 +124,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { // We cannot be certain how round-robin position gets reset after A's termination, // so we don't enforce index check here. - let maybeGotItResults = try sortedWorkerIDs.compactMap { + let maybeGotItResults = try workerIDs.compactMap { guard let probe = workerProbes[$0] else { throw testKit.fail("Missing test probe for worker \($0)") } @@ -164,22 +160,29 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { var workerB: Greeter? = Greeter(probe: pB, actorSystem: self.system) var workerC: Greeter? = Greeter(probe: pC, actorSystem: self.system) + var workers = [workerA!, workerB!, workerC!] + let workerIDs = workers.map(\.id) + // !-safe since we initialize workers above - let workers = try await WorkerPool(settings: .init(selector: .static([workerA!, workerB!, workerC!])), actorSystem: system) + let workerPool = try await WorkerPool( + settings: .init( + selector: .static(workers), + strategy: .simpleRoundRobin + ), + actorSystem: system + ) let workerProbes: [ClusterSystem.ActorID: ActorTestProbe] = [ workerA!.id: pA, workerB!.id: pB, workerC!.id: pC, ] - // Workers are sorted by id then selected round-robin - var sortedWorkerIDs = Array(workerProbes.keys).sorted() // Submit work with all workers available for i in 0...2 { - _ = try await workers.submit(work: "all-available-\(i)") + _ = try await workerPool.submit(work: "all-available-\(i)") - let workerID = sortedWorkerIDs[i] + let workerID = workerIDs[i % workerIDs.count] guard let probe = workerProbes[workerID] else { throw testKit.fail("Missing test probe for worker \(workerID)") } @@ -187,17 +190,17 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } // Terminate workerA - sortedWorkerIDs.removeAll { $0 == workerA!.id } + workers.removeFirst() workerA = nil try pA.expectMessage("Greeter deinit") // The remaining workers should take over for i in 0...2 { - _ = try await workers.submit(work: "after-A-dead-\(i)") + _ = try await workerPool.submit(work: "after-A-dead-\(i)") // We cannot be certain how round-robin position gets reset after A's termination, // so we don't enforce index check here. - let maybeGotItResults = try sortedWorkerIDs.compactMap { + let maybeGotItResults = try workerIDs.compactMap { guard let probe = workerProbes[$0] else { throw testKit.fail("Missing test probe for worker \($0)") } @@ -210,14 +213,16 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } // Terminate the rest of the workers + workers.removeFirst() workerB = nil try pB.expectMessage("Greeter deinit") + workers.removeFirst() workerC = nil try pC.expectMessage("Greeter deinit") // WorkerPool now throws error on new work submission let error = try await shouldThrow { - _ = try await workers.submit(work: "after-all-dead") + _ = try await workerPool.submit(work: "after-all-dead") } guard let workerPoolError = error as? WorkerPoolError, case .staticPoolExhausted(let errorMessage) = workerPoolError.underlying.error else { @@ -236,34 +241,93 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } errorMessage.shouldContain("Illegal empty collection passed to `.static` worker pool") } -} -private distributed actor Greeter: DistributedWorker { - typealias ID = ClusterSystem.ActorID - typealias ActorSystem = ClusterSystem - typealias WorkItem = String - typealias WorkResult = String + func test_workerPool_testRemoteActorReferencesAreHandledProperly() async throws { + let (local, remote) = await self.setUpPair { + $0.enabled = true + } + let testKit = ActorTestKit(local) + try await self.joinNodes(node: local, with: remote) + + let workerKey = DistributedReception.Key(id: "request-workers") + let workers = try await WorkerPool( + settings: WorkerPoolSettings( + selector: .dynamic(workerKey), + strategy: .simpleRoundRobin + ), + actorSystem: local + ) + let pA: ActorTestProbe = testKit.makeTestProbe("pA") + let pB: ActorTestProbe = testKit.makeTestProbe("pB") + let pC: ActorTestProbe = testKit.makeTestProbe("pC") + + let workerA = await Greeter(probe: pA, actorSystem: remote, key: workerKey) + let workerB = await Greeter(probe: pB, actorSystem: remote, key: workerKey) + let workerC = await Greeter(probe: pC, actorSystem: remote, key: workerKey) - let probe: ActorTestProbe + let workerProbes: [ClusterSystem.ActorID: ActorTestProbe] = [ + workerA.id: pA, + workerB.id: pB, + workerC.id: pC, + ] + let workerIDs = [workerA.id, workerB.id, workerC.id] - init(probe: ActorTestProbe, actorSystem: ActorSystem) { - self.actorSystem = actorSystem - self.probe = probe - } + // Wait for all workers to be registered with the receptionist + let finished = expectation(description: "all workers available") + Task { + while true { + if try await workers.size() == workerProbes.count { + break + } + try await Task.sleep(nanoseconds: 100_000_000) + } + finished.fulfill() + } + await fulfillment(of: [finished], timeout: 3.0) - init(probe: ActorTestProbe, actorSystem: ActorSystem, key: DistributedReception.Key) async { - self.actorSystem = actorSystem - self.probe = probe - await self.actorSystem.receptionist.checkIn(self, with: key) - } + // Submit work with all workers available + for i in 0...7 { + _ = try await workers.submit(work: "\(i)") - deinit { - self.probe.tell("Greeter deinit") + // We are submitting more work than there are workers + let workerID = workerIDs[i % workerIDs.count] + guard let probe = workerProbes[workerID] else { + throw testKit.fail("Missing test probe for worker \(workerID)") + } + try probe.expectMessage("work:\(i) at \(workerID)") + } } +} + +extension WorkerPoolTests { + /// Distributed actors for receptionist should be non-private, otherwise `SerializationError(.unableToSummonTypeFromManifest)` will kick in for remote calls + distributed actor Greeter: DistributedWorker { + typealias ID = ClusterSystem.ActorID + typealias ActorSystem = ClusterSystem + typealias WorkItem = String + typealias WorkResult = String + + let probe: ActorTestProbe + + init(probe: ActorTestProbe, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.probe = probe + } - distributed func submit(work: WorkItem) async throws -> WorkResult { - self.probe.tell("work:\(work) at \(self.id)") - return "hello \(work)" + init(probe: ActorTestProbe, actorSystem: ActorSystem, key: DistributedReception.Key) async { + self.actorSystem = actorSystem + self.probe = probe + await self.actorSystem.receptionist.checkIn(self, with: key) + } + + deinit { + self.probe.tell("Greeter deinit") + } + + distributed public func submit(work: WorkItem) async throws -> WorkResult { + self.probe.tell("work:\(work) at \(self.id)") + return "hello \(work)" + } } }