1
1
using System ;
2
2
using System . Collections . Concurrent ;
3
3
using System . Collections . Generic ;
4
+ using System . Linq ;
4
5
using System . Net ;
5
6
using System . Threading ;
6
7
using System . Threading . Tasks ;
7
8
using Microsoft . AspNetCore . Connections ;
9
+ using Microsoft . AspNetCore . Server . Kestrel . Transport . Sockets ;
8
10
using Microsoft . Extensions . Logging ;
11
+ using Microsoft . Extensions . Options ;
9
12
10
13
namespace Bedrock . Framework
11
14
{
12
15
public class Server
13
16
{
14
17
private readonly ServerBuilder _builder ;
15
18
private readonly ILogger < Server > _logger ;
16
- private readonly List < RunningListener > _listeners = new List < RunningListener > ( ) ;
19
+ private readonly Dictionary < EndPoint , RunningListener > _listeners = new Dictionary < EndPoint , RunningListener > ( ) ;
17
20
private readonly TaskCompletionSource < object > _shutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
18
21
private readonly TimerAwaitable _timerAwaitable ;
22
+ private readonly SemaphoreSlim _listenerSemaphore = new SemaphoreSlim ( initialCount : 1 ) ;
19
23
private Task _timerTask = Task . CompletedTask ;
24
+ private int _stopping ;
20
25
21
26
internal Server ( ServerBuilder builder )
22
27
{
@@ -29,7 +34,7 @@ public IEnumerable<EndPoint> EndPoints
29
34
{
30
35
get
31
36
{
32
- foreach ( var listener in _listeners )
37
+ foreach ( var listener in _listeners . Values )
33
38
{
34
39
yield return listener . Listener . EndPoint ;
35
40
}
@@ -42,12 +47,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
42
47
{
43
48
foreach ( var binding in _builder . Bindings )
44
49
{
45
- await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
46
- {
47
- var runningListener = new RunningListener ( this , binding , listener ) ;
48
- _listeners . Add ( runningListener ) ;
49
- runningListener . Start ( ) ;
50
- }
50
+ await StartRunningListenersAsync ( binding , cancellationToken ) . ConfigureAwait ( false ) ;
51
51
}
52
52
}
53
53
catch
@@ -67,7 +67,7 @@ private async Task StartTimerAsync()
67
67
{
68
68
while ( await _timerAwaitable )
69
69
{
70
- foreach ( var listener in _listeners )
70
+ foreach ( var listener in _listeners . Values )
71
71
{
72
72
listener . TickHeartbeat ( ) ;
73
73
}
@@ -77,40 +77,132 @@ private async Task StartTimerAsync()
77
77
78
78
public async Task StopAsync ( CancellationToken cancellationToken = default )
79
79
{
80
- var tasks = new Task [ _listeners . Count ] ;
80
+ if ( Interlocked . Exchange ( ref _stopping , 1 ) == 1 )
81
+ {
82
+ return ;
83
+ }
84
+
85
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
86
+ try
87
+ {
88
+ var listeners = _listeners . Values . ToList ( ) ;
89
+
90
+ var tasks = new Task [ listeners . Count ] ;
91
+
92
+ for ( int i = 0 ; i < listeners . Count ; i ++ )
93
+ {
94
+ tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
95
+ }
96
+
97
+ await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
98
+
99
+ // Signal to all of the listeners that it's time to start the shutdown process
100
+ // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
101
+ _shutdownTcs . TrySetResult ( null ) ;
102
+
103
+ for ( int i = 0 ; i < listeners . Count ; i ++ )
104
+ {
105
+ tasks [ i ] = listeners [ i ] . ExecutionTask ;
106
+ }
107
+
108
+ var shutdownTask = Task . WhenAll ( tasks ) ;
109
+
110
+ if ( cancellationToken . CanBeCanceled )
111
+ {
112
+ await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
113
+ }
114
+ else
115
+ {
116
+ await shutdownTask . ConfigureAwait ( false ) ;
117
+ }
118
+
119
+ if ( _timerAwaitable != null )
120
+ {
121
+ _timerAwaitable . Stop ( ) ;
81
122
82
- for ( int i = 0 ; i < _listeners . Count ; i ++ )
123
+ await _timerTask . ConfigureAwait ( false ) ;
124
+ }
125
+ }
126
+ finally
83
127
{
84
- tasks [ i ] = _listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
128
+ _listenerSemaphore . Release ( ) ;
85
129
}
130
+ }
86
131
87
- await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
132
+ public Task AddSocketListenerAsync ( EndPoint endpoint , Action < IConnectionBuilder > configure , CancellationToken cancellationToken = default )
133
+ {
134
+ var socketTransportFactory = new SocketTransportFactory ( Options . Create ( new SocketTransportOptions ( ) ) , _builder . ApplicationServices . GetLoggerFactory ( ) ) ;
135
+ var connectionBuilder = new ConnectionBuilder ( _builder . ApplicationServices ) ;
88
136
89
- // Signal to all of the listeners that it's time to start the shutdown process
90
- // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
91
- _shutdownTcs . TrySetResult ( null ) ;
137
+ configure ( connectionBuilder ) ;
92
138
93
- for ( int i = 0 ; i < _listeners . Count ; i ++ )
139
+ var binding = new EndPointBinding ( endpoint , connectionBuilder . Build ( ) , socketTransportFactory ) ;
140
+ return StartRunningListenersAsync ( binding , cancellationToken ) ;
141
+ }
142
+
143
+ public async Task RemoveSocketListener ( EndPoint endpoint , CancellationToken cancellationToken = default )
144
+ {
145
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
146
+
147
+ if ( _stopping == 1 )
94
148
{
95
- tasks [ i ] = _listeners [ i ] . ExecutionTask ;
149
+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
96
150
}
97
151
98
- var shutdownTask = Task . WhenAll ( tasks ) ;
152
+ try
153
+ {
154
+ if ( ! _listeners . Remove ( endpoint , out var listener ) )
155
+ {
156
+ return ;
157
+ }
158
+
159
+ await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
99
160
100
- if ( cancellationToken . CanBeCanceled )
161
+ // Signal to the listener that it's time to start the shutdown process
162
+ // We call this after unbind so that we're not touching the listener anymore
163
+ listener . ShutdownTcs . TrySetResult ( null ) ;
164
+
165
+ if ( cancellationToken . CanBeCanceled )
166
+ {
167
+ await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
168
+ }
169
+ else
170
+ {
171
+ await listener . ExecutionTask . ConfigureAwait ( false ) ;
172
+ }
173
+ }
174
+ finally
101
175
{
102
- await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
176
+ _listenerSemaphore . Release ( ) ;
103
177
}
104
- else
178
+ }
179
+
180
+ private async Task StartRunningListenersAsync ( ServerBinding binding , CancellationToken cancellationToken = default )
181
+ {
182
+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
183
+
184
+ if ( _stopping == 1 )
105
185
{
106
- await shutdownTask . ConfigureAwait ( false ) ;
186
+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
107
187
}
108
188
109
- if ( _timerAwaitable != null )
189
+ try
110
190
{
111
- _timerAwaitable . Stop ( ) ;
191
+ await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
192
+ {
193
+ var runningListener = new RunningListener ( this , binding , listener ) ;
194
+ if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
195
+ {
196
+ _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
197
+ continue ;
198
+ }
112
199
113
- await _timerTask . ConfigureAwait ( false ) ;
200
+ runningListener . Start ( ) ;
201
+ }
202
+ }
203
+ finally
204
+ {
205
+ _listenerSemaphore . Release ( ) ;
114
206
}
115
207
}
116
208
@@ -130,10 +222,12 @@ public RunningListener(Server server, ServerBinding binding, IConnectionListener
130
222
public void Start ( )
131
223
{
132
224
ExecutionTask = RunListenerAsync ( ) ;
225
+ ShutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
133
226
}
134
227
135
228
public IConnectionListener Listener { get ; }
136
229
public Task ExecutionTask { get ; private set ; }
230
+ public TaskCompletionSource < object > ShutdownTcs { get ; private set ; }
137
231
138
232
public void TickHeartbeat ( )
139
233
{
@@ -215,8 +309,11 @@ async Task ExecuteConnectionAsync(ServerConnection serverConnection)
215
309
id ++ ;
216
310
}
217
311
218
- // Don't shut down connections until entire server is shutting down
219
- await _server . _shutdownTcs . Task . ConfigureAwait ( false ) ;
312
+ // Don't shut down connections until this listener or the entire server is shutting down
313
+ await Task . WhenAny (
314
+ ShutdownTcs . Task ,
315
+ _server . _shutdownTcs . Task )
316
+ . ConfigureAwait ( false ) ;
220
317
221
318
// Give connections a chance to close gracefully
222
319
var tasks = new List < Task > ( _connections . Count ) ;
@@ -241,7 +338,6 @@ async Task ExecuteConnectionAsync(ServerConnection serverConnection)
241
338
await listener . DisposeAsync ( ) . ConfigureAwait ( false ) ;
242
339
}
243
340
244
-
245
341
private IDisposable BeginConnectionScope ( ServerConnection connection )
246
342
{
247
343
if ( _server . _logger . IsEnabled ( LogLevel . Critical ) )
@@ -253,4 +349,4 @@ private IDisposable BeginConnectionScope(ServerConnection connection)
253
349
}
254
350
}
255
351
}
256
- }
352
+ }
0 commit comments