|
| 1 | +using System.Buffers; |
| 2 | +using System.Net.ServerSentEvents; |
| 3 | +using System.Text.Json; |
| 4 | +using System.Threading.Channels; |
| 5 | +using McpDotNet.Protocol.Messages; |
| 6 | +using McpDotNet.Protocol.Transport; |
| 7 | +using McpDotNet.Utils.Json; |
| 8 | + |
| 9 | +namespace AspNetCoreSseServer; |
| 10 | + |
| 11 | +public class SseServerStreamTransport(Stream sseResponseStream) : ITransport |
| 12 | +{ |
| 13 | + private readonly Channel<IJsonRpcMessage> _incomingChannel = CreateSingleItemChannel<IJsonRpcMessage>(); |
| 14 | + private readonly Channel<SseItem<IJsonRpcMessage?>> _outgoingSseChannel = CreateSingleItemChannel<SseItem<IJsonRpcMessage?>>(); |
| 15 | + |
| 16 | + private Task? _sseWriteTask; |
| 17 | + private Utf8JsonWriter? _jsonWriter; |
| 18 | + |
| 19 | + public bool IsConnected => _sseWriteTask?.IsCompleted == false; |
| 20 | + |
| 21 | + public Task RunAsync(CancellationToken cancellationToken) |
| 22 | + { |
| 23 | + void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer) |
| 24 | + { |
| 25 | + if (item.EventType == "endpoint") |
| 26 | + { |
| 27 | + writer.Write("/message"u8); |
| 28 | + return; |
| 29 | + } |
| 30 | + |
| 31 | + JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, JsonSerializerOptionsExtensions.DefaultOptions); |
| 32 | + } |
| 33 | + |
| 34 | + // The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type, |
| 35 | + // so we fib and special-case the "endpoint" event type in the formatter. |
| 36 | + _outgoingSseChannel.Writer.TryWrite(new SseItem<IJsonRpcMessage?>(null, "endpoint")); |
| 37 | + |
| 38 | + var sseItems = _outgoingSseChannel.Reader.ReadAllAsync(cancellationToken); |
| 39 | + return _sseWriteTask = SseFormatter.WriteAsync(sseItems, sseResponseStream, WriteJsonRpcMessageToBuffer, cancellationToken); |
| 40 | + } |
| 41 | + |
| 42 | + public ChannelReader<IJsonRpcMessage> MessageReader => _incomingChannel.Reader; |
| 43 | + |
| 44 | + public ValueTask DisposeAsync() |
| 45 | + { |
| 46 | + _incomingChannel.Writer.TryComplete(); |
| 47 | + _outgoingSseChannel.Writer.TryComplete(); |
| 48 | + return new ValueTask(_sseWriteTask ?? Task.CompletedTask); |
| 49 | + } |
| 50 | + |
| 51 | + public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken = default) => |
| 52 | + _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken).AsTask(); |
| 53 | + |
| 54 | + public Task OnMessageReceivedAsync(IJsonRpcMessage message, CancellationToken cancellationToken) |
| 55 | + { |
| 56 | + if (!IsConnected) |
| 57 | + { |
| 58 | + throw new McpTransportException("Transport is not connected"); |
| 59 | + } |
| 60 | + |
| 61 | + return _incomingChannel.Writer.WriteAsync(message, cancellationToken).AsTask(); |
| 62 | + } |
| 63 | + |
| 64 | + private static Channel<T> CreateSingleItemChannel<T>() => |
| 65 | + Channel.CreateBounded<T>(new BoundedChannelOptions(1) |
| 66 | + { |
| 67 | + SingleReader = true, |
| 68 | + SingleWriter = false, |
| 69 | + }); |
| 70 | + |
| 71 | + private Utf8JsonWriter GetUtf8JsonWriter(IBufferWriter<byte> writer) |
| 72 | + { |
| 73 | + if (_jsonWriter is null) |
| 74 | + { |
| 75 | + _jsonWriter = new Utf8JsonWriter(writer); |
| 76 | + } |
| 77 | + else |
| 78 | + { |
| 79 | + _jsonWriter.Reset(writer); |
| 80 | + } |
| 81 | + |
| 82 | + return _jsonWriter; |
| 83 | + } |
| 84 | +} |
0 commit comments