Skip to content

Enable application-defined Synchronize gate in AsyncRx.NET #2153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Disposables
{
public sealed class CompositeAsyncDisposable : IAsyncDisposable
{
private readonly AsyncGate _gate = new();
private readonly IAsyncGate _gate = AsyncGate.Create();
private readonly List<IAsyncDisposable> _disposables;
private bool _disposed;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Disposables
{
public sealed class RefCountAsyncDisposable : IAsyncDisposable
{
private readonly AsyncGate _gate = new();
private readonly IAsyncGate _gate = AsyncGate.Create();
private IAsyncDisposable _disposable;
private bool _primaryDisposed;
private int _count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Disposables
{
public sealed class SerialAsyncDisposable : IAsyncDisposable
{
private readonly AsyncGate _gate = new();
private readonly IAsyncGate _gate = AsyncGate.Create();

private IAsyncDisposable _disposable;
private bool _disposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,7 +13,7 @@ internal abstract class ScheduledAsyncObserverBase<T> : AsyncObserverBase<T>, IS
{
private readonly IAsyncObserver<T> _observer;

private readonly AsyncGate _lock = new();
private readonly IAsyncGate _lock = AsyncGate.Create();
private readonly Queue<T> _queue = new();

private bool _hasFaulted = false;
Expand Down
6 changes: 3 additions & 3 deletions AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Joins
Expand All @@ -18,7 +18,7 @@ internal sealed class AsyncJoinObserver<T> : AsyncObserverBase<Notification<T>>,
private readonly List<ActiveAsyncPlan> _activePlans = new();
private readonly SingleAssignmentAsyncDisposable _subscription = new();

private AsyncGate _gate;
private IAsyncGate _gate;
private bool _isDisposed;

public AsyncJoinObserver(IAsyncObservable<T> source, Func<Exception, ValueTask> onError)
Expand Down Expand Up @@ -56,7 +56,7 @@ public async ValueTask DisposeAsync()
}
}

public async Task SubscribeAsync(AsyncGate gate)
public async Task SubscribeAsync(IAsyncGate gate)
{
_gate = gate;

Expand Down
4 changes: 2 additions & 2 deletions AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Joins
{
internal interface IAsyncJoinObserver : IAsyncDisposable
{
Task SubscribeAsync(AsyncGate gate);
Task SubscribeAsync(IAsyncGate gate);

void Dequeue();
}
Expand Down
6 changes: 3 additions & 3 deletions AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Threading;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Linq
Expand Down Expand Up @@ -83,7 +83,7 @@ public static (IAsyncObserver<TSource>, IAsyncObserver<TSource>) Amb<TSource>(IA
if (second == null)
throw new ArgumentNullException(nameof(second));

var gate = new AsyncGate();
var gate = AsyncGate.Create();

var state = AmbState.None;

Expand Down Expand Up @@ -199,7 +199,7 @@ public static IAsyncObserver<TSource>[] Amb<TSource>(IAsyncObserver<TSource> obs
if (subscriptions == null)
throw new ArgumentNullException(nameof(subscriptions));

var gate = new AsyncGate();
var gate = AsyncGate.Create();

var winner = default(int?);

Expand Down
11 changes: 6 additions & 5 deletions AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Threading;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -306,7 +307,7 @@ public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSour

async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
{
var gate = new AsyncGate();
var gate = AsyncGate.Create();

var buffer = new List<TSource>();

Expand Down Expand Up @@ -378,7 +379,7 @@ public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSour

async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
{
var gate = new AsyncGate();
var gate = AsyncGate.Create();

var queue = new Queue<List<TSource>>();

Expand Down Expand Up @@ -509,7 +510,7 @@ TimeSpan GetNextDue()

async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
{
var gate = new AsyncGate();
var gate = AsyncGate.Create();

var timer = new SerialAsyncDisposable();

Expand Down Expand Up @@ -586,7 +587,7 @@ public static (IAsyncObserver<TSource>, IAsyncObserver<TBufferBoundary>) Buffer<
if (observer == null)
throw new ArgumentNullException(nameof(observer));

var gate = new AsyncGate();
var gate = AsyncGate.Create();

var buffer = new List<TSource>();

Expand Down Expand Up @@ -660,7 +661,7 @@ public static (IAsyncObserver<TSource>, IAsyncObserver<TBufferBoundary>) Buffer<
{
var closeSubscription = new SerialAsyncDisposable();

var gate = new AsyncGate();
var gate = AsyncGate.Create();
var queueLock = new AsyncQueueLock();

var buffer = new List<TSource>();
Expand Down
Loading
Loading