Skip to content

Commit d3e566a

Browse files
committed
feat: add barrier and block flags for TaskQueue
1 parent 8e07add commit d3e566a

21 files changed

+1313
-990
lines changed

AsyncObjects.xcodeproj/project.pbxproj

Lines changed: 426 additions & 422 deletions
Large diffs are not rendered by default.

Sources/AsyncObjects/AsyncCountdownEvent.swift

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@ import OrderedCollections
1717
/// only one low priority usage allowed at one time.
1818
public actor AsyncCountdownEvent: AsyncObject {
1919
/// The suspended tasks continuation type.
20-
private typealias Continuation = GlobalContinuation<Void, Error>
20+
@usableFromInline
21+
typealias Continuation = GlobalContinuation<Void, Error>
2122
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
22-
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
23-
/// The lower limit for the countdown event to trigger.
23+
@usableFromInline
24+
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
25+
/// The limit up to which the countdown counts and triggers event.
2426
///
2527
/// By default this is set to zero and can be changed during initialization.
2628
public let limit: UInt
2729
/// Current count of the countdown.
2830
///
2931
/// If the current count becomes less or equal to limit, queued tasks
3032
/// are resumed from suspension until current count exceeds limit.
31-
public private(set) var currentCount: UInt
33+
public var currentCount: UInt
3234
/// Initial count of the countdown when count started.
3335
///
3436
/// Can be changed after initialization
@@ -44,8 +46,8 @@ public actor AsyncCountdownEvent: AsyncObject {
4446
/// - Parameters:
4547
/// - continuation: The `continuation` to add.
4648
/// - key: The key in the map.
47-
@inline(__always)
48-
private func addContinuation(
49+
@inlinable
50+
func addContinuation(
4951
_ continuation: Continuation,
5052
withKey key: UUID
5153
) {
@@ -56,24 +58,24 @@ public actor AsyncCountdownEvent: AsyncObject {
5658
/// from `continuations` map and resumes with `CancellationError`.
5759
///
5860
/// - Parameter key: The key in the map.
59-
@inline(__always)
60-
private func removeContinuation(withKey key: UUID) {
61+
@inlinable
62+
func removeContinuation(withKey key: UUID) {
6163
let continuation = continuations.removeValue(forKey: key)
6264
continuation?.cancel()
6365
}
6466

6567
/// Decrements countdown count by the provided number.
6668
///
6769
/// - Parameter number: The number to decrement count by.
68-
@inline(__always)
69-
private func decrementCount(by number: UInt = 1) {
70+
@inlinable
71+
func decrementCount(by number: UInt = 1) {
7072
guard currentCount > 0 else { return }
7173
currentCount -= number
7274
}
7375

7476
/// Resume previously waiting continuations for countdown event.
75-
@inline(__always)
76-
private func resumeContinuations() {
77+
@inlinable
78+
func resumeContinuations() {
7779
while !continuations.isEmpty && isSet {
7880
let (_, continuation) = continuations.removeFirst()
7981
continuation.resume()
@@ -89,8 +91,8 @@ public actor AsyncCountdownEvent: AsyncObject {
8991
/// Continuation can be resumed with error and some cleanup code can be run here.
9092
///
9193
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
92-
@inline(__always)
93-
private func withPromisedContinuation() async throws {
94+
@inlinable
95+
func withPromisedContinuation() async throws {
9496
let key = UUID()
9597
try await withTaskCancellationHandler { [weak self] in
9698
Task { [weak self] in

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import Foundation
88
/// Wait for event signal by calling ``wait()`` method or its timeout variation ``wait(forNanoseconds:)``.
99
public actor AsyncEvent: AsyncObject {
1010
/// The suspended tasks continuation type.
11-
private typealias Continuation = GlobalContinuation<Void, Error>
11+
@usableFromInline
12+
typealias Continuation = GlobalContinuation<Void, Error>
1213
/// The continuations stored with an associated key for all the suspended task that are waiting for event signal.
13-
private var continuations: [UUID: Continuation] = [:]
14+
@usableFromInline
15+
private(set) var continuations: [UUID: Continuation] = [:]
1416
/// Indicates whether current state of event is signalled.
1517
private var signalled: Bool
1618

@@ -19,8 +21,8 @@ public actor AsyncEvent: AsyncObject {
1921
/// - Parameters:
2022
/// - continuation: The `continuation` to add.
2123
/// - key: The key in the map.
22-
@inline(__always)
23-
private func addContinuation(
24+
@inlinable
25+
func addContinuation(
2426
_ continuation: Continuation,
2527
withKey key: UUID
2628
) {
@@ -31,8 +33,8 @@ public actor AsyncEvent: AsyncObject {
3133
/// from `continuations` map and resumes with `CancellationError`.
3234
///
3335
/// - Parameter key: The key in the map.
34-
@inline(__always)
35-
private func removeContinuation(withKey key: UUID) {
36+
@inlinable
37+
func removeContinuation(withKey key: UUID) {
3638
let continuation = continuations.removeValue(forKey: key)
3739
continuation?.cancel()
3840
}
@@ -45,8 +47,8 @@ public actor AsyncEvent: AsyncObject {
4547
/// Continuation can be resumed with error and some cleanup code can be run here.
4648
///
4749
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
48-
@inline(__always)
49-
private func withPromisedContinuation() async throws {
50+
@inlinable
51+
func withPromisedContinuation() async throws {
5052
let key = UUID()
5153
try await withTaskCancellationHandler { [weak self] in
5254
Task { [weak self] in

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,27 @@ import OrderedCollections
1111
/// or its timeout variation ``wait(forNanoseconds:)``.
1212
public actor AsyncSemaphore: AsyncObject {
1313
/// The suspended tasks continuation type.
14-
private typealias Continuation = GlobalContinuation<Void, Error>
14+
@usableFromInline
15+
typealias Continuation = GlobalContinuation<Void, Error>
1516
/// The continuations stored with an associated key for all the suspended task that are waiting for access to resource.
16-
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
17+
@usableFromInline
18+
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
1719
/// Pool size for concurrent resource access.
1820
/// Has value provided during initialization incremented by one.
19-
private var limit: UInt
21+
@usableFromInline
22+
private(set) var limit: UInt
2023
/// Current count of semaphore.
2124
/// Can have maximum value up to `limit`.
22-
private var count: Int
25+
@usableFromInline
26+
private(set) var count: Int
2327

2428
/// Add continuation with the provided key in `continuations` map.
2529
///
2630
/// - Parameters:
2731
/// - continuation: The `continuation` to add.
2832
/// - key: The key in the map.
29-
@inline(__always)
30-
private func addContinuation(
33+
@inlinable
34+
func addContinuation(
3135
_ continuation: Continuation,
3236
withKey key: UUID
3337
) {
@@ -38,16 +42,16 @@ public actor AsyncSemaphore: AsyncObject {
3842
/// from `continuations` map and resumes with `CancellationError`.
3943
///
4044
/// - Parameter key: The key in the map.
41-
@inline(__always)
42-
private func removeContinuation(withKey key: UUID) {
45+
@inlinable
46+
func removeContinuation(withKey key: UUID) {
4347
let continuation = continuations.removeValue(forKey: key)
4448
continuation?.cancel()
4549
incrementCount()
4650
}
4751

4852
/// Increments semaphore count within limit provided.
49-
@inline(__always)
50-
private func incrementCount() {
53+
@inlinable
54+
func incrementCount() {
5155
guard count < limit else { return }
5256
count += 1
5357
}
@@ -60,8 +64,8 @@ public actor AsyncSemaphore: AsyncObject {
6064
/// Continuation can be resumed with error and some cleanup code can be run here.
6165
///
6266
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
63-
@inline(__always)
64-
private func withPromisedContinuation() async throws {
67+
@inlinable
68+
func withPromisedContinuation() async throws {
6569
let key = UUID()
6670
try await withTaskCancellationHandler { [weak self] in
6771
Task { [weak self] in

Sources/AsyncObjects/CancellationSource.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,37 +14,39 @@ import Foundation
1414
/// In case of circular dependency between cancellation sources, app will go into infinite recursion.
1515
public actor CancellationSource {
1616
/// All the registered tasks for cooperative cancellation.
17-
private var registeredTasks: [AnyHashable: () -> Void] = [:]
17+
@usableFromInline
18+
private(set) var registeredTasks: [AnyHashable: () -> Void] = [:]
1819
/// All the linked cancellation sources that cancellation event will be propagated.
1920
///
2021
/// - TODO: Store weak reference for cancellation sources.
2122
/// ```swift
2223
/// private var linkedSources: NSHashTable<CancellationSource> = .weakObjects()
2324
/// ```
24-
private var linkedSources: [CancellationSource] = []
25+
@usableFromInline
26+
private(set) var linkedSources: [CancellationSource] = []
2527

2628
/// Add task to registered cooperative cancellation tasks list.
2729
///
2830
/// - Parameter task: The task to register.
29-
@inline(__always)
30-
private func add<Success, Failure>(task: Task<Success, Failure>) {
31+
@inlinable
32+
func add<Success, Failure>(task: Task<Success, Failure>) {
3133
guard !task.isCancelled else { return }
3234
registeredTasks[task] = { task.cancel() }
3335
}
3436

3537
/// Remove task from registered cooperative cancellation tasks list.
3638
///
3739
/// - Parameter task: The task to remove.
38-
@inline(__always)
39-
private func remove<Success, Failure>(task: Task<Success, Failure>) {
40+
@inlinable
41+
func remove<Success, Failure>(task: Task<Success, Failure>) {
4042
registeredTasks.removeValue(forKey: task)
4143
}
4244

4345
/// Add cancellation source to linked cancellation sources list to propagate cancellation event.
4446
///
4547
/// - Parameter task: The source to link.
46-
@inline(__always)
47-
private func addSource(_ source: CancellationSource) {
48+
@inlinable
49+
func addSource(_ source: CancellationSource) {
4850
linkedSources.append(source)
4951
}
5052

Sources/AsyncObjects/Continuable.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/// A type that allows to interface between synchronous and asynchronous code,
22
/// by representing task state and allowing task resuming with some value or error.
3+
@usableFromInline
34
protocol Continuable: Sendable {
45
/// The type of value to resume the continuation with in case of success.
56
associatedtype Success
@@ -26,6 +27,7 @@ extension Continuable where Failure == Error {
2627
func cancel() { self.resume(throwing: CancellationError()) }
2728
}
2829

30+
@usableFromInline
2931
protocol ThrowingContinuable: Continuable {
3032
/// The type of error to resume the continuation with in case of failure.
3133
associatedtype Failure = Error
@@ -44,6 +46,7 @@ protocol ThrowingContinuable: Continuable {
4446
static func with(_ fn: (Self) -> Void) async throws -> Success
4547
}
4648

49+
@usableFromInline
4750
protocol NonThrowingContinuable: Continuable {
4851
/// The type of error to resume the continuation with in case of failure.
4952
associatedtype Failure = Never
@@ -64,6 +67,7 @@ protocol NonThrowingContinuable: Continuable {
6467
#if DEBUG || ASYNCOBJECTS_USE_CHECKEDCONTINUATION
6568
/// The continuation type used in package in `DEBUG` mode
6669
/// or if `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag turned on.
70+
@usableFromInline
6771
typealias GlobalContinuation<T, E: Error> = CheckedContinuation<T, E>
6872

6973
extension CheckedContinuation: Continuable {}
@@ -111,6 +115,7 @@ extension CheckedContinuation: NonThrowingContinuable where E == Never {
111115
#else
112116
/// The continuation type used in package in `RELEASE` mode
113117
///and in absence of `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag.
118+
@usableFromInline
114119
typealias GlobalContinuation<T, E: Error> = UnsafeContinuation<T, E>
115120

116121
extension UnsafeContinuation: Continuable {}

Sources/AsyncObjects/Future.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ public actor Future<Output, Failure: Error> {
1919
/// A type that represents the result in the future, when an element or error is available.
2020
public typealias FutureResult = Result<Output, Failure>
2121
/// The suspended tasks continuation type.
22-
private typealias Continuation = GlobalContinuation<Output, Failure>
22+
@usableFromInline
23+
typealias Continuation = GlobalContinuation<Output, Failure>
2324
/// The continuations stored with an associated key for all the suspended task
2425
/// that are waiting for future to be fulfilled.
25-
private var continuations: [UUID: Continuation] = [:]
26+
@usableFromInline
27+
private(set) var continuations: [UUID: Continuation] = [:]
2628
/// The underlying `Result` that indicates either future fulfilled or rejected.
2729
///
2830
/// If future isn't fulfilled or rejected, the value is `nil`.
@@ -33,8 +35,8 @@ public actor Future<Output, Failure: Error> {
3335
/// - Parameters:
3436
/// - continuation: The `continuation` to add.
3537
/// - key: The key in the map.
36-
@inline(__always)
37-
private func addContinuation(
38+
@inlinable
39+
func addContinuation(
3840
_ continuation: Continuation,
3941
withKey key: UUID = .init()
4042
) {
@@ -297,8 +299,8 @@ extension Future where Failure == Error {
297299
/// from `continuations` map and resumes with `CancellationError`.
298300
///
299301
/// - Parameter key: The key in the map.
300-
@inline(__always)
301-
private func removeContinuation(withKey key: UUID) {
302+
@inlinable
303+
func removeContinuation(withKey key: UUID) {
302304
let continuation = continuations.removeValue(forKey: key)
303305
continuation?.cancel()
304306
}
@@ -313,8 +315,8 @@ extension Future where Failure == Error {
313315
/// - Returns: The value continuation is resumed with.
314316
///
315317
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
316-
@inline(__always)
317-
private func withPromisedContinuation() async throws -> Output {
318+
@inlinable
319+
func withPromisedContinuation() async throws -> Output {
318320
let key = UUID()
319321
let value = try await withTaskCancellationHandler { [weak self] in
320322
Task { [weak self] in
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
public extension TaskGroup {
2+
/// Adds a child task to the group and starts the task.
3+
///
4+
/// This method adds child task to the group and returns only after the child task is started.
5+
///
6+
/// - Parameters:
7+
/// - priority: The priority of the operation task. Omit this parameter or
8+
/// pass `nil` to set the child task’s priority to the priority of the group.
9+
/// - operation: The operation to execute as part of the task group.
10+
@inlinable
11+
mutating func addTaskAndStart(
12+
priority: TaskPriority? = nil,
13+
operation: @escaping @Sendable () async -> ChildTaskResult
14+
) async {
15+
typealias C = UnsafeContinuation<Void, Never>
16+
await withUnsafeContinuation { (continuation: C) in
17+
self.addTask {
18+
continuation.resume()
19+
return await operation()
20+
}
21+
}
22+
}
23+
}
24+
25+
public extension ThrowingTaskGroup {
26+
/// Adds a child task to the group and starts the task.
27+
///
28+
/// This method adds child task to the group and returns only after the child task is started.
29+
/// This method doesn’t throw an error, even if the child task does. Instead,
30+
/// the corresponding call to `ThrowingTaskGroup.next()` rethrows that error.
31+
///
32+
/// - Parameters:
33+
/// - priority: The priority of the operation task. Omit this parameter or
34+
/// pass `nil` to set the child task’s priority to the priority of the group.
35+
/// - operation: The operation to execute as part of the task group.
36+
@inlinable
37+
mutating func addTaskAndStart(
38+
priority: TaskPriority? = nil,
39+
operation: @escaping @Sendable () async throws -> ChildTaskResult
40+
) async {
41+
typealias C = UnsafeContinuation<Void, Never>
42+
await withUnsafeContinuation { (continuation: C) in
43+
self.addTask {
44+
continuation.resume()
45+
return try await operation()
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)