Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ var dependencies: [Package.Dependency] = [

// ~~~ Swift libraries ~~~
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0-beta"),
.package(url: "https://github.com/apple/swift-collections", from: "1.0.5"),
.package(url: "https://github.com/apple/swift-collections", from: "1.1.0"),

// ~~~ Observability ~~~
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ extension ClusterSystem {
}

// Spawn a behavior actor for it:
let behavior = InvocationBehavior.behavior(instance: Weak(actor))
let behavior = InvocationBehavior.behavior(instance: WeakLocalRef(actor))
let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id)

// Store references
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2024 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActorsConcurrencyHelpers
import NIOConcurrencyHelpers

/// A checked continuation that offers easier APIs for working with cancellation,
/// as well as has its unique identity.
internal final class ClusterCancellableCheckedContinuation<Success>: Hashable, @unchecked Sendable where Success: Sendable {
private struct _State: Sendable {
var cancelled: Bool = false
var onCancel: (@Sendable (ClusterCancellableCheckedContinuation<Success>) -> Void)?
var continuation: CheckedContinuation<Success, any Error>?
}

private let state: NIOLockedValueBox<_State> = .init(_State())

fileprivate init() {}

func setContinuation(_ continuation: CheckedContinuation<Success, any Error>) -> Bool {
var alreadyCancelled = false
self.state.withLockedValue { state in
if state.cancelled {
alreadyCancelled = true
} else {
state.continuation = continuation
}
}
if alreadyCancelled {
continuation.resume(throwing: CancellationError())
}
return !alreadyCancelled
}

/// Register a cancellation handler, or call it immediately if the continuation was already cancelled.
@Sendable
func onCancel(handler: @Sendable @escaping (ClusterCancellableCheckedContinuation<Success>) -> Void) {
var alreadyCancelled: Bool = self.state.withLockedValue { state in
if state.cancelled {
return true
}

state.onCancel = handler
return false
}
if alreadyCancelled {
handler(self)
}
}

private func withContinuation(cancelled: Bool = false, _ operation: (CheckedContinuation<Success, any Error>) -> Void) {
var safeContinuation: CheckedContinuation<Success, any Error>?
var safeOnCancel: (@Sendable (ClusterCancellableCheckedContinuation<Success>) -> Void)?
self.state.withLockedValue { (state: inout _State) -> Void in
state.cancelled = state.cancelled || cancelled
safeContinuation = state.continuation
safeOnCancel = state.onCancel
state.continuation = nil
state.onCancel = nil
}
if let safeContinuation {
operation(safeContinuation)
}
if cancelled {
safeOnCancel?(self)
}
}

func resume(returning value: Success) {
self.withContinuation {
$0.resume(returning: value)
}
}

func resume(throwing error: any Error) {
self.withContinuation {
$0.resume(throwing: error)
}
}

var isCancelled: Bool {
self.state.withLockedValue { $0.cancelled }
}

func cancel() {
self.withContinuation(cancelled: true) {
$0.resume(throwing: CancellationError())
}
}
}

extension ClusterCancellableCheckedContinuation where Success == Void {
func resume() {
self.resume(returning: ())
}
}

extension ClusterCancellableCheckedContinuation {
static func == (lhs: ClusterCancellableCheckedContinuation, rhs: ClusterCancellableCheckedContinuation) -> Bool {
return ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}

func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}

func _withClusterCancellableCheckedContinuation<Success>(
of successType: Success.Type = Success.self,
_ body: @escaping (ClusterCancellableCheckedContinuation<Success>) -> Void,
function: String = #function
) async throws -> Success where Success: Sendable {
let cccc = ClusterCancellableCheckedContinuation<Success>()
return try await withTaskCancellationHandler {
return try await withCheckedThrowingContinuation(function: function) { continuation in
if cccc.setContinuation(continuation) {
body(cccc)
}
}
} onCancel: {
cccc.cancel()
}
}
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/InvocationBehavior.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible {

// FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957)
enum InvocationBehavior {
static func behavior(instance weakInstance: Weak<some DistributedActor>) -> _Behavior<InvocationMessage> {
static func behavior(instance weakInstance: WeakLocalRef<some DistributedActor>) -> _Behavior<InvocationMessage> {
return _Behavior.setup { context in
return ._receiveMessageAsync { (message) async throws -> _Behavior<InvocationMessage> in
guard let _ = weakInstance.actor else {
Expand Down
Loading