Skip to content

Commit 8c42f17

Browse files
committed
Add withLatestFrom operator
1 parent eb2b8b5 commit 8c42f17

File tree

5 files changed

+738
-0
lines changed

5 files changed

+738
-0
lines changed

README.md

+40
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ This repo contains XCombine, a Swift module, developed on top of the Combine fra
1818
- [Example of Usage](#example-of-usage)
1919
- [ShareReplay Operator](#sharereplay-operator)
2020
- [Example of Usage](#example-of-usage-sharereplay)
21+
- [WithLatestFrom Operator](#withlatestfrom-operator)
22+
- [Example of Usage](#example-of-usage-withlatestfrom)
2123
- [License](#license)
2224

2325
## Installation
@@ -136,6 +138,44 @@ measurements.send(100)
136138
measurements.send(110)
137139
```
138140

141+
## WithLatestFrom Operator
142+
143+
Another extension of the Combine framework that XCombine offers is the `withLatestFrom` operator which merges two publishers into one publisher by combining each element from self with the latest element from the second source. It returns a publisher that emits pairs of elements from the upstream publishers as tuples.
144+
145+
### <a name="example-of-usage-withlatestfrom"></a>Example of Usage
146+
147+
The following example demonstrates the use of the XCombine's `withLatestFrom` operator.
148+
149+
```swift
150+
import Combine
151+
import XCombine
152+
153+
let up1 = PassthroughSubject<Int, Never>()
154+
let up2 = PassthroughSubject<String, Never>()
155+
156+
var subscriber = up1.x.withLatestFrom(up2)
157+
.sink(
158+
receiveCompletion: { completion in
159+
print(completion)
160+
},
161+
receiveValue: { v1, v2 in
162+
print(v1, v2)
163+
})
164+
165+
up1.send(1)
166+
up2.send("foo")
167+
up1.send(2)
168+
up1.send(3)
169+
up2.send("bar")
170+
up2.send("baz")
171+
up1.send(4)
172+
173+
// The console output:
174+
// 2 foo
175+
// 3 foo
176+
// 4 baz
177+
```
178+
139179
## License
140180

141181
This project is licensed under the MIT license.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import Combine
2+
3+
extension XCombine {
4+
enum UpstreamCompletionPolicy {
5+
case transparent
6+
case completeAll
7+
}
8+
9+
final class UpstreamGroupCompleter {
10+
private var completionHandlers = [() -> Void]()
11+
func register(_ handler: @escaping () -> Void) {
12+
completionHandlers.append(handler)
13+
}
14+
func complete() {
15+
completionHandlers.forEach { $0() }
16+
}
17+
}
18+
19+
final class UpstreamCompletionObserver<Upstream: Publisher>: Publisher
20+
{
21+
// MARK: - Types
22+
23+
typealias Output = Upstream.Output
24+
typealias Failure = Upstream.Failure
25+
26+
// MARK: - Properties
27+
28+
private let upstream: Upstream
29+
private var subscriber: AnySubscriber<Output, Failure>!
30+
31+
private let completer: UpstreamGroupCompleter
32+
private let policy: UpstreamCompletionPolicy
33+
private var isCompleted: Bool = false
34+
35+
// MARK: - Initialization
36+
37+
init(upstream: Upstream,
38+
completer: UpstreamGroupCompleter,
39+
policy: UpstreamCompletionPolicy)
40+
{
41+
self.upstream = upstream
42+
self.completer = completer
43+
self.policy = policy
44+
}
45+
46+
// MARK: - Publisher Lifecycle
47+
48+
func receive<S: Subscriber>(subscriber: S)
49+
where S.Failure == Failure, S.Input == Output
50+
{
51+
completer.register({
52+
guard !self.isCompleted else { return; }
53+
self.isCompleted = true
54+
subscriber.receive(completion: .finished)
55+
})
56+
57+
let innerSubscriber = AnySubscriber<Output, Failure>(
58+
receiveSubscription: { subscription in
59+
subscriber.receive(subscription: subscription)
60+
},
61+
receiveValue: { value in
62+
subscriber.receive(value)
63+
},
64+
receiveCompletion: { completion in
65+
switch completion {
66+
case .finished:
67+
switch self.policy {
68+
case .transparent:
69+
self.isCompleted = true
70+
subscriber.receive(completion: completion)
71+
case .completeAll:
72+
self.completer.complete()
73+
}
74+
default:
75+
subscriber.receive(completion: completion)
76+
}
77+
})
78+
79+
upstream.subscribe(innerSubscriber)
80+
}
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//
2+
// XCombine+WithLatestFrom.swift
3+
// XCombine
4+
//
5+
// Created by Serge Bouts on 12/11/2019.
6+
// Copyright © 2019 Serge Bouts. All rights reserved.
7+
//
8+
9+
import Combine
10+
11+
extension XCombine {
12+
/// A "cold" publisher, that creates a new identity for each downstream subscriber.
13+
public struct WithLatestFrom<Upstream: Publisher, Other: Publisher>: Publisher where Upstream.Failure == Other.Failure {
14+
// MARK: - Types
15+
16+
public typealias Output = (Upstream.Output, Other.Output)
17+
public typealias Failure = Upstream.Failure
18+
19+
// MARK: - Properties
20+
21+
private let upstream: Upstream
22+
private let other: Other
23+
24+
private let completer = XCombine.UpstreamGroupCompleter()
25+
26+
// MARK: - Initialization
27+
28+
init(upstream: Upstream, other: Other) {
29+
self.upstream = upstream
30+
self.other = other
31+
}
32+
33+
// MARK: - Publisher Lifecycle
34+
35+
public func receive<S: Subscriber>(subscriber: S) where S.Failure == Failure, S.Input == Output {
36+
let merged = mergedStream(upstream, other)
37+
let result = resultStream(from: merged)
38+
result.subscribe(subscriber)
39+
}
40+
}
41+
}
42+
43+
// MARK: - Helpers
44+
private extension XCombine.WithLatestFrom {
45+
// MARK: - Types
46+
47+
enum MergedElement {
48+
case upstream1(Upstream.Output)
49+
case upstream2(Other.Output)
50+
}
51+
52+
typealias ScanResult = (value1: Upstream.Output?, value2: Other.Output?, shouldEmit: Bool)
53+
54+
// MARK: - Pipelines
55+
56+
func mergedStream(_ upstream1: Upstream, _ upstream2: Other) -> AnyPublisher<MergedElement, Failure> {
57+
let mergedElementUpstream1 = upstream1
58+
.x.upstreamCompletionObserver(completer, policy: .completeAll)
59+
.map { MergedElement.upstream1($0) }
60+
let mergedElementUpstream2 = upstream2
61+
.x.upstreamCompletionObserver(completer, policy: .transparent)
62+
.map { MergedElement.upstream2($0) }
63+
return mergedElementUpstream1
64+
.merge(with: mergedElementUpstream2)
65+
.eraseToAnyPublisher()
66+
}
67+
68+
func resultStream(from mergedStream: AnyPublisher<MergedElement, Failure>) -> AnyPublisher<Output, Failure> {
69+
mergedStream
70+
.scan(nil) { (prevResult: ScanResult?, mergedElement: MergedElement) -> ScanResult? in
71+
var newValue1: Upstream.Output?
72+
var newValue2: Other.Output?
73+
let shouldEmit: Bool
74+
75+
switch mergedElement {
76+
case .upstream1(let v):
77+
newValue1 = v
78+
shouldEmit = prevResult?.value2 != nil
79+
case .upstream2(let v):
80+
newValue2 = v
81+
shouldEmit = false
82+
}
83+
84+
return ScanResult(value1: newValue1 ?? prevResult?.value1,
85+
value2: newValue2 ?? prevResult?.value2,
86+
shouldEmit: shouldEmit)
87+
}
88+
.compactMap { $0 }
89+
.filter { $0.shouldEmit }
90+
.map { Output($0.value1!, $0.value2!) }
91+
.eraseToAnyPublisher()
92+
}
93+
}

Sources/XCombine/XCombine.swift

+19
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,23 @@ extension XCombineNamespace {
3434
{
3535
return .init(upstream: self.upstream, capacity: replay)
3636
}
37+
38+
internal func upstreamCompletionObserver(
39+
_ completer: XCombine.UpstreamGroupCompleter,
40+
policy: XCombine.UpstreamCompletionPolicy
41+
) -> XCombine.UpstreamCompletionObserver<Upstream>
42+
{
43+
return .init(upstream: self.upstream,
44+
completer: completer,
45+
policy: policy)
46+
}
47+
48+
/// Merges two publishers into one publisher by combining each element from self with the latest element from the second source, if any.
49+
/// - Parameter other: Another publisher.
50+
/// - Returns: A publisher that emits pairs of elements from the upstream publishers as tuples.
51+
public func withLatestFrom<Other: Publisher>(_ other: Other)
52+
-> XCombine.WithLatestFrom<Upstream, Other>
53+
{
54+
return .init(upstream: self.upstream, other: other)
55+
}
3756
}

0 commit comments

Comments
 (0)