diff --git a/Evolution/0000-implement-withLatestFrom.md b/Evolution/0000-implement-withLatestFrom.md new file mode 100644 index 00000000..5ee0b2ea --- /dev/null +++ b/Evolution/0000-implement-withLatestFrom.md @@ -0,0 +1,161 @@ +# Feature name + +* Proposal: [NNNN](NNNN-filename.md) +* Authors: [Thibault Wittemberg](https://github.com/twittemb) +* Review Manager: TBD +* Status: **Awaiting implementation** + +*During the review process, add the following fields as needed:* + +* Implementation: [apple/swift-async-algorithms#NNNNN](https://github.com/apple/swift-async-algorithms/pull/NNNNN) +* Decision Notes: [Rationale](https://forums.swift.org/), [Additional Commentary](https://forums.swift.org/) +* Bugs: [NNNN](https://github.com/apple/swift-async-algorithms/issues) + +## Introduction + +There are several strategies when it comes to combining several sequences of events each having their own temporality. This proposal describes an operator that combines an async sequence values with the latest known values from other ones. + +Swift forums thread: [[Pitch] withLatestFrom](https://forums.swift.org/t/pitch-withlatestfrom/56487/28) + +## Motivation + +Being able to combine values happening over time is a common practice in software engineering. The goal is to synchronize events from several sources by applying some strategies. + +This is an area where reactive programming frameworks are particularly suited. Whether it is [Combine](https://developer.apple.com/documentation/combine), [RxSwift](https://github.com/ReactiveX/RxSwift) or [ReactiveCocoa](https://github.com/ReactiveCocoa/ReactiveSwift), they all provide operators that combine streams of events using some common patterns. + +The field of possibilities is generally summarized by `zip` and `combineLatest`. + +### zip + +`zip` combines elements from several streams and delivers groups of elements. The returned stream waits until all upstream streams have produced an element, then delivers the latest elements from each stream as a tuple. + +That kind of operator can be used to synchronize elements from several concurrent works. A common usecase is to synchronize values coming from concurrent network calls. + +The following example from the [zip guide](https://github.com/apple/swift-async-algorithms/blob/main/Guides/Zip.md) illustrates the synchronization mechanism in the case of two streams of stock values: + + +| Timestamp | appleFeed | nasdaqFeed | combined output | +| ----------- | --------- | ---------- | ----------------------------- | +| 11:40 AM | 173.91 | | | +| 12:25 AM | | 14236.78 | AAPL: 173.91 NASDAQ: 14236.78 | +| 12:40 AM | | 14218.34 | | +| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 | + +### combineLatest + +The `combineLatest` operator behaves in a similar way to `zip`, but while `zip` produces elements only when each of the zipped streams have produced an element, `combineLatest` produces an element whenever any of the source stream produces one. + +The following example from the [combineLatest guide](https://github.com/apple/swift-async-algorithms/blob/main/Guides/CombineLatest.md) illustrates the synchronization mechanism in the case of two streams of stock values: + + +| Timestamp | appleFeed | nasdaqFeed | combined output | +| ----------- | --------- | ---------- | ----------------------------- | +| 11:40 AM | 173.91 | | | +| 12:25 AM | | 14236.78 | AAPL: 173.91 NASDAQ: 14236.78 | +| 12:40 AM | | 14218.34 | AAPL: 173.91 NASDAQ: 14218.34 | +| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 | + + +### When self should impose its pace! + +With `zip` and `combineLatest` all streams have equal weight in the aggregation algorithm that forms the tuples. Input streams can be interchanged without changing the operator's behavior. We can see `zip` as an `AND` boolean operator and `combineLatest` as an `OR` boolean operator: in boolean algebra they are commutative properties. + +There can be usecases where a particular stream should impose its pace to the others. + +What if we want a new value of the tuple (`AAPL`, `NASDAQ`) to be produced **ONLY WHEN** the `appleFeed` produces an element? + +Although `combineLatest` is close to the desired behavior, it is not exactly it: a new tuple will be produced also when `nasdaqFeed` produces a new element. + +Following the stock example, the desired behavior would be: + +| Timestamp | appleFeed | nasdaqFeed | combined output | +| ----------- | --------- | ---------- | ----------------------------- | +| 11:40 AM | 173.91 | | | +| 12:25 AM | | 14236.78 | | +| 12:40 AM | | 14218.34 | | +| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 | + +Unlike `zip` and `combineLatest`, we cannot interchange the 2 feeds without changing the awaited behavior. + +## Proposed solution + +We propose to introduce an new operator that applies to `self` (self being an `AsyncSequence`), and that takes other AsyncSequences as parameters. + +The temporary name for this operator is: `.withLatest(from:)`. + +`.withLatest(from:)` combines elements from `self` with elements from other asynchronous sequences and delivers groups of elements as tuples. The returned `AsyncSequence` produces elements when `self` produces an element and groups it with the latest known elements from the other sequences to form the output tuples. + + +## Detailed design + +This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted, these will be implemented in terms of two- and three-base sequence cases. + +```swift +public extension AsyncSequence { + func withLatest(from other: Other) -> AsyncWithLatestFromSequence { + AsyncWithLatestFromSequence(self, other) + } + + func withLatest(from other1: Other1, _ other2: Other2) -> AsyncWithLatestFrom2Sequence { + AsyncWithLatestFrom2Sequence(self, other1, other2) + } +} + +public struct AsyncWithLatestFromSequence { + public typealias Element = (Base.Element, Other.Element) + public typealias AsyncIterator = Iterator + + public struct Iterator: AsyncIteratorProtocol { + public mutating func next() async rethrows -> Element? + } + + public func makeAsyncIterator() -> Iterator +} + +public struct AsyncWithLatestFrom2Sequence { + public typealias Element = (Base.Element, Other1.Element, Other2.Element) + public typealias AsyncIterator = Iterator + + public struct Iterator: AsyncIteratorProtocol { + public mutating func next() async rethrows -> Element? + } + + public func makeAsyncIterator() -> Iterator +} +``` + +The `withLatest(from:...)` function takes one or two asynchronous sequences as arguments and produces an `AsyncWithLatestFromSequence`/`AsyncWithLatestFrom2Sequence` which is an asynchronous sequence. + +As we must know the latest elements from `others` to form the output tuple when `self` produces a new element, we must iterate over `others` asynchronously using Tasks. + +For the first iteration of `AsyncWithLatestFromSequence` to produce an element, `AsyncWithLatestFromSequence` will wait for `self` and `others` to produce a first element. + +Each subsequent iteration of an `AsyncWithLatestFromSequence` will wait for `self` to produce an element. + +If self` terminates by returning nil from its iteration, the `AsyncWithLatestFromSequence` iteration is immediately considered unsatisfiable and returns nil and all iterations of other bases will be cancelled. + +If `others` terminates by returning nil from their iteration, the `AsyncWithLatestFromSequence` iteration continues by agregating elements from `self` and last known elements from `others`. + +If any iteration of `self` or `others` throws an error, then the `others` iterations are cancelled and the produced error is rethrown, terminating the iteration. + +The source of throwing of `AsyncWithLatestFromSequence` is determined by `Self` and `Others`. That means that if `self` or any `other` can throw an error then the iteration of the `AsyncWithLatestFromSequence` can throw. If `self` and `others` cannot throw, then the `AsyncWithLatestFromSequence` cannot throw. + +## Effect on API resilience + +None. + +## Alternatives names + +Those alternate names were suggested: + +- `zip(sampling: other1, other2, atRateOf: self)` +- `zip(other1, other2, elementOn: .newElementFrom(self))` +- `self.zipWhen(other1, other2)` + +## Comparison with other libraries + +[RxSwift](https://github.com/ReactiveX/RxSwift/blob/main/RxSwift/Observables/WithLatestFrom.swift) provides an implementation of such an operator under the name `withLatestFrom` ([RxMarble](https://rxmarbles.com/#withLatestFrom)) + +## Acknowledgments + +Thanks to everyone on the forum for giving great feedback. diff --git a/README.md b/README.md index cde1208f..2d32904a 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ This package is the home for these APIs. Development and API design take place o - [`merge(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Merge.md): Merges two or more asynchronous sequence into a single asynchronous sequence producing the elements of all of the underlying asynchronous sequences. - [`zip(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Zip.md): Creates an asynchronous sequence of pairs built out of underlying asynchronous sequences. - [`joined(separator:)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Joined.md): Concatenated elements of an asynchronous sequence of asynchronous sequences, inserting the given separator between each element. +- [`zipLatest(from:)`](https://github.com/twittemb/swift-async-algorithms/blob/zipLatestFrom/Evolution/0000-implement-zipLatestFrom.md): Combines self with another AsyncSequence into a single AsyncSequence +wher$ #### Creating asynchronous sequences diff --git a/Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift b/Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift new file mode 100644 index 00000000..61896e13 --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift @@ -0,0 +1,231 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +public extension AsyncSequence { + /// Combines `self` with two other ``AsyncSequence`` into a single ``AsyncSequence`` where each + /// element from `self` is aggregated to the latest known elements from the `other` sequences (if any) as a tuple. + /// + /// Remark: as the `other` sequences are being iterated over in the context of their own ``Task``, there is no guarantee + /// that their latest know elements are the ones that have just been produced when the base sequence produces its next element. + /// + /// ``` + /// let base = AsyncChannel() + /// let other1 = AsyncChannel() + /// let other2 = AsyncChannel() + /// let sequence = base.withLatest(from: other1, other2) + /// + /// Task { + /// for element in await sequence { + /// print(element) + /// } + /// } + /// + /// await other1.send("a") + /// await other1.send("b") + /// + /// await other2.send("c") + /// await other2.send("d") + /// + /// ... later in the application flow + /// + /// await base.send(1) + /// + /// // will print: (1, "b", "d") + /// ``` + /// + /// - Parameters: + /// - other1: the first other ``AsyncSequence`` + /// - other2: the second other ``AsyncSequence`` + /// - Returns: an ``AsyncWithLatestFrom2Sequence`` where elements are a tuple of an element from `self` and the + /// latest known elements (if any) from the `other` sequences. + func withLatest( + from other1: Other1, + _ other2: Other2 + ) -> AsyncWithLatestFrom2Sequence { + AsyncWithLatestFrom2Sequence(self, other1, other2) + } +} + +/// ``AsyncWithLatestFrom2Sequence`` is an ``AsyncSequence`` where elements are a tuple of an element from `base` and the +/// latest known element (if any) from the `other` sequences. +public struct AsyncWithLatestFrom2Sequence: AsyncSequence +where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Element: Sendable { + public typealias Element = (Base.Element, Other1.Element, Other2.Element) + public typealias AsyncIterator = Iterator + + let base: Base + let other1: Other1 + let other2: Other2 + + // for testability purpose + var onBaseElement: (@Sendable (Base.Element) -> Void)? + var onOther1Element: (@Sendable (Other1.Element?) -> Void)? + var onOther2Element: (@Sendable (Other2.Element?) -> Void)? + + init(_ base: Base, _ other1: Other1, _ other2: Other2) { + self.base = base + self.other1 = other1 + self.other2 = other2 + } + + public func makeAsyncIterator() -> Iterator { + var iterator = Iterator( + base: self.base.makeAsyncIterator(), + other1: self.other1, + other2: self.other2 + ) + iterator.onBaseElement = onBaseElement + iterator.onOther1Element = onOther1Element + iterator.onOther2Element = onOther2Element + iterator.startOthers() + return iterator + } + + public struct Iterator: AsyncIteratorProtocol { + enum Other1State { + case idle + case element(Result) + } + + enum Other2State { + case idle + case element(Result) + } + + struct OthersState { + var other1State: Other1State + var other2State: Other2State + + static var idle: OthersState { + OthersState(other1State: .idle, other2State: .idle) + } + } + + enum BaseDecision { + case pass + case returnElement(Result) + } + + var base: Base.AsyncIterator + let other1: Other1 + let other2: Other2 + + let othersState: ManagedCriticalState + var othersTask: Task? + + var isTerminated: ManagedCriticalState + + // for testability purpose + var onBaseElement: (@Sendable (Base.Element) -> Void)? + var onOther1Element: (@Sendable (Other1.Element?) -> Void)? + var onOther2Element: (@Sendable (Other2.Element?) -> Void)? + + public init(base: Base.AsyncIterator, other1: Other1, other2: Other2) { + self.base = base + self.other1 = other1 + self.other2 = other2 + self.othersState = ManagedCriticalState(.idle) + self.isTerminated = ManagedCriticalState(false) + } + + mutating func startOthers() { + self.othersTask = Task { [othersState, other1, other2, onOther1Element, onOther2Element] in + await withTaskGroup(of: Void.self) { group in + group.addTask { + do { + for try await element in other1 { + othersState.withCriticalRegion { state in + state.other1State = .element(.success(element)) + } + onOther1Element?(element) + } + } catch { + othersState.withCriticalRegion { state in + state.other1State = .element(.failure(error)) + } + } + } + + group.addTask { + do { + for try await element in other2 { + othersState.withCriticalRegion { state in + state.other2State = .element(.success(element)) + } + onOther2Element?(element) + } + } catch { + othersState.withCriticalRegion { state in + state.other2State = .element(.failure(error)) + } + } + } + } + } + } + + public mutating func next() async rethrows -> Element? { + let shouldReturnNil = self.isTerminated.withCriticalRegion { $0 } + guard !shouldReturnNil else { return nil } + + return try await withTaskCancellationHandler { [isTerminated, othersTask] in + isTerminated.withCriticalRegion { isTerminated in + isTerminated = true + } + othersTask?.cancel() + } operation: { [othersTask, othersState, onBaseElement] in + do { + while true { + guard let baseElement = try await self.base.next() else { + isTerminated.withCriticalRegion { isTerminated in + isTerminated = true + } + othersTask?.cancel() + return nil + } + + onBaseElement?(baseElement) + + let decision = othersState.withCriticalRegion { state -> BaseDecision in + switch (state.other1State, state.other2State) { + case (.idle, _): + return .pass + case (_, .idle): + return .pass + case (.element(.success(let other1Element)), .element(.success(let other2Element))): + return .returnElement(.success((baseElement, other1Element, other2Element))) + case (.element(.failure(let otherError)), _): + return .returnElement(.failure(otherError)) + case (_, .element(.failure(let otherError))): + return .returnElement(.failure(otherError)) + } + } + + switch decision { + case .pass: + continue + case .returnElement(let result): + return try result._rethrowGet() + } + } + } catch { + isTerminated.withCriticalRegion { isTerminated in + isTerminated = true + } + othersTask?.cancel() + throw error + } + } + } + } +} + +extension AsyncWithLatestFrom2Sequence: Sendable where Base: Sendable { } diff --git a/Sources/AsyncAlgorithms/AsyncWithLatestFromSequence.swift b/Sources/AsyncAlgorithms/AsyncWithLatestFromSequence.swift new file mode 100644 index 00000000..947d5530 --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncWithLatestFromSequence.swift @@ -0,0 +1,169 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +public extension AsyncSequence { + /// Combines `self` with another ``AsyncSequence`` into a single ``AsyncSequence`` where each + /// element from `self` is aggregated to the latest known element from the `other` sequence (if any) as a tuple. + /// + /// Remark: as the `other` sequence is being iterated over in the context of its own ``Task``, there is no guarantee + /// that its latest know element is the one that has just been produced when the base sequence produces its next element. + /// + /// ``` + /// let base = AsyncChannel() + /// let other = AsyncChannel() + /// let sequence = base.withLatest(from: other) + /// + /// Task { + /// for element in await sequence { + /// print(element) + /// } + /// } + /// + /// await other.send("a") + /// await other.send("b") + /// + /// ... later in the application flow + /// + /// await base.send(1) + /// + /// // will print: (1, "b") + /// ``` + /// + /// - Parameter other: the other ``AsyncSequence`` + /// - Returns: an ``AsyncWithLatestFromSequence`` where elements are a tuple of an element from `self` and the + /// latest known element (if any) from the `other` sequence. + func withLatest( + from other: Other + ) -> AsyncWithLatestFromSequence { + AsyncWithLatestFromSequence(self, other) + } +} + +/// ``AsyncWithLatestFromSequence`` is an ``AsyncSequence`` where elements are a tuple of an element from `base` and the +/// latest known element (if any) from the `other` sequence. +public struct AsyncWithLatestFromSequence: AsyncSequence +where Other: Sendable, Other.Element: Sendable { + public typealias Element = (Base.Element, Other.Element) + public typealias AsyncIterator = Iterator + + let base: Base + let other: Other + + // for testability purpose + var onBaseElement: (@Sendable (Base.Element) -> Void)? + var onOtherElement: (@Sendable (Other.Element?) -> Void)? + + init(_ base: Base, _ other: Other) { + self.base = base + self.other = other + } + + public func makeAsyncIterator() -> Iterator { + var iterator = Iterator( + base: self.base.makeAsyncIterator(), + other: self.other + ) + iterator.onBaseElement = onBaseElement + iterator.onOtherElement = onOtherElement + iterator.startOther() + return iterator + } + + public struct Iterator: AsyncIteratorProtocol { + enum OtherState { + case idle + case element(Result) + } + + enum BaseDecision { + case pass + case returnElement(Result) + } + + var base: Base.AsyncIterator + let other: Other + let otherState: ManagedCriticalState + var otherTask: Task? + var isTerminated: Bool + + // for testability purpose + var onBaseElement: (@Sendable (Base.Element) -> Void)? + var onOtherElement: (@Sendable (Other.Element?) -> Void)? + + public init(base: Base.AsyncIterator, other: Other) { + self.base = base + self.other = other + self.otherState = ManagedCriticalState(.idle) + self.isTerminated = false + } + + mutating func startOther() { + self.otherTask = Task { [other, otherState, onOtherElement] in + do { + for try await element in other { + otherState.withCriticalRegion { state in + state = .element(.success(element)) + } + onOtherElement?(element) + } + } catch { + otherState.withCriticalRegion { state in + state = .element(.failure(error)) + } + } + } + } + + public mutating func next() async rethrows -> Element? { + guard !self.isTerminated else { return nil } + + return try await withTaskCancellationHandler { [otherTask] in + otherTask?.cancel() + } operation: { [otherTask, otherState, onBaseElement] in + do { + while true { + guard let baseElement = try await self.base.next() else { + self.isTerminated = true + otherTask?.cancel() + return nil + } + + onBaseElement?(baseElement) + + let decision = otherState.withCriticalRegion { state -> BaseDecision in + switch state { + case .idle: + return .pass + case .element(.success(let otherElement)): + return .returnElement(.success((baseElement, otherElement))) + case .element(.failure(let otherError)): + return .returnElement(.failure(otherError)) + } + } + + switch decision { + case .pass: + continue + case .returnElement(let result): + return try result._rethrowGet() + } + } + } catch { + self.isTerminated = true + otherTask?.cancel() + throw error + } + } + } + } +} + +extension AsyncWithLatestFromSequence: Sendable where Base: Sendable {} diff --git a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift index 2d490cc9..05e19b95 100644 --- a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift +++ b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift @@ -64,5 +64,15 @@ final class TestThroughput: XCTestCase { zip($0, $1, $2) } } + func test_withLatest() async { + await measureSequenceThroughput(firstOutput: 1, secondOutput: 2) { + $1.withLatest(from: $0) + } + } + func test_withLatest2() async { + await measureSequenceThroughput(firstOutput: 1, secondOutput: 2, thirdOutput: 3) { + $0.withLatest(from: $1, $2) + } + } } #endif diff --git a/Tests/AsyncAlgorithmsTests/TestWithLatestFrom.swift b/Tests/AsyncAlgorithmsTests/TestWithLatestFrom.swift new file mode 100644 index 00000000..87302ef3 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestWithLatestFrom.swift @@ -0,0 +1,168 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@testable import AsyncAlgorithms +@preconcurrency import XCTest + +final class TestWithLatestFrom: XCTestCase { + func test_withLatestFrom_uses_latest_element_from_other() async { + // Timeline + // base: -0---1 -2 -----3 ---4-----x--| + // other: ---a-- -- -b-c-- -x----------| + // expected: -----(1,a)-(2,a)-----(3,c)---(4,c)-x--| + let baseHasProduced0 = expectation(description: "Base has produced 0") + + let otherHasProducedA = expectation(description: "Other has produced 'a'") + let otherHasProducedC = expectation(description: "Other has produced 'c'") + + let base = AsyncChannel() + let other = AsyncChannel() + + let sequence = base.withLatest(from: other) + var iterator = sequence.makeAsyncIterator() + + // expectations that ensure that "other" has really delivered + // its elements before requesting the next element from "base" + iterator.onOtherElement = { @Sendable element in + if element == "a" { + otherHasProducedA.fulfill() + } + + if element == "c" { + otherHasProducedC.fulfill() + } + } + + iterator.onBaseElement = { @Sendable element in + if element == 0 { + baseHasProduced0.fulfill() + } + } + + Task { + await base.send(0) + wait(for: [baseHasProduced0], timeout: 1.0) + await other.send("a") + wait(for: [otherHasProducedA], timeout: 1.0) + await base.send(1) + } + + let element1 = await iterator.next() + XCTAssertEqual(element1!, (1, "a")) + + Task { + await base.send(2) + } + + let element2 = await iterator.next() + XCTAssertEqual(element2!, (2, "a")) + + Task { + await other.send("b") + await other.send("c") + wait(for: [otherHasProducedC], timeout: 1.0) + await base.send(3) + } + + let element3 = await iterator.next() + XCTAssertEqual(element3!, (3, "c")) + + Task { + other.finish() + await base.send(4) + } + + let element4 = await iterator.next() + XCTAssertEqual(element4!, (4, "c")) + + base.finish() + + let element5 = await iterator.next() + XCTAssertNil(element5) + + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_throws_when_base_throws_and_pastEnd_is_nil() async throws { + let base = [1, 2, 3] + let other = Indefinite(value: "a") + + let sequence = base.async.map { try throwOn(1, $0) }.withLatest(from: other.async) + var iterator = sequence.makeAsyncIterator() + + do { + let value = try await iterator.next() + XCTFail("got \(value as Any) but expected throw") + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_throws_when_other_throws_and_pastEnd_is_nil() async throws { + let base = Indefinite(value: 1) + let other = AsyncThrowingChannel() + let sequence = base.async.withLatest(from: other) + var iterator = sequence.makeAsyncIterator() + + other.fail(Failure()) + + do { + var element: (Int, String)? + repeat { + element = try await iterator.next() + } while element == nil + XCTFail("got \(element as Any) but expected throw") + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_finishes_loop_when_task_is_cancelled() async { + let iterated = expectation(description: "The iteration has produced 1 element") + let finished = expectation(description: "The iteration has finished") + + let base = Indefinite(value: 1).async + let other = Indefinite(value: "a").async + + let sequence = base.withLatest(from: other) + + let task = Task { + var iterator = sequence.makeAsyncIterator() + + var firstIteration = false + var firstElement: (Int, String)? + while let element = await iterator.next() { + if !firstIteration { + firstElement = element + firstIteration = true + iterated.fulfill() + } + } + XCTAssertEqual(firstElement!, (1, "a")) + finished.fulfill() + } + + // ensure the other task actually starts + wait(for: [iterated], timeout: 1.0) + + // cancellation should ensure the loop finishes + // without regards to the remaining underlying sequence + task.cancel() + wait(for: [finished], timeout: 1.0) + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestWithLatestFrom2.swift b/Tests/AsyncAlgorithmsTests/TestWithLatestFrom2.swift new file mode 100644 index 00000000..3f89a0a0 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestWithLatestFrom2.swift @@ -0,0 +1,222 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@testable import AsyncAlgorithms +@preconcurrency import XCTest + +final class TestWithLatestFrom2: XCTestCase { + func test_withLatestFrom_uses_latest_element_from_others() async { + // Timeline + // base: -0-----1 ---2 ---3 ---4 ---5 -x--| + // other1: ---a---- ---- -b-- -x-- ---- ----| + // other2: -----x-- -y-- ---- ---- -x-- ----| + // expected: -------(1,a,x)---(2,a,y)---(3,b,y)---(4,b,y)---(5,b,y)-x--| + let baseHasProduced0 = expectation(description: "Base has produced 0") + + let other1HasProducedA = expectation(description: "Other has produced 'a'") + let other1HasProducedB = expectation(description: "Other has produced 'b'") + + let other2HasProducedX = expectation(description: "Other has produced 'x'") + let other2HasProducedY = expectation(description: "Other has produced 'y'") + + let base = AsyncChannel() + let other1 = AsyncChannel() + let other2 = AsyncChannel() + + let sequence = base.withLatest(from: other1, other2) + var iterator = sequence.makeAsyncIterator() + + // expectations that ensure that "others" has really delivered + // their elements before requesting the next element from "base" + iterator.onOther1Element = { @Sendable element in + if element == "a" { + other1HasProducedA.fulfill() + } + + if element == "b" { + other1HasProducedB.fulfill() + } + } + + iterator.onOther2Element = { @Sendable element in + if element == "x" { + other2HasProducedX.fulfill() + } + + if element == "y" { + other2HasProducedY.fulfill() + } + } + + iterator.onBaseElement = { @Sendable element in + if element == 0 { + baseHasProduced0.fulfill() + } + } + + Task { + await base.send(0) + wait(for: [baseHasProduced0], timeout: 1.0) + await other1.send("a") + wait(for: [other1HasProducedA], timeout: 1.0) + await other2.send("x") + wait(for: [other2HasProducedX], timeout: 1.0) + await base.send(1) + } + + let element1 = await iterator.next() + XCTAssertEqual(element1!, (1, "a", "x")) + + Task { + await other2.send("y") + wait(for: [other2HasProducedY], timeout: 1.0) + await base.send(2) + } + + let element2 = await iterator.next() + XCTAssertEqual(element2!, (2, "a", "y")) + + Task { + await other1.send("b") + wait(for: [other1HasProducedB], timeout: 1.0) + await base.send(3) + } + + let element3 = await iterator.next() + XCTAssertEqual(element3!, (3, "b", "y")) + + Task { + other1.finish() + await base.send(4) + } + + let element4 = await iterator.next() + XCTAssertEqual(element4!, (4, "b", "y")) + + Task { + other2.finish() + await base.send(5) + } + + let element5 = await iterator.next() + XCTAssertEqual(element5!, (5, "b", "y")) + + base.finish() + + let element6 = await iterator.next() + XCTAssertNil(element6) + + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_throws_when_base_throws_and_pastEnd_is_nil() async throws { + let base = [1, 2, 3] + let other1 = Indefinite(value: "a") + let other2 = Indefinite(value: "x") + + let sequence = base.async.map { try throwOn(1, $0) }.withLatest(from: other1.async, other2.async) + var iterator = sequence.makeAsyncIterator() + + do { + let value = try await iterator.next() + XCTFail("got \(value as Any) but expected throw") + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_throws_when_other1_throws_and_pastEnd_is_nil() async throws { + let base = Indefinite(value: 1) + let other1 = AsyncThrowingChannel() + let other2 = Indefinite(value: "x").async + + let sequence = base.async.withLatest(from: other1, other2) + var iterator = sequence.makeAsyncIterator() + + other1.fail(Failure()) + + do { + var element: (Int, String, String)? + repeat { + element = try await iterator.next() + } while element == nil + XCTFail("got \(element as Any) but expected throw") + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_throws_when_other2_throws_and_pastEnd_is_nil() async throws { + let base = Indefinite(value: 1) + let other1 = Indefinite(value: "x").async + let other2 = AsyncThrowingChannel() + + let sequence = base.async.withLatest(from: other1, other2) + var iterator = sequence.makeAsyncIterator() + + other2.fail(Failure()) + + do { + var element: (Int, String, String)? + repeat { + element = try await iterator.next() + } while element == nil + XCTFail("got \(element as Any) but expected throw") + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + + func test_withLatestFrom_finishes_loop_when_task_is_cancelled() async { + let iterated = expectation(description: "The iteration has produced 1 element") + let finished = expectation(description: "The iteration has finished") + + let base = Indefinite(value: 1).async + let other1 = Indefinite(value: "a").async + let other2 = Indefinite(value: "x").async + + let sequence = base.withLatest(from: other1, other2) + + let task = Task { + var iterator = sequence.makeAsyncIterator() + + var firstIteration = false + var firstElement: (Int, String, String)? + while let element = await iterator.next() { + if !firstIteration { + firstElement = element + firstIteration = true + iterated.fulfill() + } + } + XCTAssertEqual(firstElement!, (1, "a", "x")) + finished.fulfill() + } + + // ensure the other task actually starts + wait(for: [iterated], timeout: 1.0) + + // cancellation should ensure the loop finishes + // without regards to the remaining underlying sequence + task.cancel() + wait(for: [finished], timeout: 1.0) + } +}