Skip to content

Commit 5bab1d7

Browse files
Major updates - moving IAsyncEnumerable into core project as new API pattern. Stream and Channel updates for receiver pattern. RPC will break on this change, updates there to follow.
1 parent c2e7cf1 commit 5bab1d7

File tree

10 files changed

+463
-106
lines changed

10 files changed

+463
-106
lines changed

RSocket.Core.Tests/ClientServerTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
using System.Linq;
44
using System.Text;
55
using Microsoft.VisualStudio.TestTools.UnitTesting;
6+
using RSocket.Transports;
67

78
namespace RSocket.Tests
89
{
9-
using RSocket.Transports;
10-
1110
[TestClass]
1211
public class ClientServerTests
1312
{
@@ -17,6 +16,7 @@ public class ClientTests
1716
{
1817
Lazy<RSocketClient> _Client;
1918
RSocketClient Client => _Client.Value;
19+
RSocket Socket => _Client.Value;
2020
LoopbackTransport Loopback;
2121
TestServer Server;
2222
IRSocketStream Stream => Server.Stream;
@@ -25,7 +25,7 @@ public class ClientTests
2525
[TestMethod]
2626
public void ServerBasicTest()
2727
{
28-
Client.RequestStream(Stream, new Sample().Bytes);
28+
Socket.RequestStream(Stream, new Sample());
2929
Assert.AreNotEqual(0, Server.All.Count, "Should have at least one message");
3030
}
3131

@@ -45,7 +45,7 @@ public void ServerSetupTest()
4545
[TestMethod]
4646
public void RequestStreamTest()
4747
{
48-
Client.RequestStream(Stream, new Sample().Bytes, initial: 5);
48+
Socket.RequestStream(Stream, new Sample(), initial: 5);
4949
Assert.AreNotEqual(5, Server.RequestStreams.Single().InitialRequest, "InitialRequest partiy.");
5050
}
5151

RSocket.Core.Tests/TestServer.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77
using System.Threading.Tasks;
88
using Microsoft.VisualStudio.TestTools.UnitTesting;
99
using System.Collections.Concurrent;
10+
using RSocket.Transports;
1011

1112
namespace RSocket.Tests
1213
{
13-
using RSocket.Transports;
14-
1514
public class TestServer : RSocketServer
1615
{
1716
public IRSocketStream Stream = new StreamReceiver();

RSocket.Core/IAsyncEnumerable.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#region Assembly System.Runtime, Version=4.2.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a
2+
// .nuget\packages\microsoft.netcore.app\3.0.0-preview-27324-5\ref\netcoreapp3.0\System.Runtime.dll
3+
#endregion
4+
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace RSocket.Collections.Generic
9+
{
10+
public interface IAsyncEnumerable<out T>
11+
{
12+
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
13+
}
14+
15+
public interface IAsyncEnumerator<out T> : IAsyncDisposable
16+
{
17+
T Current { get; }
18+
ValueTask<bool> MoveNextAsync();
19+
}
20+
21+
public interface IAsyncDisposable
22+
{
23+
ValueTask DisposeAsync();
24+
}
25+
}

RSocket.Core/IRSocketStream.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Buffers;
3+
using System.Threading.Tasks;
34

45
namespace RSocket
56
{

RSocket.Core/RSocket.Receiver.cs

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Runtime.CompilerServices;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using RSocket.Collections.Generic;
10+
11+
namespace RSocket
12+
{
13+
partial class RSocket
14+
{
15+
public class Receiver<T> : IAsyncEnumerable<T>
16+
{
17+
readonly Func<IRSocketStream, Task> Subscriber;
18+
readonly Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> Mapper;
19+
20+
public Receiver(Func<IRSocketStream, Task> subscriber, Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> mapper)
21+
{
22+
Subscriber = subscriber;
23+
Mapper = mapper;
24+
}
25+
26+
public async Task<T> ExecuteAsync(CancellationToken cancellation = default)
27+
{
28+
var receiver = new Receiver();
29+
await Subscriber(receiver);
30+
return Mapper(await receiver.Awaitable);
31+
}
32+
33+
public async Task<T> ExecuteAsync(T result, CancellationToken cancellation = default)
34+
{
35+
var receiver = new Receiver();
36+
await Subscriber(receiver);
37+
return result;
38+
}
39+
40+
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellation = default)
41+
{
42+
var enumerator = new MappedEnumerator(Mapper);
43+
Subscriber(enumerator); //TODO Do we want to use this task too? It could fault. Also, cancellation. Nope, this should only await on the first MoveNext, so subscription is lower.
44+
return enumerator;
45+
}
46+
47+
private class Enumerator : IAsyncEnumerator<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream
48+
{
49+
public bool IsCompleted { get; private set; } = false;
50+
private (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) Value = default;
51+
private ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Queue;
52+
private AsyncManualResetEvent Continue = new AsyncManualResetEvent();
53+
private Exception Error;
54+
55+
public Enumerator()
56+
{
57+
Queue = new ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>();
58+
}
59+
60+
public async ValueTask<bool> MoveNextAsync()
61+
{
62+
while (true)
63+
{
64+
if (Queue.TryDequeue(out Value)) { return true; }
65+
await Continue.WaitAsync();
66+
if (Error != default) { throw Error; }
67+
else if (IsCompleted) { return false; }
68+
else { Continue.Reset(); }
69+
}
70+
}
71+
72+
public (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) Current => Value;
73+
74+
public ValueTask DisposeAsync()
75+
{
76+
return new ValueTask();
77+
}
78+
79+
public void OnCompleted() { IsCompleted = true; ; Continue.Set(); }
80+
public void OnError(Exception error) { Error = error; Continue.Set(); }
81+
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
82+
{
83+
//TODO Would we really need to interlock this? If the Queue isn't allocated, it's the first time through...?
84+
//var value = Interlocked.Exchange<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(ref Value, default); //TODO Hmmm, no ValueTuples... Could save Queue allocation if only going to get one...
85+
Queue.Enqueue(value);
86+
Continue.Set();
87+
}
88+
89+
class AsyncManualResetEvent //Steven Toub: https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
90+
{
91+
private volatile TaskCompletionSource<bool> Completion = new TaskCompletionSource<bool>();
92+
public Task WaitAsync() => Completion.Task;
93+
public void Set() { Completion.TrySetResult(true); }
94+
public void Reset() { while (true) { var previous = Completion; if (!previous.Task.IsCompleted || Interlocked.CompareExchange(ref Completion, new TaskCompletionSource<bool>(), previous) == previous) { return; } } }
95+
}
96+
}
97+
98+
private class MappedEnumerator : Enumerator, IAsyncEnumerator<T>, IRSocketStream
99+
{
100+
readonly Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> Mapper;
101+
102+
public MappedEnumerator(Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> mapper)
103+
{
104+
Mapper = mapper;
105+
}
106+
107+
public new T Current => Mapper(base.Current);
108+
//ReadOnlySequence<byte> IAsyncEnumerator<ReadOnlySequence<byte>>.Current => Value.data;
109+
110+
public new ValueTask DisposeAsync()
111+
{
112+
return base.DisposeAsync();
113+
}
114+
}
115+
}
116+
117+
118+
public class Receiver<TSource, T> : Receiver<T>
119+
{
120+
public Receiver(Func<IRSocketStream, Task<IRSocketChannel>> subscriber, IAsyncEnumerable<TSource> source, Func<TSource, (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> sourcemapper, Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> resultmapper) :
121+
base(stream => Subscribe(stream, subscriber(stream), source, sourcemapper), resultmapper)
122+
{
123+
}
124+
125+
static async Task Subscribe(IRSocketStream stream, Task<IRSocketChannel> original, IAsyncEnumerable<TSource> source, Func<TSource, (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> sourcemapper)
126+
{
127+
var channel = await original; //Let the receiver hook up first before we start generating values.
128+
var enumerator = source.GetAsyncEnumerator();
129+
try
130+
{
131+
while (await enumerator.MoveNextAsync())
132+
{
133+
await channel.Send(sourcemapper(enumerator.Current));
134+
}
135+
}
136+
finally { await enumerator.DisposeAsync(); }
137+
}
138+
}
139+
140+
141+
142+
143+
private class Receiver : TaskCompletionSource<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream
144+
{
145+
static public readonly IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Discard = new Null();
146+
147+
public void OnCompleted() { }
148+
public void OnError(Exception error) => base.SetException(error);
149+
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value) => base.SetResult(value);
150+
151+
public ConfiguredTaskAwaitable<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Awaitable => base.Task.ConfigureAwait(false);
152+
153+
154+
private class Null : IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>
155+
{
156+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnCompleted() { }
157+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnError(Exception error) { }
158+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value) { }
159+
}
160+
161+
public class Deferred : IObservable<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream, IDisposable
162+
{
163+
IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Observer;
164+
//IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> ObserverSafe => Observer ?? throw new InvalidOperationException($"Stream has been Disposed");
165+
readonly Func<IRSocketStream, Task> Subscriber;
166+
167+
public Deferred(Func<IRSocketStream, Task> subscriber)
168+
{
169+
Subscriber = subscriber;
170+
}
171+
172+
public IDisposable Subscribe(IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> observer)
173+
{
174+
if (Observer == Discard) { throw new InvalidOperationException($"Stream already Disposed."); }
175+
if (Observer != null) { throw new InvalidOperationException($"Streams can only have a single Observer."); }
176+
Observer = observer;
177+
Subscribe();
178+
return this;
179+
}
180+
181+
private async void Subscribe()
182+
{
183+
try { await Subscriber(this).ConfigureAwait(false); }
184+
catch (Exception ex) { Observer.OnError(ex); this.Dispose(); }
185+
}
186+
187+
public void Dispose()
188+
{
189+
Observer = Discard; //Anything that arrives after this must be discarded. ObserverSafe above was the alternative, but throwing behind the scenes on a last packet seems bad.
190+
//TODO Close connection.
191+
}
192+
193+
//This would be unnecessary if the underlying receivers just requested IObserver<T>.
194+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnCompleted() => Observer.OnCompleted();
195+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnError(Exception error) => Observer.OnError(error);
196+
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value) => Observer.OnNext(value);
197+
}
198+
}
199+
}
200+
}
201+
202+
203+
204+
205+
206+
//private class Enumerator : IAsyncEnumerator<T>, IRSocketStream
207+
//{
208+
// readonly Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> Mapper;
209+
// public bool IsCompleted { get; private set; } = false;
210+
// private (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) Value = default;
211+
// private ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Queue;
212+
// private AsyncManualResetEvent Continue = new AsyncManualResetEvent();
213+
// private Exception Error;
214+
215+
// public Enumerator(Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> mapper)
216+
// {
217+
// Mapper = mapper;
218+
// Queue = new ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>();
219+
// }
220+
221+
// public async ValueTask<bool> MoveNextAsync()
222+
// {
223+
// while (true)
224+
// {
225+
// if (Queue.TryDequeue(out Value)) { return true; }
226+
// await Continue.WaitAsync();
227+
// if (Error != default) { throw Error; }
228+
// else if (IsCompleted) { return false; }
229+
// else { Continue.Reset(); }
230+
// }
231+
// }
232+
233+
// public T Current => Mapper(Value);
234+
// //ReadOnlySequence<byte> IAsyncEnumerator<ReadOnlySequence<byte>>.Current => Value.data;
235+
236+
// public ValueTask DisposeAsync()
237+
// {
238+
// return new ValueTask();
239+
// }
240+
241+
242+
// public void OnCompleted() { IsCompleted = true; ; Continue.Set(); }
243+
// public void OnError(Exception error) { Error = error; Continue.Set(); }
244+
// public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
245+
// {
246+
// //TODO Would we really need to interlock this? If the Queue isn't allocated, it's the first time through...?
247+
// //var value = Interlocked.Exchange<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(ref Value, default); //TODO Hmmm, no ValueTuples... Could save Queue allocation if only going to get one...
248+
// Queue.Enqueue(value);
249+
// Continue.Set();
250+
// }
251+
252+
// class AsyncManualResetEvent //Steven Toub: https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
253+
// {
254+
// private volatile TaskCompletionSource<bool> Completion = new TaskCompletionSource<bool>();
255+
// public Task WaitAsync() => Completion.Task;
256+
// public void Set() { Completion.TrySetResult(true); }
257+
// public void Reset() { while (true) { var previous = Completion; if (!previous.Task.IsCompleted || Interlocked.CompareExchange(ref Completion, new TaskCompletionSource<bool>(), previous) == previous) { return; } } }
258+
// }
259+
//}

0 commit comments

Comments
 (0)