Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ extension ClusterSystem {
}

// Spawn a behavior actor for it:
let behavior = InvocationBehavior.behavior(instance: Weak(actor))
let behavior = InvocationBehavior.behavior(instance: DistributedActorRef.Weak(actor))
let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id)

// Store references
Expand Down Expand Up @@ -1077,6 +1077,36 @@ extension ClusterSystem {
}
}

extension ClusterSystem {
func _isAssigned(id: ActorID) -> Bool {
self.namingLock.withLock {
// Are we in the middle of initializing an actor with this ActorID?
if self._reservedNames.contains(id) {
return true
}

// Do we have a known, managed, distributed actor with this ActorID?
if self._managedDistributedActors.get(identifiedBy: id) != nil {
return true
}

// Maybe it is a well-known actor? Those we store separately (and with strong ref)?
if let wellKnownName = id.metadata.wellKnown {
if self._managedWellKnownDistributedActors[wellKnownName] != nil {
return true
}
}

// well, maybe it's an ActorRef after all?
if self._managedRefs[id] != nil {
return true
}

return false
}
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Intercepting calls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,20 @@ import Distributed
public struct WeakActorDictionary<Act: DistributedActor>: ExpressibleByDictionaryLiteral
where Act.ID == ClusterSystem.ActorID
{
var underlying: [ClusterSystem.ActorID: WeakContainer]

final class WeakContainer {
weak var actor: Act?

init(_ actor: Act) {
self.actor = actor
}

// init(idForRemoval id: ClusterSystem.ActorID) {
// self.actor = nil
// }
}
var underlying: [ClusterSystem.ActorID: DistributedActorRef.Weak<Act>]

/// Initialize an empty dictionary.
public init() {
self.underlying = [:]
}

public init(dictionaryLiteral elements: (Act.ID, Act)...) {
self.underlying = [:]
self.underlying.reserveCapacity(elements.count)
var dict: [ClusterSystem.ActorID: DistributedActorRef.Weak<Act>] = [:]
dict.reserveCapacity(elements.count)
for (id, actor) in elements {
self.underlying[id] = .init(actor)
dict[id] = .init(actor)
}
self.underlying = dict
}

/// Insert the passed in actor into the dictionary.
Expand All @@ -53,16 +42,16 @@ public struct WeakActorDictionary<Act: DistributedActor>: ExpressibleByDictionar
///
/// - Parameter actor:
public mutating func insert(_ actor: Act) {
self.underlying[actor.id] = WeakContainer(actor)
self.underlying[actor.id] = DistributedActorRef.Weak(actor)
}

public mutating func getActor(identifiedBy id: ClusterSystem.ActorID) -> Act? {
guard let container = underlying[id] else {
guard let ref = underlying[id] else {
// unknown id
return nil
}

guard let knownActor = container.actor else {
guard let knownActor = ref.actor else {
// the actor was released -- let's remove the container while we're here
_ = self.removeActor(identifiedBy: id)
return nil
Expand Down Expand Up @@ -118,15 +107,69 @@ public struct WeakAnyDistributedActorDictionary {
return knownActor
}
}
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Weak distributed actor reference wrapper helpers

final class Weak<Act: DistributedActor> {
weak var actor: Act?
enum DistributedActorRef {

/// Wrapper class for weak `distributed actor` references.
///
/// Allows for weak storage of distributed actor references inside collections,
/// although those collections need to be manually cleared from dead references.
///
public final class Weak<Act: DistributedActor>: CustomStringConvertible {
private weak var weakRef: Act?

public init(_ actor: Act) {
self.weakRef = actor
}

init(_ actor: Act) {
self.actor = actor
public var actor: Act? {
self.weakRef
}

public var description: String {
let isLocalStr: String
if let actor = self.actor {
isLocalStr = "\(__isLocalActor(actor))"
} else {
isLocalStr = "unknown/released"
}

return "DistributedActorRef.Weak(\(self.actor, orElse: "nil"), isLocal: \(isLocalStr))"
}
}

init(idForRemoval id: ClusterSystem.ActorID) {
self.actor = nil
/// Wrapper class for weak `distributed actor` references.
///
/// Allows for weak storage of distributed actor references inside collections,
/// although those collections need to be manually cleared from dead references.
///
public final class WeakWhenLocal<Act: DistributedActor>: CustomStringConvertible {
private weak var weakLocalRef: Act?
private let remoteRef: Act?

public init(_ actor: Act) {
if __isRemoteActor(actor) {
self.remoteRef = actor
self.weakLocalRef = nil
} else {
self.remoteRef = nil
self.weakLocalRef = actor
}

assert((self.remoteRef == nil && self.weakLocalRef != nil) ||
(self.remoteRef != nil && self.weakLocalRef == nil),
"Only a single var may hold the actor: remote: \(self.remoteRef, orElse: "nil"), \(self.weakLocalRef, orElse: "nil")")
}

public var actor: Act? {
remoteRef ?? weakLocalRef
}

public var description: String {
"DistributedActorRef.WeakWhenLocal(\(self.actor, orElse: "nil"), isLocal: \(self.remoteRef == nil))"
}
}
}

}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/InvocationBehavior.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible {

// FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957)
enum InvocationBehavior {
static func behavior(instance weakInstance: Weak<some DistributedActor>) -> _Behavior<InvocationMessage> {
static func behavior(instance weakInstance: DistributedActorRef.Weak<some DistributedActor>) -> _Behavior<InvocationMessage> {
return _Behavior.setup { context in
return ._receiveMessageAsync { (message) async throws -> _Behavior<InvocationMessage> in
guard let instance = weakInstance.actor else {
Expand Down
6 changes: 3 additions & 3 deletions Sources/DistributedActors/Pattern/WorkerPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
// MARK: WorkerPool state

/// The worker pool. Worker will be selected round-robin.
private var workers: [Worker.ID: Weak<Worker>] = [:]
private var workers: [Worker.ID: DistributedActorRef.Weak<Worker>] = [:]
private var roundRobinPos = 0

/// Boolean flag to help determine if pool becomes empty because at least one worker has terminated.
Expand All @@ -78,7 +78,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
self.newWorkersSubscribeTask = Task {
for await worker in await self.actorSystem.receptionist.listing(of: key) {
self.actorSystem.log.log(level: self.logLevel, "Got listing member for \(key): \(worker)")
self.workers[worker.id] = Weak(worker)
self.workers[worker.id] = DistributedActorRef.Weak(worker)
// Notify those waiting for new worker
for (i, continuation) in self.newWorkerContinuations.enumerated().reversed() {
continuation.resume()
Expand All @@ -89,7 +89,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
}
case .static(let workers):
workers.forEach { worker in
self.workers[worker.id] = Weak(worker)
self.workers[worker.id] = DistributedActorRef.Weak(worker)
watchTermination(of: worker)
}
}
Expand Down
Loading