@@ -25,8 +25,9 @@ public class EventSource : IEventSource
2525 private string _eventName = Constants . MessageField ;
2626 private string _lastEventId ;
2727 private TimeSpan _retryDelay ;
28- private CancellationTokenSource _pendingRequest ;
2928 private readonly ExponentialBackoffWithDecorrelation _backOff ;
29+ private CancellationTokenSource _currentRequestToken ;
30+ private ReadyState _readyState ;
3031
3132 #endregion
3233
@@ -65,8 +66,20 @@ public class EventSource : IEventSource
6566 /// </value>
6667 public ReadyState ReadyState
6768 {
68- get ;
69- private set ;
69+ get
70+ {
71+ lock ( this )
72+ {
73+ return _readyState ;
74+ }
75+ }
76+ private set
77+ {
78+ lock ( this )
79+ {
80+ _readyState = value ;
81+ }
82+ }
7083 }
7184
7285 internal TimeSpan BackOffDelay
@@ -88,14 +101,12 @@ internal TimeSpan BackOffDelay
88101 /// configuration</exception>
89102 public EventSource ( Configuration configuration )
90103 {
91- ReadyState = ReadyState . Raw ;
104+ _readyState = ReadyState . Raw ;
92105
93106 _configuration = configuration ?? throw new ArgumentNullException ( nameof ( configuration ) ) ;
94107
95108 _logger = _configuration . Logger ?? LogManager . GetLogger ( typeof ( EventSource ) ) ;
96109
97- _pendingRequest = new CancellationTokenSource ( ) ;
98-
99110 _retryDelay = _configuration . DelayRetryDuration ;
100111
101112 _backOff = new ExponentialBackoffWithDecorrelation ( _retryDelay ,
@@ -114,14 +125,22 @@ public EventSource(Configuration configuration)
114125 /// <exception cref="InvalidOperationException">The method was called after the connection <see cref="ReadyState"/> was Open or Connecting.</exception>
115126 public async Task StartAsync ( )
116127 {
117- var cancellationToken = _pendingRequest . Token ;
118- while ( ! cancellationToken . IsCancellationRequested )
128+ while ( ReadyState != ReadyState . Shutdown )
119129 {
120130 await MaybeWaitWithBackOff ( ) ;
121131 try
122132 {
123- await ConnectToEventSourceAsync ( cancellationToken ) ;
124- _backOff . ResetReconnectAttemptCount ( ) ;
133+ var newRequestTokenSource = new CancellationTokenSource ( ) ;
134+ lock ( this )
135+ {
136+ if ( _readyState == ReadyState . Shutdown )
137+ {
138+ // in case Close() was called in between the previous ReadyState check and the creation of the new token
139+ return ;
140+ }
141+ _currentRequestToken = newRequestTokenSource ;
142+ }
143+ await ConnectToEventSourceAsync ( newRequestTokenSource . Token ) ;
125144 }
126145 catch ( Exception e )
127146 {
@@ -152,21 +171,24 @@ public void Close()
152171 if ( ReadyState == ReadyState . Raw || ReadyState == ReadyState . Shutdown ) return ;
153172
154173 Close ( ReadyState . Shutdown ) ;
155-
156- // Cancel token to cancel requests.
157- CancelToken ( ) ;
158-
174+ CancelCurrentRequest ( ) ;
159175 }
160176
161177 #endregion
162178
163179 #region Private Methods
164180
165- private void CancelToken ( )
181+ private void CancelCurrentRequest ( )
166182 {
167- CancellationTokenSource cancellationTokenSource = Interlocked . Exchange ( ref _pendingRequest , new CancellationTokenSource ( ) ) ;
168- cancellationTokenSource . Cancel ( ) ;
169- cancellationTokenSource . Dispose ( ) ;
183+ CancellationTokenSource requestTokenSource = null ;
184+ lock ( this )
185+ {
186+ requestTokenSource = _currentRequestToken ;
187+ }
188+ if ( requestTokenSource != null )
189+ {
190+ requestTokenSource . Cancel ( ) ;
191+ }
170192 }
171193
172194 internal virtual EventSourceService GetEventSourceService ( Configuration configuration )
@@ -189,7 +211,10 @@ private async Task ConnectToEventSourceAsync(CancellationToken cancellationToken
189211
190212 var svc = GetEventSourceService ( _configuration ) ;
191213
192- svc . ConnectionOpened += ( o , e ) => { SetReadyState ( ReadyState . Open , OnOpened ) ; } ;
214+ svc . ConnectionOpened += ( o , e ) => {
215+ _backOff . ResetReconnectAttemptCount ( ) ;
216+ SetReadyState ( ReadyState . Open , OnOpened ) ;
217+ } ;
193218 svc . ConnectionClosed += ( o , e ) => { SetReadyState ( ReadyState . Closed , OnClosed ) ; } ;
194219
195220 await svc . GetDataAsync (
@@ -199,7 +224,7 @@ await svc.GetDataAsync(
199224 }
200225 catch ( EventSourceServiceCancelledException e )
201226 {
202- CancelToken ( ) ;
227+ CancelCurrentRequest ( ) ;
203228
204229 CloseAndRaiseError ( e ) ;
205230 }
@@ -212,7 +237,6 @@ await svc.GetDataAsync(
212237
213238 throw ;
214239 }
215-
216240 }
217241 }
218242
@@ -252,12 +276,19 @@ private void ProcessResponseContent(string content)
252276
253277 private void SetReadyState ( ReadyState state , Action < StateChangedEventArgs > action = null )
254278 {
255- if ( ReadyState == state ) return ;
256-
257- ReadyState = state ;
279+ lock ( this )
280+ {
281+ if ( _readyState == state || _readyState == ReadyState . Shutdown )
282+ {
283+ return ;
284+ }
285+ _readyState = state ;
286+ }
258287
259- if ( action != null )
260- action ( new StateChangedEventArgs ( ReadyState ) ) ;
288+ if ( action != null )
289+ {
290+ action ( new StateChangedEventArgs ( state ) ) ;
291+ }
261292 }
262293
263294 private void ProcessField ( string field , string value )
0 commit comments