Skip to content

Commit 7644d93

Browse files
Producer-side fixes. New scheduler, new iterators. Fixes for Tasks that instantly complete and so cannot be scheduled.
1 parent 756e801 commit 7644d93

File tree

2 files changed

+65
-107
lines changed

2 files changed

+65
-107
lines changed

RSocket.Core/RSocket.Receiver.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ public async Task<T> ExecuteAsync(CancellationToken cancellation = default)
3434
Subscriber(observer).ConfigureAwait(false);
3535
return Disposable.Empty;
3636
});
37-
var (metadata, data) = await observable.ToTask(cancellation);
38-
return Mapper((data, metadata));
37+
38+
var (metadata, data) = await observable.ToTask(cancellation);
39+
return Mapper((data, metadata));
3940
}
4041

4142
public async Task<T> ExecuteAsync(T result, CancellationToken cancellation = default)

RSocket.Core/RSocket.cs

+62-105
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ void IRSocketProtocol.Payload(in RSocketProtocol.Payload message, ReadOnlySequen
149149
//Console.WriteLine($"{value.Header.Stream:0000}===>{Encoding.UTF8.GetString(value.Data.ToArray())}");
150150
if (Dispatcher.TryGetValue(message.Stream, out var transform))
151151
{
152-
//TODO FIXIE!
153152
if (message.IsNext) { transform.OnNext((metadata, data)); }
154153
if (message.IsComplete) { transform.OnCompleted(); }
155154
}
@@ -159,23 +158,51 @@ void IRSocketProtocol.Payload(in RSocketProtocol.Payload message, ReadOnlySequen
159158
}
160159
}
161160

161+
//void Schedule(Task task)
162+
//{
163+
// //if (!task.IsCompleted) { task.Start(); } //FUTURE Someday might want to schedule these in a different pool or perhaps track all in-flight tasks.
164+
//}
165+
166+
void Schedule(int stream, Func<int, CancellationToken, Task> operation, CancellationToken cancel = default)
167+
{
168+
var task = operation(stream, cancel);
169+
if (!task.IsCompleted) { task.ConfigureAwait(false); } //FUTURE Someday might want to schedule these in a different pool or perhaps track all in-flight tasks.
170+
}
171+
162172

163173
//void IRSocketProtocol.Setup(in RSocketProtocol.Setup message) => Setup(message);
164174
public virtual void Setup(in RSocketProtocol.Setup value) => throw new InvalidOperationException($"Client cannot process Setup frames"); //TODO This exception just stalls processing. Need to make sure it's handled.
165175
void IRSocketProtocol.Error(in RSocketProtocol.Error message) { throw new NotImplementedException(); } //TODO Handle Errors!
166176
void IRSocketProtocol.RequestFireAndForget(in RSocketProtocol.RequestFireAndForget message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => throw new NotImplementedException();
167177

168-
public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), Task<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)>> Responder { get; set; } = request => throw new NotImplementedException();
178+
179+
public void Respond<TRequest, TResult>(
180+
Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), TRequest> requestTransform,
181+
Func<TRequest, IAsyncEnumerable<TResult>> producer,
182+
Func<TResult, (ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)> resultTransform) =>
183+
Responder = (request) => (from result in producer(requestTransform(request)) select resultTransform(result)).FirstAsync();
184+
185+
public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), ValueTask<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)>> Responder { get; set; } = request => throw new NotImplementedException();
169186

170187
void IRSocketProtocol.RequestResponse(in RSocketProtocol.RequestResponse message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
171188
{
172-
Respond(message.Stream).Start();
173-
async Task Respond(int stream)
189+
Schedule(message.Stream, async (stream, cancel) =>
174190
{
175-
var (Data, Metadata) = await Responder((data, metadata)); //TODO Handle Errors.
176-
new RSocketProtocol.Payload(stream, Data, Metadata, next: true, complete: true).Write(Transport.Output, Data, Metadata);
177-
await Transport.Output.FlushAsync();
178-
}
191+
var value = await Responder((data, metadata)); //TODO Handle Errors.
192+
await new RSocketProtocol.Payload(stream, value.Data, value.Metadata, next: true, complete: true).WriteFlush(Transport.Output, value.Data, value.Metadata);
193+
});
194+
//async Task Respond(int stream)
195+
//{
196+
// var value = await Responder((data, metadata)); //TODO Handle Errors.
197+
// await new RSocketProtocol.Payload(stream, value.Data, value.Metadata, next: true).WriteFlush(Transport.Output, value.Data, value.Metadata);
198+
//}
199+
200+
//ScheduleTask(Respond(message.Stream));
201+
//async Task Respond(int stream)
202+
//{
203+
// var value = await Responder((data, metadata)); //TODO Handle Errors.
204+
// await new RSocketProtocol.Payload(stream, value.Data, value.Metadata, next: true).WriteFlush(Transport.Output, value.Data, value.Metadata);
205+
//}
179206
}
180207

181208

@@ -185,60 +212,30 @@ public void Stream<TRequest, TResult>(
185212
Func<TResult, (ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)> resultTransform) =>
186213
Streamer = (request) => from result in producer(requestTransform(request)) select resultTransform(result);
187214

188-
public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)>> Streamer { get; set; } = request => throw new NotImplementedException();
215+
public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), IAsyncEnumerable<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)>> Streamer { get; set; } = request => throw new NotImplementedException();
189216

190217
void IRSocketProtocol.RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
191218
{
192-
Stream(message.Stream).Start();
193-
async Task Stream(int stream)
219+
Schedule(message.Stream, async (stream, cancel) =>
194220
{
195221
var source = Streamer((data, metadata)); //TODO Handle Errors.
196222
await ForEach(source,
197-
action: value => new RSocketProtocol.Payload(stream, value.data, value.metadata, next: true).WriteFlush(Transport.Output, value.data, value.metadata),
223+
action: value => new RSocketProtocol.Payload(stream, value.Data, value.Metadata, next: true).WriteFlush(Transport.Output, value.Data, value.Metadata),
198224
final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
199-
200-
//var source = Streamer((data, metadata)); //TODO Handle Errors.
201-
//var enumerator = source.GetAsyncEnumerator();
202-
//try
203-
//{
204-
// while (await enumerator.MoveNextAsync())
205-
// {
206-
// var (Data, Metadata) = enumerator.Current;
207-
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
208-
// await Transport.Output.FlushAsync();
209-
// }
210-
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
211-
// await Transport.Output.FlushAsync();
212-
//}
213-
//finally { await enumerator.DisposeAsync(); }
214-
}
225+
});
226+
227+
//Schedule(Stream(message.Stream));
228+
//async Task Stream(int stream)
229+
//{
230+
// var source = Streamer((data, metadata)); //TODO Handle Errors.
231+
// await ForEach(source,
232+
// action: value => new RSocketProtocol.Payload(stream, value.Data, value.Metadata, next: true).WriteFlush(Transport.Output, value.Data, value.Metadata),
233+
// final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
234+
//}
215235
}
216236

217237

218-
//public void Channel<TSource, TResult>(Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata),
219-
// (IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Incoming,
220-
// IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Outgoing)>
221-
222-
223-
// Func<TSource, (ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> outgoingMapper,
224-
// Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), TResult> incomingMapper)
225-
//{
226-
// var observable = Observable.Create<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(observer => {
227-
// Subscriber(observer).ConfigureAwait(false);
228-
// });
229-
// return observable
230-
// .Select(value => Mapper((value.data, value.metadata)))
231-
// .ToAsyncEnumerable()
232-
233-
// .GetAsyncEnumerator(cancellation);
234-
235-
// var receiver = new Receiver<TResult>(stream => Task.CompletedTask, incomingMapper);
236-
237-
// Channeler = request =>
238-
// (
239-
// receiver,
240-
// );
241-
//}
238+
//TODO, probably need to have an IAE<T> pipeline overload too.
242239

243240
public void Channel<TRequest, TIncoming, TOutgoing>(Func<TRequest, IObservable<TIncoming>, IAsyncEnumerable<TOutgoing>> pipeline,
244241
Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata), TRequest> requestTransform,
@@ -252,66 +249,26 @@ public void Channel<TRequest, TIncoming, TOutgoing>(Func<TRequest, IObservable<T
252249

253250
void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
254251
{
255-
Channel(message.Stream).Start();
256-
async Task Channel(int stream, CancellationToken cancel = default)
252+
Schedule(message.Stream, async (stream, cancel) =>
257253
{
258-
//TODO Elsewhere in previous changes, bad Disposable.Empty
259254
var inc = Observable.Create<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(observer => () => StreamDispatch(stream, observer));
260255
var outgoing = Channeler((data, metadata), inc); //TODO Handle Errors.
261256

262257
await ForEach(outgoing,
263258
action: value => new RSocketProtocol.Payload(stream, value.data, value.metadata, next: true).WriteFlush(Transport.Output, value.data, value.metadata),
264259
final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
265-
266-
//var outgoingstream = ForEach(outgoing,
267-
// action: value => new RSocketProtocol.Payload(stream, value.data, value.metadata, next: true).WriteFlush(Transport.Output, value.data, value.metadata),
268-
// final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
269-
270-
//var enumerator = outgoing.GetAsyncEnumerator();
271-
//try
272-
//{
273-
// while (await enumerator.MoveNextAsync())
274-
// {
275-
// var (Data, Metadata) = enumerator.Current;
276-
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
277-
// await Transport.Output.FlushAsync();
278-
// }
279-
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
280-
// await Transport.Output.FlushAsync();
281-
//}
282-
//finally { await enumerator.DisposeAsync(); }
283-
}
284-
285-
286-
// Channel(message.Stream).Start();
287-
288-
// //new Receiver<bool>()
289-
290-
// //new Receiver<bool>(stream => RequestFireAndForget(stream, data, metadata), _ => true).ExecuteAsync(result: true);
291-
// //var id = StreamDispatch(stream);
292-
293-
// async Task Channel(int stream)
294-
// {
295-
// var (Incoming, Outoing) = Channeler((data, metadata)); //TODO Handle Errors.
296-
297-
298-
// using (observable.Subscribe())
299-
// {
300-
// var enumerator = source.GetAsyncEnumerator();
301-
// try
302-
// {
303-
// while (await enumerator.MoveNextAsync())
304-
// {
305-
// var (Data, Metadata) = enumerator.Current;
306-
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
307-
// await Transport.Output.FlushAsync();
308-
// }
309-
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
310-
// await Transport.Output.FlushAsync();
311-
// }
312-
// finally { await enumerator.DisposeAsync(); }
313-
// }
314-
// }
260+
});
261+
//Schedule(Channel(message.Stream));
262+
//async Task Channel(int stream, CancellationToken cancel = default)
263+
//{
264+
// //TODO Elsewhere in previous changes, bad Disposable.Empty
265+
// var inc = Observable.Create<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(observer => () => StreamDispatch(stream, observer));
266+
// var outgoing = Channeler((data, metadata), inc); //TODO Handle Errors.
267+
268+
// await ForEach(outgoing,
269+
// action: value => new RSocketProtocol.Payload(stream, value.data, value.metadata, next: true).WriteFlush(Transport.Output, value.data, value.metadata),
270+
// final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
271+
//}
315272
}
316273

317274
internal static async Task ForEach<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task> action, CancellationToken cancel = default, Func<Task> final = default)

0 commit comments

Comments
 (0)