Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit de2e096

Browse files
Adding CancelInvocation message (#979)
* For Streaming only. And C# client only.
1 parent 665f166 commit de2e096

File tree

18 files changed

+368
-69
lines changed

18 files changed

+368
-69
lines changed

samples/ClientSample/HubSample.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
7272
break;
7373
}
7474

75-
await connection.InvokeAsync<object>("Send", cts.Token, line);
75+
await connection.InvokeAsync<object>("Send", line, cts.Token);
7676
}
7777
}
7878
catch (AggregateException aex) when (aex.InnerExceptions.All(e => e is OperationCanceledException))

specs/HubProtocol.md

+70-16
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ This document describes three encodings of the SignalR protocol: [JSON](http://w
2020

2121
In the SignalR protocol, the following types of messages can be sent:
2222

23-
* `Negotiation` Message - Sent by the client to negotiate the message format
23+
* `Negotiation` Message - Sent by the client to negotiate the message format.
2424
* `Invocation` Message - Indicates a request to invoke a particular method (the Target) with provided Arguments on the remote endpoint.
2525
* `StreamItem` Message - Indicates individual items of streamed response data from a previous Invocation message.
2626
* `Completion` Message - Indicates a previous Invocation has completed, and no further `StreamItem` messages will be received. Contains an error if the invocation concluded with an error, or the result if the invocation is not a streaming invocation.
27+
* `CancelInvocation` Message - Sent by the client to cancel a streaming invocation on the server.
2728

2829
After opening a connection to the server the client must send a `Negotiation` message to the server as its first message. The negotiation message is **always** a JSON message and contains the name of the format (protocol) that will be used for the duration of the connection. If the server does not support the protocol requested by the client or the first message received from the client is not a `Negotiation` message the server must close the connection.
2930

@@ -41,9 +42,9 @@ Example:
4142

4243
## Communication between the Caller and the Callee
4344

44-
There a three kinds of interactions between the Caller and the Calle:
45+
There are three kinds of interactions between the Caller and the Callee:
4546

46-
* Invocations - the Caller sends a message to the Calle and expects a message indicating that the invocation has been completed and optionally a result of the invocation
47+
* Invocations - the Caller sends a message to the Callee and expects a message indicating that the invocation has been completed and optionally a result of the invocation
4748
* Non-Blocking Invocations - the Caller sends a message to the Callee and does not expect any further messages for this invocation
4849
* Streaming Invocations - the Caller sends a message to the Callee and expects one or more results returned by the Callee followed by a message indicating the end of invocation
4950

@@ -74,7 +75,7 @@ The SignalR protocol allows for multiple `StreamItem` messages to be transmitted
7475

7576
On the Callee side, it is up to the Callee's Binder to determine if a method call will yield multiple results. For example, in .NET certain return types may indicate multiple results, while others may indicate a single result. Even then, applications may wish for multiple results to be buffered and returned in a single `Completion` frame. It is up to the Binder to decide how to map this. The Callee's Binder must encode each result in separate `StreamItem` messages, indicating the end of results by sending a `Completion` message.
7677

77-
On the Caller side, the user code which performs the invocation indicates how it would like to receive the results and it is up the Caller's Binder to determine how to handle the result. If the Caller expects only a single result, but multiple results are returned, the Caller's Binder should yield an error indicating that multiple results were returned. However, if a Caller expects multiple results, but only a single result is returned, the Caller's Binder should yield that single result and indicate there are no further results.
78+
On the Caller side, the user code which performs the invocation indicates how it would like to receive the results and it is up the Caller's Binder to determine how to handle the result. If the Caller expects only a single result, but multiple results are returned, the Caller's Binder should yield an error indicating that multiple results were returned. However, if a Caller expects multiple results, but only a single result is returned, the Caller's Binder should yield that single result and indicate there are no further results. If the Caller wants to stop receiving `StreamItem` messages before the Callee sends a `Completion` message, the Caller can send a `CancelInvocation` message with the same `Invocation ID` used for the `Invocation` message that started the stream. It is possible to receive `StreamItem` messages or a `Completion` message after a `CancelInvocation` message has been sent, these can be ignored.
7879

7980
## Completion and results
8081

@@ -223,6 +224,16 @@ S->C: Completion { Id = 42, Error = "Ran out of data!" }
223224

224225
This should manifest to the Calling code as a sequence which emits `0`, `1`, `2`, `3`, `4`, but then fails with the error `Ran out of data!`.
225226

227+
### Streamed Result closed early (`Stream` example above)
228+
229+
```
230+
C->S: Invocation { Id = 42, Target = "Stream", Arguments = [ 5 ] }
231+
S->C: StreamItem { Id = 42, Item = 0 }
232+
S->C: StreamItem { Id = 42, Item = 1 }
233+
C->S: CancelInvocation { Id = 42 }
234+
S->C: StreamItem { Id = 42, Item = 2} // This can be ignored
235+
```
236+
226237
### Non-Blocking Call (`NonBlocking` example above)
227238

228239
```
@@ -248,7 +259,7 @@ Example:
248259
```json
249260
{
250261
"type": 1,
251-
"invocationId": 123,
262+
"invocationId": "123",
252263
"target": "Send",
253264
"arguments": [
254265
42,
@@ -261,7 +272,7 @@ Example (Non-Blocking):
261272
```json
262273
{
263274
"type": 1,
264-
"invocationId": 123,
275+
"invocationId": "123",
265276
"nonblocking": true,
266277
"target": "Send",
267278
"arguments": [
@@ -284,7 +295,7 @@ Example
284295
```json
285296
{
286297
"type": 2,
287-
"invocationId": 123,
298+
"invocationId": "123",
288299
"item": 42
289300
}
290301
```
@@ -305,7 +316,7 @@ Example - A `Completion` message with no result or error
305316
```json
306317
{
307318
"type": 3,
308-
"invocationId": 123
319+
"invocationId": "123"
309320
}
310321
```
311322

@@ -314,7 +325,7 @@ Example - A `Completion` message with a result
314325
```json
315326
{
316327
"type": 3,
317-
"invocationId": 123,
328+
"invocationId": "123",
318329
"result": 42
319330
}
320331
```
@@ -324,7 +335,7 @@ Example - A `Completion` message with an error
324335
```json
325336
{
326337
"type": 3,
327-
"invocationId": 123,
338+
"invocationId": "123",
328339
"error": "It didn't work!"
329340
}
330341
```
@@ -334,12 +345,26 @@ Example - The following `Completion` message is a protocol error because it has
334345
```json
335346
{
336347
"type": 3,
337-
"invocationId": 123,
348+
"invocationId": "123",
338349
"result": 42,
339350
"error": "It didn't work!"
340351
}
341352
```
342353

354+
### CancelInvocation Message Encoding
355+
A `CancelInvocation` message is a JSON object with the following properties
356+
357+
* `type` - A `Number` with the literal value `5`, indicationg that this is a `CancelInvocation`.
358+
* `invocationId` - A `String` encoding the `Invocation ID` for a message.
359+
360+
Example
361+
```json
362+
{
363+
"type": 5,
364+
"invocationId": "123"
365+
}
366+
```
367+
343368
### JSON Payload Encoding
344369

345370
Items in the arguments array within the `Invocation` message type, as well as the `item` value of the `StreamItem` message and the `result` value of the `Completion` message, encode values which have meaning to each particular Binder. A general guideline for encoding/decoding these values is provided in the "Type Mapping" section at the end of this document, but Binders should provide configuration to applications to allow them to customize these mappings. These mappings need not be self-describing, because when decoding the value, the Binder is expected to know the destination type (by looking up the definition of the method indicated by the Target).
@@ -378,7 +403,7 @@ is decoded as follows:
378403

379404
* `0x95` - 5-element array
380405
* `0x01` - `1` (Message Type - `Invocation` message)
381-
* `0xa3` - string of length 3 (Target)
406+
* `0xa3` - string of length 3 (InvocationId)
382407
* `0x78` - `x`
383408
* `0x79` - `y`
384409
* `0x7a` - `z`
@@ -397,7 +422,9 @@ is decoded as follows:
397422

398423
`StreamItem` messages have the following structure:
399424

425+
```
400426
[2, InvocationId, Item]
427+
```
401428

402429
* `2` - Message Type - `2` indicates this is a `StreamItem` message
403430
* InvocationId - A `String` encoding the Invocation ID for the message
@@ -414,7 +441,7 @@ is decoded as follows:
414441

415442
* `0x93` - 3-element array
416443
* `0x02` - `2` (Message Type - `StreamItem` message)
417-
* `0xa3` - string of length 3 (Target)
444+
* `0xa3` - string of length 3 (InvocationId)
418445
* `0x78` - `x`
419446
* `0x79` - `y`
420447
* `0x7a` - `z`
@@ -449,7 +476,7 @@ is decoded as follows:
449476

450477
* `0x94` - 4-element array
451478
* `0x03` - `3` (Message Type - `Result` message)
452-
* `0xa3` - string of length 3 (Target)
479+
* `0xa3` - string of length 3 (InvocationId)
453480
* `0x78` - `x`
454481
* `0x79` - `y`
455482
* `0x7a` - `z`
@@ -472,7 +499,7 @@ is decoded as follows:
472499

473500
* `0x93` - 3-element array
474501
* `0x03` - `3` (Message Type - `Result` message)
475-
* `0xa3` - string of length 3 (Target)
502+
* `0xa3` - string of length 3 (InvocationId)
476503
* `0x78` - `x`
477504
* `0x79` - `y`
478505
* `0x7a` - `z`
@@ -489,13 +516,40 @@ is decoded as follows:
489516

490517
* `0x94` - 4-element array
491518
* `0x03` - `3` (Message Type - `Result` message)
492-
* `0xa3` - string of length 3 (Target)
519+
* `0xa3` - string of length 3 (InvocationId)
493520
* `0x78` - `x`
494521
* `0x79` - `y`
495522
* `0x7a` - `z`
496523
* `0x03` - `3` (ResultKind - Non-Void result)
497524
* `0x2a` - `42` (Result)
498525

526+
### CancelInvocation Message Encoding
527+
528+
`CancelInvocation` messages have the following structure
529+
530+
```
531+
[5, InvocationId]
532+
```
533+
534+
* `5` - Message Type - `5` indicates this is a `CancelInvocation` message
535+
* InvocationId - A `String` encoding the Invocation ID for the message
536+
537+
Example:
538+
539+
The following payload:
540+
```
541+
0x92 0x05 0xa3 0x78 0x79 0x7a
542+
```
543+
544+
is decoded as follows:
545+
546+
* `0x92` - 2-element array
547+
* `0x05` - `5` (Message Type `CancelInvocation` message)
548+
* `0xa3` - string of length 3 (InvocationId)
549+
* `0x78` - `x`
550+
* `0x79` - `y`
551+
* `0x7a` - `z`
552+
499553
## Protocol Buffers (ProtoBuf) Encoding
500554

501555
**Protobuf encoding is currently not implemented**

src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs

+38-11
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,35 @@ public void On(string methodName, Type[] parameterTypes, Func<object[], Task> ha
132132

133133
private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
134134
{
135-
var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var channel);
136-
await InvokeCore(methodName, irq, args, nonBlocking: false);
135+
var invokeCts = new CancellationTokenSource();
136+
var irq = InvocationRequest.Stream(invokeCts.Token, returnType, GetNextId(), _loggerFactory, this, out var channel);
137+
// After InvokeCore we don't want the irq cancellation token to be triggered.
138+
// The stream invocation will be canceled by the CancelInvocationMessage, connection closing, or channel finishing.
139+
using (cancellationToken.Register(token => ((CancellationTokenSource)token).Cancel(), invokeCts))
140+
{
141+
await InvokeCore(methodName, irq, args, nonBlocking: false);
142+
}
143+
144+
if (cancellationToken.CanBeCanceled)
145+
{
146+
cancellationToken.Register(state =>
147+
{
148+
var invocationReq = (InvocationRequest)state;
149+
if (!invocationReq.HubConnection._connectionActive.IsCancellationRequested)
150+
{
151+
// Fire and forget, if it fails that means we aren't connected anymore.
152+
_ = invocationReq.HubConnection.SendHubMessage(new CancelInvocationMessage(invocationReq.InvocationId), invocationReq);
153+
154+
if (invocationReq.HubConnection.TryRemoveInvocation(invocationReq.InvocationId, out _))
155+
{
156+
invocationReq.Complete(null);
157+
}
158+
159+
invocationReq.Dispose();
160+
}
161+
}, irq);
162+
}
163+
137164
return channel;
138165
}
139166

@@ -142,7 +169,7 @@ private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, T
142169

143170
private async Task<object> InvokeAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
144171
{
145-
var irq = InvocationRequest.Invoke(cancellationToken, returnType, GetNextId(), _loggerFactory, out var task);
172+
var irq = InvocationRequest.Invoke(cancellationToken, returnType, GetNextId(), _loggerFactory, this, out var task);
146173
await InvokeCore(methodName, irq, args, nonBlocking: false);
147174
return await task;
148175
}
@@ -152,7 +179,7 @@ private async Task<object> InvokeAsyncCore(string methodName, Type returnType, o
152179

153180
private Task SendAsyncCore(string methodName, object[] args, CancellationToken cancellationToken)
154181
{
155-
var irq = InvocationRequest.Invoke(cancellationToken, typeof(void), GetNextId(), _loggerFactory, out _);
182+
var irq = InvocationRequest.Invoke(cancellationToken, typeof(void), GetNextId(), _loggerFactory, this, out _);
156183
return InvokeCore(methodName, irq, args, nonBlocking: true);
157184
}
158185

@@ -184,24 +211,24 @@ private Task InvokeCore(string methodName, InvocationRequest irq, object[] args,
184211
_logger.IssueInvocation(invocationMessage.InvocationId, irq.ResultType.FullName, methodName, args);
185212

186213
// We don't need to wait for this to complete. It will signal back to the invocation request.
187-
return SendInvocation(invocationMessage, irq);
214+
return SendHubMessage(invocationMessage, irq);
188215
}
189216

190-
private async Task SendInvocation(InvocationMessage invocationMessage, InvocationRequest irq)
217+
private async Task SendHubMessage(HubMessage hubMessage, InvocationRequest irq)
191218
{
192219
try
193220
{
194-
var payload = _protocolReaderWriter.WriteMessage(invocationMessage);
195-
_logger.SendInvocation(invocationMessage.InvocationId);
221+
var payload = _protocolReaderWriter.WriteMessage(hubMessage);
222+
_logger.SendInvocation(hubMessage.InvocationId);
196223

197224
await _connection.SendAsync(payload, irq.CancellationToken);
198-
_logger.SendInvocationCompleted(invocationMessage.InvocationId);
225+
_logger.SendInvocationCompleted(hubMessage.InvocationId);
199226
}
200227
catch (Exception ex)
201228
{
202-
_logger.SendInvocationFailed(invocationMessage.InvocationId, ex);
229+
_logger.SendInvocationFailed(hubMessage.InvocationId, ex);
203230
irq.Fail(ex);
204-
TryRemoveInvocation(invocationMessage.InvocationId, out _);
231+
TryRemoveInvocation(hubMessage.InvocationId, out _);
205232
}
206233
}
207234

src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs

+13-10
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,33 @@ internal abstract class InvocationRequest : IDisposable
1919
public Type ResultType { get; }
2020
public CancellationToken CancellationToken { get; }
2121
public string InvocationId { get; }
22+
public HubConnection HubConnection { get; private set; }
2223

23-
protected InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILogger logger)
24+
protected InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILogger logger, HubConnection hubConnection)
2425
{
2526
_cancellationTokenRegistration = cancellationToken.Register(self => ((InvocationRequest)self).Cancel(), this);
2627

2728
InvocationId = invocationId;
2829
CancellationToken = cancellationToken;
2930
ResultType = resultType;
3031
Logger = logger;
32+
HubConnection = hubConnection;
3133

3234
Logger.InvocationCreated(InvocationId);
3335
}
3436

35-
public static InvocationRequest Invoke(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out Task<object> result)
37+
public static InvocationRequest Invoke(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, HubConnection hubConnection, out Task<object> result)
3638
{
37-
var req = new NonStreaming(cancellationToken, resultType, invocationId, loggerFactory);
39+
var req = new NonStreaming(cancellationToken, resultType, invocationId, loggerFactory, hubConnection);
3840
result = req.Result;
3941
return req;
4042
}
4143

4244

43-
public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out ReadableChannel<object> result)
45+
public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId,
46+
ILoggerFactory loggerFactory, HubConnection hubConnection, out ReadableChannel<object> result)
4447
{
45-
var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory);
48+
var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory, hubConnection);
4649
result = req.Result;
4750
return req;
4851
}
@@ -67,8 +70,8 @@ private class Streaming : InvocationRequest
6770
{
6871
private readonly Channel<object> _channel = Channel.CreateUnbounded<object>();
6972

70-
public Streaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
71-
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<Streaming>())
73+
public Streaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, HubConnection hubConnection)
74+
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<Streaming>(), hubConnection)
7275
{
7376
}
7477

@@ -115,16 +118,16 @@ public override async ValueTask<bool> StreamItem(object item)
115118

116119
protected override void Cancel()
117120
{
118-
_channel.Out.TryComplete(new OperationCanceledException("Connection terminated"));
121+
_channel.Out.TryComplete(new OperationCanceledException("Invocation terminated"));
119122
}
120123
}
121124

122125
private class NonStreaming : InvocationRequest
123126
{
124127
private readonly TaskCompletionSource<object> _completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
125128

126-
public NonStreaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
127-
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<NonStreaming>())
129+
public NonStreaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, HubConnection hubConnection)
130+
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<NonStreaming>(), hubConnection)
128131
{
129132
}
130133

0 commit comments

Comments
 (0)