diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..8b4c83bb --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true \ No newline at end of file diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 1ed694ad..3c2103e7 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -13,3 +13,4 @@ jobs: uses: swiftlang/github-workflows/.github/workflows/soundness.yml@main with: license_header_check_project_name: "Swift Async Algorithms" + format_check_container_image: "swiftlang/swift:nightly-6.1-noble" # Needed since 6.0.x doesn't support sending keyword diff --git a/Evolution/NNNN-channel.md b/Evolution/0012-channel.md similarity index 100% rename from Evolution/NNNN-channel.md rename to Evolution/0012-channel.md diff --git a/Evolution/NNNN-chunk.md b/Evolution/0013-chunk.md similarity index 100% rename from Evolution/NNNN-chunk.md rename to Evolution/0013-chunk.md diff --git a/Evolution/NNNN-rate-limits.md b/Evolution/0014-rate-limits.md similarity index 100% rename from Evolution/NNNN-rate-limits.md rename to Evolution/0014-rate-limits.md diff --git a/Evolution/NNNN-reductions.md b/Evolution/0015-reductions.md similarity index 100% rename from Evolution/NNNN-reductions.md rename to Evolution/0015-reductions.md diff --git a/Evolution/0016-mutli-producer-single-consumer-channel.md b/Evolution/0016-mutli-producer-single-consumer-channel.md new file mode 100644 index 00000000..20a23eab --- /dev/null +++ b/Evolution/0016-mutli-producer-single-consumer-channel.md @@ -0,0 +1,847 @@ +# MultiProducerSingleConsumerAsyncChannel + +* Proposal: [SAA-0016](0016-multi-producer-single-consumer-channel.md) +* Authors: [Franz Busch](https://github.com/FranzBusch) +* Status: **Implemented** + +## Revision +- 2023/12/18: Migrate proposal from Swift Evolution to Swift Async Algorithms. +- 2023/12/19: Add element size dependent strategy +- 2024/05/19: Rename to multi producer single consumer channel +- 2024/05/28: Add unbounded strategy +- 2025/03/24: Adopt `~Copyable` for correct semantics and better performance. + +## Introduction + +[SE-0314](https://github.com/apple/swift-evolution/blob/main/proposals/0314-async-stream.md) +introduced new `Async[Throwing]Stream` types which act as root asynchronous +sequences. These two types allow bridging from synchronous callbacks such as +delegates to an asynchronous sequence. This proposal adds a new root primitive +with the goal to model asynchronous multi-producer-single-consumer systems. + +## Motivation + +After using the `AsyncSequence` protocol, the `Async[Throwing]Stream` types, and +the `Async[Throwing]Channel` types extensively over the past years, we learned +that there is a gap in the ecosystem for a type that provides strict +multi-producer-single-consumer guarantees with external backpressure support. +Additionally, any stream/channel like type needs to have a clear definition +about the following behaviors: + +1. Backpressure +2. Multi/single consumer support +3. Downstream consumer termination +4. Upstream producer termination + +The below sections are providing a detailed explanation of each of those. + +### Backpressure + +In general, backpressure is the mechanism that prevents a fast producer from +overwhelming a slow consumer. It helps stability of the overall system by +regulating the flow of data between different components. Additionally, it +allows to put an upper bound on resource consumption of a system. In reality, +backpressure is used in almost all networked applications. + +In Swift, asynchronous sequence also have the concept of internal backpressure. +This modeled by the pull-based implementation where a consumer has to call +`next` on the `AsyncIterator`. In this model, there is no way for a consumer to +overwhelm a producer since the producer controls the rate of pulling elements. + +However, the internal backpressure of an asynchronous isn't the only +backpressure in play. There is also the source backpressure that is producing +the actual elements. For a backpressured system it is important that every +component of such a system is aware of the backpressure of its consumer and its +producer. + +Let's take a quick look how our current root asynchronous sequences are handling +this. + +`Async[Throwing]Stream` aims to support backpressure by providing a configurable +buffer and returning `Async[Throwing]Stream.Continuation.YieldResult` which +contains the current buffer depth from the `yield()` method. However, only +providing the current buffer depth on `yield()` is not enough to bridge a +backpressured system into an asynchronous sequence since this can only be used +as a "stop" signal but we are missing a signal to indicate resuming the +production. The only viable backpressure strategy that can be implemented with +the current API is a timed backoff where we stop producing for some period of +time and then speculatively produce again. This is a very inefficient pattern +that produces high latencies and inefficient use of resources. + +`Async[Throwing]Channel` is a multi-producer-multi-consumer channel that only +supports asynchronous producers. Additionally, the backpressure strategy is +fixed by a buffer size of 1 element per producer. + +We are currently lacking a type that supports a configurable backpressure +strategy and both asynchronous and synchronous producers. + +### Multi/single consumer support + +The `AsyncSequence` protocol itself makes no assumptions about whether the +implementation supports multiple consumers or not. This allows the creation of +unicast and multicast asynchronous sequences. The difference between a unicast +and multicast asynchronous sequence is if they allow multiple iterators to be +created. `AsyncStream` does support the creation of multiple iterators and it +does handle multiple consumers correctly. On the other hand, +`AsyncThrowingStream` also supports multiple iterators but does `fatalError` +when more than one iterator has to suspend. The original proposal states: + +> As with any sequence, iterating over an AsyncStream multiple times, or +creating multiple iterators and iterating over them separately, may produce an +unexpected series of values. + +While that statement leaves room for any behavior we learned that a clear distinction +of behavior for root asynchronous sequences is beneficial; especially, when it comes to +how transformation algorithms are applied on top. + +### Downstream consumer termination + +Downstream consumer termination allows the producer to notify the consumer that +no more values are going to be produced. `Async[Throwing]Stream` does support +this by calling the `finish()` or `finish(throwing:)` methods of the +`Async[Throwing]Stream.Continuation`. However, `Async[Throwing]Stream` does not +handle the case that the `Continuation` may be `deinit`ed before one of the +finish methods is called. This currently leads to async streams that never +terminate. + +### Upstream producer termination + +Upstream producer termination is the inverse of downstream consumer termination +where the producer is notified once the consumption has terminated. Currently, +`Async[Throwing]Stream` does expose the `onTermination` property on the +`Continuation`. The `onTermination` closure is invoked once the consumer has +terminated. The consumer can terminate in four separate cases: + +1. The asynchronous sequence was `deinit`ed and no iterator was created +2. The iterator was `deinit`ed and the asynchronous sequence is unicast +3. The consuming task is canceled +4. The asynchronous sequence returned `nil` or threw + +`Async[Throwing]Stream` currently invokes `onTermination` in all cases; however, +since `Async[Throwing]Stream` supports multiple consumers (as discussed in the +`Multi/single consumer support` section), a single consumer task being canceled +leads to the termination of all consumers. This is not expected from multicast +asynchronous sequences in general. + +## Proposed solution + +The above motivation lays out the expected behaviors for any consumer/producer +system and compares them to the behaviors of `Async[Throwing]Stream` and +`Async[Throwing]Channel`. + +This section proposes a new type called `MultiProducerSingleConsumerAsyncChannel` +that implement all of the above-mentioned behaviors. Importantly, this proposed +solution is taking advantage of `~Copyable` types to model the +multi-producer-single-consumer behavior. While the current `AsyncSequence` +protocols are not supporting `~Copyable` types we provide a way to convert the +proposed channel to an asynchronous sequence. This leaves us room to support any +potential future asynchronous streaming protocol that supports `~Copyable`. + +### Creating a MultiProducerSingleConsumerAsyncChannel + +You can create an `MultiProducerSingleConsumerAsyncChannel` instance using the +`makeChannel(of: backpressureStrategy:)` method. This method returns you the +channel and the source. The source can be used to send new values to the +asynchronous channel. The new API specifically provides a +multi-producer/single-consumer pattern. + +```swift +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) + +// The channel and source can be extracted from the returned type +let channel = consume channelAndSource.channel +let source = consume channelAndSource.source +``` + +The new proposed APIs offer two different backpressure strategies: +- Watermark: Using a low and high watermark +- Unbounded: Unbounded buffering of the channel. **Only** use this if the + production is limited through some other mean. + +The source is used to send values to the channel. It provides different APIs for +synchronous and asynchronous producers. All of the APIs are relaying the +backpressure of the channel. The synchronous multi-step APIs are the foundation +for all other APIs. Below is an example of how it can be used: + +```swift +do { + let sendResult = try source.send(contentsOf: sequence) + + switch sendResult { + case .produceMore: + // Trigger more production in the underlying system + + case .enqueueCallback(let callbackToken): + // There are enough values in the channel already. We need to enqueue + // a callback to get notified when we should produce more. + source.enqueueCallback(token: callbackToken, onProduceMore: { result in + switch result { + case .success: + // Trigger more production in the underlying system + case .failure(let error): + // Terminate the underlying producer + } + }) + } +} catch { + // `send(contentsOf:)` throws if the channel already terminated +} +``` + +The above API offers the most control and highest performance when bridging a +synchronous producer to a `MultiProducerSingleConsumerAsyncChannel`. First, you have +to send values using the `send(contentsOf:)` which returns a `SendResult`. The +result either indicates that more values should be produced or that a callback +should be enqueued by calling the `enqueueCallback(callbackToken: +onProduceMore:)` method. This callback is invoked once the backpressure strategy +decided that more values should be produced. This API aims to offer the most +flexibility with the greatest performance. The callback only has to be allocated +in the case where the producer needs to pause production. + +Additionally, the above API is the building block for some higher-level and +easier-to-use APIs to send values to the channel. Below is an +example of the two higher-level APIs. + +```swift +// Writing new values and providing a callback when to produce more +try source.send(contentsOf: sequence, onProduceMore: { result in + switch result { + case .success: + // Trigger more production + case .failure(let error): + // Terminate the underlying producer + } +}) + +// This method suspends until more values should be produced +try await source.send(contentsOf: sequence) +``` + +With the above APIs, we should be able to effectively bridge any system into a +`MultiProducerSingleConsumerAsyncChannel` regardless if the system is callback-based, +blocking, or asynchronous. + +### Multi producer + +To support multiple producers the source offers a `copy` method to produce a new +source. The source is returned `sending` so it is in a disconnected isolation +region than the original source allowing to pass it into a different isolation +region to concurrently produce elements. + +```swift +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) +) +var channel = consume channelAndSource.channel +var source1 = consume channelAndSource.source +var source2 = source1.copy() + +group.addTask { + try await source1.send(1) +} + +group.addTask() { + try await source2.send(2) +} + +print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first +print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first +``` + +### Downstream consumer termination + +> When reading the next two examples around termination behaviour keep in mind +that the newly proposed APIs are providing a strict a single consumer channel. + +Calling `finish()` terminates the downstream consumer. Below is an example of +this: + +```swift +// Termination through calling finish +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source + +try await source.send(1) +source.finish() + +print(await channel.next()) // Prints Optional(1) +print(await channel.next()) // Prints nil +``` + +If the channel has a failure type it can also be finished with an error. + +```swift +// Termination through calling finish +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: SomeError.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source + +try await source.send(1) +source.finish(throwing: SomeError) + +print(try await channel.next()) // Prints Optional(1) +print(try await channel.next()) // Throws SomeError +``` + +The other way to terminate the consumer is by deiniting the source. This has the +same effect as calling `finish()`. Since the source is a `~Copyable` type this +will happen automatically when the source is last used or explicitly consumed. + +```swift +// Termination through deiniting the source +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source + +try await source.send(1) +_ = consume source // Explicitly consume the source + +print(await channel.next()) // Prints Optional(1) +print(await channel.next()) // Prints nil +``` + +### Upstream producer termination + +The producer will get notified about termination through the `onTerminate` +callback. Termination of the producer happens in the following scenarios: + +```swift +// Termination through task cancellation +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source +source.setOnTerminationCallback { print("Terminated") } + +let task = Task { + await channel.next() +} +task.cancel() // Prints Terminated +``` + +```swift +// Termination through deiniting the channel +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source +source.setOnTerminationCallback { print("Terminated") } +_ = consume channel // Prints Terminated +``` + +```swift +// Termination through finishing the source and consuming the last element +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source = consume channelAndSource.source +source.setOnTerminationCallback { print("Terminated") } + +_ = try await source.send(1) +source.finish() + +print(await channel.next()) // Prints Optional(1) +await channel.next() // Prints Terminated +``` + +```swift +// Termination through deiniting the last source and consuming the last element +let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +var channel = consume channelAndSource.channel +var source1 = consume channelAndSource.source +var source2 = source1.copy() +source1.setOnTerminationCallback { print("Terminated") } + +_ = try await source1.send(1) +_ = consume source1 +_ = try await source2.send(2) + +print(await channel.next()) // Prints Optional(1) +print(await channel.next()) // Prints Optional(2) +_ = consume source2 +await channel.next() // Prints Terminated +``` + +Similar to the downstream consumer termination, trying to send more elements after the +producer has been terminated will result in an error thrown from the send methods. + +## Detailed design + +```swift +#if compiler(>=6.1) +/// An error that is thrown from the various `send` methods of the +/// ``MultiProducerSingleConsumerAsyncChannel/Source``. +/// +/// This error is thrown when the channel is already finished when +/// trying to send new elements to the source. +public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error { } + +/// A multi-producer single-consumer channel. +/// +/// The ``MultiProducerSingleConsumerAsyncChannel`` provides a ``MultiProducerSingleConsumerAsyncChannel/Source`` to +/// send values to the channel. The channel supports different back pressure strategies to control the +/// buffering and demand. The channel will buffer values until its backpressure strategy decides that the +/// producer have to wait. +/// +/// This channel is also suitable for the single-producer single-consumer use-case +/// +/// ## Using a MultiProducerSingleConsumerAsyncChannel +/// +/// To use a ``MultiProducerSingleConsumerAsyncChannel`` you have to create a new channel with its source first by calling +/// the ``MultiProducerSingleConsumerAsyncChannel/makeChannel(of:throwing:BackpressureStrategy:)`` method. +/// Afterwards, you can pass the source to the producer and the channel to the consumer. +/// +/// ``` +/// let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( +/// of: Int.self, +/// backpressureStrategy: .watermark(low: 2, high: 4) +/// ) +/// +/// // The channel and source can be extracted from the returned type +/// let channel = consume channelAndSource.channel +/// let source = consume channelAndSource.source +/// ``` +/// +/// ### Asynchronous producing +/// +/// Values can be send to the source from asynchronous contexts using ``MultiProducerSingleConsumerAsyncChannel/Source/send(_:)-8eo96`` +/// and ``MultiProducerSingleConsumerAsyncChannel/Source/send(contentsOf:)``. Backpressure results in calls +/// to the `send` methods to be suspended. Once more elements should be produced the `send` methods will be resumed. +/// +/// ``` +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await source.send(1) +/// try await source.send(2) +/// try await source.send(3) +/// } +/// +/// for await element in channel { +/// print(element) +/// } +/// } +/// ``` +/// +/// ### Synchronous produceing +/// +/// Values can also be send to the source from synchronous context. Backpressure is also exposed on the synchronous contexts; however, +/// it is up to the caller to decide how to properly translate the backpressure to underlying producer e.g. by blocking the thread. +/// +/// ```swift +/// do { +/// let sendResult = try source.send(contentsOf: sequence) +/// +/// switch sendResult { +/// case .produceMore: +/// // Trigger more production in the underlying system +/// +/// case .enqueueCallback(let callbackToken): +/// // There are enough values in the channel already. We need to enqueue +/// // a callback to get notified when we should produce more. +/// source.enqueueCallback(token: callbackToken, onProduceMore: { result in +/// switch result { +/// case .success: +/// // Trigger more production in the underlying system +/// case .failure(let error): +/// // Terminate the underlying producer +/// } +/// }) +/// } +/// } catch { +/// // `send(contentsOf:)` throws if the channel already terminated +/// } +/// ``` +/// +/// ### Multiple producers +/// +/// To support multiple producers the source offers a ``Source/copy()`` method to produce a new source. +/// +/// ### Terminating the production of values +/// +/// The consumer can be terminated through multiple ways: +/// - Calling ``Source/finish(throwing:)``. +/// - Deiniting all sources. +/// +/// In both cases, if there are still elements buffered by the channel, then the consumer will receive +/// all buffered elements. Afterwards it will be terminated. +/// +/// ### Observing termination of the consumer +/// +/// When the consumer stops consumption by either deiniting the channel or the task calling ``next(isolation:)`` +/// getting cancelled, the source will get notified about the termination if a termination callback has been set +/// before by calling ``Source/setOnTerminationCallback(_:)``. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +public struct MultiProducerSingleConsumerAsyncChannel: ~Copyable { + /// A struct containing the initialized channel and source. + /// + /// This struct can be deconstructed by consuming the individual + /// components from it. + /// + /// ```swift + /// let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + /// of: Int.self, + /// backpressureStrategy: .watermark(low: 5, high: 10) + /// ) + /// var channel = consume channelAndSource.channel + /// var source = consume channelAndSource.source + /// ``` + @frozen + public struct ChannelAndStream : ~Copyable { + /// The channel. + public var channel: MultiProducerSingleConsumerAsyncChannel + /// The source. + public var source: Source + } + + /// Initializes a new ``MultiProducerSingleConsumerAsyncChannel`` and an ``MultiProducerSingleConsumerAsyncChannel/Source``. + /// + /// - Parameters: + /// - elementType: The element type of the channel. + /// - failureType: The failure type of the channel. + /// - backpressureStrategy: The backpressure strategy that the channel should use. + /// - Returns: A tuple containing the channel and its source. The source should be passed to the + /// producer while the channel should be passed to the consumer. + public static func makeChannel( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: Source.BackpressureStrategy + ) -> ChannelAndStream + + /// Returns the next element. + /// + /// If this method returns `nil` it indicates that no further values can ever + /// be returned. The channel automatically closes when all sources have been deinited. + /// + /// If there are no elements and the channel has not been finished yet, this method will + /// suspend until an element is send to the channel. + /// + /// If the task calling this method is cancelled this method will return `nil`. + /// + /// - Parameter isolation: The callers isolation. + /// - Returns: The next buffered element. + public func next(isolation: isolated (any Actor)? = #isolation) async throws(Failure) -> Element? +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// A struct to send values to the channel. + /// + /// Use this source to provide elements to the channel by calling one of the `send` methods. + public struct Source: ~Copyable, Sendable { + /// A struct representing the backpressure of the channel. + public struct BackpressureStrategy: Sendable { + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + public static func watermark(low: Int, high: Int) -> BackpressureStrategy + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + /// - waterLevelForElement: A closure used to compute the contribution of each buffered element to the current water level. + /// + /// - Note, `waterLevelForElement` will be called on each element when it is written into the source and when + /// it is consumed from the channel, so it is recommended to provide a function that runs in constant time. + public static func watermark(low: Int, high: Int, waterLevelForElement: @escaping @Sendable (borrowing Element) -> Int) -> BackpressureStrategy + + /// An unbounded backpressure strategy. + /// + /// - Important: Only use this strategy if the production of elements is limited through some other mean. Otherwise + /// an unbounded backpressure strategy can result in infinite memory usage and cause + /// your process to run out of memory. + public static func unbounded() -> BackpressureStrategy + } + + /// A type that indicates the result of sending elements to the source. + public enum SendResult: ~Copyable, Sendable { + /// An opaque token that is returned when the channel's backpressure strategy indicated that production should + /// be suspended. Use this token to enqueue a callback by calling the ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Important: This token must only be passed once to ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)`` + /// and ``MultiProducerSingleConsumerAsyncChannel/Source/cancelCallback(callbackToken:)``. + public struct CallbackToken: Sendable, Hashable { } + + /// Indicates that more elements should be produced and send to the source. + case produceMore + + /// Indicates that a callback should be enqueued. + /// + /// The associated token should be passed to the ````MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)```` method. + case enqueueCallback(CallbackToken) + } + + /// A callback to invoke when the channel finished. + /// + /// This is called after the last element has been consumed by the channel. + public func setOnTerminationCallback(_ callback: @escaping @Sendable () -> Void) + + /// Creates a new source which can be used to send elements to the channel concurrently. + /// + /// The channel will only automatically be finished if all existing sources have been deinited. + /// + /// - Returns: A new source for sending elements to the channel. + public mutating func copy() -> Source + + /// Sends new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter sequence: The elements to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + public mutating func send( + contentsOf sequence: consuming sending S + ) throws -> SendResult where Element == S.Element, S: Sequence + + /// Send the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter element: The element to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + public mutating func send(_ element: sending consuming Element) throws -> SendResult + + /// Enqueues a callback that will be invoked once more elements should be produced. + /// + /// Call this method after ``send(contentsOf:)-5honm`` or ``send(_:)-3jxzb`` returned ``SendResult/enqueueCallback(_:)``. + /// + /// - Important: Enqueueing the same token multiple times is **not allowed**. + /// + /// - Parameters: + /// - callbackToken: The callback token. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. + public mutating func enqueueCallback( + callbackToken: consuming SendResult.CallbackToken, + onProduceMore: sending @escaping (Result + ) -> Void) + + /// Cancel an enqueued callback. + /// + /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and + /// will mark the passed `callbackToken` as cancelled. + /// + /// - Parameter callbackToken: The callback token. + public mutating func cancelCallback( + callbackToken: consuming SendResult.CallbackToken + ) + + /// Send new elements to the channel and provide a callback which will be invoked once more elements should be produced. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(contentsOf:onProduceMore:)``. + public mutating func send( + contentsOf sequence: consuming sending S, + onProduceMore: @escaping @Sendable (Result) -> Void + ) where Element == S.Element, S: Sequence + + /// Sends the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - element: The element to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(_:onProduceMore:)``. + public mutating func send( + _ element: consuming sending Element, + onProduceMore: @escaping @Sendable (Result + ) -> Void) + + /// Send new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + public mutating func send( + contentsOf sequence: consuming sending S + ) async throws where Element == S.Element, S: Sequence + + /// Send new element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - element: The element to send to the channel. + public mutating func send(_ element: consuming sending Element) async throws + + /// Send the elements of the asynchronous sequence to the channel. + /// + /// This method returns once the provided asynchronous sequence or the channel finished. + /// + /// - Important: This method does not finish the source if consuming the upstream sequence terminated. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + public mutating func send( + contentsOf sequence: consuming sending S + ) async throws where Element: Sendable, Element == S.Element, S: Sendable, S: AsyncSequence + + /// Indicates that the production terminated. + /// + /// After all buffered elements are consumed the subsequent call to ``MultiProducerSingleConsumerAsyncChannel/next(isolation:)`` will return + /// `nil` or throw an error. + /// + /// Calling this function more than once has no effect. After calling finish, the channel enters a terminal state and doesn't accept + /// new elements. + /// + /// - Parameters: + /// - error: The error to throw, or `nil`, to finish normally. + public consuming func finish(throwing error: Failure? = nil) + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// Converts the channel to an asynchronous sequence for consumption. + /// + /// - Important: The returned asynchronous sequence only supports a single iterator to be created and + /// will fatal error at runtime on subsequent calls to `makeAsyncIterator`. + public consuming func asyncSequence() -> some (AsyncSequence & Sendable) +} +``` + +## Comparison to other root asynchronous primitives + +### swift-async-algorithm: AsyncChannel + +The `AsyncChannel` is a multi-consumer/multi-producer root asynchronous sequence +which can be used to communicate between two tasks. It only offers asynchronous +production APIs and has an effective buffer of one per producer. This means that +any producer will be suspended until its value has been consumed. `AsyncChannel` +can handle multiple consumers and resumes them in FIFO order. + +### swift-nio: NIOAsyncSequenceProducer + +The NIO team have created their own root asynchronous sequence with the goal to +provide a high performance sequence that can be used to bridge a NIO `Channel` +inbound stream into Concurrency. The `NIOAsyncSequenceProducer` is a highly +generic and fully inlinable type and quite unwiedly to use. This proposal is +heavily inspired by the learnings from this type but tries to create a more +flexible and easier to use API that fits into the standard library. + +## Future directions + +### Adaptive backpressure strategy + +The high/low watermark strategy is common in networking code; however, there are +other strategies such as an adaptive strategy that we could offer in the future. +An adaptive strategy regulates the backpressure based on the rate of +consumption and production. With the proposed new APIs we can easily add further +strategies. + +### Support `~Copyable` elements + +In the future, we can extend the channel to support `~Copyable` elements. We +only need an underlying buffer primitive that can hold `~Copyable` types and the +continuations need to support `~Copyable` elements as well. By making the +channel not directly conform to `AsyncSequence` we can support this down the +road. + +## Alternatives considered + +### Provide an `onTermination` callback to the factory method + +During development of the new APIs, I first tried to provide the `onTermination` +callback in the `makeChannel` method. However, that showed significant usability +problems in scenarios where one wants to store the source in a type and +reference `self` in the `onTermination` closure at the same time; hence, I kept +the current pattern of setting the `onTermination` closure on the source. + +### Provide a `onConsumerCancellation` callback + +During the pitch phase, it was raised that we should provide a +`onConsumerCancellation` callback which gets invoked once the asynchronous +channel notices that the consuming task got cancelled. This callback could be +used to customize how cancellation is handled by the channel e.g. one could +imagine writing a few more elements to the channel before finishing it. Right now +the channel immediately returns `nil` or throws a `CancellationError` when it +notices cancellation. This proposal decided to not provide this customization +because it opens up the possiblity that asynchronous channels are not terminating +when implemented incorrectly. Additionally, asynchronous sequences are not the +only place where task cancellation leads to an immediate error being thrown i.e. +`Task.sleep()` does the same. Hence, the value of the asynchronous not +terminating immediately brings little value when the next call in the iterating +task might throw. However, the implementation is flexible enough to add this in +the future and we can just default it to the current behaviour. + +### Create a custom type for the `Result` of the `onProduceMore` callback + +The `onProducerMore` callback takes a `Result` which is used to +indicate if the producer should produce more or if the asynchronous channel +finished. We could introduce a new type for this but the proposal decided +against it since it effectively is a result type. + +### Use an initializer instead of factory methods + +Instead of providing a `makeChannel` factory method we could use an initializer +approach that takes a closure which gets the `Source` passed into. A similar API +has been offered with the `Continuation` based approach and +[SE-0388](https://github.com/apple/swift-evolution/blob/main/proposals/0388-async-stream-factory.md) +introduced new factory methods to solve some of the usability ergonomics with +the initializer based APIs. + +### Provide the type on older compilers + +To achieve maximum performance the implementation is using `~Copyable` extensively. +On Swift versions before 6.1, there is a https://github.com/swiftlang/swift/issues/78048 when using; hence, this type +is only usable with Swift 6.1 and later compilers. + +## Acknowledgements + +- [Johannes Weiss](https://github.com/weissi) - For making me aware how +important this problem is and providing great ideas on how to shape the API. +- [Philippe Hausler](https://github.com/phausler) - For helping me designing the +APIs and continuously providing feedback +- [George Barnett](https://github.com/glbrntt) - For providing extensive code +reviews and testing the implementation. +- [Si Beaumont](https://github.com/simonjbeaumont) - For implementing the element size dependent strategy diff --git a/Package.swift b/Package.swift index 1177d22d..8f7e804d 100644 --- a/Package.swift +++ b/Package.swift @@ -39,6 +39,13 @@ let package = Package( .enableExperimentalFeature("StrictConcurrency=complete") ] ), + .executableTarget( + name: "Example", + dependencies: ["AsyncAlgorithms"], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency=complete") + ] + ), .testTarget( name: "AsyncAlgorithmsTests", dependencies: ["AsyncAlgorithms", "AsyncSequenceValidation", "AsyncAlgorithms_XCTest"], diff --git a/Sources/AsyncAlgorithms/Internal/_TinyArray.swift b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift new file mode 100644 index 00000000..4d3e64a2 --- /dev/null +++ b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift @@ -0,0 +1,329 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftCertificates open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftCertificates project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftCertificates project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// ``_TinyArray`` is a ``RandomAccessCollection`` optimised to store zero or one ``Element``. +/// It supports arbitrary many elements but if only up to one ``Element`` is stored it does **not** allocate separate storage on the heap +/// and instead stores the ``Element`` inline. +@usableFromInline +struct _TinyArray { + @usableFromInline + enum Storage { + case one(Element) + case arbitrary([Element]) + } + + @usableFromInline + var storage: Storage +} + +// MARK: - TinyArray "public" interface + +extension _TinyArray: Equatable where Element: Equatable {} +extension _TinyArray: Hashable where Element: Hashable {} +extension _TinyArray: Sendable where Element: Sendable {} + +extension _TinyArray: RandomAccessCollection { + @usableFromInline + typealias Element = Element + + @usableFromInline + typealias Index = Int + + @inlinable + subscript(position: Int) -> Element { + get { + self.storage[position] + } + set { + self.storage[position] = newValue + } + } + + @inlinable + var startIndex: Int { + self.storage.startIndex + } + + @inlinable + var endIndex: Int { + self.storage.endIndex + } +} + +extension _TinyArray { + @inlinable + init(_ elements: some Sequence) { + self.storage = .init(elements) + } + + @inlinable + init() { + self.storage = .init() + } + + @inlinable + mutating func append(_ newElement: Element) { + self.storage.append(newElement) + } + + @inlinable + mutating func append(contentsOf newElements: some Sequence) { + self.storage.append(contentsOf: newElements) + } + + @discardableResult + @inlinable + mutating func remove(at index: Int) -> Element { + self.storage.remove(at: index) + } + + @inlinable + mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { + try self.storage.removeAll(where: shouldBeRemoved) + } + + @inlinable + mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { + try self.storage.sort(by: areInIncreasingOrder) + } +} + +// MARK: - TinyArray.Storage "private" implementation + +extension _TinyArray.Storage: Equatable where Element: Equatable { + @inlinable + static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.one(let lhs), .one(let rhs)): + return lhs == rhs + case (.arbitrary(let lhs), .arbitrary(let rhs)): + // we don't use lhs.elementsEqual(rhs) so we can hit the fast path from Array + // if both arrays share the same underlying storage: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1775 + return lhs == rhs + + case (.one(let element), .arbitrary(let array)), + (.arbitrary(let array), .one(let element)): + guard array.count == 1 else { + return false + } + return element == array[0] + + } + } +} +extension _TinyArray.Storage: Hashable where Element: Hashable { + @inlinable + func hash(into hasher: inout Hasher) { + // same strategy as Array: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1801 + hasher.combine(count) + for element in self { + hasher.combine(element) + } + } +} +extension _TinyArray.Storage: Sendable where Element: Sendable {} + +extension _TinyArray.Storage: RandomAccessCollection { + @inlinable + subscript(position: Int) -> Element { + get { + switch self { + case .one(let element): + guard position == 0 else { + fatalError("index \(position) out of bounds") + } + return element + case .arbitrary(let elements): + return elements[position] + } + } + set { + switch self { + case .one: + guard position == 0 else { + fatalError("index \(position) out of bounds") + } + self = .one(newValue) + case .arbitrary(var elements): + elements[position] = newValue + self = .arbitrary(elements) + } + } + } + + @inlinable + var startIndex: Int { + 0 + } + + @inlinable + var endIndex: Int { + switch self { + case .one: return 1 + case .arbitrary(let elements): return elements.endIndex + } + } +} + +extension _TinyArray.Storage { + @inlinable + init(_ elements: some Sequence) { + var iterator = elements.makeIterator() + guard let firstElement = iterator.next() else { + self = .arbitrary([]) + return + } + guard let secondElement = iterator.next() else { + // newElements just contains a single element + // and we hit the fast path + self = .one(firstElement) + return + } + + var elements: [Element] = [] + elements.reserveCapacity(elements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + while let nextElement = iterator.next() { + elements.append(nextElement) + } + self = .arbitrary(elements) + } + + @inlinable + init() { + self = .arbitrary([]) + } + + @inlinable + mutating func append(_ newElement: Element) { + self.append(contentsOf: CollectionOfOne(newElement)) + } + + @inlinable + mutating func append(contentsOf newElements: some Sequence) { + switch self { + case .one(let firstElement): + var iterator = newElements.makeIterator() + guard let secondElement = iterator.next() else { + // newElements is empty, nothing to do + return + } + var elements: [Element] = [] + elements.reserveCapacity(1 + newElements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + elements.appendRemainingElements(from: &iterator) + self = .arbitrary(elements) + + case .arbitrary(var elements): + if elements.isEmpty { + // if `self` is currently empty and `newElements` just contains a single + // element, we skip allocating an array and set `self` to `.one(firstElement)` + var iterator = newElements.makeIterator() + guard let firstElement = iterator.next() else { + // newElements is empty, nothing to do + return + } + guard let secondElement = iterator.next() else { + // newElements just contains a single element + // and we hit the fast path + self = .one(firstElement) + return + } + elements.reserveCapacity(elements.count + newElements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + elements.appendRemainingElements(from: &iterator) + self = .arbitrary(elements) + + } else { + elements.append(contentsOf: newElements) + self = .arbitrary(elements) + } + + } + } + + @discardableResult + @inlinable + mutating func remove(at index: Int) -> Element { + switch self { + case .one(let oldElement): + guard index == 0 else { + fatalError("index \(index) out of bounds") + } + self = .arbitrary([]) + return oldElement + + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + return elements.remove(at: index) + + } + } + + @inlinable + mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { + switch self { + case .one(let oldElement): + if try shouldBeRemoved(oldElement) { + self = .arbitrary([]) + } + + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + return try elements.removeAll(where: shouldBeRemoved) + + } + } + + @inlinable + mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { + switch self { + case .one: + // a collection of just one element is always sorted, nothing to do + break + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + + try elements.sort(by: areInIncreasingOrder) + } + } +} + +extension Array { + @inlinable + mutating func appendRemainingElements(from iterator: inout some IteratorProtocol) { + while let nextElement = iterator.next() { + append(nextElement) + } + } +} diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift new file mode 100644 index 00000000..419a2f31 --- /dev/null +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift @@ -0,0 +1,1666 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.1) +import DequeModule +import Synchronization + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + @usableFromInline + enum _InternalBackpressureStrategy: Sendable, CustomStringConvertible { + @usableFromInline + struct _Watermark: Sendable, CustomStringConvertible { + /// The low watermark where demand should start. + @usableFromInline + let _low: Int + + /// The high watermark where demand should be stopped. + @usableFromInline + let _high: Int + + /// The current watermark level. + @usableFromInline + var _currentWatermark: Int = 0 + + /// A closure that can be used to calculate the watermark impact of a single element + @usableFromInline + let _waterLevelForElement: (@Sendable (borrowing Element) -> Int)? + + @usableFromInline + var description: String { + "watermark(\(self._currentWatermark))" + } + + init(low: Int, high: Int, waterLevelForElement: (@Sendable (borrowing Element) -> Int)?) { + precondition(low <= high) + self._low = low + self._high = high + self._waterLevelForElement = waterLevelForElement + } + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + if let waterLevelForElement = self._waterLevelForElement { + for element in elements { + self._currentWatermark += waterLevelForElement(element) + } + } else { + self._currentWatermark += elements.count + } + precondition(self._currentWatermark >= 0) + // We are demanding more until we reach the high watermark + return self._currentWatermark < self._high + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + if let waterLevelForElement = self._waterLevelForElement { + self._currentWatermark -= waterLevelForElement(element) + } else { + self._currentWatermark -= 1 + } + precondition(self._currentWatermark >= 0) + // We start demanding again once we are below the low watermark + return self._currentWatermark < self._low + } + } + + @usableFromInline + struct _Unbounded: Sendable, CustomStringConvertible { + @usableFromInline + var description: String { + "unbounded" + } + + init() {} + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + true + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + true + } + } + + /// A watermark based strategy. + case watermark(_Watermark) + /// An unbounded based strategy. + case unbounded(_Unbounded) + + @usableFromInline + var description: String { + switch consume self { + case .watermark(let strategy): + return strategy.description + case .unbounded(let unbounded): + return unbounded.description + } + } + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + switch consume self { + case .watermark(var strategy): + let result = strategy.didSend(elements: elements) + self = .watermark(strategy) + return result + case .unbounded(var strategy): + let result = strategy.didSend(elements: elements) + self = .unbounded(strategy) + return result + } + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + switch consume self { + case .watermark(var strategy): + let result = strategy.didConsume(element: element) + self = .watermark(strategy) + return result + case .unbounded(var strategy): + let result = strategy.didConsume(element: element) + self = .unbounded(strategy) + return result + } + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + @usableFromInline + final class _Storage: Sendable { + @usableFromInline + let _stateMachine: Mutex<_StateMachine> + + var onTermination: (@Sendable () -> Void)? { + set { + self._stateMachine.withLock { + $0._onTermination = newValue + } + } + get { + self._stateMachine.withLock { + $0._onTermination + } + } + } + + @inlinable + init( + backpressureStrategy: _InternalBackpressureStrategy + ) { + self._stateMachine = Mutex<_StateMachine>(_StateMachine(backpressureStrategy: backpressureStrategy)) + } + + func channelDeinitialized() { + let action = self._stateMachine.withLock { + $0.channelDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + func sequenceInitialized() { + self._stateMachine.withLock { + $0.sequenceInitialized() + } + } + + func sequenceDeinitialized() { + let action = self._stateMachine.withLock { + $0.sequenceDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + func iteratorInitialized() { + self._stateMachine.withLock { + $0.iteratorInitialized() + } + } + + func iteratorDeinitialized() { + let action = self._stateMachine.withLock { + $0.iteratorDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + func sourceInitialized() { + self._stateMachine.withLock { + $0.sourceInitialized() + } + } + + func sourceDeinitialized() { + let action = self._stateMachine.withLock { + $0.sourceDeinitialized() + } + + switch action { + case .resumeConsumerAndCallOnTermination(let consumerContinuation, let failure, let onTermination): + switch failure { + case .some(let error): + consumerContinuation.resume(throwing: error) + case .none: + consumerContinuation.resume(returning: nil) + } + + onTermination?() + + case .none: + break + } + } + + @inlinable + func send( + contentsOf sequence: sending some Sequence + ) throws -> MultiProducerSingleConsumerAsyncChannel.Source.SendResult { + let action = self._stateMachine.withLock { + $0.send(sequence) + } + + switch action { + case .returnProduceMore: + return .produceMore + + case .returnEnqueue(let callbackToken): + return .enqueueCallback(.init(id: callbackToken)) + + case .resumeConsumerAndReturnProduceMore(let continuation, let element): + continuation.resume(returning: element) + return .produceMore + + case .resumeConsumerAndReturnEnqueue(let continuation, let element, let callbackToken): + continuation.resume(returning: element) + return .enqueueCallback(.init(id: callbackToken)) + + case .throwFinishedError: + throw MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + } + } + + @inlinable + func enqueueProducer( + callbackToken: UInt64, + continuation: UnsafeContinuation + ) { + let action = self._stateMachine.withLock { + $0.enqueueContinuation(callbackToken: callbackToken, continuation: continuation) + } + + switch action { + case .resumeProducer(let continuation): + continuation.resume() + + case .resumeProducerWithError(let continuation, let error): + continuation.resume(throwing: error) + + case .none: + break + } + } + + @inlinable + func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) { + let action = self._stateMachine.withLock { + $0.enqueueProducer(callbackToken: callbackToken, onProduceMore: onProduceMore) + } + + switch action { + case .resumeProducer(let onProduceMore): + onProduceMore(Result.success(())) + + case .resumeProducerWithError(let onProduceMore, let error): + onProduceMore(Result.failure(error)) + + case .none: + break + } + } + + @inlinable + func cancelProducer( + callbackToken: UInt64 + ) { + let action = self._stateMachine.withLock { + $0.cancelProducer(callbackToken: callbackToken) + } + + switch action { + case .resumeProducerWithCancellationError(let onProduceMore): + switch onProduceMore { + case .closure(let onProduceMore): + onProduceMore(.failure(CancellationError())) + case .continuation(let continuation): + continuation.resume(throwing: CancellationError()) + } + + case .none: + break + } + } + + @inlinable + func finish(_ failure: Failure?) { + let action = self._stateMachine.withLock { + $0.finish(failure) + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .resumeConsumerAndCallOnTermination(let consumerContinuation, let failure, let onTermination): + switch failure { + case .some(let error): + consumerContinuation.resume(throwing: error) + case .none: + consumerContinuation.resume(returning: nil) + } + + onTermination?() + + case .resumeProducers(let producerContinuations): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + + case .none: + break + } + } + + @inlinable + func next(isolation: isolated (any Actor)? = #isolation) async throws -> Element? { + let action = self._stateMachine.withLock { + $0.next() + } + + switch action { + case .returnElement(let element): + return element + + case .returnElementAndResumeProducers(let element, let producerContinuations): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.success(())) + case .continuation(let continuation): + continuation.resume() + } + } + + return element + + case .returnFailureAndCallOnTermination(let failure, let onTermination): + onTermination?() + switch failure { + case .some(let error): + throw error + + case .none: + return nil + } + + case .returnNil: + return nil + + case .suspendTask: + return try await self.suspendNext() + } + } + + @inlinable + func suspendNext(isolation: isolated (any Actor)? = #isolation) async throws -> Element? { + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + let action = self._stateMachine.withLock { + $0.suspendNext(continuation: continuation) + } + + switch action { + case .resumeConsumerWithElement(let continuation, let element): + continuation.resume(returning: element) + + case .resumeConsumerWithElementAndProducers( + let continuation, + let element, + let producerContinuations + ): + continuation.resume(returning: element) + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.success(())) + case .continuation(let continuation): + continuation.resume() + } + } + + case .resumeConsumerWithFailureAndCallOnTermination( + let continuation, + let failure, + let onTermination + ): + switch failure { + case .some(let error): + continuation.resume(throwing: error) + + case .none: + continuation.resume(returning: nil) + } + onTermination?() + + case .resumeConsumerWithNil(let continuation): + continuation.resume(returning: nil) + + case .none: + break + } + } + } onCancel: { + let action = self._stateMachine.withLock { + $0.cancelNext() + } + + switch action { + case .resumeConsumerWithNilAndCallOnTermination(let continuation, let onTermination): + continuation.resume(returning: nil) + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel._Storage { + /// The state machine of the channel. + @usableFromInline + struct _StateMachine: ~Copyable { + /// The state machine's current state. + @usableFromInline + var _state: _State + + @inlinable + var _onTermination: (@Sendable () -> Void)? { + set { + switch consume self._state { + case .channeling(var channeling): + channeling.onTermination = newValue + self = .init(state: .channeling(channeling)) + + case .sourceFinished(var sourceFinished): + sourceFinished.onTermination = newValue + self = .init(state: .sourceFinished(sourceFinished)) + + case .finished(let finished): + self = .init(state: .finished(finished)) + } + } + get { + switch self._state { + case .channeling(let channeling): + return channeling.onTermination + + case .sourceFinished(let sourceFinished): + return sourceFinished.onTermination + + case .finished: + return nil + } + } + } + + @usableFromInline + init( + backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy + ) { + self._state = .channeling( + .init( + backpressureStrategy: backpressureStrategy, + iteratorInitialized: false, + sequenceInitialized: false, + buffer: .init(), + producerContinuations: .init(), + cancelledAsyncProducers: .init(), + hasOutstandingDemand: true, + activeProducers: 0, + nextCallbackTokenID: 0 + ) + ) + } + + @inlinable + init(state: consuming _State) { + self._state = state + } + + @inlinable + mutating func sourceInitialized() { + switch consume self._state { + case .channeling(var channeling): + channeling.activeProducers += 1 + self = .init(state: .channeling(channeling)) + + case .sourceFinished(let sourceFinished): + self = .init(state: .sourceFinished(sourceFinished)) + + case .finished(let finished): + self = .init(state: .finished(finished)) + } + } + + /// Actions returned by `sourceDeinitialized()`. + @usableFromInline + enum SourceDeinitialized { + /// Indicates that the consumer should be resumed with the failure, the producers + /// should be resumed with an error and `onTermination` should be called. + case resumeConsumerAndCallOnTermination( + consumerContinuation: UnsafeContinuation, + failure: Failure?, + onTermination: (() -> Void)? + ) + } + + @inlinable + mutating func sourceDeinitialized() -> SourceDeinitialized? { + switch consume self._state { + case .channeling(var channeling): + channeling.activeProducers -= 1 + + guard channeling.activeProducers == 0 else { + // We still have more producers + self = .init(state: .channeling(channeling)) + + return nil + } + // This was the last producer so we can transition to source finished now + + guard let consumerContinuation = channeling.consumerContinuation else { + // We don't have a suspended consumer so we are just going to mark + // the source as finished. + self = .init( + state: .sourceFinished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + buffer: channeling.buffer, + failure: nil, + onTermination: channeling.onTermination + ) + ) + ) + + return nil + } + // We have a continuation, this means our buffer must be empty + // Furthermore, we can now transition to finished + // and resume the continuation with the failure + precondition(channeling.buffer.isEmpty, "Expected an empty buffer") + + self = .init( + state: .finished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .resumeConsumerAndCallOnTermination( + consumerContinuation: consumerContinuation, + failure: nil, + onTermination: channeling.onTermination + ) + + case .sourceFinished(let sourceFinished): + // If the source has finished, finishing again has no effect. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(var finished): + finished.sourceFinished = true + self = .init(state: .finished(finished)) + return .none + } + } + + @inlinable + mutating func sequenceInitialized() { + switch consume self._state { + case .channeling(var channeling): + channeling.sequenceInitialized = true + self = .init(state: .channeling(channeling)) + + case .sourceFinished(var sourceFinished): + sourceFinished.sequenceInitialized = true + self = .init(state: .sourceFinished(sourceFinished)) + + case .finished(var finished): + finished.sequenceInitialized = true + self = .init(state: .finished(finished)) + } + } + + /// Actions returned by `sequenceDeinitialized()`. + @usableFromInline + enum ChannelOrSequenceDeinitializedAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((@Sendable () -> Void)?) + /// Indicates that all producers should be failed and `onTermination` should be called. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (@Sendable () -> Void)? + ) + } + + @inlinable + mutating func sequenceDeinitialized() -> ChannelOrSequenceDeinitializedAction? { + switch consume self._state { + case .channeling(let channeling): + guard channeling.iteratorInitialized else { + precondition(channeling.sequenceInitialized, "Sequence was not initialized") + // No iterator was created so we can transition to finished right away. + self = .init( + state: .finished( + .init( + iteratorInitialized: false, + sequenceInitialized: true, + sourceFinished: false + ) + ) + ) + + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + self = .init(state: .channeling(channeling)) + + return .none + + case .sourceFinished(let sourceFinished): + guard sourceFinished.iteratorInitialized else { + precondition(sourceFinished.sequenceInitialized, "Sequence was not initialized") + // No iterator was created so we can transition to finished right away. + self = .init( + state: .finished( + .init( + iteratorInitialized: false, + sequenceInitialized: true, + sourceFinished: true + ) + ) + ) + + return .callOnTermination(sourceFinished.onTermination) + } + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + self = .init(state: .finished(finished)) + + return .none + } + } + + @inlinable + mutating func channelDeinitialized() -> ChannelOrSequenceDeinitializedAction? { + switch consume self._state { + case .channeling(let channeling): + guard channeling.sequenceInitialized else { + // No async sequence was created so we can transition to finished + self = .init( + state: .finished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + // An async sequence was created so we need to ignore this deinit + self = .init(state: .channeling(channeling)) + return nil + + case .sourceFinished(let sourceFinished): + guard sourceFinished.sequenceInitialized else { + // No async sequence was created so we can transition to finished + self = .init( + state: .finished( + .init( + iteratorInitialized: sourceFinished.iteratorInitialized, + sequenceInitialized: sourceFinished.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .callOnTermination(sourceFinished.onTermination) + } + // An async sequence was created so we need to ignore this deinit + self = .init(state: .sourceFinished(sourceFinished)) + return nil + + case .finished(let finished): + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + self = .init(state: .finished(finished)) + + return .none + } + } + + @inlinable + mutating func iteratorInitialized() { + switch consume self._state { + case .channeling(var channeling): + if channeling.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + // The first and only iterator was initialized. + channeling.iteratorInitialized = true + self = .init(state: .channeling(channeling)) + } + + case .sourceFinished(var sourceFinished): + if sourceFinished.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + // The first and only iterator was initialized. + sourceFinished.iteratorInitialized = true + self = .init(state: .sourceFinished(sourceFinished)) + } + + case .finished(let finished): + if finished.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + self = .init( + state: .finished( + .init( + iteratorInitialized: true, + sequenceInitialized: true, + sourceFinished: finished.sourceFinished + ) + ) + ) + } + } + } + + /// Actions returned by `iteratorDeinitialized()`. + @usableFromInline + enum IteratorDeinitializedAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((@Sendable () -> Void)?) + /// Indicates that all producers should be failed and `onTermination` should be called. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (@Sendable () -> Void)? + ) + } + + @inlinable + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch consume self._state { + case .channeling(let channeling): + if channeling.iteratorInitialized { + // An iterator was created and deinited. Since we only support + // a single iterator we can now transition to finish. + self = .init( + state: .finished( + .init( + iteratorInitialized: true, + sequenceInitialized: true, + sourceFinished: false + ) + ) + ) + + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } else { + // An iterator needs to be initialized before it can be deinitialized. + fatalError("MultiProducerSingleConsumerAsyncChannel internal inconsistency") + } + + case .sourceFinished(let sourceFinished): + if sourceFinished.iteratorInitialized { + // An iterator was created and deinited. Since we only support + // a single iterator we can now transition to finish. + self = .init( + state: .finished( + .init( + iteratorInitialized: true, + sequenceInitialized: true, + sourceFinished: true + ) + ) + ) + + return .callOnTermination(sourceFinished.onTermination) + } else { + // An iterator needs to be initialized before it can be deinitialized. + fatalError("MultiProducerSingleConsumerAsyncChannel internal inconsistency") + } + + case .finished(let finished): + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + self = .init(state: .finished(finished)) + + return .none + } + } + + /// Actions returned by `send()`. + @usableFromInline + enum SendAction { + /// Indicates that the producer should be notified to produce more. + case returnProduceMore + /// Indicates that the producer should be suspended to stop producing. + case returnEnqueue( + callbackToken: UInt64 + ) + /// Indicates that the consumer should be resumed and the producer should be notified to produce more. + case resumeConsumerAndReturnProduceMore( + continuation: UnsafeContinuation, + element: Element + ) + /// Indicates that the consumer should be resumed and the producer should be suspended. + case resumeConsumerAndReturnEnqueue( + continuation: UnsafeContinuation, + element: Element, + callbackToken: UInt64 + ) + /// Indicates that the producer has been finished. + case throwFinishedError + + @inlinable + init( + callbackToken: UInt64?, + continuationAndElement: (UnsafeContinuation, Element)? = nil + ) { + switch (callbackToken, continuationAndElement) { + case (.none, .none): + self = .returnProduceMore + + case (.some(let callbackToken), .none): + self = .returnEnqueue(callbackToken: callbackToken) + + case (.none, .some((let continuation, let element))): + self = .resumeConsumerAndReturnProduceMore( + continuation: continuation, + element: element + ) + + case (.some(let callbackToken), .some((let continuation, let element))): + self = .resumeConsumerAndReturnEnqueue( + continuation: continuation, + element: element, + callbackToken: callbackToken + ) + } + } + } + + @inlinable + mutating func send(_ sequence: sending some Sequence) -> SendAction { + switch consume self._state { + case .channeling(var channeling): + // We have an element and can resume the continuation + let bufferEndIndexBeforeAppend = channeling.buffer.endIndex + channeling.buffer.append(contentsOf: sequence) + var shouldProduceMore = channeling.backpressureStrategy.didSend( + elements: channeling.buffer[bufferEndIndexBeforeAppend...] + ) + channeling.hasOutstandingDemand = shouldProduceMore + + guard let consumerContinuation = channeling.consumerContinuation else { + // We don't have a suspended consumer so we just buffer the elements + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init( + callbackToken: callbackToken + ) + } + guard let element = channeling.buffer.popFirst() else { + // We got a send of an empty sequence. We just tolerate this. + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init(callbackToken: callbackToken) + } + // We need to tell the back pressure strategy that we consumed + shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + // We got a consumer continuation and an element. We can resume the consumer now + channeling.consumerContinuation = nil + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init( + callbackToken: callbackToken, + continuationAndElement: (consumerContinuation, element) + ) + + case .sourceFinished(let sourceFinished): + // If the source has finished we are dropping the elements. + self = .init(state: .sourceFinished(sourceFinished)) + + return .throwFinishedError + + case .finished(let finished): + // If the source has finished we are dropping the elements. + self = .init(state: .finished(finished)) + + return .throwFinishedError + } + } + + /// Actions returned by `enqueueProducer()`. + @usableFromInline + enum EnqueueProducerAction { + /// Indicates that the producer should be notified to produce more. + case resumeProducer((Result) -> Void) + /// Indicates that the producer should be notified about an error. + case resumeProducerWithError((Result) -> Void, Error) + } + + @inlinable + mutating func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) -> EnqueueProducerAction? { + switch consume self._state { + case .channeling(var channeling): + if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { + // Our producer got marked as cancelled. + channeling.cancelledAsyncProducers.remove(at: index) + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithError(onProduceMore, CancellationError()) + } else if channeling.hasOutstandingDemand { + // We hit an edge case here where we wrote but the consuming thread got interleaved + self = .init(state: .channeling(channeling)) + + return .resumeProducer(onProduceMore) + } else { + channeling.suspendedProducers.append((callbackToken, .closure(onProduceMore))) + self = .init(state: .channeling(channeling)) + + return .none + } + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + + /// Actions returned by `enqueueContinuation()`. + @usableFromInline + enum EnqueueContinuationAction { + /// Indicates that the producer should be notified to produce more. + case resumeProducer(UnsafeContinuation) + /// Indicates that the producer should be notified about an error. + case resumeProducerWithError(UnsafeContinuation, Error) + } + + @inlinable + mutating func enqueueContinuation( + callbackToken: UInt64, + continuation: UnsafeContinuation + ) -> EnqueueContinuationAction? { + switch consume self._state { + case .channeling(var channeling): + if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { + // Our producer got marked as cancelled. + channeling.cancelledAsyncProducers.remove(at: index) + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithError(continuation, CancellationError()) + } else if channeling.hasOutstandingDemand { + // We hit an edge case here where we wrote but the consuming thread got interleaved + self = .init(state: .channeling(channeling)) + + return .resumeProducer(continuation) + } else { + channeling.suspendedProducers.append((callbackToken, .continuation(continuation))) + self = .init(state: .channeling(channeling)) + + return .none + } + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeProducerWithError(continuation, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .resumeProducerWithError(continuation, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + + /// Actions returned by `cancelProducer()`. + @usableFromInline + enum CancelProducerAction { + /// Indicates that the producer should be notified about cancellation. + case resumeProducerWithCancellationError(_MultiProducerSingleConsumerSuspendedProducer) + } + + @inlinable + mutating func cancelProducer( + callbackToken: UInt64 + ) -> CancelProducerAction? { + switch consume self._state { + case .channeling(var channeling): + guard let index = channeling.suspendedProducers.firstIndex(where: { $0.0 == callbackToken }) else { + // The task that sends was cancelled before sending elements so the cancellation handler + // got invoked right away + channeling.cancelledAsyncProducers.append(callbackToken) + self = .init(state: .channeling(channeling)) + + return .none + } + // We have an enqueued producer that we need to resume now + let continuation = channeling.suspendedProducers.remove(at: index).1 + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithCancellationError(continuation) + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .none + } + } + + /// Actions returned by `finish()`. + @usableFromInline + enum FinishAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((() -> Void)?) + /// Indicates that the consumer should be resumed with the failure, the producers + /// should be resumed with an error and `onTermination` should be called. + case resumeConsumerAndCallOnTermination( + consumerContinuation: UnsafeContinuation, + failure: Failure?, + onTermination: (() -> Void)? + ) + /// Indicates that the producers should be resumed with an error. + case resumeProducers( + producerContinuations: _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + ) + } + + @inlinable + mutating func finish(_ failure: Failure?) -> FinishAction? { + switch consume self._state { + case .channeling(let channeling): + guard let consumerContinuation = channeling.consumerContinuation else { + // We don't have a suspended consumer so we are just going to mark + // the source as finished and terminate the current suspended producers. + self = .init( + state: .sourceFinished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + buffer: channeling.buffer, + failure: failure, + onTermination: channeling.onTermination + ) + ) + ) + + return .resumeProducers( + producerContinuations: .init(channeling.suspendedProducers.lazy.map { $0.1 }) + ) + } + // We have a continuation, this means our buffer must be empty + // Furthermore, we can now transition to finished + // and resume the continuation with the failure + precondition(channeling.buffer.isEmpty, "Expected an empty buffer") + + self = .init( + state: .finished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .resumeConsumerAndCallOnTermination( + consumerContinuation: consumerContinuation, + failure: failure, + onTermination: channeling.onTermination + ) + + case .sourceFinished(let sourceFinished): + // If the source has finished, finishing again has no effect. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(var finished): + finished.sourceFinished = true + self = .init(state: .finished(finished)) + return .none + } + } + + /// Actions returned by `next()`. + @usableFromInline + enum NextAction { + /// Indicates that the element should be returned to the caller. + case returnElement(Element) + /// Indicates that the element should be returned to the caller and that all producers should be called. + case returnElementAndResumeProducers(Element, _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>) + /// Indicates that the `Failure` should be returned to the caller and that `onTermination` should be called. + case returnFailureAndCallOnTermination(Failure?, (() -> Void)?) + /// Indicates that the `nil` should be returned to the caller. + case returnNil + /// Indicates that the `Task` of the caller should be suspended. + case suspendTask + } + + @inlinable + mutating func next() -> NextAction { + switch consume self._state { + case .channeling(var channeling): + guard channeling.consumerContinuation == nil else { + // We have multiple AsyncIterators iterating the sequence + fatalError("MultiProducerSingleConsumerAsyncChannel internal inconsistency") + } + + guard let element = channeling.buffer.popFirst() else { + // There is nothing in the buffer to fulfil the demand so we need to suspend. + // We are not interacting with the backpressure strategy here because + // we are doing this inside `suspendNext` + self = .init(state: .channeling(channeling)) + + return .suspendTask + } + // We have an element to fulfil the demand right away. + let shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + guard shouldProduceMore else { + // We don't have any new demand, so we can just return the element. + self = .init(state: .channeling(channeling)) + + return .returnElement(element) + } + // There is demand and we have to resume our producers + let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) + channeling.suspendedProducers.removeAll(keepingCapacity: true) + self = .init(state: .channeling(channeling)) + + return .returnElementAndResumeProducers(element, producers) + + case .sourceFinished(var sourceFinished): + // Check if we have an element left in the buffer and return it + guard let element = sourceFinished.buffer.popFirst() else { + // We are returning the queued failure now and can transition to finished + self = .init( + state: .finished( + .init( + iteratorInitialized: sourceFinished.iteratorInitialized, + sequenceInitialized: sourceFinished.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .returnFailureAndCallOnTermination(sourceFinished.failure, sourceFinished.onTermination) + } + self = .init(state: .sourceFinished(sourceFinished)) + + return .returnElement(element) + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .returnNil + } + } + + /// Actions returned by `suspendNext()`. + @usableFromInline + enum SuspendNextAction { + /// Indicates that the consumer should be resumed. + case resumeConsumerWithElement(UnsafeContinuation, Element) + /// Indicates that the consumer and all producers should be resumed. + case resumeConsumerWithElementAndProducers( + UnsafeContinuation, + Element, + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + ) + /// Indicates that the consumer should be resumed with the failure and that `onTermination` should be called. + case resumeConsumerWithFailureAndCallOnTermination( + UnsafeContinuation, + Failure?, + (() -> Void)? + ) + /// Indicates that the consumer should be resumed with `nil`. + case resumeConsumerWithNil(UnsafeContinuation) + } + + @inlinable + mutating func suspendNext(continuation: UnsafeContinuation) -> SuspendNextAction? { + switch consume self._state { + case .channeling(var channeling): + guard channeling.consumerContinuation == nil else { + // We have multiple AsyncIterators iterating the sequence + fatalError("MultiProducerSingleConsumerAsyncChannel internal inconsistency") + } + + // We have to check here again since we might have a producer interleave next and suspendNext + guard let element = channeling.buffer.popFirst() else { + // There is nothing in the buffer to fulfil the demand so we to store the continuation. + channeling.consumerContinuation = continuation + self = .init(state: .channeling(channeling)) + + return .none + } + // We have an element to fulfil the demand right away. + + let shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + guard shouldProduceMore else { + // We don't have any new demand, so we can just return the element. + self = .init(state: .channeling(channeling)) + + return .resumeConsumerWithElement(continuation, element) + } + // There is demand and we have to resume our producers + let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) + channeling.suspendedProducers.removeAll(keepingCapacity: true) + self = .init(state: .channeling(channeling)) + + return .resumeConsumerWithElementAndProducers(continuation, element, producers) + + case .sourceFinished(var sourceFinished): + // Check if we have an element left in the buffer and return it + guard let element = sourceFinished.buffer.popFirst() else { + // We are returning the queued failure now and can transition to finished + self = .init( + state: .finished( + .init( + iteratorInitialized: sourceFinished.iteratorInitialized, + sequenceInitialized: sourceFinished.sequenceInitialized, + sourceFinished: true + ) + ) + ) + + return .resumeConsumerWithFailureAndCallOnTermination( + continuation, + sourceFinished.failure, + sourceFinished.onTermination + ) + } + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeConsumerWithElement(continuation, element) + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .resumeConsumerWithNil(continuation) + } + } + + /// Actions returned by `cancelNext()`. + @usableFromInline + enum CancelNextAction { + /// Indicates that the continuation should be resumed with nil, the producers should be finished and call onTermination. + case resumeConsumerWithNilAndCallOnTermination(UnsafeContinuation, (() -> Void)?) + /// Indicates that the producers should be finished and call onTermination. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (() -> Void)? + ) + } + + @inlinable + mutating func cancelNext() -> CancelNextAction? { + switch consume self._state { + case .channeling(let channeling): + self = .init( + state: .finished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + sequenceInitialized: channeling.sequenceInitialized, + sourceFinished: false + ) + ) + ) + + guard let consumerContinuation = channeling.consumerContinuation else { + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + precondition( + channeling.suspendedProducers.isEmpty, + "Internal inconsistency. Unexpected producer continuations." + ) + return .resumeConsumerWithNilAndCallOnTermination( + consumerContinuation, + channeling.onTermination + ) + + case .sourceFinished(let sourceFinished): + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .none + } + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine { + @usableFromInline + enum _State: ~Copyable { + @usableFromInline + struct Channeling: ~Copyable { + /// The backpressure strategy. + @usableFromInline + var backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy + + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// Indicates if an async sequence was initialized. + @usableFromInline + var sequenceInitialized: Bool + + /// The onTermination callback. + @usableFromInline + var onTermination: (@Sendable () -> Void)? + + /// The buffer of elements. + @usableFromInline + var buffer: Deque + + /// The optional consumer continuation. + @usableFromInline + var consumerContinuation: UnsafeContinuation? + + /// The producer continuations. + @usableFromInline + var suspendedProducers: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)> + + /// The producers that have been cancelled. + @usableFromInline + var cancelledAsyncProducers: Deque + + /// Indicates if we currently have outstanding demand. + @usableFromInline + var hasOutstandingDemand: Bool + + /// The number of active producers. + @usableFromInline + var activeProducers: UInt64 + + /// The next callback token. + @usableFromInline + var nextCallbackTokenID: UInt64 + + var description: String { + "backpressure:\(self.backpressureStrategy.description) iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) consumerContinuation:\(self.consumerContinuation == nil) producerContinuations:\(self.suspendedProducers.count) cancelledProducers:\(self.cancelledAsyncProducers.count) hasOutstandingDemand:\(self.hasOutstandingDemand)" + } + + @inlinable + init( + backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy, + iteratorInitialized: Bool, + sequenceInitialized: Bool, + onTermination: (@Sendable () -> Void)? = nil, + buffer: Deque, + consumerContinuation: UnsafeContinuation? = nil, + producerContinuations: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)>, + cancelledAsyncProducers: Deque, + hasOutstandingDemand: Bool, + activeProducers: UInt64, + nextCallbackTokenID: UInt64 + ) { + self.backpressureStrategy = backpressureStrategy + self.iteratorInitialized = iteratorInitialized + self.sequenceInitialized = sequenceInitialized + self.onTermination = onTermination + self.buffer = buffer + self.consumerContinuation = consumerContinuation + self.suspendedProducers = producerContinuations + self.cancelledAsyncProducers = cancelledAsyncProducers + self.hasOutstandingDemand = hasOutstandingDemand + self.activeProducers = activeProducers + self.nextCallbackTokenID = nextCallbackTokenID + } + + /// Generates the next callback token. + @inlinable + mutating func nextCallbackToken() -> UInt64 { + let id = self.nextCallbackTokenID + self.nextCallbackTokenID += 1 + return id + } + } + + @usableFromInline + struct SourceFinished: ~Copyable { + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// Indicates if an async sequence was initialized. + @usableFromInline + var sequenceInitialized: Bool + + /// The buffer of elements. + @usableFromInline + var buffer: Deque + + /// The failure that should be thrown after the last element has been consumed. + @usableFromInline + var failure: Failure? + + /// The onTermination callback. + @usableFromInline + var onTermination: (@Sendable () -> Void)? + + var description: String { + "iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) failure:\(self.failure == nil)" + } + + @inlinable + init( + iteratorInitialized: Bool, + sequenceInitialized: Bool, + buffer: Deque, + failure: Failure? = nil, + onTermination: (@Sendable () -> Void)? = nil + ) { + self.iteratorInitialized = iteratorInitialized + self.sequenceInitialized = sequenceInitialized + self.buffer = buffer + self.failure = failure + self.onTermination = onTermination + } + } + + @usableFromInline + struct Finished: ~Copyable { + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// Indicates if an async sequence was initialized. + @usableFromInline + var sequenceInitialized: Bool + + /// Indicates if the source was finished. + @usableFromInline + var sourceFinished: Bool + + var description: String { + "iteratorInitialized:\(self.iteratorInitialized) sourceFinished:\(self.sourceFinished)" + } + + @inlinable + init( + iteratorInitialized: Bool, + sequenceInitialized: Bool, + sourceFinished: Bool + ) { + self.iteratorInitialized = iteratorInitialized + self.sequenceInitialized = sequenceInitialized + self.sourceFinished = sourceFinished + } + } + + /// The state once either any element was sent or `next()` was called. + case channeling(Channeling) + + /// The state once the underlying source signalled that it is finished. + case sourceFinished(SourceFinished) + + /// The state once there can be no outstanding demand. This can happen if: + /// 1. The iterator was deinited + /// 2. The underlying source finished and all buffered elements have been consumed + case finished(Finished) + + @usableFromInline + var description: String { + switch self { + case .channeling(let channeling): + return "channeling \(channeling.description)" + case .sourceFinished(let sourceFinished): + return "sourceFinished \(sourceFinished.description)" + case .finished(let finished): + return "finished \(finished.description)" + } + } + } +} + +@usableFromInline +enum _MultiProducerSingleConsumerSuspendedProducer { + case closure((Result) -> Void) + case continuation(UnsafeContinuation) +} +#endif diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift new file mode 100644 index 00000000..24150047 --- /dev/null +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift @@ -0,0 +1,624 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.1) +/// An error that is thrown from the various `send` methods of the +/// ``MultiProducerSingleConsumerAsyncChannel/Source``. +/// +/// This error is thrown when the channel is already finished when +/// trying to send new elements to the source. +public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error { + @usableFromInline + init() {} +} + +/// A multi-producer single-consumer channel. +/// +/// The ``MultiProducerSingleConsumerAsyncChannel`` provides a ``MultiProducerSingleConsumerAsyncChannel/Source`` to +/// send values to the channel. The channel supports different back pressure strategies to control the +/// buffering and demand. The channel will buffer values until its backpressure strategy decides that the +/// producer have to wait. +/// +/// This channel is also suitable for the single-producer single-consumer use-case +/// +/// ## Using a MultiProducerSingleConsumerAsyncChannel +/// +/// To use a ``MultiProducerSingleConsumerAsyncChannel`` you have to create a new channel with its source first by calling +/// the ``MultiProducerSingleConsumerAsyncChannel/makeChannel(of:throwing:BackpressureStrategy:)`` method. +/// Afterwards, you can pass the source to the producer and the channel to the consumer. +/// +/// ``` +/// let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( +/// of: Int.self, +/// backpressureStrategy: .watermark(low: 2, high: 4) +/// ) +/// +/// // The channel and source can be extracted from the returned type +/// let channel = consume channelAndSource.channel +/// let source = consume channelAndSource.source +/// ``` +/// +/// ### Asynchronous producing +/// +/// Values can be send to the source from asynchronous contexts using ``MultiProducerSingleConsumerAsyncChannel/Source/send(_:)-8eo96`` +/// and ``MultiProducerSingleConsumerAsyncChannel/Source/send(contentsOf:)``. Backpressure results in calls +/// to the `send` methods to be suspended. Once more elements should be produced the `send` methods will be resumed. +/// +/// ``` +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await source.send(1) +/// try await source.send(2) +/// try await source.send(3) +/// } +/// +/// for await element in channel { +/// print(element) +/// } +/// } +/// ``` +/// +/// ### Synchronous produceing +/// +/// Values can also be send to the source from synchronous context. Backpressure is also exposed on the synchronous contexts; however, +/// it is up to the caller to decide how to properly translate the backpressure to underlying producer e.g. by blocking the thread. +/// +/// ```swift +/// do { +/// let sendResult = try source.send(contentsOf: sequence) +/// +/// switch sendResult { +/// case .produceMore: +/// // Trigger more production in the underlying system +/// +/// case .enqueueCallback(let callbackToken): +/// // There are enough values in the channel already. We need to enqueue +/// // a callback to get notified when we should produce more. +/// source.enqueueCallback(token: callbackToken, onProduceMore: { result in +/// switch result { +/// case .success: +/// // Trigger more production in the underlying system +/// case .failure(let error): +/// // Terminate the underlying producer +/// } +/// }) +/// } +/// } catch { +/// // `send(contentsOf:)` throws if the channel already terminated +/// } +/// ``` +/// +/// ### Multiple producers +/// +/// To support multiple producers the source offers a ``Source/copy()`` method to produce a new source. +/// +/// ### Terminating the production of values +/// +/// The consumer can be terminated through multiple ways: +/// - Calling ``Source/finish(throwing:)``. +/// - Deiniting all sources. +/// +/// In both cases, if there are still elements buffered by the channel, then the consumer will receive +/// all buffered elements. Afterwards it will be terminated. +/// +/// ### Observing termination of the consumer +/// +/// When the consumer stops consumption by either deiniting the channel or the task calling ``next(isolation:)`` +/// getting cancelled, the source will get notified about the termination if a termination callback has been set +/// before by calling ``Source/setOnTerminationCallback(_:)``. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +public struct MultiProducerSingleConsumerAsyncChannel: ~Copyable { + /// The backing storage. + @usableFromInline + let storage: _Storage + + /// A struct containing the initialized channel and source. + /// + /// This struct can be deconstructed by consuming the individual + /// components from it. + /// + /// ```swift + /// let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + /// of: Int.self, + /// backpressureStrategy: .watermark(low: 5, high: 10) + /// ) + /// var channel = consume channelAndSource.channel + /// var source = consume channelAndSource.source + /// ``` + @frozen + public struct ChannelAndStream: ~Copyable { + /// The channel. + public var channel: MultiProducerSingleConsumerAsyncChannel + /// The source. + public var source: Source + + init( + channel: consuming MultiProducerSingleConsumerAsyncChannel, + source: consuming Source + ) { + self.channel = channel + self.source = source + } + } + + /// Initializes a new ``MultiProducerSingleConsumerAsyncChannel`` and an ``MultiProducerSingleConsumerAsyncChannel/Source``. + /// + /// - Parameters: + /// - elementType: The element type of the channel. + /// - failureType: The failure type of the channel. + /// - backpressureStrategy: The backpressure strategy that the channel should use. + /// - Returns: A struct containing the channel and its source. The source should be passed to the + /// producer while the channel should be passed to the consumer. + public static func makeChannel( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: Source.BackpressureStrategy + ) -> sending ChannelAndStream { + let storage = _Storage( + backpressureStrategy: backpressureStrategy.internalBackpressureStrategy + ) + let source = Source(storage: storage) + + return .init(channel: .init(storage: storage), source: source) + } + + init(storage: _Storage) { + self.storage = storage + } + + deinit { + self.storage.channelDeinitialized() + } + + /// Returns the next element. + /// + /// If this method returns `nil` it indicates that no further values can ever + /// be returned. The channel automatically closes when all sources have been deinited. + /// + /// If there are no elements and the channel has not been finished yet, this method will + /// suspend until an element is send to the channel. + /// + /// If the task calling this method is cancelled this method will return `nil`. + /// + /// - Parameter isolation: The callers isolation. + /// - Returns: The next buffered element. + @inlinable + public mutating func next( + isolation: isolated (any Actor)? = #isolation + ) async throws(Failure) -> Element? { + do { + return try await self.storage.next() + } catch { + // This force-cast is safe since we only allow closing the source with this failure + // We only need this force cast since continuations don't support typed throws yet. + throw error as! Failure + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// A struct to send values to the channel. + /// + /// Use this source to provide elements to the channel by calling one of the `send` methods. + public struct Source: ~Copyable { + /// A struct representing the backpressure of the channel. + public struct BackpressureStrategy: Sendable { + var internalBackpressureStrategy: _InternalBackpressureStrategy + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + public static func watermark(low: Int, high: Int) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: nil) + ) + ) + } + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + /// - waterLevelForElement: A closure used to compute the contribution of each buffered element to the current water level. + /// + /// - Important: `waterLevelForElement` will be called during a lock on each element when it is written into the source and when + /// it is consumed from the channel, so it must be side-effect free and at best constant in time. + public static func watermark( + low: Int, + high: Int, + waterLevelForElement: @escaping @Sendable (borrowing Element) -> Int + ) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: waterLevelForElement) + ) + ) + } + + /// An unbounded backpressure strategy. + /// + /// - Important: Only use this strategy if the production of elements is limited through some other mean. Otherwise + /// an unbounded backpressure strategy can result in infinite memory usage and cause + /// your process to run out of memory. + public static func unbounded() -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .unbounded(.init()) + ) + } + } + + /// A type that indicates the result of sending elements to the source. + public enum SendResult: ~Copyable, Sendable { + /// An opaque token that is returned when the channel's backpressure strategy indicated that production should + /// be suspended. Use this token to enqueue a callback by calling the ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Important: This token must only be passed once to ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)`` + /// and ``MultiProducerSingleConsumerAsyncChannel/Source/cancelCallback(callbackToken:)``. + public struct CallbackToken: Sendable, Hashable { + @usableFromInline + let _id: UInt64 + + @usableFromInline + init(id: UInt64) { + self._id = id + } + } + + /// Indicates that more elements should be produced and send to the source. + case produceMore + + /// Indicates that a callback should be enqueued. + /// + /// The associated token should be passed to the ````MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)```` method. + case enqueueCallback(CallbackToken) + } + + @usableFromInline + let _storage: _Storage + + internal init(storage: _Storage) { + self._storage = storage + self._storage.sourceInitialized() + } + + deinit { + self._storage.sourceDeinitialized() + } + + /// Sets a callback to invoke when the channel terminated. + /// + /// This is called after the last element has been consumed by the channel. + public func setOnTerminationCallback(_ callback: @escaping @Sendable () -> Void) { + self._storage.onTermination = callback + } + + /// Creates a new source which can be used to send elements to the channel concurrently. + /// + /// The channel will only automatically be finished if all existing sources have been deinited. + /// + /// - Returns: A new source for sending elements to the channel. + public mutating func copy() -> sending Self { + .init(storage: self._storage) + } + + /// Sends new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter sequence: The elements to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + @inlinable + public mutating func send( + contentsOf sequence: consuming sending S + ) throws -> SendResult where Element == S.Element, S: Sequence, Element: Copyable { + try self._storage.send(contentsOf: sequence) + } + + /// Send the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter element: The element to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + @inlinable + public mutating func send(_ element: consuming sending Element) throws -> SendResult { + try self._storage.send(contentsOf: CollectionOfOne(element)) + } + + /// Enqueues a callback that will be invoked once more elements should be produced. + /// + /// Call this method after ``send(contentsOf:)-65yju`` or ``send(_:)`` returned ``SendResult/enqueueCallback(_:)``. + /// + /// - Important: Enqueueing the same token multiple times is **not allowed**. + /// + /// - Parameters: + /// - callbackToken: The callback token. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. + @inlinable + public mutating func enqueueCallback( + callbackToken: consuming SendResult.CallbackToken, + onProduceMore: sending @escaping (Result) -> Void + ) { + self._storage.enqueueProducer(callbackToken: callbackToken._id, onProduceMore: onProduceMore) + } + + /// Cancel an enqueued callback. + /// + /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and + /// will mark the passed `callbackToken` as cancelled. + /// + /// - Parameter callbackToken: The callback token. + @inlinable + public mutating func cancelCallback(callbackToken: consuming SendResult.CallbackToken) { + self._storage.cancelProducer(callbackToken: callbackToken._id) + } + + /// Send new elements to the channel and provide a callback which will be invoked once more elements should be produced. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(contentsOf:onProduceMore:)``. + @inlinable + public mutating func send( + contentsOf sequence: consuming sending S, + onProduceMore: @escaping @Sendable (Result) -> Void + ) where Element == S.Element, S: Sequence, Element: Copyable { + do { + let sendResult = try self.send(contentsOf: sequence) + + switch consume sendResult { + case .produceMore: + onProduceMore(Result.success(())) + + case .enqueueCallback(let callbackToken): + self.enqueueCallback(callbackToken: callbackToken, onProduceMore: onProduceMore) + } + } catch { + onProduceMore(.failure(error)) + } + } + + /// Sends the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - element: The element to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(_:onProduceMore:)``. + @inlinable + public mutating func send( + _ element: consuming sending Element, + onProduceMore: @escaping @Sendable (Result) -> Void + ) { + do { + let sendResult = try self.send(element) + + switch consume sendResult { + case .produceMore: + onProduceMore(Result.success(())) + + case .enqueueCallback(let callbackToken): + self.enqueueCallback(callbackToken: callbackToken, onProduceMore: onProduceMore) + } + } catch { + onProduceMore(.failure(error)) + } + } + + /// Send new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating func send( + contentsOf sequence: consuming sending S + ) async throws where Element == S.Element, S: Sequence, Element: Copyable { + let syncSend: (sending S, inout Self) throws -> SendResult = { try $1.send(contentsOf: $0) } + let sendResult = try syncSend(sequence, &self) + + switch consume sendResult { + case .produceMore: + return () + + case .enqueueCallback(let callbackToken): + let id = callbackToken._id + let storage = self._storage + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { continuation in + self._storage.enqueueProducer( + callbackToken: id, + continuation: continuation + ) + } + } onCancel: { + storage.cancelProducer(callbackToken: id) + } + } + } + + /// Send new element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - element: The element to send to the channel. + @inlinable + public mutating func send(_ element: consuming sending Element) async throws { + let syncSend: (consuming sending Element, inout Self) throws -> SendResult = { try $1.send($0) } + let sendResult = try syncSend(element, &self) + + switch consume sendResult { + case .produceMore: + return () + + case .enqueueCallback(let callbackToken): + let id = callbackToken._id + let storage = self._storage + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { continuation in + self._storage.enqueueProducer( + callbackToken: id, + continuation: continuation + ) + } + } onCancel: { + storage.cancelProducer(callbackToken: id) + } + } + } + + /// Send the elements of the asynchronous sequence to the channel. + /// + /// This method returns once the provided asynchronous sequence or the channel finished. + /// + /// - Important: This method does not finish the source if consuming the upstream sequence terminated. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating func send(contentsOf sequence: consuming sending S) async throws + where Element == S.Element, S: AsyncSequence, Element: Copyable, S: Sendable, Element: Sendable { + for try await element in sequence { + try await self.send(contentsOf: CollectionOfOne(element)) + } + } + + /// Indicates that the production terminated. + /// + /// After all buffered elements are consumed the subsequent call to ``MultiProducerSingleConsumerAsyncChannel/next(isolation:)`` will return + /// `nil` or throw an error. + /// + /// Calling this function more than once has no effect. After calling finish, the channel enters a terminal state and doesn't accept + /// new elements. + /// + /// - Parameters: + /// - error: The error to throw, or `nil`, to finish normally. + @inlinable + public consuming func finish(throwing error: Failure? = nil) { + self._storage.finish(error) + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel where Element: Copyable { + struct ChannelAsyncSequence: AsyncSequence { + @usableFromInline + final class _Backing: Sendable { + @usableFromInline + let storage: MultiProducerSingleConsumerAsyncChannel._Storage + + init(storage: MultiProducerSingleConsumerAsyncChannel._Storage) { + self.storage = storage + self.storage.sequenceInitialized() + } + + deinit { + self.storage.sequenceDeinitialized() + } + } + + @usableFromInline + let _backing: _Backing + + public func makeAsyncIterator() -> Self.Iterator { + .init(storage: self._backing.storage) + } + } + + /// Converts the channel to an asynchronous sequence for consumption. + /// + /// - Important: The returned asynchronous sequence only supports a single iterator to be created and + /// will fatal error at runtime on subsequent calls to `makeAsyncIterator`. + public consuming func asyncSequence() -> some (AsyncSequence & Sendable) { + ChannelAsyncSequence(_backing: .init(storage: self.storage)) + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel.ChannelAsyncSequence where Element: Copyable { + struct Iterator: AsyncIteratorProtocol { + @usableFromInline + final class _Backing { + @usableFromInline + let storage: MultiProducerSingleConsumerAsyncChannel._Storage + + init(storage: MultiProducerSingleConsumerAsyncChannel._Storage) { + self.storage = storage + self.storage.iteratorInitialized() + } + + deinit { + self.storage.iteratorDeinitialized() + } + } + + @usableFromInline + let _backing: _Backing + + init(storage: MultiProducerSingleConsumerAsyncChannel._Storage) { + self._backing = .init(storage: storage) + } + + @inlinable + mutating func next() async throws -> Element? { + do { + return try await self._backing.storage.next(isolation: nil) + } catch { + throw error as! Failure + } + } + + @inlinable + mutating func next( + isolation actor: isolated (any Actor)? = #isolation + ) async throws(Failure) -> Element? { + do { + return try await self._backing.storage.next(isolation: actor) + } catch { + throw error as! Failure + } + } + } +} +// +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel.ChannelAsyncSequence: Sendable {} +#endif diff --git a/Sources/Example/Example.swift b/Sources/Example/Example.swift new file mode 100644 index 00000000..3eb472bf --- /dev/null +++ b/Sources/Example/Example.swift @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.1) +import AsyncAlgorithms + +@available(macOS 15.0, *) +@main +struct Example { + static func main() async throws { + let durationUnboundedMPSC = await ContinuousClock().measure { + await testMPSCChannel(count: 1_000_000, backpressureStrategy: .unbounded()) + } + print("Unbounded MPSC:", durationUnboundedMPSC) + let durationHighLowMPSC = await ContinuousClock().measure { + await testMPSCChannel(count: 1_000_000, backpressureStrategy: .watermark(low: 100, high: 500)) + } + print("HighLow MPSC:", durationHighLowMPSC) + let durationAsyncStream = await ContinuousClock().measure { + await testAsyncStream(count: 1_000_000) + } + print("AsyncStream:", durationAsyncStream) + } + + static func testMPSCChannel( + count: Int, + backpressureStrategy: MultiProducerSingleConsumerAsyncChannel.Source.BackpressureStrategy + ) async { + await withTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: backpressureStrategy + ) + var channel = channelAndSource.channel + var source = Optional.some(consume channelAndSource.source) + + group.addTask { + var source = source.take()! + for i in 0..=6.1) +import AsyncAlgorithms +import XCTest + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +final class MultiProducerSingleConsumerAsyncChannelTests: XCTestCase { + // MARK: - sourceDeinitialized + + func testSourceDeinitialized_whenChanneling_andNoSuspendedConsumer() async throws { + let manualExecutor = ManualTaskExecutor() + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + var channel = consume channelAndSource.channel + let source = consume channelAndSource.source + + nonisolated(unsafe) var didTerminate = false + source.setOnTerminationCallback { + didTerminate = true + } + + group.addTask(executorPreference: manualExecutor) { + _ = await channel.next() + } + + withExtendedLifetime(source) {} + _ = consume source + XCTAssertFalse(didTerminate) + manualExecutor.run() + _ = try await group.next() + XCTAssertTrue(didTerminate) + } + } + + func testSourceDeinitialized_whenChanneling_andSuspendedConsumer() async throws { + let manualExecutor = ManualTaskExecutor() + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + var channel = channelAndSource.channel + let source = consume channelAndSource.source + nonisolated(unsafe) var didTerminate = false + source.setOnTerminationCallback { + didTerminate = true + } + + group.addTask(executorPreference: manualExecutor) { + _ = await channel.next() + } + manualExecutor.run() + XCTAssertFalse(didTerminate) + + withExtendedLifetime(source) {} + _ = consume source + XCTAssertTrue(didTerminate) + manualExecutor.run() + _ = try await group.next() + } + } + + func testSourceDeinitialized_whenMultipleSources() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + var channel = consume channelAndSource.channel + var source1 = consume channelAndSource.source + var source2 = source1.copy() + nonisolated(unsafe) var didTerminate = false + source1.setOnTerminationCallback { + didTerminate = true + } + + _ = try await source1.send(1) + XCTAssertFalse(didTerminate) + _ = consume source1 + XCTAssertFalse(didTerminate) + _ = try await source2.send(2) + XCTAssertFalse(didTerminate) + + _ = await channel.next() + XCTAssertFalse(didTerminate) + _ = await channel.next() + XCTAssertFalse(didTerminate) + _ = consume source2 + _ = await channel.next() + XCTAssertTrue(didTerminate) + } + + func testSourceDeinitialized_whenSourceFinished() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + var source: MultiProducerSingleConsumerAsyncChannel.Source? = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source?.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + try await source?.send(1) + try await source?.send(2) + source?.finish(throwing: nil) + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator = Optional.some(channel.asyncSequence().makeAsyncIterator()) + _ = try await iterator?.next() + + _ = await onTerminationIterator.next() + + _ = try await iterator?.next() + _ = try await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testSourceDeinitialized_whenFinished() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let source: MultiProducerSingleConsumerAsyncChannel.Source? = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source?.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + source?.finish(throwing: nil) + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + _ = channel.asyncSequence().makeAsyncIterator() + + _ = await onTerminationIterator.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + // MARK: Channel deinitialized + + func testChannelDeinitialized() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let source = consume channelAndSource.source + nonisolated(unsafe) var didTerminate = false + source.setOnTerminationCallback { didTerminate = true } + + XCTAssertFalse(didTerminate) + _ = consume channel + XCTAssertTrue(didTerminate) + } + + // MARK: - sequenceDeinitialized + + func testSequenceDeinitialized_whenChanneling_andNoSuspendedConsumer() async throws { + let manualExecutor = ManualTaskExecutor() + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let asyncSequence = channel.asyncSequence() + let source = consume channelAndSource.source + nonisolated(unsafe) var didTerminate = false + source.setOnTerminationCallback { didTerminate = true } + + group.addTask(executorPreference: manualExecutor) { + _ = await asyncSequence.first { _ in true } + } + + withExtendedLifetime(source) {} + _ = consume source + XCTAssertFalse(didTerminate) + manualExecutor.run() + _ = try await group.next() + XCTAssertTrue(didTerminate) + } + } + + func testSequenceDeinitialized_whenChanneling_andSuspendedConsumer() async throws { + let manualExecutor = ManualTaskExecutor() + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let asyncSequence = channel.asyncSequence() + let source = consume channelAndSource.source + nonisolated(unsafe) var didTerminate = false + source.setOnTerminationCallback { didTerminate = true } + + group.addTask(executorPreference: manualExecutor) { + _ = await asyncSequence.first { _ in true } + } + manualExecutor.run() + XCTAssertFalse(didTerminate) + + withExtendedLifetime(source) {} + _ = consume source + XCTAssertTrue(didTerminate) + manualExecutor.run() + _ = try await group.next() + } + } + + // MARK: - iteratorInitialized + + func testIteratorInitialized_whenInitial() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + _ = consume channelAndSource.source + + _ = channel.asyncSequence().makeAsyncIterator() + } + + func testIteratorInitialized_whenChanneling() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + + var iterator = channel.asyncSequence().makeAsyncIterator() + let element = await iterator.next(isolation: nil) + XCTAssertEqual(element, 1) + } + + func testIteratorInitialized_whenSourceFinished() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + source.finish(throwing: nil) + + var iterator = channel.asyncSequence().makeAsyncIterator() + let element1 = await iterator.next(isolation: nil) + XCTAssertEqual(element1, 1) + let element2 = await iterator.next(isolation: nil) + XCTAssertNil(element2) + } + + func testIteratorInitialized_whenFinished() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let source = consume channelAndSource.source + + source.finish(throwing: nil) + + var iterator = channel.asyncSequence().makeAsyncIterator() + let element = await iterator.next(isolation: nil) + XCTAssertNil(element) + } + + // MARK: - iteratorDeinitialized + + func testIteratorDeinitialized_whenInitial() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let source = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator = Optional.some(channel.asyncSequence().makeAsyncIterator()) + iterator = nil + _ = await iterator?.next(isolation: nil) + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenChanneling() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + try await source.send(1) + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator = Optional.some(channel.asyncSequence().makeAsyncIterator()) + iterator = nil + _ = await iterator?.next(isolation: nil) + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenSourceFinished() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + try await source.send(1) + source.finish(throwing: nil) + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator = Optional.some(channel.asyncSequence().makeAsyncIterator()) + iterator = nil + _ = await iterator?.next(isolation: nil) + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenFinished() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndSource.channel + let source = consume channelAndSource.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.setOnTerminationCallback { + onTerminationContinuation.finish() + } + + source.finish(throwing: nil) + + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator = Optional.some(channel.asyncSequence().makeAsyncIterator()) + iterator = nil + _ = try await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenChanneling_andSuspendedProducer() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + var channel: MultiProducerSingleConsumerAsyncChannel? = channelAndSource.channel + var source = consume channelAndSource.source + + var iterator = channel?.asyncSequence().makeAsyncIterator() + channel = nil + + _ = try { try source.send(1) }() + + do { + try await withCheckedThrowingContinuation { continuation in + source.send(1) { result in + continuation.resume(with: result) + } + + iterator = nil + } + } catch { + XCTAssertTrue(error is MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError) + } + + _ = try await iterator?.next() + } + + // MARK: - write + + func testWrite_whenInitial() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + + var iterator = channel.asyncSequence().makeAsyncIterator() + let element = await iterator.next(isolation: nil) + XCTAssertEqual(element, 1) + } + + func testWrite_whenChanneling_andNoConsumer() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + try await source.send(2) + + var iterator = channel.asyncSequence().makeAsyncIterator() + let element1 = await iterator.next(isolation: nil) + XCTAssertEqual(element1, 1) + let element2 = await iterator.next(isolation: nil) + XCTAssertEqual(element2, 2) + } + + func testWrite_whenChanneling_andSuspendedConsumer() async throws { + try await withThrowingTaskGroup(of: Int?.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + + group.addTask { + await channel.next() + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + try await source.send(1) + let element = try await group.next() + XCTAssertEqual(element, 1) + } + } + + func testWrite_whenChanneling_andSuspendedConsumer_andEmptySequence() async throws { + try await withThrowingTaskGroup(of: Int?.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + group.addTask { + await channel.next() + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + try await source.send(contentsOf: []) + try await source.send(contentsOf: [1]) + let element = try await group.next() + XCTAssertEqual(element, 1) + } + } + + func testWrite_whenSourceFinished() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + var channel = consume channelAndSource.channel + var source1 = consume channelAndSource.source + var source2 = source1.copy() + + try await source1.send(1) + source1.finish() + do { + try await source2.send(1) + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError) + } + let element1 = await channel.next() + XCTAssertEqual(element1, 1) + let element2 = await channel.next() + XCTAssertNil(element2) + } + + func testWrite_whenConcurrentProduction() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + var channel = consume channelAndSource.channel + var source1 = consume channelAndSource.source + var source2 = Optional.some(source1.copy()) + + let manualExecutor1 = ManualTaskExecutor() + group.addTask(executorPreference: manualExecutor1) { + try await source1.send(1) + } + + let manualExecutor2 = ManualTaskExecutor() + group.addTask(executorPreference: manualExecutor2) { + var source2 = source2.take()! + try await source2.send(2) + source2.finish() + } + + manualExecutor1.run() + let element1 = await channel.next() + XCTAssertEqual(element1, 1) + + manualExecutor2.run() + let element2 = await channel.next() + XCTAssertEqual(element2, 2) + + let element3 = await channel.next() + XCTAssertNil(element3) + } + } + + // MARK: - enqueueProducer + + func testEnqueueProducer_whenChanneling_andAndCancelled() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + try await source.send(1) + + let writeResult = try { try source.send(2) }() + + switch consume writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.cancelCallback(callbackToken: callbackToken) + + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + do { + _ = try await producerStream.first { _ in true } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + let element = await channel.next() + XCTAssertEqual(element, 1) + } + + func testEnqueueProducer_whenChanneling_andAndCancelled_andAsync() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + + group.addTask { + try await source.send(2) + } + + group.cancelAll() + do { + try await group.next() + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + let element = await channel.next() + XCTAssertEqual(element, 1) + } + } + + func testEnqueueProducer_whenChanneling_andInterleaving() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + var iterator = channel.asyncSequence().makeAsyncIterator() + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + let writeResult = try { try source.send(1) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + let element = await iterator.next(isolation: nil) + XCTAssertEqual(element, 1) + + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + do { + _ = try await producerStream.first { _ in true } + } catch { + XCTFail("Expected no error to be thrown") + } + } + + func testEnqueueProducer_whenChanneling_andSuspending() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + var iterator = channel.asyncSequence().makeAsyncIterator() + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + let writeResult = try { try source.send(1) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + let element = await iterator.next(isolation: nil) + XCTAssertEqual(element, 1) + + do { + _ = try await producerStream.first { _ in true } + } catch { + XCTFail("Expected no error to be thrown") + } + } + + // MARK: - cancelProducer + + func testCancelProducer_whenChanneling() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + try await source.send(1) + + let writeResult = try { try source.send(2) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + + source.cancelCallback(callbackToken: callbackToken) + } + + do { + _ = try await producerStream.first { _ in true } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + let element = await channel.next() + XCTAssertEqual(element, 1) + } + + // MARK: - finish + + func testFinish_whenChanneling_andConsumerSuspended() async throws { + try await withThrowingTaskGroup(of: Int?.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + var channel = channelAndSource.channel + var source: MultiProducerSingleConsumerAsyncChannel.Source? = consume channelAndSource.source + + group.addTask { + while let element = await channel.next() { + if element == 2 { + return element + } + } + return nil + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + source?.finish(throwing: nil) + source = nil + + let element = try await group.next() + XCTAssertEqual(element, .some(nil)) + } + } + + func testFinish_whenInitial() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndSource.channel + let source = consume channelAndSource.source + + source.finish(throwing: CancellationError()) + + do { + for try await _ in channel.asyncSequence() {} + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + } + + // MARK: - Backpressure + + func testBackpressure() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await source.send(contentsOf: [1]) + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.asyncSequence().makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } + + func testBackpressureSync() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await withCheckedThrowingContinuation { continuation in + source.send(contentsOf: [1]) { result in + continuation.resume(with: result) + } + } + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.asyncSequence().makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } + + func testWatermarkWithCustomCoount() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: [Int].self, + backpressureStrategy: .watermark(low: 2, high: 4, waterLevelForElement: { $0.count }) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + var iterator = channel.asyncSequence().makeAsyncIterator() + + try await source.send([1, 1, 1]) + + _ = await iterator.next(isolation: nil) + + try await source.send([1, 1, 1]) + + _ = await iterator.next(isolation: nil) + } + + func testWatermarWithLotsOfElements() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + // This test should in the future use a custom task executor to schedule to avoid sending + // 1000 elements. + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndSource.channel + var source: MultiProducerSingleConsumerAsyncChannel.Source! = consume channelAndSource.source + + group.addTask { + var source = source.take()! + for i in 0...10000 { + try await source.send(i) + } + source.finish() + } + + let asyncSequence = channel.asyncSequence() + + group.addTask { + var sum = 0 + for try await element in asyncSequence { + sum += element + } + } + } + } + + func testThrowsError() async throws { + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + try await source.send(1) + try await source.send(2) + source.finish(throwing: CancellationError()) + + var elements = [Int]() + var iterator = channel.asyncSequence().makeAsyncIterator() + + do { + while let element = try await iterator.next() { + elements.append(element) + } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + XCTAssertEqual(elements, [1, 2]) + } + + let element = try await iterator.next() + XCTAssertNil(element) + } + + func testAsyncSequenceWrite() async throws { + let (stream, continuation) = AsyncStream.makeStream() + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + var channel = channelAndSource.channel + var source = consume channelAndSource.source + + continuation.yield(1) + continuation.yield(2) + continuation.finish() + + try await source.send(contentsOf: stream) + source.finish(throwing: nil) + + let elements = await channel.collect() + XCTAssertEqual(elements, [1, 2]) + } + + // MARK: NonThrowing + + func testNonThrowing() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndSource.channel + var source = consume channelAndSource.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await source.send(contentsOf: [1]) + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.asyncSequence().makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// Collect all elements in the sequence into an array. + fileprivate mutating func collect() async throws(Failure) -> [Element] { + var elements = [Element]() + while let element = try await self.next() { + elements.append(element) + } + return elements + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel.Source.SendResult { + func assertIsProducerMore() { + switch self { + case .produceMore: + return () + + case .enqueueCallback: + XCTFail("Expected produceMore") + } + } + + func assertIsEnqueueCallback() { + switch self { + case .produceMore: + XCTFail("Expected enqueueCallback") + + case .enqueueCallback: + return () + } + } +} + +extension Optional where Wrapped: ~Copyable { + fileprivate mutating func take() -> Self { + let result = consume self + self = nil + return result + } +} +#endif diff --git a/Tests/AsyncAlgorithmsTests/Support/ManualExecutor.swift b/Tests/AsyncAlgorithmsTests/Support/ManualExecutor.swift new file mode 100644 index 00000000..b07fb835 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/ManualExecutor.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.0) +import DequeModule +import Synchronization + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +final class ManualTaskExecutor: TaskExecutor { + private let jobs = Mutex>(.init()) + + func enqueue(_ job: UnownedJob) { + self.jobs.withLock { $0.append(job) } + } + + func run() { + while let job = self.jobs.withLock({ $0.popFirst() }) { + job.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } +} +#endif