diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs index ab78b7a00..3742fe52a 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs @@ -4,14 +4,14 @@ using System.Collections.Generic; using System.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Disposables { public sealed class CompositeAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private readonly List _disposables; private bool _disposed; diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs index 93f801af2..90f2aca8b 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ namespace System.Reactive.Disposables { public sealed class RefCountAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private IAsyncDisposable _disposable; private bool _primaryDisposed; private int _count; diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs index 94bb1a6b7..2020a8f50 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs @@ -2,14 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Disposables { public sealed class SerialAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private IAsyncDisposable _disposable; private bool _disposed; diff --git a/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs b/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs index 36fe60b46..371195d64 100644 --- a/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -12,7 +13,7 @@ internal abstract class ScheduledAsyncObserverBase : AsyncObserverBase, IS { private readonly IAsyncObserver _observer; - private readonly AsyncGate _lock = new(); + private readonly IAsyncGate _lock = AsyncGate.Create(); private readonly Queue _queue = new(); private bool _hasFaulted = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs b/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs index e1e8df311..5803eba6d 100644 --- a/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs +++ b/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Joins @@ -18,7 +18,7 @@ internal sealed class AsyncJoinObserver : AsyncObserverBase>, private readonly List _activePlans = new(); private readonly SingleAssignmentAsyncDisposable _subscription = new(); - private AsyncGate _gate; + private IAsyncGate _gate; private bool _isDisposed; public AsyncJoinObserver(IAsyncObservable source, Func onError) @@ -56,7 +56,7 @@ public async ValueTask DisposeAsync() } } - public async Task SubscribeAsync(AsyncGate gate) + public async Task SubscribeAsync(IAsyncGate gate) { _gate = gate; diff --git a/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs b/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs index df2d71c59..eb3c488c9 100644 --- a/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs +++ b/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs @@ -2,14 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Joins { internal interface IAsyncJoinObserver : IAsyncDisposable { - Task SubscribeAsync(AsyncGate gate); + Task SubscribeAsync(IAsyncGate gate); void Dequeue(); } diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs index 9105d85dc..037688eac 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -83,7 +83,7 @@ public static (IAsyncObserver, IAsyncObserver) Amb(IA if (second == null) throw new ArgumentNullException(nameof(second)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var state = AmbState.None; @@ -199,7 +199,7 @@ public static IAsyncObserver[] Amb(IAsyncObserver obs if (subscriptions == null) throw new ArgumentNullException(nameof(subscriptions)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var winner = default(int?); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs index 000c3b50b..1b59fbb08 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -306,7 +307,7 @@ public static IAsyncObserver Buffer(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var buffer = new List(); @@ -378,7 +379,7 @@ public static IAsyncObserver Buffer(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue>(); @@ -509,7 +510,7 @@ TimeSpan GetNextDue() async Task<(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var timer = new SerialAsyncDisposable(); @@ -586,7 +587,7 @@ public static (IAsyncObserver, IAsyncObserver) Buffer< if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var buffer = new List(); @@ -660,7 +661,7 @@ public static (IAsyncObserver, IAsyncObserver) Buffer< { var closeSubscription = new SerialAsyncDisposable(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queueLock = new AsyncQueueLock(); var buffer = new List(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs index c8ae78381..c5cf24f3a 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -1827,7 +1828,7 @@ public static (IAsyncObserver, IAsyncObserver) CombineLatest(IAs bool isDone2 = false; T2 latestValue2 = default(T2); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -1946,7 +1947,7 @@ public static (IAsyncObserver, IAsyncObserver) CombineLatest, IAsyncObserver, IAsyncObserver) Combi bool isDone3 = false; T3 latestValue3 = default(T3); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2243,7 +2244,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Combi bool isDone3 = false; T3 latestValue3 = default(T3); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2432,7 +2433,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone4 = false; T4 latestValue4 = default(T4); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2643,7 +2644,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone4 = false; T4 latestValue4 = default(T4); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2889,7 +2890,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone5 = false; T5 latestValue5 = default(T5); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3146,7 +3147,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone5 = false; T5 latestValue5 = default(T5); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3449,7 +3450,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone6 = false; T6 latestValue6 = default(T6); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3752,7 +3753,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone6 = false; T6 latestValue6 = default(T6); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4112,7 +4113,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone7 = false; T7 latestValue7 = default(T7); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4461,7 +4462,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone7 = false; T7 latestValue7 = default(T7); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4878,7 +4879,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone8 = false; T8 latestValue8 = default(T8); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -5273,7 +5274,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone8 = false; T8 latestValue8 = default(T8); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -5747,7 +5748,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone9 = false; T9 latestValue9 = default(T9); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -6188,7 +6189,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone9 = false; T9 latestValue9 = default(T9); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -6719,7 +6720,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone10 = false; T10 latestValue10 = default(T10); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -7206,7 +7207,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone10 = false; T10 latestValue10 = default(T10); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -7794,7 +7795,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone11 = false; T11 latestValue11 = default(T11); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -8327,7 +8328,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone11 = false; T11 latestValue11 = default(T11); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -8972,7 +8973,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone12 = false; T12 latestValue12 = default(T12); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -9551,7 +9552,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone12 = false; T12 latestValue12 = default(T12); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -10253,7 +10254,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone13 = false; T13 latestValue13 = default(T13); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -10878,7 +10879,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone13 = false; T13 latestValue13 = default(T13); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -11637,7 +11638,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone14 = false; T14 latestValue14 = default(T14); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -12308,7 +12309,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone14 = false; T14 latestValue14 = default(T14); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -13124,7 +13125,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone15 = false; T15 latestValue15 = default(T15); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -13841,7 +13842,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone15 = false; T15 latestValue15 = default(T15); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt index 5e42119a4..e6b44999d 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt @@ -9,6 +9,7 @@ <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -161,7 +162,7 @@ for (var j = 1; j <= i; j++) } #> - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -248,7 +249,7 @@ for (var j = 1; j <= i; j++) } #> - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs index 918871500..c5510bfe4 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -76,7 +77,7 @@ public partial class AsyncObserver var semaphore = new SemaphoreSlim(0); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue>(); var isDone = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index 5221045a9..a724d17b9 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -609,7 +609,7 @@ public partial class AsyncObserver groups = new ConcurrentDictionary>(Environment.ProcessorCount * 4, capacity, comparer); } - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var nullGate = new object(); var nullGroup = default(IAsyncSubject); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs index 93d855bc9..1718da00b 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -57,7 +57,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncDisposable) if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var group = new CompositeAsyncDisposable(subscriptions); var refCount = new RefCountAsyncDisposable(group); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs index 5d9971479..0c78aa789 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs @@ -4,7 +4,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -59,7 +59,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncDisposable) if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var group = new CompositeAsyncDisposable(subscriptions); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs index f03a496db..4f111931a 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -35,7 +35,7 @@ public static (IAsyncObserver>, IAsyncDisposable) Merg if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var count = 1; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs index d4949b1e1..3fc84e0f3 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -44,7 +45,7 @@ public partial class AsyncObserver var semaphore = new SemaphoreSlim(0); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue(); var error = default(Exception); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs index 60da700b3..309c01845 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs @@ -4,7 +4,7 @@ using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; namespace System.Reactive.Linq { @@ -15,7 +15,7 @@ public static IAsyncObservable RefCount(this IConnectableAsync if (source == null) throw new ArgumentNullException(nameof(source)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var count = 0; var connectable = default(IAsyncDisposable); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs index e8752d3d8..aa326863e 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -78,7 +78,7 @@ public static (IAsyncObserver, IAsyncObserver) Sample, IAsyncDisposable) SelectMany, IAsyncObserver) SequenceEqual(); var queueRight = new Queue(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs index 126dbb00d..1b59adeb1 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -130,7 +130,7 @@ public static IAsyncObserver Skip(IAsyncObserver obse // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer? // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable). - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var open = false; return diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs index 7f6c478d7..75b9ab35d 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -87,7 +87,7 @@ public static (IAsyncObserver, IAsyncObserver) SkipUntil, IAsyncObserver) SkipUntil>, IAsyncDisposable) Swit if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var isStopped = false; var hasLatest = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs index 27d016799..32fd3c3da 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs @@ -2,7 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Threading; +using System.Reactive.Threading; namespace System.Reactive.Linq { @@ -16,7 +16,7 @@ public static IAsyncObservable Synchronize(this IAsyncObservab return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer))); } - public static IAsyncObservable Synchronize(this IAsyncObservable source, AsyncGate gate) + public static IAsyncObservable Synchronize(this IAsyncObservable source, IAsyncGate gate) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -37,10 +37,10 @@ public static IAsyncObserver Synchronize(IAsyncObserver Synchronize(IAsyncObserver observer, AsyncGate gate) + public static IAsyncObserver Synchronize(IAsyncObserver observer, IAsyncGate gate) { if (observer == null) throw new ArgumentNullException(nameof(observer)); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs index d0064c23a..63cce1f73 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -130,7 +130,7 @@ public static IAsyncObserver Take(IAsyncObserver obse // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer? // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable). - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs index b71eb727a..5d70568b3 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -87,7 +87,7 @@ public static (IAsyncObserver, IAsyncObserver) TakeUntil, IAsyncObserver) TakeUntil, IAsyncDisposable) Throttle(IAsy if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var timer = new SerialAsyncDisposable(); @@ -187,7 +187,7 @@ public static (IAsyncObserver, IAsyncDisposable) Throttle, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var switched = false; var id = 0UL; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs index 562c28ad3..515554035 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Joins; -using System.Threading; +using System.Reactive.Threading; namespace System.Reactive.Linq { @@ -19,7 +19,7 @@ public static IAsyncObservable When(IEnumerable(async observer => { var externalSubscriptions = new Dictionary(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var activePlans = new List(); var outputObserver = AsyncObserver.Create( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs index aafbe08e3..886d51187 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs @@ -6,6 +6,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -296,7 +297,7 @@ public static (IAsyncObserver, IAsyncDisposable) Window(IAsync if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var window = default(IAsyncSubject); var d = new CompositeAsyncDisposable(); @@ -382,7 +383,7 @@ async Task CreateWindowAsync() if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var d = new CompositeAsyncDisposable(); var timer = new SerialAsyncDisposable(); @@ -538,7 +539,7 @@ async Task CreateTimer() if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var n = 0; var window = default(IAsyncSubject); @@ -649,7 +650,7 @@ async Task> CreateWindowAsync() if (subscription == null) throw new ArgumentNullException(nameof(subscription)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var refCount = new RefCountAsyncDisposable(subscription); var window = default(IAsyncSubject); @@ -736,7 +737,7 @@ async Task CreateWindowAsync() var closeSubscription = new SerialAsyncDisposable(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queueLock = new AsyncQueueLock(); var refCount = new RefCountAsyncDisposable(subscription); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs index 859aaaca2..c15d3d2c0 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -103,7 +103,7 @@ public static (IAsyncObserver, IAsyncObserver) WithLatestFrom, IAsyncObserver) WithLatestFrom, IAsyncObserver) Zip(IAsyncObserve if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -1913,7 +1914,7 @@ public static (IAsyncObserver, IAsyncObserver) Zip(IAsy if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2007,7 +2008,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(); var values2 = new Queue(); @@ -2103,7 +2104,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(); var values2 = new Queue(); @@ -2199,7 +2200,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2297,7 +2298,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2395,7 +2396,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2495,7 +2496,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2595,7 +2596,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2697,7 +2698,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2799,7 +2800,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2903,7 +2904,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3007,7 +3008,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3113,7 +3114,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3219,7 +3220,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3327,7 +3328,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3435,7 +3436,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3545,7 +3546,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3655,7 +3656,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3767,7 +3768,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3879,7 +3880,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3993,7 +3994,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4107,7 +4108,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4223,7 +4224,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4339,7 +4340,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4457,7 +4458,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4575,7 +4576,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4695,7 +4696,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt index f21866efc..fd34ced99 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt @@ -10,6 +10,7 @@ <#@ output extension=".cs" #> using System.Collections.Generic; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -152,7 +153,7 @@ for (var i = 2; i <= 15; i++) if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); <# for (var j = 1; j <= i; j++) @@ -258,7 +259,7 @@ for (var j = 1; j <= i; j++) if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); <# for (var j = 1; j <= i; j++) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs index 4deb6427c..f61d83348 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -50,7 +50,7 @@ public static IAsyncObserver[] Zip(IAsyncObserver[count]; var isDone = new bool[count]; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs index 5f12f775c..94c871c03 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs @@ -4,14 +4,14 @@ using System.Collections.Generic; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects { public abstract class BehaviorAsyncSubject : IAsyncSubject { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private readonly List> _observers = new(); private T _value; private bool _done; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs index 3723ce014..daa732882 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects @@ -12,7 +12,7 @@ internal sealed class ConnectableAsyncObservable : IConnectabl { private readonly IAsyncSubject _subject; private readonly IAsyncObservable _source; - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private Connection _connection; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs index 5928dc995..49f614ba1 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs @@ -6,7 +6,7 @@ using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects @@ -120,7 +120,7 @@ public ReplayAsyncSubject(bool concurrent, int bufferSize, TimeSpan window, IAsy private abstract class ReplayBase : IAsyncSubject { private readonly bool _concurrent; - private readonly AsyncGate _lock = new(); + private readonly IAsyncGate _lock = AsyncGate.Create(); private readonly List> _observers = new(); // TODO: immutable array private bool _done; private Exception _error; diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs index 507aa676f..766594653 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs @@ -3,17 +3,43 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; -namespace System.Threading +namespace System.Reactive.Threading { - public sealed class AsyncGate + /// + /// Provides an implementation of , enabling mutually exclusive locking + /// in async code. + /// + public sealed class AsyncGate : IAsyncGate, IAsyncGateReleaser { private readonly object _gate = new(); private readonly SemaphoreSlim _semaphore = new(1, 1); private readonly AsyncLocal _recursionCount = new(); - public ValueTask LockAsync() + /// + /// Creates an . + /// + /// + /// This is private because we hope that one day, the .NET runtime will provide a built-in + /// asynchronous mutual exclusion primitive, and that we might be able to use that instead of + /// our own implementation. Although that might be something we could do by modifying this + /// class, it might prove useful to be able to provide the old implementation for backwards + /// compatibility, so we don't want AsyncRx.NET consumers to depend on a specific concrete type + /// as the implementation. + /// + private AsyncGate() + { + } + + /// + /// Creates a new instance of an implementation. + /// + /// + public static IAsyncGate Create() => new AsyncGate(); + + ValueTask IAsyncGate.AcquireAsync() { var shouldAcquire = false; @@ -32,13 +58,18 @@ public ValueTask LockAsync() if (shouldAcquire) { - return new ValueTask(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this))); + Task acquireTask = _semaphore.WaitAsync(); + if (acquireTask.IsCompleted) + { + return new ValueTask(this); + } + return new ValueTask(acquireTask.ContinueWith(_ => this)); } - return new ValueTask(new Releaser(this)); + return new ValueTask(this); } - private void Release() + void IAsyncGateReleaser.Release() { lock (_gate) { @@ -50,14 +81,5 @@ private void Release() } } } - - public readonly struct Releaser : IDisposable - { - private readonly AsyncGate _parent; - - public Releaser(AsyncGate parent) => _parent = parent; - - public void Dispose() => _parent.Release(); - } } } diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs new file mode 100644 index 000000000..f801251e7 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs @@ -0,0 +1,43 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Reactive.Threading; + +/// +/// Extension methods for . +/// +public static class AsyncGateExtensions +{ + /// + /// Acquires an in a way enables the gate to be released with a + /// statement or declaration. + /// + /// The gate to lock. + /// + /// A that produces a that will call + /// when disposed. + /// + public static ValueTask LockAsync(this IAsyncGate gate) + { + // Note, we are avoiding async/await here because we MUST NOT create a new child ExecutionContext + // (The AsyncGate.LockAsync method does not use async/await either, and for the same reason.) + // + // IAsyncGate implementations are allowed to require that their LockAsync method is called from the same + // execution context as Release will be called. For example, AsyncGate uses an AsyncLocal to track + // the recursion count, and when you update an AsyncLocal's value, that modified value is visible only + // in the current ExecutionContext and its descendants. An async method effectively introduces a new child + // context, so any AsyncLocal value changes are lost when an async method returns, but we need the + // recursion count to live in our caller's context, which is why we must make sure we don't introduce a + // new child context here. That's why this needs to be old-school manual task management, and not async/await. + ValueTask releaserValueTask = gate.AcquireAsync(); + if (releaserValueTask.IsCompleted) + { + return new ValueTask(new DisposableGateReleaser(releaserValueTask.Result)); + } + + return new ValueTask(releaserValueTask.AsTask().ContinueWith(t => new DisposableGateReleaser(t.Result))); + } +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs index 407924a45..5b9d8e6a2 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Threading @@ -10,7 +11,7 @@ namespace System.Threading public sealed class AsyncQueueLock : IAsyncDisposable { private readonly Queue> _queue = new(); - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private bool _isAcquired; private bool _hasFaulted; diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs new file mode 100644 index 000000000..5d0bce691 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs @@ -0,0 +1,15 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +namespace System.Reactive.Threading; + +/// +/// Enables a statement or declaration to be used to release an +/// . Typically obtained through +/// +/// +public struct DisposableGateReleaser(IAsyncGateReleaser gateReleaser) : IDisposable +{ + public void Dispose() => gateReleaser.Release(); +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs new file mode 100644 index 000000000..68892a448 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs @@ -0,0 +1,63 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Reactive.Linq; +using System.Threading.Tasks; + +namespace System.Reactive.Threading +{ + /// + /// Synchronization primitive that provides -style + /// exclusive access semantics, but with an asynchronous API. + /// + /// + /// + /// This enables + /// and + /// to be used to synchronize access to an observer with a custom synchronization primitive. + /// + /// + /// These methods model the equivalents for and + /// in System.Reactive. Those offer overloads accepting a 'gate' parameter, and if you pass + /// the same object to multiple calls to these methods, they will all synchronize their operation + /// through that same gate object. The gate parameter in those methods is of type + /// , which works because all .NET objects have an associated monitor. + /// (It's created on demand when you first use lock or something equivalent.) + /// + /// + /// That approach is problematic in an async world, because this built-in monitor blocks the + /// calling thread when contention occurs. The basic idea of AsyncRx.NET is to avoid such + /// blocking. It can't always be avoided, and in cases where we can be certain that lock + /// acquisition times will be short, the conventional .NET monitor is still a good choice. + /// But since these Synchronize operators allow the caller to pass a gate which the + /// application code itself might lock, we have no control over how long the lock might be + /// held. So it would be inappropriate to use a monitor here. + /// + /// + /// Since the .NET runtime does not currently offer any asynchronous direct equivalent to + /// monitor, this interface defines the required API. The class + /// provide a basic implementation. If applications require additional features, (e.g. + /// if they want cancellation support when the application tries to acquire the lock) + /// they can provide their own implementation. + /// + /// + public interface IAsyncGate + { + /// + /// Acquires the lock. + /// + /// + /// A task that completes when the lock has been acquired, returning an + /// with which to release the lock. + /// + /// + /// + /// Applications release the lock by calling on the object + /// returned by this method. Typically this is done with a using statement or declaration by + /// using the extension method. + /// + /// + public ValueTask AcquireAsync(); + } +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs new file mode 100644 index 000000000..4b1d8df33 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs @@ -0,0 +1,24 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +namespace System.Reactive.Threading; + +/// +/// Releases a lock acquired from . +/// +/// +/// +/// Note that implementations of may return a reference to themselves +/// as the , so callers should not depend on each lock +/// acquisition returning a distinct . (This enables gate +/// implementations to avoid unnecessary allocation during lock acquisition.) +/// +/// +public interface IAsyncGateReleaser +{ + /// + /// Releases a lock acquired from . + /// + void Release(); +}