diff --git a/Package.swift b/Package.swift index 3e8a9389..04121ef6 100644 --- a/Package.swift +++ b/Package.swift @@ -14,7 +14,7 @@ let package = Package( .library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.4"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), ], targets: [ diff --git a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift index 5c99d3d7..d6008ad9 100644 --- a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift +++ b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift @@ -16,16 +16,19 @@ struct BoundedBufferStateMachine { typealias SuspendedProducer = UnsafeContinuation typealias SuspendedConsumer = UnsafeContinuation?, Never> + // We are using UnsafeTransfer here since we have to get the elements from the task + // into the consumer task. This is a transfer but we cannot prove this to the compiler at this point + // since next is not marked as transferring the return value. fileprivate enum State { case initial(base: Base) case buffering( task: Task, - buffer: Deque>, + buffer: Deque, Error>>, suspendedProducer: SuspendedProducer?, suspendedConsumer: SuspendedConsumer? ) case modifying - case finished(buffer: Deque>) + case finished(buffer: Deque, Error>>) } private var state: State @@ -139,7 +142,7 @@ struct BoundedBufferStateMachine { // we have to stack the new element or suspend the producer if the buffer is full precondition(buffer.count < limit, "Invalid state. The buffer should be available for stacking a new element.") self.state = .modifying - buffer.append(.success(element)) + buffer.append(.success(.init(element))) self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) return .none @@ -218,7 +221,7 @@ struct BoundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) - return .returnResult(producerContinuation: suspendedProducer, result: result) + return .returnResult(producerContinuation: suspendedProducer, result: result.map { $0.wrapped }) case .buffering(_, _, _, .some): preconditionFailure("Invalid states. There is already a suspended consumer.") @@ -233,7 +236,7 @@ struct BoundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .finished(buffer: buffer) - return .returnResult(producerContinuation: nil, result: result) + return .returnResult(producerContinuation: nil, result: result.map { $0.wrapped }) } } @@ -257,7 +260,7 @@ struct BoundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil) - return .returnResult(producerContinuation: suspendedProducer, result: result) + return .returnResult(producerContinuation: suspendedProducer, result: result.map { $0.wrapped }) case .buffering(_, _, _, .some): preconditionFailure("Invalid states. There is already a suspended consumer.") @@ -272,7 +275,7 @@ struct BoundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .finished(buffer: buffer) - return .returnResult(producerContinuation: nil, result: result) + return .returnResult(producerContinuation: nil, result: result.map { $0.wrapped }) } } diff --git a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift index de5d37ae..be19b58b 100644 --- a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift +++ b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift @@ -21,15 +21,18 @@ struct UnboundedBufferStateMachine { case bufferingOldest(Int) } + // We are using UnsafeTransfer here since we have to get the elements from the task + // into the consumer task. This is a transfer but we cannot prove this to the compiler at this point + // since next is not marked as transferring the return value. fileprivate enum State { case initial(base: Base) case buffering( task: Task, - buffer: Deque>, + buffer: Deque, Error>>, suspendedConsumer: SuspendedConsumer? ) case modifying - case finished(buffer: Deque>) + case finished(buffer: Deque, Error>>) } private var state: State @@ -84,15 +87,15 @@ struct UnboundedBufferStateMachine { self.state = .modifying switch self.policy { case .unlimited: - buffer.append(.success(element)) + buffer.append(.success(.init(element))) case .bufferingNewest(let limit): if buffer.count >= limit { _ = buffer.popFirst() } - buffer.append(.success(element)) + buffer.append(.success(.init(element))) case .bufferingOldest(let limit): if buffer.count < limit { - buffer.append(.success(element)) + buffer.append(.success(.init(element))) } } self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) @@ -170,7 +173,7 @@ struct UnboundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) - return .returnResult(result) + return .returnResult(result.map { $0.wrapped }) case .modifying: preconditionFailure("Invalid state.") @@ -182,7 +185,7 @@ struct UnboundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .finished(buffer: buffer) - return .returnResult(result) + return .returnResult(result.map { $0.wrapped }) } } @@ -208,7 +211,7 @@ struct UnboundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil) - return .resumeConsumer(result) + return .resumeConsumer(result.map { $0.wrapped }) case .modifying: preconditionFailure("Invalid state.") @@ -220,7 +223,7 @@ struct UnboundedBufferStateMachine { self.state = .modifying let result = buffer.popFirst()! self.state = .finished(buffer: buffer) - return .resumeConsumer(result) + return .resumeConsumer(result.map { $0.wrapped }) } } @@ -251,3 +254,5 @@ struct UnboundedBufferStateMachine { extension UnboundedBufferStateMachine: Sendable where Base: Sendable { } extension UnboundedBufferStateMachine.State: Sendable where Base: Sendable { } + + diff --git a/Sources/AsyncAlgorithms/Channels/ChannelStateMachine.swift b/Sources/AsyncAlgorithms/Channels/ChannelStateMachine.swift index 2c8b1b92..920f6056 100644 --- a/Sources/AsyncAlgorithms/Channels/ChannelStateMachine.swift +++ b/Sources/AsyncAlgorithms/Channels/ChannelStateMachine.swift @@ -10,9 +10,6 @@ //===----------------------------------------------------------------------===// import OrderedCollections -// NOTE: this is only marked as unchecked since the swift-collections tag is before auditing for Sendable -extension OrderedSet: @unchecked Sendable where Element: Sendable { } - struct ChannelStateMachine: Sendable { private struct SuspendedProducer: Hashable, Sendable { let id: UInt64 diff --git a/Sources/AsyncAlgorithms/UnsafeTransfer.swift b/Sources/AsyncAlgorithms/UnsafeTransfer.swift new file mode 100644 index 00000000..c8bfca12 --- /dev/null +++ b/Sources/AsyncAlgorithms/UnsafeTransfer.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// A wrapper struct to unconditionally to transfer an non-Sendable value. +struct UnsafeTransfer: @unchecked Sendable { + let wrapped: Element + + init(_ wrapped: Element) { + self.wrapped = wrapped + } +}