Skip to content

Commit 2bee21b

Browse files
bigbearzhudarkl
authored andcommitted
fix intermittent OperationCancelledException on closing websocket connection (#177)
* fix build. * fix intermittent OperationCancelledException on closing websocket connection. * always close connection after RunAsync has finished. * Make sure token is not created multiple times as ObjectDisposedException could throw. * added ConnectionId to log messages. * do not forcefully close connection unless exception is caught. * capture any exception on CloseAsync. * Some corrections to OWIN/System.WebSockets implementation Code format Removed IsConnected property Code formatting Use CancellationToken.None Fixing if Checking for multiple disposes in AsyncWampConnection, checking for cancellation requests in WebSocketWrapperConnection
1 parent 13c9e63 commit 2bee21b

File tree

6 files changed

+151
-58
lines changed

6 files changed

+151
-58
lines changed

src/net45/Extensions/WampSharp.Owin/Owin/OwinWebSocketWrapper.cs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ internal class OwinWebSocketWrapper : IWebSocketWrapper
1313
private readonly Func<ArraySegment<byte>, int, bool, CancellationToken, Task> mSendAsync;
1414
private readonly Func<ArraySegment<byte>, CancellationToken, Task<Tuple<int, bool, int>>> mReceiveAsync;
1515
private readonly Func<int, string, CancellationToken, Task> mCloseAsync;
16+
private WebSocketState mState = WebSocketState.Open;
1617

1718
private const string WebSocketSendAsync = "websocket.SendAsync";
1819
private const string WebSocketReceiveAsync = "websocket.ReceiveAsync";
@@ -95,7 +96,29 @@ public async Task<WebSocketReceiveResult> ReceiveAsync
9596
await mReceiveAsync(arraySegment, callCancelled)
9697
.ConfigureAwait(false);
9798

98-
return new WebSocketReceiveResult(count: result.Item3, messageType: GetMessageType(result.Item1), endOfMessage: result.Item2);
99+
WebSocketMessageType webSocketMessageType = GetMessageType(result.Item1);
100+
101+
if (webSocketMessageType == WebSocketMessageType.Close)
102+
{
103+
ChangeState(actionDone: WebSocketState.CloseReceived,
104+
dualAction: WebSocketState.CloseSent);
105+
106+
return new WebSocketReceiveResult(count: result.Item3,
107+
messageType: webSocketMessageType,
108+
endOfMessage: result.Item2,
109+
closeStatus: this.ClientCloseStatus,
110+
closeStatusDescription: WebSocketClientCloseDescription);
111+
}
112+
113+
return new WebSocketReceiveResult(count: result.Item3, messageType: webSocketMessageType, endOfMessage: result.Item2);
114+
}
115+
116+
public WebSocketState State
117+
{
118+
get
119+
{
120+
return mState;
121+
}
99122
}
100123

101124
public Task SendAsync(ArraySegment<byte> data, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancel)
@@ -105,16 +128,21 @@ public Task SendAsync(ArraySegment<byte> data, WebSocketMessageType messageType,
105128

106129
public Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription, CancellationToken cancel)
107130
{
131+
ChangeState(actionDone: WebSocketState.CloseSent,
132+
dualAction: WebSocketState.CloseReceived);
133+
108134
return mCloseAsync((int) closeStatus, closeDescription, cancel);
109135
}
110136

111-
public bool IsConnected
137+
private void ChangeState(WebSocketState actionDone, WebSocketState dualAction)
112138
{
113-
get
139+
if (State == WebSocketState.Open)
114140
{
115-
WebSocketCloseStatus? closeStatus = ClientCloseStatus;
116-
117-
return ((closeStatus == null) || (closeStatus == 0));
141+
mState = actionDone;
142+
}
143+
else if (mState == dualAction)
144+
{
145+
mState = WebSocketState.Closed;
118146
}
119147
}
120148

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
{
1+
{
22
"frameworks": {
33
"net45": {}
44
},
55
"runtimes": {
66
"win": {}
7+
},
8+
"dependencies": {
9+
"Microsoft.Owin": "3.0.1"
710
}
811
}

src/net45/Extensions/WampSharp.WebSockets/WebSockets/WebSocketWrapperConnection.cs

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using WampSharp.Core.Listener;
77
using WampSharp.Core.Message;
8+
using WampSharp.Logging;
89
using WampSharp.V2.Authentication;
910
using WampSharp.V2.Binding.Parsers;
1011

@@ -16,15 +17,17 @@ public abstract class WebSocketWrapperConnection<TMessage> : AsyncWebSocketWampC
1617
{
1718
private readonly IWampStreamingMessageParser<TMessage> mParser;
1819
private readonly IWebSocketWrapper mWebSocket;
19-
private readonly CancellationTokenSource mCancellationTokenSource;
20+
private CancellationTokenSource mCancellationTokenSource;
2021
private readonly Uri mAddressUri;
22+
private CancellationToken mCancellationToken;
2123

2224
public WebSocketWrapperConnection(IWebSocketWrapper webSocket, IWampStreamingMessageParser<TMessage> parser, ICookieProvider cookieProvider, ICookieAuthenticatorFactory cookieAuthenticatorFactory) :
2325
base(cookieProvider, cookieAuthenticatorFactory)
2426
{
2527
mWebSocket = webSocket;
2628
mParser = parser;
2729
mCancellationTokenSource = new CancellationTokenSource();
30+
mCancellationToken = mCancellationTokenSource.Token;
2831
}
2932

3033
protected WebSocketWrapperConnection(IClientWebSocketWrapper clientWebSocket, Uri addressUri, string protocolName, IWampStreamingMessageParser<TMessage> parser) :
@@ -36,8 +39,9 @@ protected WebSocketWrapperConnection(IClientWebSocketWrapper clientWebSocket, Ur
3639

3740
protected override Task SendAsync(WampMessage<object> message)
3841
{
42+
mLogger.Debug("Attempting to send a message");
3943
ArraySegment<byte> messageToSend = GetMessageInBytes(message);
40-
return mWebSocket.SendAsync(messageToSend, WebSocketMessageType, true, mCancellationTokenSource.Token);
44+
return mWebSocket.SendAsync(messageToSend, WebSocketMessageType, true, mCancellationToken);
4145
}
4246

4347
protected abstract ArraySegment<byte> GetMessageInBytes(WampMessage<object> message);
@@ -48,12 +52,12 @@ protected async void Connect()
4852
{
4953
try
5054
{
51-
await this.ClientWebSocket.ConnectAsync(mAddressUri, mCancellationTokenSource.Token)
55+
await this.ClientWebSocket.ConnectAsync(mAddressUri, mCancellationToken)
5256
.ConfigureAwait(false);
5357

5458
RaiseConnectionOpen();
5559

56-
Task task = Task.Run((Func<Task>) this.RunAsync, mCancellationTokenSource.Token);
60+
Task task = Task.Run((Func<Task>) this.RunAsync, mCancellationToken);
5761
}
5862
catch (Exception ex)
5963
{
@@ -87,36 +91,13 @@ data is very small.
8791
MemoryStream memoryStream = new MemoryStream();
8892

8993
// Checks WebSocket state.
90-
while (mWebSocket.IsConnected)
94+
while (IsConnected && !mCancellationToken.IsCancellationRequested)
9195
{
9296
// Reads data.
93-
WebSocketReceiveResult webSocketReceiveResult;
97+
WebSocketReceiveResult webSocketReceiveResult =
98+
await ReadMessage(receivedDataBuffer, memoryStream);
9499

95-
long length = 0;
96-
do
97-
{
98-
webSocketReceiveResult =
99-
await mWebSocket.ReceiveAsync(receivedDataBuffer, mCancellationTokenSource.Token)
100-
.ConfigureAwait(false);
101-
102-
length += webSocketReceiveResult.Count;
103-
104-
await memoryStream.WriteAsync(receivedDataBuffer.Array, receivedDataBuffer.Offset,
105-
webSocketReceiveResult.Count, mCancellationTokenSource.Token)
106-
.ConfigureAwait(false);
107-
108-
} while (!webSocketReceiveResult.EndOfMessage);
109-
110-
// If input frame is cancelation frame, send close command.
111-
if (webSocketReceiveResult.MessageType == WebSocketMessageType.Close)
112-
{
113-
this.RaiseConnectionClosed();
114-
115-
await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
116-
String.Empty, mCancellationTokenSource.Token)
117-
.ConfigureAwait(false);
118-
}
119-
else
100+
if (webSocketReceiveResult.MessageType != WebSocketMessageType.Close)
120101
{
121102
memoryStream.Position = 0;
122103
OnNewMessage(memoryStream);
@@ -128,9 +109,62 @@ await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
128109
}
129110
catch (Exception ex)
130111
{
131-
RaiseConnectionError(ex);
132-
RaiseConnectionClosed();
112+
// Cancellation token could be cancelled in Dispose if a
113+
// Goodbye message has been received.
114+
if (!(ex is OperationCanceledException) ||
115+
!mCancellationToken.IsCancellationRequested)
116+
{
117+
RaiseConnectionError(ex);
118+
}
133119
}
120+
121+
if (mWebSocket.State != WebSocketState.CloseReceived &&
122+
mWebSocket.State != WebSocketState.Closed)
123+
{
124+
await CloseWebSocket().ConfigureAwait(false);
125+
}
126+
127+
RaiseConnectionClosed();
128+
}
129+
130+
private async Task CloseWebSocket()
131+
{
132+
try
133+
{
134+
await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
135+
String.Empty,
136+
CancellationToken.None)
137+
.ConfigureAwait(false);
138+
}
139+
catch (Exception ex)
140+
{
141+
mLogger.WarnException("Failed sending a close message to client", ex);
142+
}
143+
}
144+
145+
private async Task<WebSocketReceiveResult> ReadMessage(ArraySegment<byte> receivedDataBuffer, MemoryStream memoryStream)
146+
{
147+
WebSocketReceiveResult webSocketReceiveResult;
148+
149+
long length = 0;
150+
151+
do
152+
{
153+
webSocketReceiveResult =
154+
await mWebSocket.ReceiveAsync(receivedDataBuffer, mCancellationToken)
155+
.ConfigureAwait(false);
156+
157+
length += webSocketReceiveResult.Count;
158+
159+
await memoryStream.WriteAsync(receivedDataBuffer.Array,
160+
receivedDataBuffer.Offset,
161+
webSocketReceiveResult.Count,
162+
mCancellationToken)
163+
.ConfigureAwait(false);
164+
}
165+
while (!webSocketReceiveResult.EndOfMessage);
166+
167+
return webSocketReceiveResult;
134168
}
135169

136170
private void OnNewMessage(MemoryStream payloadData)
@@ -142,14 +176,16 @@ private void OnNewMessage(MemoryStream payloadData)
142176
protected override void Dispose()
143177
{
144178
mCancellationTokenSource.Cancel();
179+
mCancellationTokenSource.Dispose();
180+
mCancellationTokenSource = null;
145181
}
146182

147183
protected override bool IsConnected
148184
{
149185
get
150186
{
151-
return mWebSocket.IsConnected;
187+
return mWebSocket.State == WebSocketState.Open;
152188
}
153189
}
154190
}
155-
}
191+
}

src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/IWebSocketWrapper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace WampSharp.WebSockets
77
{
88
public interface IWebSocketWrapper
99
{
10-
bool IsConnected { get; }
10+
WebSocketState State { get; }
1111
Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> arraySegment, CancellationToken callCancelled);
1212
Task SendAsync(ArraySegment<byte> data, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancel);
1313
Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription, CancellationToken cancel);

src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/WebSocketWrapper.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,6 @@ public WebSocketWrapper(WebSocket webSocket)
1414
mWebSocket = webSocket;
1515
}
1616

17-
public bool IsConnected
18-
{
19-
get
20-
{
21-
return mWebSocket.State == WebSocketState.Open;
22-
}
23-
}
24-
2517
public Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> arraySegment, CancellationToken callCancelled)
2618
{
2719
return mWebSocket.ReceiveAsync(arraySegment, callCancelled);
@@ -36,5 +28,10 @@ public Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription
3628
{
3729
return mWebSocket.CloseAsync(closeStatus, closeDescription, cancel);
3830
}
31+
32+
public WebSocketState State
33+
{
34+
get { return mWebSocket.State; }
35+
}
3936
}
4037
}

src/net45/WampSharp/Core/Listener/Connections/AsyncConnection/AsyncWampConnection.cs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ public abstract class AsyncWampConnection<TMessage> : IWampConnection<TMessage>,
1212
{
1313
private readonly ActionBlock<WampMessage<object>> mSendBlock;
1414
protected readonly ILog mLogger;
15+
private int mDisposeCalled = 0;
1516

1617
protected AsyncWampConnection()
1718
{
18-
mLogger = LogProvider.GetLogger(this.GetType());
19+
mLogger = new LoggerWithConnectionId(LogProvider.GetLogger(this.GetType()));
1920
mSendBlock = new ActionBlock<WampMessage<object>>(x => InnerSend(x));
2021
}
2122

@@ -112,6 +113,7 @@ protected virtual void RaiseMessageArrived(WampMessage<TMessage> message)
112113

113114
protected virtual void RaiseConnectionClosed()
114115
{
116+
mLogger.Debug("Connection has been closed");
115117
var handler = ConnectionClosed;
116118
if (handler != null) handler(this, EventArgs.Empty);
117119
}
@@ -125,9 +127,12 @@ protected virtual void RaiseConnectionError(Exception ex)
125127

126128
void IDisposable.Dispose()
127129
{
128-
mSendBlock.Complete();
129-
mSendBlock.Completion.Wait();
130-
this.Dispose();
130+
if (Interlocked.CompareExchange(ref mDisposeCalled, 1, 0) == 0)
131+
{
132+
mSendBlock.Complete();
133+
mSendBlock.Completion.Wait();
134+
this.Dispose();
135+
}
131136
}
132137

133138
protected abstract void Dispose();
@@ -136,9 +141,12 @@ void IDisposable.Dispose()
136141

137142
async Task IAsyncDisposable.DisposeAsync()
138143
{
139-
mSendBlock.Complete();
140-
await mSendBlock.Completion;
141-
this.Dispose();
144+
if (Interlocked.CompareExchange(ref mDisposeCalled, 1, 0) == 0)
145+
{
146+
mSendBlock.Complete();
147+
await mSendBlock.Completion;
148+
this.Dispose();
149+
}
142150
}
143151

144152
#else
@@ -151,5 +159,26 @@ Task IAsyncDisposable.DisposeAsync()
151159

152160
#endif
153161

162+
// TODO: move this to another file (after making it more generic)
163+
// TODO: or get rid of this.
164+
private class LoggerWithConnectionId : ILog
165+
{
166+
private readonly ILog mLogger;
167+
private readonly string mConnectionId;
168+
169+
public LoggerWithConnectionId(ILog logger)
170+
{
171+
mConnectionId = Guid.NewGuid().ToString();
172+
mLogger = logger;
173+
}
174+
175+
public bool Log(LogLevel logLevel, Func<string> messageFunc, Exception exception = null, params object[] formatParameters)
176+
{
177+
using (LogProvider.OpenMappedContext("ConncetionId", mConnectionId))
178+
{
179+
return mLogger.Log(logLevel, messageFunc, exception, formatParameters);
180+
}
181+
}
182+
}
154183
}
155184
}

0 commit comments

Comments
 (0)