Skip to content

Commit f1f54e7

Browse files
author
Johan 't Hart
committed
Add possibility for callee instance to return an observable
#238
1 parent fe62d3e commit f1f54e7

File tree

5 files changed

+163
-18
lines changed

5 files changed

+163
-18
lines changed

src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,57 @@ public async Task ProgressiveCallsCallerProgress()
4141
Assert.That(callback.Task.Result, Is.EqualTo(10));
4242
}
4343

44+
[Test]
45+
public async Task ProgressiveCallsCallerProgressObservable()
46+
{
47+
WampPlayground playground = new WampPlayground();
48+
49+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
50+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
51+
IWampChannel callerChannel = dualChannel.CallerChannel;
52+
53+
var service = new LongOpObsService();
54+
await calleeChannel.RealmProxy.Services.RegisterCallee(service);
55+
56+
MyCallback callback = new MyCallback();
57+
58+
callerChannel.RealmProxy.RpcCatalog.Invoke
59+
(callback,
60+
new CallOptions() { ReceiveProgress = true },
61+
"com.myapp.longop",
62+
new object[] { 10, false });
63+
64+
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called));
65+
Assert.That(callback.Task.Result, Is.EqualTo(-1));
66+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
67+
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Completed));
68+
}
69+
70+
[Test]
71+
public async Task ProgressiveCallsCallerProgressCancelObservable()
72+
{
73+
WampPlayground playground = new WampPlayground();
74+
75+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
76+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
77+
IWampChannel callerChannel = dualChannel.CallerChannel;
78+
79+
var service = new LongOpObsService();
80+
await calleeChannel.RealmProxy.Services.RegisterCallee(service);
81+
82+
MyCallback callback = new MyCallback();
83+
84+
var invocation = callerChannel.RealmProxy.RpcCatalog.Invoke
85+
(callback,
86+
new CallOptions() { ReceiveProgress = true },
87+
"com.myapp.longop",
88+
new object[] { 10, false });
89+
90+
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called));
91+
invocation.Cancel(new CancelOptions());
92+
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Cancelled));
93+
}
94+
4495
[Test]
4596
public async Task ProgressiveCallsCalleeProxyProgress()
4697
{
@@ -180,19 +231,35 @@ public interface ILongOpObsService
180231

181232
public class LongOpObsService : ILongOpObsService
182233
{
183-
public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async obs =>
234+
public enum EState
235+
{
236+
Nothing,
237+
Called,
238+
Completed,
239+
Cancelled
240+
}
241+
242+
public EState State { get; set; } = EState.Nothing;
243+
244+
public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async (obs, ct) =>
184245
{
246+
State = EState.Called;
247+
ct.Register(() =>
248+
{
249+
if (State == EState.Called)
250+
State = EState.Cancelled;
251+
});
185252
for (int i = 0; i < n; i++)
186253
{
187254
obs.OnNext(i);
188-
await Task.Delay(100);
255+
await Task.Delay(100, ct);
256+
ct.ThrowIfCancellationRequested();
189257
}
258+
State = EState.Completed;
190259
if (endWithError)
191260
obs.OnError(new WampException("wamp.error", "Something bad happened"));
192261
else
193262
obs.OnCompleted();
194-
195-
return Disposable.Empty;
196263
});
197264
}
198265

@@ -206,7 +273,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback
206273

207274
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details)
208275
{
209-
throw new NotImplementedException();
276+
mTask.SetResult(-1); // -1 indicates no final return value
210277
}
211278

212279
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details, TMessage[] arguments)

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption
7777
{
7878
caller.Result(ObjectFormatter, options, arguments, argumentKeywords);
7979
}
80-
else if (!this.HasResult)
80+
else if (!this.HasResult || arguments == null)
8181
{
8282
caller.Result(ObjectFormatter, options);
8383
}
@@ -93,7 +93,7 @@ protected IEnumerable<object> UnpackParameters<TMessage>(IWampFormatter<TMessage
9393
{
9494
ArgumentUnpacker unpacker = new ArgumentUnpacker(Parameters);
9595

96-
IEnumerable<object> result =
96+
IEnumerable<object> result =
9797
unpacker.UnpackParameters(formatter, arguments, argumentsKeywords);
9898

9999
return result;

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ protected IWampRpcOperation CreateRpcMethod(Func<object> instanceProvider, ICall
5757
string procedureUri =
5858
interceptor.GetProcedureUri(method);
5959

60-
if (!typeof (Task).IsAssignableFrom(method.ReturnType))
60+
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>))
61+
{
62+
return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri);
63+
}
64+
else if (!typeof (Task).IsAssignableFrom(method.ReturnType))
6165
{
6266
MethodInfoValidation.ValidateSyncMethod(method);
6367
return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri);
@@ -97,5 +101,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func<object> instanc
97101

98102
return operation;
99103
}
104+
105+
private static IWampRpcOperation CreateProgressiveObservableOperation(Func<object> instanceProvider, MethodInfo method, string procedureUri)
106+
{
107+
//return new ProgressiveObservableMethodInfoRpcOperation<returnType>
108+
// (instance, method, procedureUri);
109+
110+
Type returnType = method.ReturnType.GetGenericArguments()[0];
111+
112+
Type operationType =
113+
typeof(ProgressiveObservableMethodInfoRpcOperation<>)
114+
.MakeGenericType(returnType);
115+
116+
IWampRpcOperation operation =
117+
(IWampRpcOperation)Activator.CreateInstance(operationType,
118+
instanceProvider,
119+
method,
120+
procedureUri);
121+
122+
return operation;
123+
}
100124
}
101-
}
125+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Reflection;
5+
using System.Threading;
6+
using WampSharp.Core.Utilities;
7+
using WampSharp.Core.Serialization;
8+
using WampSharp.V2.Core.Contracts;
9+
10+
namespace WampSharp.V2.Rpc
11+
{
12+
public class ProgressiveObservableMethodInfoRpcOperation<T> : SyncMethodInfoRpcOperation
13+
{
14+
public ProgressiveObservableMethodInfoRpcOperation(Func<object> instanceProvider, MethodInfo method, string procedureName) :
15+
base(instanceProvider, method, procedureName)
16+
{
17+
}
18+
19+
protected override IWampCancellableInvocation OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs)
20+
{
21+
var ctSource = new CancellationTokenSource();
22+
((IObservable<T>)result).Subscribe(
23+
it => CallResult(caller, it, outputs, new YieldOptions { Progress = true }),
24+
ex =>
25+
{
26+
if (ex is WampException wampex)
27+
HandleException(caller, wampex);
28+
else
29+
HandleException(caller, ex);
30+
},
31+
// An observable does not emit any value when completing, so result without arguments
32+
() => caller.Result(ObjectFormatter, new YieldOptions()),
33+
ctSource.Token
34+
);
35+
return new CancellationTokenSourceInvocation(ctSource);
36+
}
37+
}
38+
}

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,46 @@ protected override IWampCancellableInvocation InnerInvoke<TMessage>(IWampRawRpcO
2929
argumentsKeywords,
3030
out IDictionary<string, object> outputs);
3131

32-
CallResult(caller, result, outputs);
32+
return OnResult(caller, result, outputs);
3333
}
3434
catch (WampException ex)
3535
{
36-
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
37-
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
38-
callback.Error(ex);
36+
HandleException(caller, ex);
3937
}
4038
catch (Exception ex)
4139
{
42-
WampException wampException = ConvertExceptionToRuntimeException(ex);
43-
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
44-
callback.Error(wampException);
40+
HandleException(caller, ex);
4541
}
4642

4743
return null;
4844
}
4945

46+
protected void HandleException(IWampRawRpcOperationRouterCallback caller, WampException ex)
47+
{
48+
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
49+
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
50+
callback.Error(ex);
51+
}
52+
53+
protected void HandleException(IWampRawRpcOperationRouterCallback caller, Exception ex)
54+
{
55+
WampException wampException = ConvertExceptionToRuntimeException(ex);
56+
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
57+
callback.Error(wampException);
58+
}
59+
60+
protected virtual IWampCancellableInvocation OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs)
61+
{
62+
CallResult(caller, result, outputs);
63+
return null;
64+
}
65+
5066
protected void CallResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs, YieldOptions yieldOptions = null)
5167
{
5268
yieldOptions = yieldOptions ?? new YieldOptions();
5369
object[] resultArguments = GetResultArguments(result);
5470

55-
IDictionary<string, object> argumentKeywords =
71+
IDictionary<string, object> argumentKeywords =
5672
GetResultArgumentKeywords(result, outputs);
5773

5874
CallResult(caller,
@@ -69,4 +85,4 @@ protected virtual IDictionary<string, object> GetResultArgumentKeywords(object r
6985
protected abstract object InvokeSync<TMessage>
7086
(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter, InvocationDetails details, TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords, out IDictionary<string, object> outputs);
7187
}
72-
}
88+
}

0 commit comments

Comments
 (0)