Skip to content

Commit e3ff552

Browse files
authored
Improve testing for StreamableHttpHandler and IdleTrackingBackgroundService (#345)
* Add idle session pruning tests * Remove redundant CTS from StreamableHttpHandler * StreamableHttpServerTransport.HandlePostRequest already creates the equivelant CTS which DeleteRequest_CompletesSession_WhichCancelsLongRunningToolCalls verifies * Fix accept header validation for GET * Use InvalidOperationException for transport-not-connected errors in doc comments * Fix flaky Cancellation_ThrowsCancellationException test
1 parent 4fd3ebf commit e3ff552

25 files changed

+315
-291
lines changed

Directory.Packages.props

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.4" />
5555
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.4" />
5656
<PackageVersion Include="Microsoft.Extensions.Options" Version="9.0.4" />
57+
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="9.4.0" />
5758
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
5859
<PackageVersion Include="Moq" Version="4.20.72" />
5960
<PackageVersion Include="OpenTelemetry" Version="1.11.2" />

src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public static class HttpMcpServerBuilderExtensions
2222
public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action<HttpServerTransportOptions>? configureOptions = null)
2323
{
2424
ArgumentNullException.ThrowIfNull(builder);
25+
2526
builder.Services.TryAddSingleton<StreamableHttpHandler>();
2627
builder.Services.TryAddSingleton<SseHandler>();
2728
builder.Services.AddHostedService<IdleTrackingBackgroundService>();

src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs

+10-5
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,18 @@ public async ValueTask DisposeAsync()
4747
}
4848
finally
4949
{
50-
if (Server is not null)
50+
try
5151
{
52-
await Server.DisposeAsync();
52+
if (Server is not null)
53+
{
54+
await Server.DisposeAsync();
55+
}
56+
}
57+
finally
58+
{
59+
await Transport.DisposeAsync();
60+
_disposeCts.Dispose();
5361
}
54-
55-
await Transport.DisposeAsync();
56-
_disposeCts.Dispose();
5762
}
5863
}
5964

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@ public class HttpServerTransportOptions
2626
/// Represents the duration of time the server will wait between any active requests before timing out an
2727
/// MCP session. This is checked in background every 5 seconds. A client trying to resume a session will
2828
/// receive a 404 status code and should restart their session. A client can keep their session open by
29-
/// keeping a GET request open. The default value is set to 2 minutes.
29+
/// keeping a GET request open. The default value is set to 2 hours.
3030
/// </summary>
31-
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromMinutes(2);
31+
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromHours(2);
32+
33+
/// <summary>
34+
/// The maximum number of idle sessions to track. This is used to limit the number of sessions that can be idle at once.
35+
/// Past this limit, the server will log a critical error and terminate the oldest idle sessions even if they have not reached
36+
/// their <see cref="IdleTimeout"/> until the idle session count is below this limit. Clients that keep their session open by
37+
/// keeping a GET request open will not count towards this limit. The default value is set to 100,000 sessions.
38+
/// </summary>
39+
public int MaxIdleSessionCount { get; set; } = 100_000;
3240

3341
/// <summary>
3442
/// Used for testing the <see cref="IdleTimeout"/>.

src/ModelContextProtocol.AspNetCore/IdleTrackingBackgroundService.cs

+69-26
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,40 @@ namespace ModelContextProtocol.AspNetCore;
88
internal sealed partial class IdleTrackingBackgroundService(
99
StreamableHttpHandler handler,
1010
IOptions<HttpServerTransportOptions> options,
11+
IHostApplicationLifetime appLifetime,
1112
ILogger<IdleTrackingBackgroundService> logger) : BackgroundService
1213
{
1314
// The compiler will complain about the parameter being unused otherwise despite the source generator.
1415
private ILogger _logger = logger;
1516

16-
// We can make this configurable once we properly harden the MCP server. In the meantime, anyone running
17-
// this should be taking a cattle not pets approach to their servers and be able to launch more processes
18-
// to handle more than 10,000 idle sessions at a time.
19-
private const int MaxIdleSessionCount = 10_000;
20-
2117
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2218
{
23-
var timeProvider = options.Value.TimeProvider;
24-
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);
19+
// Still run loop given infinite IdleTimeout to enforce the MaxIdleSessionCount and assist graceful shutdown.
20+
if (options.Value.IdleTimeout != Timeout.InfiniteTimeSpan)
21+
{
22+
ArgumentOutOfRangeException.ThrowIfLessThan(options.Value.IdleTimeout, TimeSpan.Zero);
23+
}
24+
ArgumentOutOfRangeException.ThrowIfLessThan(options.Value.MaxIdleSessionCount, 0);
2525

2626
try
2727
{
28+
var timeProvider = options.Value.TimeProvider;
29+
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);
30+
31+
var idleTimeoutTicks = options.Value.IdleTimeout.Ticks;
32+
var maxIdleSessionCount = options.Value.MaxIdleSessionCount;
33+
34+
// The default ValueTuple Comparer will check the first item then the second which preserves both order and uniqueness.
35+
var idleSessions = new SortedSet<(long Timestamp, string SessionId)>();
36+
2837
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
2938
{
30-
var idleActivityCutoff = timeProvider.GetTimestamp() - options.Value.IdleTimeout.Ticks;
39+
var idleActivityCutoff = idleTimeoutTicks switch
40+
{
41+
< 0 => long.MinValue,
42+
var ticks => timeProvider.GetTimestamp() - ticks,
43+
};
3144

32-
var idleCount = 0;
3345
foreach (var (_, session) in handler.Sessions)
3446
{
3547
if (session.IsActive || session.SessionClosed.IsCancellationRequested)
@@ -38,34 +50,40 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3850
continue;
3951
}
4052

41-
idleCount++;
42-
if (idleCount == MaxIdleSessionCount)
43-
{
44-
// Emit critical log at most once every 5 seconds the idle count it exceeded,
45-
//since the IdleTimeout will no longer be respected.
46-
LogMaxSessionIdleCountExceeded();
47-
}
48-
else if (idleCount < MaxIdleSessionCount && session.LastActivityTicks > idleActivityCutoff)
53+
if (session.LastActivityTicks < idleActivityCutoff)
4954
{
55+
RemoveAndCloseSession(session.Id);
5056
continue;
5157
}
5258

53-
if (handler.Sessions.TryRemove(session.Id, out var removedSession))
59+
idleSessions.Add((session.LastActivityTicks, session.Id));
60+
61+
// Emit critical log at most once every 5 seconds the idle count it exceeded,
62+
// since the IdleTimeout will no longer be respected.
63+
if (idleSessions.Count == maxIdleSessionCount + 1)
5464
{
55-
LogSessionIdle(removedSession.Id);
65+
LogMaxSessionIdleCountExceeded(maxIdleSessionCount);
66+
}
67+
}
5668

57-
// Don't slow down the idle tracking loop. DisposeSessionAsync logs. We only await during graceful shutdown.
58-
_ = DisposeSessionAsync(removedSession);
69+
if (idleSessions.Count > maxIdleSessionCount)
70+
{
71+
var sessionsToPrune = idleSessions.ToArray()[..^maxIdleSessionCount];
72+
foreach (var (_, id) in sessionsToPrune)
73+
{
74+
RemoveAndCloseSession(id);
5975
}
6076
}
77+
78+
idleSessions.Clear();
6179
}
6280
}
6381
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
6482
{
6583
}
6684
finally
6785
{
68-
if (stoppingToken.IsCancellationRequested)
86+
try
6987
{
7088
List<Task> disposeSessionTasks = [];
7189

@@ -79,7 +97,29 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
7997

8098
await Task.WhenAll(disposeSessionTasks);
8199
}
100+
finally
101+
{
102+
if (!stoppingToken.IsCancellationRequested)
103+
{
104+
// Something went terribly wrong. A very unexpected exception must be bubbling up, but let's ensure we also stop the application,
105+
// so that it hopefully gets looked at and restarted. This shouldn't really be reachable.
106+
appLifetime.StopApplication();
107+
IdleTrackingBackgroundServiceStoppedUnexpectedly();
108+
}
109+
}
110+
}
111+
}
112+
113+
private void RemoveAndCloseSession(string sessionId)
114+
{
115+
if (!handler.Sessions.TryRemove(sessionId, out var session))
116+
{
117+
return;
82118
}
119+
120+
LogSessionIdle(session.Id);
121+
// Don't slow down the idle tracking loop. DisposeSessionAsync logs. We only await during graceful shutdown.
122+
_ = DisposeSessionAsync(session);
83123
}
84124

85125
private async Task DisposeSessionAsync(HttpMcpSession<StreamableHttpServerTransport> session)
@@ -97,9 +137,12 @@ private async Task DisposeSessionAsync(HttpMcpSession<StreamableHttpServerTransp
97137
[LoggerMessage(Level = LogLevel.Information, Message = "Closing idle session {sessionId}.")]
98138
private partial void LogSessionIdle(string sessionId);
99139

100-
[LoggerMessage(Level = LogLevel.Critical, Message = "Exceeded static maximum of 10,000 idle connections. Now clearing all inactive connections regardless of timeout.")]
101-
private partial void LogMaxSessionIdleCountExceeded();
102-
103-
[LoggerMessage(Level = LogLevel.Error, Message = "Error disposing the IMcpServer for session {sessionId}.")]
140+
[LoggerMessage(Level = LogLevel.Error, Message = "Error disposing session {sessionId}.")]
104141
private partial void LogSessionDisposeError(string sessionId, Exception ex);
142+
143+
[LoggerMessage(Level = LogLevel.Critical, Message = "Exceeded maximum of {maxIdleSessionCount} idle sessions. Now closing sessions active more recently than configured IdleTimeout.")]
144+
private partial void LogMaxSessionIdleCountExceeded(int maxIdleSessionCount);
145+
146+
[LoggerMessage(Level = LogLevel.Critical, Message = "The IdleTrackingBackgroundService has stopped unexpectedly.")]
147+
private partial void IdleTrackingBackgroundServiceStoppedUnexpectedly();
105148
}

src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs

+19-21
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Microsoft.AspNetCore.WebUtilities;
44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.Options;
6+
using Microsoft.Net.Http.Headers;
67
using ModelContextProtocol.Protocol.Messages;
78
using ModelContextProtocol.Protocol.Transport;
89
using ModelContextProtocol.Server;
@@ -23,18 +24,19 @@ internal sealed class StreamableHttpHandler(
2324
IServiceProvider applicationServices)
2425
{
2526
private static JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
27+
private static MediaTypeHeaderValue ApplicationJsonMediaType = new("application/json");
28+
private static MediaTypeHeaderValue TextEventStreamMediaType = new("text/event-stream");
2629

2730
public ConcurrentDictionary<string, HttpMcpSession<StreamableHttpServerTransport>> Sessions { get; } = new(StringComparer.Ordinal);
2831

2932
public async Task HandlePostRequestAsync(HttpContext context)
3033
{
3134
// The Streamable HTTP spec mandates the client MUST accept both application/json and text/event-stream.
32-
// ASP.NET Core Minimal APIs mostly ry to stay out of the business of response content negotiation, so
33-
// we have to do this manually. The spec doesn't mandate that servers MUST reject these requests, but it's
34-
// probably good to at least start out trying to be strict.
35-
var acceptHeader = context.Request.Headers.Accept.ToString();
36-
if (!acceptHeader.Contains("application/json", StringComparison.Ordinal) ||
37-
!acceptHeader.Contains("text/event-stream", StringComparison.Ordinal))
35+
// ASP.NET Core Minimal APIs mostly try to stay out of the business of response content negotiation,
36+
// so we have to do this manually. The spec doesn't mandate that servers MUST reject these requests,
37+
// but it's probably good to at least start out trying to be strict.
38+
var acceptHeaders = context.Request.GetTypedHeaders().Accept;
39+
if (!acceptHeaders.Contains(ApplicationJsonMediaType) || !acceptHeaders.Contains(TextEventStreamMediaType))
3840
{
3941
await WriteJsonRpcErrorAsync(context,
4042
"Not Acceptable: Client must accept both application/json and text/event-stream",
@@ -49,9 +51,8 @@ await WriteJsonRpcErrorAsync(context,
4951
}
5052

5153
using var _ = session.AcquireReference();
52-
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, session.SessionClosed);
5354
InitializeSseResponse(context);
54-
var wroteResponse = await session.Transport.HandlePostRequest(new HttpDuplexPipe(context), cts.Token);
55+
var wroteResponse = await session.Transport.HandlePostRequest(new HttpDuplexPipe(context), context.RequestAborted);
5556
if (!wroteResponse)
5657
{
5758
// We wound up writing nothing, so there should be no Content-Type response header.
@@ -62,8 +63,8 @@ await WriteJsonRpcErrorAsync(context,
6263

6364
public async Task HandleGetRequestAsync(HttpContext context)
6465
{
65-
var acceptHeader = context.Request.Headers.Accept.ToString();
66-
if (!acceptHeader.Contains("application/json", StringComparison.Ordinal))
66+
var acceptHeaders = context.Request.GetTypedHeaders().Accept;
67+
if (!acceptHeaders.Contains(TextEventStreamMediaType))
6768
{
6869
await WriteJsonRpcErrorAsync(context,
6970
"Not Acceptable: Client must accept text/event-stream",
@@ -105,12 +106,6 @@ public async Task HandleDeleteRequestAsync(HttpContext context)
105106
}
106107
}
107108

108-
private void InitializeSessionResponse(HttpContext context, HttpMcpSession<StreamableHttpServerTransport> session)
109-
{
110-
context.Response.Headers["mcp-session-id"] = session.Id;
111-
context.Features.Set(session.Server);
112-
}
113-
114109
private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>?> GetSessionAsync(HttpContext context, string sessionId)
115110
{
116111
if (Sessions.TryGetValue(sessionId, out var existingSession))
@@ -123,7 +118,8 @@ await WriteJsonRpcErrorAsync(context,
123118
return null;
124119
}
125120

126-
InitializeSessionResponse(context, existingSession);
121+
context.Response.Headers["mcp-session-id"] = existingSession.Id;
122+
context.Features.Set(existingSession.Server);
127123
return existingSession;
128124
}
129125

@@ -138,11 +134,10 @@ await WriteJsonRpcErrorAsync(context,
138134
private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>?> GetOrCreateSessionAsync(HttpContext context)
139135
{
140136
var sessionId = context.Request.Headers["mcp-session-id"].ToString();
141-
HttpMcpSession<StreamableHttpServerTransport>? session;
142137

143138
if (string.IsNullOrEmpty(sessionId))
144139
{
145-
session = await CreateSessionAsync(context);
140+
var session = await CreateSessionAsync(context);
146141

147142
if (!Sessions.TryAdd(session.Id, session))
148143
{
@@ -159,6 +154,9 @@ await WriteJsonRpcErrorAsync(context,
159154

160155
private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>> CreateSessionAsync(HttpContext context)
161156
{
157+
var sessionId = MakeNewSessionId();
158+
context.Response.Headers["mcp-session-id"] = sessionId;
159+
162160
var mcpServerOptions = mcpServerOptionsSnapshot.Value;
163161
if (httpMcpServerOptions.Value.ConfigureSessionOptions is { } configureSessionOptions)
164162
{
@@ -169,16 +167,16 @@ private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>> CreateSes
169167
var transport = new StreamableHttpServerTransport();
170168
// Use application instead of request services, because the session will likely outlive the first initialization request.
171169
var server = McpServerFactory.Create(transport, mcpServerOptions, loggerFactory, applicationServices);
170+
context.Features.Set(server);
172171

173-
var session = new HttpMcpSession<StreamableHttpServerTransport>(MakeNewSessionId(), transport, context.User, httpMcpServerOptions.Value.TimeProvider)
172+
var session = new HttpMcpSession<StreamableHttpServerTransport>(sessionId, transport, context.User, httpMcpServerOptions.Value.TimeProvider)
174173
{
175174
Server = server,
176175
};
177176

178177
var runSessionAsync = httpMcpServerOptions.Value.RunSessionHandler ?? RunSessionAsync;
179178
session.ServerRunTask = runSessionAsync(context, server, session.SessionClosed);
180179

181-
InitializeSessionResponse(context, session);
182180
return session;
183181
}
184182

src/ModelContextProtocol/IMcpEndpoint.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public interface IMcpEndpoint : IAsyncDisposable
3434
/// <param name="request">The JSON-RPC request to send.</param>
3535
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
3636
/// <returns>A task containing the endpoint's response.</returns>
37-
/// <exception cref="McpException">The transport is not connected, or another error occurs during request processing.</exception>
37+
/// <exception cref="InvalidOperationException">The transport is not connected, or another error occurs during request processing.</exception>
38+
/// <exception cref="McpException">An error occured during request processing.</exception>
3839
/// <remarks>
3940
/// This method provides low-level access to send raw JSON-RPC requests. For most use cases,
4041
/// consider using the strongly-typed extension methods that provide a more convenient API.
@@ -50,7 +51,7 @@ public interface IMcpEndpoint : IAsyncDisposable
5051
/// </param>
5152
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
5253
/// <returns>A task that represents the asynchronous send operation.</returns>
53-
/// <exception cref="McpException">The transport is not connected.</exception>
54+
/// <exception cref="InvalidOperationException">The transport is not connected.</exception>
5455
/// <exception cref="ArgumentNullException"><paramref name="message"/> is <see langword="null"/>.</exception>
5556
/// <remarks>
5657
/// <para>

src/ModelContextProtocol/Protocol/Transport/SseWriter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
5151
{
5252
Throw.IfNull(message);
5353

54-
using var _ = await _disposeLock.LockAsync().ConfigureAwait(false);
54+
using var _ = await _disposeLock.LockAsync(cancellationToken).ConfigureAwait(false);
5555

5656
if (_disposed)
5757
{

src/ModelContextProtocol/Protocol/Transport/StreamableHttpServerTransport.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public async Task HandleGetRequest(Stream sseResponseStream, CancellationToken c
5151
throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session.");
5252
}
5353

54-
using var getCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
55-
await _sseWriter.WriteAllAsync(sseResponseStream, getCts.Token).ConfigureAwait(false);
54+
// We do not need to reference _disposeCts like in HandlePostRequest, because the session ending completes the _sseWriter gracefully.
55+
await _sseWriter.WriteAllAsync(sseResponseStream, cancellationToken).ConfigureAwait(false);
5656
}
5757

5858
/// <summary>

0 commit comments

Comments
 (0)