Skip to content

Commit d0c8b35

Browse files
committed
fix stream reconnect logic
1 parent 74c4d5c commit d0c8b35

File tree

3 files changed

+144
-78
lines changed

3 files changed

+144
-78
lines changed

src/LaunchDarkly.EventSource/EventSource.cs

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ public class EventSource : IEventSource
2525
private string _eventName = Constants.MessageField;
2626
private string _lastEventId;
2727
private TimeSpan _retryDelay;
28-
private CancellationTokenSource _pendingRequest;
2928
private readonly ExponentialBackoffWithDecorrelation _backOff;
29+
private CancellationTokenSource _currentRequestToken;
30+
private ReadyState _readyState;
3031

3132
#endregion
3233

@@ -65,8 +66,20 @@ public class EventSource : IEventSource
6566
/// </value>
6667
public ReadyState ReadyState
6768
{
68-
get;
69-
private set;
69+
get
70+
{
71+
lock(this)
72+
{
73+
return _readyState;
74+
}
75+
}
76+
private set
77+
{
78+
lock(this)
79+
{
80+
_readyState = value;
81+
}
82+
}
7083
}
7184

7285
internal TimeSpan BackOffDelay
@@ -88,14 +101,12 @@ internal TimeSpan BackOffDelay
88101
/// configuration</exception>
89102
public EventSource(Configuration configuration)
90103
{
91-
ReadyState = ReadyState.Raw;
104+
_readyState = ReadyState.Raw;
92105

93106
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
94107

95108
_logger = _configuration.Logger ?? LogManager.GetLogger(typeof(EventSource));
96109

97-
_pendingRequest = new CancellationTokenSource();
98-
99110
_retryDelay = _configuration.DelayRetryDuration;
100111

101112
_backOff = new ExponentialBackoffWithDecorrelation(_retryDelay,
@@ -114,14 +125,22 @@ public EventSource(Configuration configuration)
114125
/// <exception cref="InvalidOperationException">The method was called after the connection <see cref="ReadyState"/> was Open or Connecting.</exception>
115126
public async Task StartAsync()
116127
{
117-
var cancellationToken = _pendingRequest.Token;
118-
while (!cancellationToken.IsCancellationRequested)
128+
while (ReadyState != ReadyState.Shutdown)
119129
{
120130
await MaybeWaitWithBackOff();
121131
try
122132
{
123-
await ConnectToEventSourceAsync(cancellationToken);
124-
_backOff.ResetReconnectAttemptCount();
133+
var newRequestTokenSource = new CancellationTokenSource();
134+
lock (this)
135+
{
136+
if (_readyState == ReadyState.Shutdown)
137+
{
138+
// in case Close() was called in between the previous ReadyState check and the creation of the new token
139+
return;
140+
}
141+
_currentRequestToken = newRequestTokenSource;
142+
}
143+
await ConnectToEventSourceAsync(newRequestTokenSource.Token);
125144
}
126145
catch (Exception e)
127146
{
@@ -152,21 +171,24 @@ public void Close()
152171
if (ReadyState == ReadyState.Raw || ReadyState == ReadyState.Shutdown) return;
153172

154173
Close(ReadyState.Shutdown);
155-
156-
// Cancel token to cancel requests.
157-
CancelToken();
158-
174+
CancelCurrentRequest();
159175
}
160176

161177
#endregion
162178

163179
#region Private Methods
164180

165-
private void CancelToken()
181+
private void CancelCurrentRequest()
166182
{
167-
CancellationTokenSource cancellationTokenSource = Interlocked.Exchange(ref _pendingRequest, new CancellationTokenSource());
168-
cancellationTokenSource.Cancel();
169-
cancellationTokenSource.Dispose();
183+
CancellationTokenSource requestTokenSource = null;
184+
lock (this)
185+
{
186+
requestTokenSource = _currentRequestToken;
187+
}
188+
if (requestTokenSource != null)
189+
{
190+
requestTokenSource.Cancel();
191+
}
170192
}
171193

172194
internal virtual EventSourceService GetEventSourceService(Configuration configuration)
@@ -189,7 +211,10 @@ private async Task ConnectToEventSourceAsync(CancellationToken cancellationToken
189211

190212
var svc = GetEventSourceService(_configuration);
191213

192-
svc.ConnectionOpened += (o, e) => { SetReadyState(ReadyState.Open, OnOpened); };
214+
svc.ConnectionOpened += (o, e) => {
215+
_backOff.ResetReconnectAttemptCount();
216+
SetReadyState(ReadyState.Open, OnOpened);
217+
};
193218
svc.ConnectionClosed += (o, e) => { SetReadyState(ReadyState.Closed, OnClosed); };
194219

195220
await svc.GetDataAsync(
@@ -199,7 +224,7 @@ await svc.GetDataAsync(
199224
}
200225
catch (EventSourceServiceCancelledException e)
201226
{
202-
CancelToken();
227+
CancelCurrentRequest();
203228

204229
CloseAndRaiseError(e);
205230
}
@@ -212,7 +237,6 @@ await svc.GetDataAsync(
212237

213238
throw;
214239
}
215-
216240
}
217241
}
218242

@@ -252,12 +276,19 @@ private void ProcessResponseContent(string content)
252276

253277
private void SetReadyState(ReadyState state, Action<StateChangedEventArgs> action = null)
254278
{
255-
if (ReadyState == state) return;
256-
257-
ReadyState = state;
279+
lock (this)
280+
{
281+
if (_readyState == state || _readyState == ReadyState.Shutdown)
282+
{
283+
return;
284+
}
285+
_readyState = state;
286+
}
258287

259-
if (action != null)
260-
action(new StateChangedEventArgs(ReadyState));
288+
if (action != null)
289+
{
290+
action(new StateChangedEventArgs(state));
291+
}
261292
}
262293

263294
private void ProcessField(string field, string value)

src/LaunchDarkly.EventSource/EventSourceService.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public async Task GetDataAsync(Action<string> processResponse, CancellationToken
6363
cancellationToken.ThrowIfCancellationRequested();
6464

6565
await ConnectToEventSourceApi(processResponse, cancellationToken);
66-
6766
}
6867

6968
#endregion

test/LaunchDarkly.EventSource.Tests/EventSourceTests.cs

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -353,18 +353,15 @@ public async Task Given_content_type_not_equal_to_eventstream_when_the_http_resp
353353

354354
var evt = new EventSource(config);
355355

356-
var wasErrorEventRaised = false;
357-
evt.Error += (s, e) =>
358-
{
359-
wasErrorEventRaised = true;
360-
};
356+
var receiver = new ErrorReceiver(evt);
361357

362358
//// Act
363359
await evt.StartAsync();
364360

365361
//// Assert
366-
Assert.True(wasErrorEventRaised);
367-
Assert.True(evt.ReadyState == ReadyState.Closed);
362+
Assert.NotNull(receiver.ErrorReceived);
363+
Assert.Equal(ReadyState.Closed, receiver.SourceStateReceived);
364+
Assert.Equal(ReadyState.Shutdown, evt.ReadyState);
368365
}
369366

370367
[Fact]
@@ -377,21 +374,19 @@ public async Task Given_204_when_the_http_response_is_received_then_error_event_
377374

378375
var evt = new EventSource(new Configuration(_uri, handler));
379376

377+
var receiver = new ErrorReceiver(evt);
378+
380379
//Act
381-
var raisedEvent = await Assert.RaisesAsync<ExceptionEventArgs>(
382-
h => evt.Error += h,
383-
h => evt.Error -= h,
384-
() => evt.StartAsync());
380+
await evt.StartAsync();
385381

386382
//// Assert
387-
Assert.NotNull(raisedEvent);
388-
Assert.Equal(evt, raisedEvent.Sender);
389-
Assert.IsType<EventSourceServiceUnsuccessfulResponseException>(raisedEvent.Arguments.Exception);
390-
Assert.Equal(204, ((EventSourceServiceUnsuccessfulResponseException)raisedEvent.Arguments.Exception).StatusCode);
391-
Assert.True(evt.ReadyState == ReadyState.Closed);
383+
Assert.NotNull(receiver.ErrorReceived);
384+
var ex = Assert.IsType<EventSourceServiceUnsuccessfulResponseException>(receiver.ErrorReceived);
385+
Assert.Equal(204, ex.StatusCode);
386+
Assert.Equal(ReadyState.Closed, receiver.SourceStateReceived);
387+
Assert.Equal(ReadyState.Shutdown, evt.ReadyState);
392388
}
393389

394-
395390
[Theory]
396391
[InlineData(HttpStatusCode.InternalServerError)]
397392
[InlineData(HttpStatusCode.BadRequest)]
@@ -406,18 +401,16 @@ public async Task Given_status_code_when_the_http_response_is_received_then_erro
406401

407402
var evt = new EventSource(new Configuration(_uri, handler));
408403

409-
//Act
410-
var raisedEvent = await Assert.RaisesAsync<ExceptionEventArgs>(
411-
h => evt.Error += h,
412-
h => evt.Error -= h,
413-
() => evt.StartAsync());
404+
ErrorReceiver receiver = new ErrorReceiver(evt);
405+
406+
await evt.StartAsync();
414407

415408
//// Assert
416-
Assert.NotNull(raisedEvent);
417-
Assert.Equal(evt, raisedEvent.Sender);
418-
Assert.IsType<EventSourceServiceUnsuccessfulResponseException>(raisedEvent.Arguments.Exception);
419-
Assert.Equal((int)statusCode, ((EventSourceServiceUnsuccessfulResponseException)raisedEvent.Arguments.Exception).StatusCode);
420-
Assert.True(evt.ReadyState == ReadyState.Closed);
409+
Assert.NotNull(receiver.ErrorReceived);
410+
var ex = Assert.IsType<EventSourceServiceUnsuccessfulResponseException>(receiver.ErrorReceived);
411+
Assert.Equal((int)statusCode, ex.StatusCode);
412+
Assert.Equal(ReadyState.Closed, receiver.SourceStateReceived);
413+
Assert.Equal(ReadyState.Shutdown, evt.ReadyState);
421414
}
422415

423416
[Fact]
@@ -426,7 +419,8 @@ public async Task Given_bad_http_responses_then_retry_delay_durations_should_be_
426419
// Arrange
427420
var handler = new StubMessageHandler();
428421

429-
for (int i = 0; i < 2; i++)
422+
var nAttempts = 2;
423+
for (int i = 0; i < nAttempts; i++)
430424
{
431425
var response = new HttpResponseMessageWithError();
432426

@@ -436,7 +430,6 @@ public async Task Given_bad_http_responses_then_retry_delay_durations_should_be_
436430
"text/event-stream");
437431

438432
handler.QueueResponse(response);
439-
440433
}
441434

442435
handler.QueueResponse(new HttpResponseMessage(HttpStatusCode.NoContent));
@@ -447,14 +440,18 @@ public async Task Given_bad_http_responses_then_retry_delay_durations_should_be_
447440
evt.Error += (_, e) =>
448441
{
449442
backoffs.Add(evt.BackOffDelay);
443+
if (backoffs.Count >= nAttempts)
444+
{
445+
evt.Close();
446+
}
450447
};
451448

452449
//Act
453450
await evt.StartAsync();
454451

455452
//// Assert
456453
Assert.NotEmpty(backoffs);
457-
Assert.True(backoffs.Distinct().Count() == backoffs.Count());
454+
Assert.Equal(backoffs.Distinct().Count(), backoffs.Count());
458455
}
459456

460457

@@ -471,25 +468,14 @@ public async Task When_response_exceeds_read_timeout_then_read_timeout_exception
471468

472469
var evt = new StubEventSource(new Configuration(_uri, handler, readTimeout: readTimeout), (int)timeout.TotalMilliseconds);
473470

474-
var exceptionMessage = string.Empty;
475-
476-
try
477-
{
478-
479-
evt.Error += (_, e) =>
480-
{
481-
exceptionMessage = e.Exception.Message;
482-
evt.Close();
483-
};
484-
485-
await evt.StartAsync();
486-
487-
}
488-
catch (TaskCanceledException) {}
471+
var receiver = new ErrorReceiver(evt);
489472

490-
Assert.Contains(exceptionMessage, Resources.EventSourceService_Read_Timeout);
491-
Assert.True(evt.ReadyState == ReadyState.Shutdown);
473+
await evt.StartAsync();
492474

475+
Assert.NotNull(receiver.ErrorReceived);
476+
Assert.Contains(receiver.ErrorReceived.Message, Resources.EventSourceService_Read_Timeout);
477+
Assert.Equal(ReadyState.Closed, receiver.SourceStateReceived);
478+
Assert.Equal(ReadyState.Shutdown, evt.ReadyState);
493479
}
494480

495481
[Fact]
@@ -520,7 +506,36 @@ public async Task When_response_does_not_exceed_read_timeout_then_expected_messa
520506

521507
Assert.Equal("put", eventName);
522508
Assert.True(wasMessageReceivedEventRaised);
509+
}
510+
511+
[Fact]
512+
public async Task When_server_returns_HTTP_error_a_reconnect_attempt_is_made()
513+
{
514+
var messageData = "hello";
523515

516+
var handler = new StubMessageHandler();
517+
handler.QueueResponse(new HttpResponseMessage(HttpStatusCode.Unauthorized));
518+
handler.QueueStringResponse("event: put\ndata: " + messageData + "\n\n");
519+
520+
var evt = new EventSource(new Configuration(_uri, handler));
521+
522+
ErrorReceiver receiver = new ErrorReceiver(evt);
523+
receiver.CloseEventSourceOnError = false;
524+
525+
string messageReceived = null;
526+
evt.MessageReceived += (_, e) =>
527+
{
528+
messageReceived = e.Message.Data;
529+
evt.Close();
530+
};
531+
532+
await evt.StartAsync();
533+
534+
Assert.Equal(2, handler.GetRequests().Count());
535+
Assert.NotNull(receiver.ErrorReceived);
536+
var ex = Assert.IsType<EventSourceServiceUnsuccessfulResponseException>(receiver.ErrorReceived);
537+
Assert.Equal((int)HttpStatusCode.Unauthorized, ex.StatusCode);
538+
Assert.Equal(messageData, messageReceived);
524539
}
525540

526541
[Fact]
@@ -531,14 +546,35 @@ public async Task When_error_handler_closes_event_source_no_reconnect_attempt_is
531546

532547
var evt = new EventSource(new Configuration(_uri, handler));
533548

534-
evt.Error += (_, e) =>
535-
{
536-
evt.Close();
537-
};
549+
var receiver = new ErrorReceiver(evt);
538550

539551
await evt.StartAsync();
540552

541553
Assert.Equal(1, handler.GetRequests().Count());
542554
}
543555
}
556+
557+
class ErrorReceiver
558+
{
559+
public bool CloseEventSourceOnError = true;
560+
public Exception ErrorReceived = null;
561+
public ReadyState SourceStateReceived;
562+
private readonly EventSource _source;
563+
564+
public ErrorReceiver(EventSource source)
565+
{
566+
_source = source;
567+
source.Error += HandleError;
568+
}
569+
570+
public void HandleError(object sender, ExceptionEventArgs e)
571+
{
572+
ErrorReceived = e.Exception;
573+
SourceStateReceived = _source.ReadyState;
574+
if (CloseEventSourceOnError)
575+
{
576+
_source.Close();
577+
}
578+
}
579+
}
544580
}

0 commit comments

Comments
 (0)