File tree Expand file tree Collapse file tree 2 files changed +48
-3
lines changed
AsyncRx.NET/System.Reactive.Async Expand file tree Collapse file tree 2 files changed +48
-3
lines changed Original file line number Diff line number Diff line change 22// The .NET Foundation licenses this file to you under the MIT License.
33// See the LICENSE file in the project root for more information.
44
5+ using System . Reactive . Internal ;
56using System . Threading . Tasks ;
67
78namespace System . Reactive
@@ -124,7 +125,7 @@ protected override async ValueTask OnCompletedAsyncCore()
124125 return ;
125126 }
126127
127- _task = _observer . OnCompletedAsync ( ) ;
128+ _task = _observer . OnCompletedAsync_EnsureAsync ( ) ;
128129 }
129130
130131 try
@@ -146,7 +147,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error)
146147 return ;
147148 }
148149
149- _task = _observer . OnErrorAsync ( error ) ;
150+ _task = _observer . OnErrorAsync_EnsureAsync ( error ) ;
150151 }
151152
152153 try
@@ -168,7 +169,7 @@ protected override async ValueTask OnNextAsyncCore(T value)
168169 return ;
169170 }
170171
171- _task = _observer . OnNextAsync ( value ) ;
172+ _task = _observer . OnNextAsync_EnsureAsync ( value ) ;
172173 }
173174
174175 try
Original file line number Diff line number Diff line change 1+ using System . Threading . Tasks ;
2+
3+ namespace System . Reactive . Internal ;
4+
5+ // Helpers methods that ensure that calls to IAsyncObserver methods don't throw synchronously.
6+ // Those methods will always return a ValueTask, and any exception will be propagated through that ValueTask.
7+ internal static class AsyncObserverEnsureAsyncHelpers
8+ {
9+ public static ValueTask OnNextAsync_EnsureAsync < T > ( this IAsyncObserver < T > source , T value )
10+ {
11+ try
12+ {
13+ return source . OnNextAsync ( value ) ;
14+ }
15+ catch ( Exception e )
16+ {
17+ return new ValueTask ( Task . FromException ( e ) ) ;
18+ }
19+ }
20+
21+ public static ValueTask OnErrorAsync_EnsureAsync < T > ( this IAsyncObserver < T > source , Exception error )
22+ {
23+ try
24+ {
25+ return source . OnErrorAsync ( error ) ;
26+ }
27+ catch ( Exception e )
28+ {
29+ return new ValueTask ( Task . FromException ( e ) ) ;
30+ }
31+ }
32+
33+ public static ValueTask OnCompletedAsync_EnsureAsync < T > ( this IAsyncObserver < T > source )
34+ {
35+ try
36+ {
37+ return source . OnCompletedAsync ( ) ;
38+ }
39+ catch ( Exception e )
40+ {
41+ return new ValueTask ( Task . FromException ( e ) ) ;
42+ }
43+ }
44+ }
You can’t perform that action at this time.
0 commit comments