diff --git a/Evolution/NNNN-deferred.md b/Evolution/NNNN-deferred.md new file mode 100644 index 00000000..4000b36a --- /dev/null +++ b/Evolution/NNNN-deferred.md @@ -0,0 +1,114 @@ +# Deferred + +* Proposal: [NNNN](NNNN-deferred.md) +* Authors: [Tristan Celder](https://github.com/tcldr) +* Review Manager: TBD +* Status: **Awaiting implementation** + +* Implementation: [[Source](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift) | +[Tests](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Tests/AsyncAlgorithmsTests/TestDeferred.swift)] +* Decision Notes: [Additional Commentary](https://forums.swift.org/) +* Bugs: + +## Introduction + +`AsyncDeferredSequence` provides a convenient way to postpone the initialization of a sequence to the point where it is requested by a sequence consumer. + +## Motivation + +Some source sequences may perform expensive work on initialization. This could be network activity, sensor activity, or anything else that consumes system resources. While this can be mitigated in some simple situtations by only passing around a sequence at the point of use, often it is favorable to be able to pass a sequence to its eventual point of use without commencing its initialization process. This is especially true for sequences which are intended for multicast/broadcast for which a reliable startup and shutdown procedure is essential. + +A simple example of a seqeunce which may benefit from being deferred is provided in the documentation for AsyncStream: + +```swift +extension QuakeMonitor { + + static var quakes: AsyncStream { + AsyncStream { continuation in + let monitor = QuakeMonitor() + monitor.quakeHandler = { quake in + continuation.yield(quake) + } + continuation.onTermination = { @Sendable _ in + monitor.stopMonitoring() + } + monitor.startMonitoring() + } + } +} +``` + +In the supplied code sample, the closure provided to the AsyncStream initializer will be executed immediately upon initialization; `QuakeMonitor.startMonitoring()` will be called, and the stream will then begin buffering its contents waiting to be iterated. Whilst this behavior is sometimes desirable, on other occasions it can cause system resources to be consumed unnecessarily. + +```swift +let nonDeferredSequence = QuakeMonitor.quakes // `Quake.startMonitoring()` is called now! + +... +// at some arbitrary point, possibly hours later... +for await quake in nonDeferredSequence { + print("Quake: \(quake.date)") +} +// Prints out hours of previously buffered quake data before showing the latest +``` + +## Proposed solution + +`AsyncDeferredSequence` uses a supplied closure to create a new asynchronous sequence. The closure is executed for each iterator on the first call to `next`. This has the effect of postponing the initialization of an arbitrary async sequence until the point of first demand: + +```swift +let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed + +... +// at some arbitrary point, possibly hours later... +for await quake in deferredSequence { // `Quake.startMonitoring()` is now called + print("Quake: \(quake.date)") +} +// Prints out only the latest quake data +``` + +Now, potentially expensive system resources are consumed only at the point they're needed. + +## Detailed design + +`AsyncDeferredSequence` is a trivial algorithm supported by some convenience functions. + +### Functions + +```swift +public func deferred( + _ createSequence: @escaping @Sendable () async -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable + +public func deferred( + _ createSequence: @autoclosure @escaping @Sendable () -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable +``` + +The synchronous function can be auto-escaped, simplifying the call-site. While the async variant allows a sequence to be initialized within a concurrency context other than that of the end consumer. + +```swift +public struct AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { + public typealias Element = Base.Element + public struct Iterator: AsyncIteratorProtocol { + public mutating func next() async rethrows -> Element? + } + public func makeAsyncIterator() -> Iterator +} +``` + +### Naming + +The `deferred(_:)` function takes its inspiration from the Combine publisher of the same name with similar functionality. However, `lazy(_:)` could be quite fitting, too. + +### Comparison with other libraries + +**ReactiveX** ReactiveX has an [API definition of Defer](https://reactivex.io/documentation/operators/defer.html) as a top level operator for generating observables. + +**Combine** Combine has an [API definition of Deferred](https://developer.apple.com/documentation/combine/deferred) as a top-level convenience publisher. + + +## Effect on API resilience + +Deferred has a trivial implementation and is marked as `@frozen` and `@inlinable`. This removes the ability of this type and functions to be ABI resilient boundaries at the benefit of being highly optimizable. + +## Alternatives considered diff --git a/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift new file mode 100644 index 00000000..97b2b71d --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift @@ -0,0 +1,101 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Creates a ``AsyncDeferredSequence`` that uses the supplied closure to create a new `AsyncSequence`. +/// The closure is executed for each iterator on the first call to `next`. +/// This has the effect of postponing the initialization of an arbitrary `AsyncSequence` until the point of first demand. +@inlinable +public func deferred( + _ createSequence: @escaping @Sendable () async -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { + AsyncDeferredSequence(createSequence) +} + +/// Creates a ``AsyncDeferredSequence`` that uses the supplied closure to create a new `AsyncSequence`. +/// The closure is executed for each iterator on the first call to `next`. +/// This has the effect of postponing the initialization of an arbitrary `AsyncSequence` until the point of first demand. +@inlinable +public func deferred( + _ createSequence: @autoclosure @escaping @Sendable () -> Base +) -> AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { + AsyncDeferredSequence(createSequence) +} + +@frozen +public struct AsyncDeferredSequence where Base: AsyncSequence, Base: Sendable { + + @usableFromInline + let createSequence: @Sendable () async -> Base + + @inlinable + init(_ createSequence: @escaping @Sendable () async -> Base) { + self.createSequence = createSequence + } +} + +extension AsyncDeferredSequence: AsyncSequence { + + public typealias Element = Base.Element + + public struct Iterator: AsyncIteratorProtocol { + + @usableFromInline + enum State { + case pending(@Sendable () async -> Base) + case active(Base.AsyncIterator) + case terminal + } + + @usableFromInline + var state: State + + @inlinable + init(_ createSequence: @escaping @Sendable () async -> Base) { + self.state = .pending(createSequence) + } + + @inlinable + public mutating func next() async rethrows -> Element? { + switch state { + case .pending(let generator): + state = .active(await generator().makeAsyncIterator()) + return try await next() + case .active(var base): + do { + if let value = try await base.next() { + state = .active(base) + return value + } + else { + state = .terminal + return nil + } + } + catch let error { + state = .terminal + throw error + } + case .terminal: + return nil + } + } + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(createSequence) + } +} + +extension AsyncDeferredSequence: Sendable { } + +@available(*, unavailable) +extension AsyncDeferredSequence.Iterator: Sendable { } diff --git a/Tests/AsyncAlgorithmsTests/TestDeferred.swift b/Tests/AsyncAlgorithmsTests/TestDeferred.swift new file mode 100644 index 00000000..80b35e58 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestDeferred.swift @@ -0,0 +1,114 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@preconcurrency import XCTest +import AsyncAlgorithms + +final class TestDeferred: XCTestCase { + func test_deferred() async { + let expected = [0,1,2,3,4] + let sequence = deferred { + return expected.async + } + var iterator = sequence.makeAsyncIterator() + var actual = [Int]() + while let item = await iterator.next() { + actual.append(item) + } + XCTAssertEqual(expected, actual) + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_deferred_remains_idle_pending_consumer() async { + let expectation = expectation(description: "pending") + expectation.isInverted = true + let _ = deferred { + AsyncStream { continuation in + expectation.fulfill() + continuation.yield(0) + continuation.finish() + } + } + wait(for: [expectation], timeout: 1.0) + } + + func test_deferred_generates_new_sequence_per_consumer() async { + let expectation = expectation(description: "started") + expectation.expectedFulfillmentCount = 3 + let sequence = deferred { + AsyncStream { continuation in + expectation.fulfill() + continuation.yield(0) + continuation.finish() + } + } + for await _ in sequence { } + for await _ in sequence { } + for await _ in sequence { } + wait(for: [expectation], timeout: 1.0) + } + + func test_deferred_throws() async { + let expectation = expectation(description: "throws") + let sequence = deferred { + AsyncThrowingStream { continuation in + continuation.finish(throwing: Failure()) + } + } + var iterator = sequence.makeAsyncIterator() + do { + while let _ = try await iterator.next() { } + } + catch { + expectation.fulfill() + } + wait(for: [expectation], timeout: 1.0) + let pastEnd = try! await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_deferred_autoclosure() async { + let expected = [0,1,2,3,4] + let sequence = deferred(expected.async) + var iterator = sequence.makeAsyncIterator() + var actual = [Int]() + while let item = await iterator.next() { + actual.append(item) + } + XCTAssertEqual(expected, actual) + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_deferred_cancellation() async { + let source = Indefinite(value: 0) + let sequence = deferred(source.async) + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + let task = Task { + var firstIteration = false + for await _ in sequence { + if !firstIteration { + firstIteration = true + iterated.fulfill() + } + } + finished.fulfill() + } + // ensure the other task actually starts + wait(for: [iterated], timeout: 1.0) + // cancellation should ensure the loop finishes + // without regards to the remaining underlying sequence + task.cancel() + wait(for: [finished], timeout: 1.0) + } +}