@@ -16,10 +16,12 @@ public class Server
16
16
{
17
17
private readonly ServerBuilder _builder ;
18
18
private readonly ILogger < Server > _logger ;
19
- private readonly ConcurrentDictionary < EndPoint , RunningListener > _listeners = new ConcurrentDictionary < EndPoint , RunningListener > ( ) ;
19
+ private readonly Dictionary < EndPoint , RunningListener > _listeners = new Dictionary < EndPoint , RunningListener > ( ) ;
20
20
private readonly TaskCompletionSource < object > _shutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
21
21
private readonly TimerAwaitable _timerAwaitable ;
22
+ private readonly SemaphoreSlim _listenerSemaphore = new SemaphoreSlim ( initialCount : 1 ) ;
22
23
private Task _timerTask = Task . CompletedTask ;
24
+ private int _stopping ;
23
25
24
26
internal Server ( ServerBuilder builder )
25
27
{
@@ -75,59 +77,55 @@ private async Task StartTimerAsync()
75
77
76
78
public async Task StopAsync ( CancellationToken cancellationToken = default )
77
79
{
78
- // TODO: Svar: Oh, you're right! Didn't think about that :) I don't often come across areas where I have to think about thread safety in my daily work, but I've given it a go here. Let me know if there are any gotchas or something special to look out for...
79
- // TODO: Question: When stopping the server, do we need to prevent new listeners from being added?
80
- // TODO: Question: The "take values and clear" isn't an atomic operation. Is that ok?
81
- // TODO: Question: Should Add/RemoveSocketListenerAsync take Endpoint as input?
82
-
83
- // Svar: To be honest, I'm not really sure how to solve this the best way. I started out with a ConcurrentDictionary
84
- // for the RunningListeners, but I ended up with an issue where I can't guarantee that RunningListeners aren't added/removed
85
- // while stopping the server. So I went for a simple locking instead. This way, the server will have to wait until any
86
- // Add/Remove operation has finished. But it would have felt better if I could have solved it with the ConcurrentDictionary
87
- // and no locking...
88
-
89
- // TODO: KOLLA MED ENDPOINTPOOL OCH LOCKING
90
- // Get the listeners and clear the ConcurrentDictionary, so that they can't be touched anywhere else
91
- // TODO: lock or something else that we check in add/remove
92
- // TODO: Om jag kör lock, behöver jag kanske inte ha concurrentdictionary heller?
93
- // TODO: Får inte kör StopAsync om det pågår en Add/RemoveSockerListener
94
- var listeners = _listeners . Values . ToList ( ) ;
95
- _listeners . Clear ( ) ;
96
-
97
- var tasks = new Task [ listeners . Count ] ;
98
-
99
- for ( int i = 0 ; i < listeners . Count ; i ++ )
80
+ if ( Interlocked . Exchange ( ref _stopping , 1 ) == 1 )
100
81
{
101
- tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
82
+ return ;
102
83
}
103
84
104
- await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
85
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
86
+ try
87
+ {
88
+ var listeners = _listeners . Values . ToList ( ) ;
105
89
106
- // Signal to all of the listeners that it's time to start the shutdown process
107
- // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
108
- _shutdownTcs . TrySetResult ( null ) ;
90
+ var tasks = new Task [ listeners . Count ] ;
109
91
110
- for ( int i = 0 ; i < listeners . Count ; i ++ )
111
- {
112
- tasks [ i ] = listeners [ i ] . ExecutionTask ;
113
- }
92
+ for ( int i = 0 ; i < listeners . Count ; i ++ )
93
+ {
94
+ tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
95
+ }
114
96
115
- var shutdownTask = Task . WhenAll ( tasks ) ;
97
+ await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
116
98
117
- if ( cancellationToken . CanBeCanceled )
118
- {
119
- await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
120
- }
121
- else
122
- {
123
- await shutdownTask . ConfigureAwait ( false ) ;
124
- }
99
+ // Signal to all of the listeners that it's time to start the shutdown process
100
+ // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
101
+ _shutdownTcs . TrySetResult ( null ) ;
125
102
126
- if ( _timerAwaitable != null )
127
- {
128
- _timerAwaitable . Stop ( ) ;
103
+ for ( int i = 0 ; i < listeners . Count ; i ++ )
104
+ {
105
+ tasks [ i ] = listeners [ i ] . ExecutionTask ;
106
+ }
107
+
108
+ var shutdownTask = Task . WhenAll ( tasks ) ;
109
+
110
+ if ( cancellationToken . CanBeCanceled )
111
+ {
112
+ await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
113
+ }
114
+ else
115
+ {
116
+ await shutdownTask . ConfigureAwait ( false ) ;
117
+ }
118
+
119
+ if ( _timerAwaitable != null )
120
+ {
121
+ _timerAwaitable . Stop ( ) ;
129
122
130
- await _timerTask . ConfigureAwait ( false ) ;
123
+ await _timerTask . ConfigureAwait ( false ) ;
124
+ }
125
+ }
126
+ finally
127
+ {
128
+ _listenerSemaphore . Release ( ) ;
131
129
}
132
130
}
133
131
@@ -144,39 +142,67 @@ public Task AddSocketListenerAsync(EndPoint endpoint, Action<IConnectionBuilder>
144
142
145
143
public async Task RemoveSocketListener ( EndPoint endpoint , CancellationToken cancellationToken = default )
146
144
{
147
- if ( ! _listeners . TryRemove ( endpoint , out var listener ) )
145
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
146
+
147
+ if ( _stopping == 1 )
148
148
{
149
- return ;
149
+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
150
150
}
151
151
152
- await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
152
+ try
153
+ {
154
+ if ( ! _listeners . Remove ( endpoint , out var listener ) )
155
+ {
156
+ return ;
157
+ }
158
+
159
+ await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
153
160
154
- // Signal to the listener that it's time to start the shutdown process
155
- // We call this after unbind so that we're not touching the listener anymore
156
- listener . ShutdownTcs . TrySetResult ( null ) ;
161
+ // Signal to the listener that it's time to start the shutdown process
162
+ // We call this after unbind so that we're not touching the listener anymore
163
+ listener . ShutdownTcs . TrySetResult ( null ) ;
157
164
158
- if ( cancellationToken . CanBeCanceled )
159
- {
160
- await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
165
+ if ( cancellationToken . CanBeCanceled )
166
+ {
167
+ await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
168
+ }
169
+ else
170
+ {
171
+ await listener . ExecutionTask . ConfigureAwait ( false ) ;
172
+ }
161
173
}
162
- else
174
+ finally
163
175
{
164
- await listener . ExecutionTask . ConfigureAwait ( false ) ;
176
+ _listenerSemaphore . Release ( ) ;
165
177
}
166
178
}
167
179
168
180
private async Task StartRunningListenersAsync ( ServerBinding binding , CancellationToken cancellationToken = default )
169
181
{
170
- await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
182
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
183
+
184
+ if ( _stopping == 1 )
185
+ {
186
+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
187
+ }
188
+
189
+ try
171
190
{
172
- var runningListener = new RunningListener ( this , binding , listener ) ;
173
- if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
191
+ await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
174
192
{
175
- _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
176
- continue ;
177
- }
193
+ var runningListener = new RunningListener ( this , binding , listener ) ;
194
+ if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
195
+ {
196
+ _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
197
+ continue ;
198
+ }
178
199
179
- runningListener . Start ( ) ;
200
+ runningListener . Start ( ) ;
201
+ }
202
+ }
203
+ finally
204
+ {
205
+ _listenerSemaphore . Release ( ) ;
180
206
}
181
207
}
182
208
0 commit comments