diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a4c8c9fb0..f873c09da 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -651,7 +651,6 @@ C89AB1DA1DAAC3350065FBE6 /* Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B11DAAC3350065FBE6 /* Driver.swift */; }; C89AB1DE1DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */; }; C89AB1EA1DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */; }; - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */; }; C89AB1F61DAAC3350065FBE6 /* SharedSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */; }; C89AB2021DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */; }; C89AB2061DAAC3350065FBE6 /* KVORepresentable+Swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */; }; @@ -789,6 +788,8 @@ CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */; }; CDDEF16A1D4FB40000CA8546 /* Disposables.swift in Sources */ = {isa = PBXBuildFile; fileRef = CDDEF1691D4FB40000CA8546 /* Disposables.swift */; }; + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */; }; + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */; }; D9080ACF1EA05AE0002B433B /* RxNavigationControllerDelegateProxy.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */; }; D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */; }; D9080AD81EA06189002B433B /* UINavigationController+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */; }; @@ -1353,7 +1354,6 @@ C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObservableConvertibleType+Driver.swift"; sourceTree = ""; }; C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+arity.swift"; sourceTree = ""; }; C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = "SharedSequence+Operators+arity.tt"; sourceTree = ""; }; - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SharedSequence.swift; sourceTree = ""; }; C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+CoreGraphics.swift"; sourceTree = ""; }; C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+Swift.swift"; sourceTree = ""; }; @@ -1453,6 +1453,8 @@ CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Infallible+Driver.swift"; sourceTree = ""; }; CDDEF1691D4FB40000CA8546 /* Disposables.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Disposables.swift; sourceTree = ""; }; + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+MainActor.swift"; sourceTree = ""; }; D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxNavigationControllerDelegateProxy.swift; sourceTree = ""; }; D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+Rx.swift"; sourceTree = ""; }; D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+RxTests.swift"; sourceTree = ""; }; @@ -2346,7 +2348,8 @@ children = ( C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */, C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */, - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */, + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */, + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */, DB08833626FB0637005805BE /* SharedSequence+Concurrency.swift */, C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */, C85E6FBD1F53025700C5681E /* SchedulerType+SharedSequence.swift */, @@ -3045,6 +3048,7 @@ B562478F203515DD00D3EE75 /* RxCollectionViewDataSourcePrefetchingProxy.swift in Sources */, 84E4D3921C9AFD3400ADFDC9 /* UISearchController+Rx.swift in Sources */, C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */, + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */, CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */, C89AB1A61DAAC25A0065FBE6 /* RxCocoaObjCRuntimeError+Extensions.swift in Sources */, C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */, @@ -3080,7 +3084,6 @@ C89AB1CA1DAAC3350065FBE6 /* ControlProperty.swift in Sources */, ECBBA59E1DF8C0D400DDDC2E /* RxTabBarControllerDelegateProxy.swift in Sources */, 78F2D93E24C8D35700D13F0C /* RxWKNavigationDelegateProxy.swift in Sources */, - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */, 9BA1CBD31C0F7D550044B50A /* UIActivityIndicatorView+Rx.swift in Sources */, 842A5A2C1C357F92003568D5 /* NSTextStorage+Rx.swift in Sources */, C88254241B8A752B00B02D69 /* RxTextViewDelegateProxy.swift in Sources */, @@ -3098,6 +3101,7 @@ C89AB2501DAAC3A60065FBE6 /* _RXObjCRuntime.m in Sources */, C89AB21E1DAAC3350065FBE6 /* NSObject+Rx.swift in Sources */, D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */, + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */, 88718CFE1CE5D80000D88D60 /* UITabBar+Rx.swift in Sources */, 88D98F2E1CE7549A00D50457 /* RxTabBarDelegateProxy.swift in Sources */, C88254331B8A752B00B02D69 /* UISwitch+Rx.swift in Sources */, diff --git a/RxCocoa/Traits/Driver/Driver+Subscription.swift b/RxCocoa/Traits/Driver/Driver+Subscription.swift index 0b9024c75..bab00ec51 100644 --- a/RxCocoa/Traits/Driver/Driver+Subscription.swift +++ b/RxCocoa/Traits/Driver/Driver+Subscription.swift @@ -155,10 +155,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func drive( with object: Object, - onNext: ((Object, Element) -> Void)? = nil, - onCompleted: ((Object) -> Void)? = nil, + onNext: (@MainActor (Object, Element) -> Void)? = nil, + onCompleted: (@MainActor (Object) -> Void)? = nil, onDisposed: ((Object) -> Void)? = nil ) -> Disposable { MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) @@ -178,9 +179,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func drive( - onNext: ((Element) -> Void)? = nil, - onCompleted: (() -> Void)? = nil, + onNext: (@MainActor (Element) -> Void)? = nil, + onCompleted: (@MainActor () -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) diff --git a/RxCocoa/Traits/Driver/Driver.swift b/RxCocoa/Traits/Driver/Driver.swift index 5de8b3a56..0301315d2 100644 --- a/RxCocoa/Traits/Driver/Driver.swift +++ b/RxCocoa/Traits/Driver/Driver.swift @@ -37,7 +37,7 @@ import RxSwift */ public typealias Driver = SharedSequence -public struct DriverSharingStrategy: SharingStrategyProtocol { +public struct DriverSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { source.share(replay: 1, scope: .whileConnected) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift new file mode 100644 index 000000000..42bf4e680 --- /dev/null +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift @@ -0,0 +1,540 @@ +// +// SharedSequence+Operators.swift +// RxCocoa +// +// Created by Krunoslav Zaher on 9/19/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +// MARK: map +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into a new form. + + - parameter selector: A transform function to apply to each source element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. + */ + @preconcurrency + public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { + let source = self + .asObservable() + .map(selector) + return SharedSequence(source) + } +} + +// MARK: compactMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into an optional form and filters all optional results. + + - parameter selector: A transform function to apply to each source element and which returns an element or nil. + - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. + + */ + @preconcurrency + public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { + let source = self + .asObservable() + .compactMap(selector) + return SharedSequence(source) + } +} + +// MARK: filter +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Filters the elements of an observable sequence based on a predicate. + + - parameter predicate: A function to test each source element for a condition. + - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. + */ + @preconcurrency + public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { + let source = self + .asObservable() + .filter(predicate) + return SharedSequence(source) + } +} + +// MARK: switchLatest +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Transforms an observable sequence of observable sequences into an observable sequence + producing values only from the most recent observable sequence. + + Each time a new inner observable sequence is received, unsubscribe from the + previous inner observable sequence. + + - returns: The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + public func switchLatest() -> SharedSequence { + let source: Observable = self + .asObservable() + .map { $0.asSharedSequence() } + .switchLatest() + return SharedSequence(source) + } +} + +// MARK: flatMapLatest +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Projects each element of an observable sequence into a new sequence of observable sequences and then + transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. + + It is a combination of `map` + `switchLatest` operator + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an + Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + @preconcurrency + public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapLatest(selector) + return SharedSequence(source) + } +} + +// MARK: flatMapFirst +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + If element is received while there is some projected observable sequence being merged it will simply be ignored. + + - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. + */ + @preconcurrency + public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapFirst(selector) + return SharedSequence(source) + } +} + +// MARK: do +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Invokes an action for each event in the observable sequence, and propagates all observer messages through the result sequence. + + - parameter onNext: Action to invoke for each element in the observable sequence. + - parameter afterNext: Action to invoke for each element after the observable has passed an onNext event along to its downstream. + - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. + - parameter afterCompleted: Action to invoke after graceful termination of the observable sequence. + - parameter onSubscribe: Action to invoke before subscribing to source observable sequence. + - parameter onSubscribed: Action to invoke after subscribing to source observable sequence. + - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. + - returns: The source sequence with the side-effecting behavior applied. + */ + @preconcurrency + public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) + -> SharedSequence { + let source = self.asObservable() + .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) + + return SharedSequence(source) + } +} + +// MARK: debug +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Prints received events for all observers on standard output. + + - parameter identifier: Identifier that is printed together with event description to standard output. + - returns: An observable sequence whose events are printed to standard output. + */ + public func debug(_ identifier: String? = nil, trimOutput: Bool = false, file: String = #file, line: UInt = #line, function: String = #function) -> SharedSequence { + let source = self.asObservable() + .debug(identifier, trimOutput: trimOutput, file: file, line: line, function: function) + return SharedSequence(source) + } +} + +// MARK: distinctUntilChanged +extension SharedSequenceConvertibleType where Element: Equatable, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to equality operator. + + - returns: An observable sequence only containing the distinct contiguous elements, based on equality operator, from the source sequence. + */ + public func distinctUntilChanged() + -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: { ($0 == $1) }) + + return SharedSequence(source) + } +} + +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `keySelector`. + + - parameter keySelector: A function to compute the comparison key for each element. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. + */ + @preconcurrency + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `comparer`. + + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. + */ + @preconcurrency + public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: comparer) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the keySelector and the comparer. + + - parameter keySelector: A function to compute the comparison key for each element. + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. + */ + @preconcurrency + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: comparer) + return SharedSequence(source) + } +} + + +// MARK: flatMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + */ + @preconcurrency + public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { + let source = self.asObservable() + .flatMap(selector) + + return SharedSequence(source) + } +} + +// MARK: merge +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences from collection into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences from array into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Array of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: [SharedSequence]) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: SharedSequence...) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + +} + +// MARK: merge +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence. + + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public func merge() -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge() + return SharedSequence(source) + } + + /** + Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences. + + - parameter maxConcurrent: Maximum number of inner observable sequences being subscribed to concurrently. + - returns: The observable sequence that merges the elements of the inner sequences. + */ + public func merge(maxConcurrent: Int) + -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge(maxConcurrent: maxConcurrent) + return SharedSequence(source) + } +} + +// MARK: throttle +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration. + + This operator makes sure that no two elements are emitted in less then dueTime. + + - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) + + - parameter dueTime: Throttling duration for each element. + - parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted. + - returns: The throttled sequence. + */ + public func throttle(_ dueTime: RxTimeInterval, latest: Bool = true) + -> SharedSequence { + let source = self.asObservable() + .throttle(dueTime, latest: latest, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } + + /** + Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers. + + - parameter dueTime: Throttling duration for each element. + - returns: The throttled sequence. + */ + public func debounce(_ dueTime: RxTimeInterval) + -> SharedSequence { + let source = self.asObservable() + .debounce(dueTime, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } +} + +// MARK: scan +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value. + + For aggregation behavior with no intermediate results, see `reduce`. + + - parameter seed: The initial accumulator value. + - parameter accumulator: An accumulator function to be invoked on each element. + - returns: An observable sequence containing the accumulated values. + */ + @preconcurrency + public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) + -> SharedSequence { + let source = self.asObservable() + .scan(seed, accumulator: accumulator) + return SharedSequence(source) + } +} + +// MARK: concat + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ sequence: Sequence) -> SharedSequence + where Sequence.Element == SharedSequence { + let source = Observable.concat(sequence.lazy.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.concat(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: zip + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. + + - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency + public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence all of the observable sequences have produced an element at a corresponding index. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func zip(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }) + return SharedSequence(source) + } +} + +// MARK: combineLatest + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element. + + - parameter resultSelector: Function to invoke whenever any of the sources produces an element. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency + public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence whenever any of the observable sequences produces an element. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func combineLatest(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: - withUnretained +extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. + - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. + */ + @preconcurrency + public func withUnretained( + _ obj: Object, + resultSelector: @escaping @MainActor (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. + */ + public func withUnretained(_ obj: Object) -> SharedSequence { + withUnretained(obj) { ($0, $1) } + } +} + +extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained( + _ obj: Object, + resultSelector: @escaping (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained(_ obj: Object) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) + } +} + +// MARK: withLatestFrom +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Merges two observable sequences into one observable sequence by combining each element from self with the latest element from the second source, if any. + + - parameter second: Second observable source. + - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + @preconcurrency + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) + + return SharedSequence(source) + } + + /** + Merges two observable sequences into one observable sequence by using latest element from the second sequence every time when `self` emits an element. + + - parameter second: Second observable source. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + public func withLatestFrom(_ second: SecondO) -> SharedSequence { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence()) + + return SharedSequence(source) + } +} diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift index 1d53b03d9..7a207826f 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift @@ -443,56 +443,6 @@ extension SharedSequence { } } -// MARK: - withUnretained -extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. - - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. - */ - public func withUnretained( - _ obj: Object, - resultSelector: @escaping (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. - */ - public func withUnretained(_ obj: Object) -> SharedSequence { - withUnretained(obj) { ($0, $1) } - } -} - -extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained( - _ obj: Object, - resultSelector: @escaping (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained(_ obj: Object) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) - } -} - // MARK: withLatestFrom extension SharedSequenceConvertibleType { diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence.swift b/RxCocoa/Traits/SharedSequence/SharedSequence.swift index 4596c8ec0..b86ff97c1 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence.swift @@ -81,6 +81,11 @@ public protocol SharingStrategyProtocol { static func share(_ source: Observable) -> Observable } +/** + A marker protocol for all sharing strategies, which are guaranteed to run on the main thread. + */ +public protocol MainActorSharingStrategyProtocol: SharingStrategyProtocol {} + /** A type that can be converted to `SharedSequence`. */ diff --git a/RxCocoa/Traits/Signal/Signal+Subscription.swift b/RxCocoa/Traits/Signal/Signal+Subscription.swift index 4a6add336..bb4f23f2d 100644 --- a/RxCocoa/Traits/Signal/Signal+Subscription.swift +++ b/RxCocoa/Traits/Signal/Signal+Subscription.swift @@ -130,10 +130,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency public func emit( with object: Object, - onNext: ((Object, Element) -> Void)? = nil, - onCompleted: ((Object) -> Void)? = nil, + onNext: (@MainActor (Object, Element) -> Void)? = nil, + onCompleted: (@MainActor (Object) -> Void)? = nil, onDisposed: ((Object) -> Void)? = nil ) -> Disposable { self.asObservable().subscribe( @@ -156,9 +157,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency public func emit( - onNext: ((Element) -> Void)? = nil, - onCompleted: (() -> Void)? = nil, + onNext: (@MainActor (Element) -> Void)? = nil, + onCompleted: (@MainActor () -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed) diff --git a/RxCocoa/Traits/Signal/Signal.swift b/RxCocoa/Traits/Signal/Signal.swift index e066b7ec2..db2153241 100644 --- a/RxCocoa/Traits/Signal/Signal.swift +++ b/RxCocoa/Traits/Signal/Signal.swift @@ -29,7 +29,7 @@ import RxSwift */ public typealias Signal = SharedSequence -public struct SignalSharingStrategy: SharingStrategyProtocol { +public struct SignalSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { diff --git a/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift new file mode 120000 index 000000000..49285c88e --- /dev/null +++ b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift @@ -0,0 +1 @@ +../../RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift \ No newline at end of file diff --git a/Tests/RxCocoaTests/SharedSequence+Test.swift b/Tests/RxCocoaTests/SharedSequence+Test.swift index e2406f477..74aa6cbd4 100644 --- a/Tests/RxCocoaTests/SharedSequence+Test.swift +++ b/Tests/RxCocoaTests/SharedSequence+Test.swift @@ -107,4 +107,61 @@ extension SharedSequenceTest { return firstElements } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testDriverWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asDriver(onErrorDriveWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testSignalWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asSignal(onErrorSignalWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testBackgroundSharingSequence() async { + func testBackgroundSharingSequence() async { + for await value in await Observable.just(1) + .asSharedSequence( + sharingStrategy: BackgroundSharingStrategy.self, + onErrorRecover: { _ in .empty() }) + .map({ one in + if Thread.isMainThread { + return 0 + } + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + } } + +private struct BackgroundSharingStrategy: SharingStrategyProtocol { + public static var scheduler: SchedulerType { ConcurrentDispatchQueueScheduler(qos: .default) } + + public static func share(_ source: Observable) -> Observable { + source.share(scope: .whileConnected) + } +} + +private typealias TestSequence = SharedSequence