diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index 5457cb110..098892575 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -1011,7 +1011,7 @@ extension ClusterSystem { } // Spawn a behavior actor for it: - let behavior = InvocationBehavior.behavior(instance: Weak(actor)) + let behavior = InvocationBehavior.behavior(instance: DistributedActorRef.Weak(actor)) let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) // Store references @@ -1077,6 +1077,36 @@ extension ClusterSystem { } } +extension ClusterSystem { + func _isAssigned(id: ActorID) -> Bool { + self.namingLock.withLock { + // Are we in the middle of initializing an actor with this ActorID? + if self._reservedNames.contains(id) { + return true + } + + // Do we have a known, managed, distributed actor with this ActorID? + if self._managedDistributedActors.get(identifiedBy: id) != nil { + return true + } + + // Maybe it is a well-known actor? Those we store separately (and with strong ref)? + if let wellKnownName = id.metadata.wellKnown { + if self._managedWellKnownDistributedActors[wellKnownName] != nil { + return true + } + } + + // well, maybe it's an ActorRef after all? + if self._managedRefs[id] != nil { + return true + } + + return false + } + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Intercepting calls diff --git a/Sources/DistributedActors/WeakActorDictionary.swift b/Sources/DistributedActors/DistributedActorRefs.swift similarity index 53% rename from Sources/DistributedActors/WeakActorDictionary.swift rename to Sources/DistributedActors/DistributedActorRefs.swift index f32e608fe..a9ec9c460 100644 --- a/Sources/DistributedActors/WeakActorDictionary.swift +++ b/Sources/DistributedActors/DistributedActorRefs.swift @@ -18,19 +18,7 @@ import Distributed public struct WeakActorDictionary: ExpressibleByDictionaryLiteral where Act.ID == ClusterSystem.ActorID { - var underlying: [ClusterSystem.ActorID: WeakContainer] - - final class WeakContainer { - weak var actor: Act? - - init(_ actor: Act) { - self.actor = actor - } - -// init(idForRemoval id: ClusterSystem.ActorID) { -// self.actor = nil -// } - } + var underlying: [ClusterSystem.ActorID: DistributedActorRef.Weak] /// Initialize an empty dictionary. public init() { @@ -38,11 +26,12 @@ public struct WeakActorDictionary: ExpressibleByDictionar } public init(dictionaryLiteral elements: (Act.ID, Act)...) { - self.underlying = [:] - self.underlying.reserveCapacity(elements.count) + var dict: [ClusterSystem.ActorID: DistributedActorRef.Weak] = [:] + dict.reserveCapacity(elements.count) for (id, actor) in elements { - self.underlying[id] = .init(actor) + dict[id] = .init(actor) } + self.underlying = dict } /// Insert the passed in actor into the dictionary. @@ -53,16 +42,16 @@ public struct WeakActorDictionary: ExpressibleByDictionar /// /// - Parameter actor: public mutating func insert(_ actor: Act) { - self.underlying[actor.id] = WeakContainer(actor) + self.underlying[actor.id] = DistributedActorRef.Weak(actor) } public mutating func getActor(identifiedBy id: ClusterSystem.ActorID) -> Act? { - guard let container = underlying[id] else { + guard let ref = underlying[id] else { // unknown id return nil } - guard let knownActor = container.actor else { + guard let knownActor = ref.actor else { // the actor was released -- let's remove the container while we're here _ = self.removeActor(identifiedBy: id) return nil @@ -118,15 +107,69 @@ public struct WeakAnyDistributedActorDictionary { return knownActor } } +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Weak distributed actor reference wrapper helpers -final class Weak { - weak var actor: Act? +enum DistributedActorRef { + + /// Wrapper class for weak `distributed actor` references. + /// + /// Allows for weak storage of distributed actor references inside collections, + /// although those collections need to be manually cleared from dead references. + /// + public final class Weak: CustomStringConvertible { + private weak var weakRef: Act? + + public init(_ actor: Act) { + self.weakRef = actor + } - init(_ actor: Act) { - self.actor = actor + public var actor: Act? { + self.weakRef + } + + public var description: String { + let isLocalStr: String + if let actor = self.actor { + isLocalStr = "\(__isLocalActor(actor))" + } else { + isLocalStr = "unknown/released" + } + + return "DistributedActorRef.Weak(\(self.actor, orElse: "nil"), isLocal: \(isLocalStr))" + } } - init(idForRemoval id: ClusterSystem.ActorID) { - self.actor = nil + /// Wrapper class for weak `distributed actor` references. + /// + /// Allows for weak storage of distributed actor references inside collections, + /// although those collections need to be manually cleared from dead references. + /// + public final class WeakWhenLocal: CustomStringConvertible { + private weak var weakLocalRef: Act? + private let remoteRef: Act? + + public init(_ actor: Act) { + if __isRemoteActor(actor) { + self.remoteRef = actor + self.weakLocalRef = nil + } else { + self.remoteRef = nil + self.weakLocalRef = actor + } + + assert((self.remoteRef == nil && self.weakLocalRef != nil) || + (self.remoteRef != nil && self.weakLocalRef == nil), + "Only a single var may hold the actor: remote: \(self.remoteRef, orElse: "nil"), \(self.weakLocalRef, orElse: "nil")") + } + + public var actor: Act? { + remoteRef ?? weakLocalRef + } + + public var description: String { + "DistributedActorRef.WeakWhenLocal(\(self.actor, orElse: "nil"), isLocal: \(self.remoteRef == nil))" + } } -} + +} \ No newline at end of file diff --git a/Sources/DistributedActors/InvocationBehavior.swift b/Sources/DistributedActors/InvocationBehavior.swift index f641f65d2..687535d03 100644 --- a/Sources/DistributedActors/InvocationBehavior.swift +++ b/Sources/DistributedActors/InvocationBehavior.swift @@ -34,7 +34,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { // FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957) enum InvocationBehavior { - static func behavior(instance weakInstance: Weak) -> _Behavior { + static func behavior(instance weakInstance: DistributedActorRef.Weak) -> _Behavior { return _Behavior.setup { context in return ._receiveMessageAsync { (message) async throws -> _Behavior in guard let instance = weakInstance.actor else { diff --git a/Sources/DistributedActors/Pattern/WorkerPool.swift b/Sources/DistributedActors/Pattern/WorkerPool.swift index 166d6a5e0..e6974d449 100644 --- a/Sources/DistributedActors/Pattern/WorkerPool.swift +++ b/Sources/DistributedActors/Pattern/WorkerPool.swift @@ -53,7 +53,7 @@ public distributed actor WorkerPool: DistributedWorke // MARK: WorkerPool state /// The worker pool. Worker will be selected round-robin. - private var workers: [Worker.ID: Weak] = [:] + private var workers: [Worker.ID: DistributedActorRef.Weak] = [:] private var roundRobinPos = 0 /// Boolean flag to help determine if pool becomes empty because at least one worker has terminated. @@ -78,7 +78,7 @@ public distributed actor WorkerPool: DistributedWorke self.newWorkersSubscribeTask = Task { for await worker in await self.actorSystem.receptionist.listing(of: key) { self.actorSystem.log.log(level: self.logLevel, "Got listing member for \(key): \(worker)") - self.workers[worker.id] = Weak(worker) + self.workers[worker.id] = DistributedActorRef.Weak(worker) // Notify those waiting for new worker for (i, continuation) in self.newWorkerContinuations.enumerated().reversed() { continuation.resume() @@ -89,7 +89,7 @@ public distributed actor WorkerPool: DistributedWorke } case .static(let workers): workers.forEach { worker in - self.workers[worker.id] = Weak(worker) + self.workers[worker.id] = DistributedActorRef.Weak(worker) watchTermination(of: worker) } } diff --git a/Sources/DistributedActorsTestKit/Cluster/Assert+ClusterSystem.swift b/Sources/DistributedActorsTestKit/Cluster/Assert+ClusterSystem.swift new file mode 100644 index 000000000..c33d08688 --- /dev/null +++ b/Sources/DistributedActorsTestKit/Cluster/Assert+ClusterSystem.swift @@ -0,0 +1,243 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import DistributedActors +import DistributedActorsConcurrencyHelpers +import Foundation +import XCTest + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Assertions + +extension ActorTestKit { + /// Assert that a given `ActorID` is not used by this system. E.g. it has been resigned already. + public func assertIDAvailable( + _ id: ClusterSystem.ActorID, + file: StaticString = #fileID, line: UInt = #line, column: UInt = #column) throws { + if self.system._isAssigned(id: id) { + self.fail("ActorID [\(id)] is assigned to some actor in \(self.system)!", + file: file, line: line, column: column) + } + } + + public func assertIDAssigned( + _ id: ClusterSystem.ActorID, + file: StaticString = #fileID, line: UInt = #line, column: UInt = #column) throws { + if !self.system._isAssigned(id: id) { + self.fail("ActorID [\(id)] was not assigned to any actor in \(self.system)!", + file: file, line: line, column: column) + } + } +} + +extension ClusteredActorSystemsXCTestCase { + public func assertAssociated( + _ system: ClusterSystem, withAtLeast node: UniqueNode, + timeout: Duration? = nil, interval: Duration? = nil, + verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column + ) throws { + try self.assertAssociated( + system, withAtLeast: [node], timeout: timeout, interval: interval, + verbose: verbose, file: file, line: line, column: column + ) + } + + public func assertAssociated( + _ system: ClusterSystem, withExactly node: UniqueNode, + timeout: Duration? = nil, interval: Duration? = nil, + verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column + ) throws { + try self.assertAssociated( + system, withExactly: [node], timeout: timeout, interval: interval, + verbose: verbose, file: file, line: line, column: column + ) + } + + /// Query associated state of `system` for at-most `timeout` amount of time, and verify it contains exactly the passed in `nodes`. + /// + /// - Parameters: + /// - withExactly: specific set of nodes that must exactly match the associated nodes on `system`; i.e. no extra associated nodes are allowed + /// - withAtLeast: sub-set of nodes that must be associated + public func assertAssociated( + _ system: ClusterSystem, + withExactly exactlyNodes: [UniqueNode] = [], + withAtLeast atLeastNodes: [UniqueNode] = [], + timeout: Duration? = nil, interval: Duration? = nil, + verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column + ) throws { + // FIXME: this is a weak workaround around not having "extensions" (unique object per actor system) + // FIXME: this can be removed once issue #458 lands + + let testKit = self.testKit(system) + + let probe = testKit.makeTestProbe(.prefixed(with: "probe-assertAssociated"), expecting: Set.self, file: file, line: line) + defer { probe.stop() } + + try testKit.eventually(within: timeout ?? .seconds(8), file: file, line: line, column: column) { + system.cluster.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here + let associatedNodes = try probe.expectMessage(file: file, line: line) + + if verbose { + pprint(" Self: \(String(reflecting: system.settings.uniqueBindNode))") + pprint(" Associated nodes: \(associatedNodes.map { String(reflecting: $0) })") + pprint(" Expected exact nodes: \(String(reflecting: exactlyNodes))") + pprint("Expected at least nodes: \(String(reflecting: atLeastNodes))") + } + + if !atLeastNodes.isEmpty { + let notYetAssociated = Set(atLeastNodes).subtracting(Set(associatedNodes)) // atLeast set is a sub set of the right one + if notYetAssociated.count > 0 { + throw TestError(""" + [\(system)] still did not associate \(notYetAssociated). \ + Associated nodes: \(reflecting: associatedNodes), expected nodes: \(reflecting: atLeastNodes). + """) + } + } + + if !exactlyNodes.isEmpty { + var diff = Set(associatedNodes) + diff.formSymmetricDifference(exactlyNodes) + guard diff.isEmpty else { + throw TestError( + """ + [\(system)] did not associate the expected nodes: [\(exactlyNodes)]. + Associated nodes: \(reflecting: associatedNodes), expected nodes: \(reflecting: exactlyNodes), + diff: \(reflecting: diff). + """) + } + } + } + } + + public func assertNotAssociated( + system: ClusterSystem, node: UniqueNode, + timeout: Duration? = nil, interval: Duration? = nil, + verbose: Bool = false + ) throws { + let testKit: ActorTestKit = self.testKit(system) + + let probe = testKit.makeTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set.self) + defer { probe.stop() } + try testKit.assertHolds(for: timeout ?? .seconds(1)) { + system.cluster.ref.tell(.query(.associatedNodes(probe.ref))) + let associatedNodes = try probe.expectMessage() // TODO: use interval here + if verbose { + pprint(" Self: \(String(reflecting: system.settings.uniqueBindNode))") + pprint(" Associated nodes: \(associatedNodes.map { String(reflecting: $0) })") + pprint(" Not expected node: \(String(reflecting: node))") + } + + if associatedNodes.contains(node) { + throw TestError("[\(system)] unexpectedly associated with node: [\(node)]") + } + } + } + + /// Asserts the given member node has the expected `status` within the duration. + public func assertMemberStatus( + on system: ClusterSystem, node: UniqueNode, + is expectedStatus: Cluster.MemberStatus, + within: Duration, + file: StaticString = #filePath, line: UInt = #line + ) async throws { + let testKit = self.testKit(system) + + do { + _ = try await system.cluster.waitFor(node, expectedStatus, within: within) + } catch let error as Cluster.MembershipError { + switch error.underlying.error { + case .notFound: + throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) + case .statusRequirementNotMet(_, let foundMember): + throw testKit.error( + """ + Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ + to be seen as: [\(expectedStatus)], but was [\(foundMember.status)] + """, + file: file, + line: line + ) + default: + throw testKit.error(error.description, file: file, line: line) + } + } + } + + public func assertMemberStatus( + on system: ClusterSystem, node: UniqueNode, + atLeast expectedAtLeastStatus: Cluster.MemberStatus, + within: Duration, + file: StaticString = #filePath, line: UInt = #line + ) async throws { + let testKit = self.testKit(system) + + do { + _ = try await system.cluster.waitFor(node, atLeast: expectedAtLeastStatus, within: within) + } catch let error as Cluster.MembershipError { + switch error.underlying.error { + case .notFound: + throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) + case .atLeastStatusRequirementNotMet(_, let foundMember): + throw testKit.error( + """ + Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ + to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)] + """, + file: file, + line: line + ) + default: + throw testKit.error(error.description, file: file, line: line) + } + } + } + + /// Assert based on the event stream of ``Cluster/Event`` that the given `node` was downed or removed. + public func assertMemberDown(_ eventStreamProbe: ActorTestProbe, node: UniqueNode) throws { + let events = try eventStreamProbe.fishFor(Cluster.Event.self, within: .seconds(5)) { + switch $0 { + case .membershipChange(let change) + where change.node == node && change.status.isAtLeast(.down): + return .catchComplete($0) + default: + return .ignore + } + } + + guard events.first != nil else { + throw self._testKits.first!.fail("Expected to capture cluster event about \(node) being down or removed, yet none captured!") + } + } + + /// Asserts the given node is the leader. + /// + /// An error is thrown but NOT failing the test; use in pair with `testKit.eventually` to achieve the expected behavior. + public func assertLeaderNode( + on system: ClusterSystem, is expectedNode: UniqueNode?, + file: StaticString = #filePath, line: UInt = #line + ) throws { + let testKit = self.testKit(system) + let p = testKit.makeTestProbe(expecting: Cluster.Membership.self) + defer { + p.stop() + } + system.cluster.ref.tell(.query(.currentMembership(p.ref))) + + let membership = try p.expectMessage() + let leaderNode = membership.leader?.uniqueNode + if leaderNode != expectedNode { + throw testKit.error("Expected \(reflecting: expectedNode) to be leader node on \(reflecting: system.cluster.uniqueNode) but was [\(reflecting: leaderNode)]") + } + } +} diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift index 0e13cb67c..dbe4eefa5 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift @@ -317,210 +317,6 @@ extension ClusteredActorSystemsXCTestCase { } } -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Assertions - -extension ClusteredActorSystemsXCTestCase { - public func assertAssociated( - _ system: ClusterSystem, withAtLeast node: UniqueNode, - timeout: Duration? = nil, interval: Duration? = nil, - verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column - ) throws { - try self.assertAssociated( - system, withAtLeast: [node], timeout: timeout, interval: interval, - verbose: verbose, file: file, line: line, column: column - ) - } - - public func assertAssociated( - _ system: ClusterSystem, withExactly node: UniqueNode, - timeout: Duration? = nil, interval: Duration? = nil, - verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column - ) throws { - try self.assertAssociated( - system, withExactly: [node], timeout: timeout, interval: interval, - verbose: verbose, file: file, line: line, column: column - ) - } - - /// Query associated state of `system` for at-most `timeout` amount of time, and verify it contains exactly the passed in `nodes`. - /// - /// - Parameters: - /// - withExactly: specific set of nodes that must exactly match the associated nodes on `system`; i.e. no extra associated nodes are allowed - /// - withAtLeast: sub-set of nodes that must be associated - public func assertAssociated( - _ system: ClusterSystem, - withExactly exactlyNodes: [UniqueNode] = [], - withAtLeast atLeastNodes: [UniqueNode] = [], - timeout: Duration? = nil, interval: Duration? = nil, - verbose: Bool = false, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column - ) throws { - // FIXME: this is a weak workaround around not having "extensions" (unique object per actor system) - // FIXME: this can be removed once issue #458 lands - - let testKit = self.testKit(system) - - let probe = testKit.makeTestProbe(.prefixed(with: "probe-assertAssociated"), expecting: Set.self, file: file, line: line) - defer { probe.stop() } - - try testKit.eventually(within: timeout ?? .seconds(8), file: file, line: line, column: column) { - system.cluster.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here - let associatedNodes = try probe.expectMessage(file: file, line: line) - - if verbose { - pprint(" Self: \(String(reflecting: system.settings.uniqueBindNode))") - pprint(" Associated nodes: \(associatedNodes.map { String(reflecting: $0) })") - pprint(" Expected exact nodes: \(String(reflecting: exactlyNodes))") - pprint("Expected at least nodes: \(String(reflecting: atLeastNodes))") - } - - if !atLeastNodes.isEmpty { - let notYetAssociated = Set(atLeastNodes).subtracting(Set(associatedNodes)) // atLeast set is a sub set of the right one - if notYetAssociated.count > 0 { - throw TestError(""" - [\(system)] still did not associate \(notYetAssociated). \ - Associated nodes: \(reflecting: associatedNodes), expected nodes: \(reflecting: atLeastNodes). - """) - } - } - - if !exactlyNodes.isEmpty { - var diff = Set(associatedNodes) - diff.formSymmetricDifference(exactlyNodes) - guard diff.isEmpty else { - throw TestError( - """ - [\(system)] did not associate the expected nodes: [\(exactlyNodes)]. - Associated nodes: \(reflecting: associatedNodes), expected nodes: \(reflecting: exactlyNodes), - diff: \(reflecting: diff). - """) - } - } - } - } - - public func assertNotAssociated( - system: ClusterSystem, node: UniqueNode, - timeout: Duration? = nil, interval: Duration? = nil, - verbose: Bool = false - ) throws { - let testKit: ActorTestKit = self.testKit(system) - - let probe = testKit.makeTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set.self) - defer { probe.stop() } - try testKit.assertHolds(for: timeout ?? .seconds(1)) { - system.cluster.ref.tell(.query(.associatedNodes(probe.ref))) - let associatedNodes = try probe.expectMessage() // TODO: use interval here - if verbose { - pprint(" Self: \(String(reflecting: system.settings.uniqueBindNode))") - pprint(" Associated nodes: \(associatedNodes.map { String(reflecting: $0) })") - pprint(" Not expected node: \(String(reflecting: node))") - } - - if associatedNodes.contains(node) { - throw TestError("[\(system)] unexpectedly associated with node: [\(node)]") - } - } - } - - /// Asserts the given member node has the expected `status` within the duration. - public func assertMemberStatus( - on system: ClusterSystem, node: UniqueNode, - is expectedStatus: Cluster.MemberStatus, - within: Duration, - file: StaticString = #filePath, line: UInt = #line - ) async throws { - let testKit = self.testKit(system) - - do { - _ = try await system.cluster.waitFor(node, expectedStatus, within: within) - } catch let error as Cluster.MembershipError { - switch error.underlying.error { - case .notFound: - throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) - case .statusRequirementNotMet(_, let foundMember): - throw testKit.error( - """ - Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ - to be seen as: [\(expectedStatus)], but was [\(foundMember.status)] - """, - file: file, - line: line - ) - default: - throw testKit.error(error.description, file: file, line: line) - } - } - } - - public func assertMemberStatus( - on system: ClusterSystem, node: UniqueNode, - atLeast expectedAtLeastStatus: Cluster.MemberStatus, - within: Duration, - file: StaticString = #filePath, line: UInt = #line - ) async throws { - let testKit = self.testKit(system) - - do { - _ = try await system.cluster.waitFor(node, atLeast: expectedAtLeastStatus, within: within) - } catch let error as Cluster.MembershipError { - switch error.underlying.error { - case .notFound: - throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) - case .atLeastStatusRequirementNotMet(_, let foundMember): - throw testKit.error( - """ - Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ - to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)] - """, - file: file, - line: line - ) - default: - throw testKit.error(error.description, file: file, line: line) - } - } - } - - /// Assert based on the event stream of ``Cluster/Event`` that the given `node` was downed or removed. - public func assertMemberDown(_ eventStreamProbe: ActorTestProbe, node: UniqueNode) throws { - let events = try eventStreamProbe.fishFor(Cluster.Event.self, within: .seconds(5)) { - switch $0 { - case .membershipChange(let change) - where change.node == node && change.status.isAtLeast(.down): - return .catchComplete($0) - default: - return .ignore - } - } - - guard events.first != nil else { - throw self._testKits.first!.fail("Expected to capture cluster event about \(node) being down or removed, yet none captured!") - } - } - - /// Asserts the given node is the leader. - /// - /// An error is thrown but NOT failing the test; use in pair with `testKit.eventually` to achieve the expected behavior. - public func assertLeaderNode( - on system: ClusterSystem, is expectedNode: UniqueNode?, - file: StaticString = #filePath, line: UInt = #line - ) throws { - let testKit = self.testKit(system) - let p = testKit.makeTestProbe(expecting: Cluster.Membership.self) - defer { - p.stop() - } - system.cluster.ref.tell(.query(.currentMembership(p.ref))) - - let membership = try p.expectMessage() - let leaderNode = membership.leader?.uniqueNode - if leaderNode != expectedNode { - throw testKit.error("Expected \(reflecting: expectedNode) to be leader node on \(reflecting: system.cluster.uniqueNode) but was [\(reflecting: leaderNode)]") - } - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Resolve utilities, for resolving remote refs "on" a specific system diff --git a/Tests/DistributedActorsTests/DeadLetterTests.swift b/Tests/DistributedActorsTests/DeadLetterTests.swift index 67d491538..34db3ae97 100644 --- a/Tests/DistributedActorsTests/DeadLetterTests.swift +++ b/Tests/DistributedActorsTests/DeadLetterTests.swift @@ -133,10 +133,3 @@ final class DeadLetterTests: SingleClusterSystemXCTestCase { } } -private distributed actor Greeter { - typealias ActorSystem = ClusterSystem - - distributed func greet(name: String) -> String { - "hello \(name)!" - } -} diff --git a/Tests/DistributedActorsTests/InterceptorTests.swift b/Tests/DistributedActorsTests/InterceptorTests.swift index 5fbd73c67..76f6007f0 100644 --- a/Tests/DistributedActorsTests/InterceptorTests.swift +++ b/Tests/DistributedActorsTests/InterceptorTests.swift @@ -66,9 +66,9 @@ final class InterceptorTests: SingleClusterSystemXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let otherGreeter = Greeter(actorSystem: local, greeting: "HI!!!") - let localGreeter: Greeter = try system.interceptCalls( - to: Greeter.self, + let otherGreeter = InterceptMe(actorSystem: local, greeting: "HI!!!") + let localGreeter: InterceptMe = try system.interceptCalls( + to: InterceptMe.self, metadata: ActorMetadata(), interceptor: GreeterRemoteCallInterceptor(system: local, greeter: otherGreeter) ) @@ -88,10 +88,10 @@ final class InterceptorTests: SingleClusterSystemXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let otherGreeter = Greeter(actorSystem: local, greeting: "HI!!!") - let localGreeter: Greeter = try shouldNotThrow { + let otherGreeter = InterceptMe(actorSystem: local, greeting: "HI!!!") + let localGreeter: InterceptMe = try shouldNotThrow { try system.interceptCalls( - to: Greeter.self, + to: InterceptMe.self, metadata: ActorMetadata(), interceptor: GreeterRemoteCallInterceptor(system: local, greeter: otherGreeter) ) @@ -238,7 +238,7 @@ final class InterceptorTests: SingleClusterSystemXCTestCase { } } -private distributed actor Greeter { +private distributed actor InterceptMe { typealias ID = ClusterSystem.ActorID typealias ActorSystem = ClusterSystem @@ -286,9 +286,9 @@ private distributed actor Greeter { private struct GreeterRemoteCallInterceptor: RemoteCallInterceptor { let system: ClusterSystem - let greeter: Greeter + let greeter: InterceptMe - init(system: ClusterSystem, greeter: Greeter) { + init(system: ClusterSystem, greeter: InterceptMe) { self.system = system self.greeter = greeter } diff --git a/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift b/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift index 25d5e2d0c..072ff0d50 100644 --- a/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift +++ b/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift @@ -21,7 +21,7 @@ import XCTest // TODO: "ActorGroup" perhaps could be better name? final class WorkerPoolTests: SingleClusterSystemXCTestCase { func test_workerPool_registerNewlyStartedActors() async throws { - let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") + let workerKey = DistributedReception.Key(TestWorker.self, id: "request-workers") let settings = WorkerPoolSettings(selector: .dynamic(workerKey)) let workers = try await WorkerPool(settings: settings, actorSystem: system) @@ -30,9 +30,9 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") let pC: ActorTestProbe = self.testKit.makeTestProbe("pC") - let workerA = await Greeter(probe: pA, actorSystem: self.system, key: workerKey) - let workerB = await Greeter(probe: pB, actorSystem: self.system, key: workerKey) - let workerC = await Greeter(probe: pC, actorSystem: self.system, key: workerKey) + let workerA = await TestWorker(probe: pA, actorSystem: self.system, key: workerKey) + let workerB = await TestWorker(probe: pB, actorSystem: self.system, key: workerKey) + let workerC = await TestWorker(probe: pC, actorSystem: self.system, key: workerKey) let workerProbes: [ClusterSystem.ActorID: ActorTestProbe] = [ workerA.id: pA, @@ -71,7 +71,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { func test_workerPool_dynamic_removeDeadActors() async throws { throw XCTSkip("!!! Skipping test \(#function) !!!") // FIXME(distributed): Pending fix for #831 to be able to terminate worker by setting it to nil - let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") + let workerKey = DistributedReception.Key(TestWorker.self, id: "request-workers") let workers = try await WorkerPool(selector: .dynamic(workerKey), actorSystem: system) @@ -79,9 +79,9 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") let pC: ActorTestProbe = self.testKit.makeTestProbe("pC") - var workerA: Greeter? = await Greeter(probe: pA, actorSystem: self.system, key: workerKey) - var workerB: Greeter? = await Greeter(probe: pB, actorSystem: self.system, key: workerKey) - var workerC: Greeter? = await Greeter(probe: pC, actorSystem: self.system, key: workerKey) + var workerA: TestWorker? = await TestWorker(probe: pA, actorSystem: self.system, key: workerKey) + var workerB: TestWorker? = await TestWorker(probe: pB, actorSystem: self.system, key: workerKey) + var workerC: TestWorker? = await TestWorker(probe: pC, actorSystem: self.system, key: workerKey) // !-safe since we initialize workers above let workerProbes: [ClusterSystem.ActorID: ActorTestProbe] = [ @@ -119,7 +119,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { // Terminate workerA sortedWorkerIDs.removeAll { $0 == workerA!.id } workerA = nil - try pA.expectMessage("Greeter deinit") + try pA.expectMessage("TestWorker deinit") // The remaining workers should take over for i in 0 ... 2 { @@ -141,13 +141,13 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { // Terminate the rest of the workers workerB = nil - try pB.expectMessage("Greeter deinit") + try pB.expectMessage("TestWorker deinit") workerC = nil - try pC.expectMessage("Greeter deinit") + try pC.expectMessage("TestWorker deinit") // Register new worker let pD: ActorTestProbe = self.testKit.makeTestProbe("pD") - let workerD = await Greeter(probe: pD, actorSystem: self.system, key: workerKey) + let workerD = await TestWorker(probe: pD, actorSystem: self.system, key: workerKey) // WorkerPool should wait for D to join then assign work to it _ = try await workers.submit(work: "D-only") @@ -159,9 +159,9 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") let pC: ActorTestProbe = self.testKit.makeTestProbe("pC") - var workerA: Greeter? = Greeter(probe: pA, actorSystem: self.system) - var workerB: Greeter? = Greeter(probe: pB, actorSystem: self.system) - var workerC: Greeter? = Greeter(probe: pC, actorSystem: self.system) + var workerA: TestWorker? = TestWorker(probe: pA, actorSystem: self.system) + var workerB: TestWorker? = TestWorker(probe: pB, actorSystem: self.system) + var workerC: TestWorker? = TestWorker(probe: pC, actorSystem: self.system) // !-safe since we initialize workers above let workers = try await WorkerPool(settings: .init(selector: .static([workerA!, workerB!, workerC!])), actorSystem: system) @@ -188,7 +188,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { // Terminate workerA sortedWorkerIDs.removeAll { $0 == workerA!.id } workerA = nil - try pA.expectMessage("Greeter deinit") + try pA.expectMessage("TestWorker deinit") // The remaining workers should take over for i in 0 ... 2 { @@ -210,9 +210,9 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { // Terminate the rest of the workers workerB = nil - try pB.expectMessage("Greeter deinit") + try pB.expectMessage("TestWorker deinit") workerC = nil - try pC.expectMessage("Greeter deinit") + try pC.expectMessage("TestWorker deinit") // WorkerPool now throws error on new work submission let error = try await shouldThrow { @@ -227,7 +227,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { func test_workerPool_static_throwOnEmptyInitialSet() async throws { let error = try await shouldThrow { - let _: WorkerPool = try await WorkerPool(selector: .static([]), actorSystem: system) + let _: WorkerPool = try await WorkerPool(selector: .static([]), actorSystem: system) } guard let workerPoolError = error as? WorkerPoolError, case .emptyStaticWorkerPool(let errorMessage) = workerPoolError.underlying.error else { @@ -237,7 +237,7 @@ final class WorkerPoolTests: SingleClusterSystemXCTestCase { } } -private distributed actor Greeter: DistributedWorker { +private distributed actor TestWorker: DistributedWorker { typealias ID = ClusterSystem.ActorID typealias ActorSystem = ClusterSystem typealias WorkItem = String @@ -250,14 +250,14 @@ private distributed actor Greeter: DistributedWorker { self.probe = probe } - init(probe: ActorTestProbe, actorSystem: ActorSystem, key: DistributedReception.Key) async { + init(probe: ActorTestProbe, actorSystem: ActorSystem, key: DistributedReception.Key) async { self.actorSystem = actorSystem self.probe = probe await self.actorSystem.receptionist.checkIn(self, with: key) } deinit { - self.probe.tell("Greeter deinit") + self.probe.tell("TestWorker deinit") } distributed func submit(work: WorkItem) async throws -> WorkResult { diff --git a/Tests/DistributedActorsTests/RemoteCallTests.swift b/Tests/DistributedActorsTests/RemoteCallTests.swift index 918ebff19..5675c3af8 100644 --- a/Tests/DistributedActorsTests/RemoteCallTests.swift +++ b/Tests/DistributedActorsTests/RemoteCallTests.swift @@ -27,8 +27,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let value = try await shouldNotThrow { try await remoteGreeterRef.hello() @@ -45,8 +45,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: true) @@ -65,8 +65,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: false) @@ -84,8 +84,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) try await shouldNotThrow { try await remoteGreeterRef.muted() @@ -99,8 +99,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } try await self.joinNodes(node: local, with: remote) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { try await remoteGreeterRef.mutedThrow(codable: true) @@ -118,8 +118,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } try await self.joinNodes(node: local, with: remote) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { try await remoteGreeterRef.mutedThrow(codable: false) @@ -135,8 +135,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { let remote = await setUpNode("remote") try await self.joinNodes(node: local, with: remote) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { try await RemoteCall.with(timeout: .milliseconds(200)) { @@ -157,8 +157,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { let remote = await setUpNode("remote") local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { try await RemoteCall.with(timeout: .milliseconds(200)) { @@ -179,8 +179,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { let remote = await setUpNode("remote") local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let message: String = "hello" let value = try await shouldNotThrow { @@ -202,8 +202,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: true) @@ -226,8 +226,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: true) @@ -251,8 +251,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: true) @@ -271,8 +271,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { try await RemoteCall.with(timeout: .milliseconds(200)) { @@ -297,8 +297,8 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } local.cluster.join(node: remote.cluster.uniqueNode) - let greeter = Greeter(actorSystem: local) - let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote) + let greeter = RemoteCallTestGreeter(actorSystem: local) + let remoteGreeterRef = try RemoteCallTestGreeter.resolve(id: greeter.id, using: remote) let error = try await shouldThrow { _ = try await remoteGreeterRef.helloThrow(codable: true) @@ -310,7 +310,7 @@ final class RemoteCallTests: ClusteredActorSystemsXCTestCase { } } -private distributed actor Greeter { +private distributed actor RemoteCallTestGreeter { typealias ActorSystem = ClusterSystem distributed func hello() async throws -> String { diff --git a/Tests/DistributedActorsTests/TestActors.swift b/Tests/DistributedActorsTests/TestActors.swift new file mode 100644 index 000000000..fbdb1b665 --- /dev/null +++ b/Tests/DistributedActorsTests/TestActors.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActors + +internal distributed actor Greeter: CustomStringConvertible { + typealias ActorSystem = ClusterSystem + + distributed func greet(name: String) -> String { + "Hello, \(name)!" + } + + nonisolated var description: String { + "\(Self.self)(\(self.id))" + } +} diff --git a/Tests/DistributedActorsTests/WeakReferencesTests.swift b/Tests/DistributedActorsTests/WeakReferencesTests.swift new file mode 100644 index 000000000..463029be0 --- /dev/null +++ b/Tests/DistributedActorsTests/WeakReferencesTests.swift @@ -0,0 +1,52 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import DistributedActors +import DistributedActorsConcurrencyHelpers +import DistributedActorsTestKit +import Foundation +import NIO +import XCTest + +final class WeakReferencesTests: SingleClusterSystemXCTestCase { + + func test_weakWhenLocal_notKeepLocalActorAlive_local() throws { + var greeter: Greeter? = Greeter(actorSystem: system) + let id = greeter!.id + + let ref = DistributedActorRef.WeakWhenLocal(greeter!) + "\(ref)".shouldEqual("DistributedActorRef.WeakWhenLocal(Greeter(/user/Greeter-y), isLocal: true)") + greeter = nil + + try testKit.assertIDAvailable(id) + ref.actor.shouldBeNil() + } + + func test_weakWhenLocal_alwaysKeepTheRemoteRef() async throws { + let second = await setUpNode("second") + + let greeter: Greeter? = Greeter(actorSystem: system) + let id = greeter!.id + + var remoteRef: Greeter? = try Greeter.resolve(id: id, using: second) + + let ref = DistributedActorRef.WeakWhenLocal(remoteRef!) + "\(ref)".shouldEqual("DistributedActorRef.WeakWhenLocal(Greeter(/user/Greeter-y), isLocal: false)") + remoteRef = nil // doesn't do anything, was just a remote reference + + try testKit(system).assertIDAssigned(id) + try testKit(second).assertIDAvailable(id) + ref.actor.shouldNotBeNil() // keeps the remote reference, until we notice terminated and reap the `ref` + } +} \ No newline at end of file