Skip to content

Commit 674e0f7

Browse files
authored
Replace custom SSE reader with source for System.Net.ServerSentEvents (#33)
* Replace custom SSE reader with source for System.Net.ServerSentEvents A stable package for the new System.Net.ServerSentEvents library isn't yet available, but until one is we can switch the code over to using a source copy. * Add comment to csproj * Fix NRT handling in ServerSentEvents.cs to account for lack of NRT at project level
1 parent 207d597 commit 674e0f7

13 files changed

+657
-469
lines changed

src/Custom/Assistants/Streaming/AsyncStreamingUpdateCollection.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
using System.ClientModel.Primitives;
44
using System.Collections.Generic;
55
using System.Diagnostics;
6+
using System.Linq;
7+
using System.Net.ServerSentEvents;
68
using System.Threading;
79
using System.Threading.Tasks;
810

@@ -31,7 +33,7 @@ public override IAsyncEnumerator<StreamingUpdate> GetAsyncEnumerator(Cancellatio
3133

3234
private sealed class AsyncStreamingUpdateEnumerator : IAsyncEnumerator<StreamingUpdate>
3335
{
34-
private const string _terminalData = "[DONE]";
36+
private static ReadOnlySpan<byte> TerminalData => "[DONE]"u8;
3537

3638
private readonly Func<Task<ClientResult>> _getResultAsync;
3739
private readonly AsyncStreamingUpdateCollection _enumerable;
@@ -44,7 +46,7 @@ private sealed class AsyncStreamingUpdateEnumerator : IAsyncEnumerator<Streaming
4446
// // get _updates from sse event
4547
// foreach (var update in _updates) { ... }
4648
// }
47-
private IAsyncEnumerator<ServerSentEvent>? _events;
49+
private IAsyncEnumerator<SseItem<byte[]>>? _events;
4850
private IEnumerator<StreamingUpdate>? _updates;
4951

5052
private StreamingUpdate? _current;
@@ -84,7 +86,7 @@ async ValueTask<bool> IAsyncEnumerator<StreamingUpdate>.MoveNextAsync()
8486

8587
if (await _events.MoveNextAsync().ConfigureAwait(false))
8688
{
87-
if (_events.Current.Data == _terminalData)
89+
if (_events.Current.Data.AsSpan().SequenceEqual(TerminalData))
8890
{
8991
_current = default;
9092
return false;
@@ -104,7 +106,7 @@ async ValueTask<bool> IAsyncEnumerator<StreamingUpdate>.MoveNextAsync()
104106
return false;
105107
}
106108

107-
private async Task<IAsyncEnumerator<ServerSentEvent>> CreateEventEnumeratorAsync()
109+
private async Task<IAsyncEnumerator<SseItem<byte[]>>> CreateEventEnumeratorAsync()
108110
{
109111
ClientResult result = await _getResultAsync().ConfigureAwait(false);
110112
PipelineResponse response = result.GetRawResponse();
@@ -115,7 +117,7 @@ private async Task<IAsyncEnumerator<ServerSentEvent>> CreateEventEnumeratorAsync
115117
throw new InvalidOperationException("Unable to create result from response with null ContentStream");
116118
}
117119

118-
AsyncServerSentEventEnumerable enumerable = new(response.ContentStream);
120+
IAsyncEnumerable<SseItem<byte[]>> enumerable = SseParser.Create(response.ContentStream, (_, bytes) => bytes.ToArray()).EnumerateAsync();
119121
return enumerable.GetAsyncEnumerator(_cancellationToken);
120122
}
121123

src/Custom/Assistants/Streaming/StreamingUpdate.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Generic;
2+
using System.Net.ServerSentEvents;
23
using System.Text.Json;
34

45
namespace OpenAI.Assistants;
@@ -38,7 +39,7 @@ internal StreamingUpdate(StreamingUpdateReason updateKind)
3839
UpdateKind = updateKind;
3940
}
4041

41-
internal static IEnumerable<StreamingUpdate> FromEvent(ServerSentEvent sseItem)
42+
internal static IEnumerable<StreamingUpdate> FromEvent(SseItem<byte[]> sseItem)
4243
{
4344
StreamingUpdateReason updateKind = StreamingUpdateReasonExtensions.FromSseEventLabel(sseItem.EventType);
4445
using JsonDocument dataDocument = JsonDocument.Parse(sseItem.Data);

src/Custom/Assistants/Streaming/StreamingUpdateCollection.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Collections;
55
using System.Collections.Generic;
66
using System.Diagnostics;
7+
using System.Net.ServerSentEvents;
78

89
#nullable enable
910

@@ -30,7 +31,7 @@ public override IEnumerator<StreamingUpdate> GetEnumerator()
3031

3132
private sealed class StreamingUpdateEnumerator : IEnumerator<StreamingUpdate>
3233
{
33-
private const string _terminalData = "[DONE]";
34+
private static ReadOnlySpan<byte> TerminalData => "[DONE]"u8;
3435

3536
private readonly Func<ClientResult> _getResult;
3637
private readonly StreamingUpdateCollection _enumerable;
@@ -42,7 +43,7 @@ private sealed class StreamingUpdateEnumerator : IEnumerator<StreamingUpdate>
4243
// // get _updates from sse event
4344
// foreach (var update in _updates) { ... }
4445
// }
45-
private IEnumerator<ServerSentEvent>? _events;
46+
private IEnumerator<SseItem<byte[]>>? _events;
4647
private IEnumerator<StreamingUpdate>? _updates;
4748

4849
private StreamingUpdate? _current;
@@ -81,7 +82,7 @@ public bool MoveNext()
8182

8283
if (_events.MoveNext())
8384
{
84-
if (_events.Current.Data == _terminalData)
85+
if (_events.Current.Data.AsSpan().SequenceEqual(TerminalData))
8586
{
8687
_current = default;
8788
return false;
@@ -101,7 +102,7 @@ public bool MoveNext()
101102
return false;
102103
}
103104

104-
private IEnumerator<ServerSentEvent> CreateEventEnumerator()
105+
private IEnumerator<SseItem<byte[]>> CreateEventEnumerator()
105106
{
106107
ClientResult result = _getResult();
107108
PipelineResponse response = result.GetRawResponse();
@@ -112,7 +113,7 @@ private IEnumerator<ServerSentEvent> CreateEventEnumerator()
112113
throw new InvalidOperationException("Unable to create result from response with null ContentStream");
113114
}
114115

115-
ServerSentEventEnumerable enumerable = new(response.ContentStream);
116+
IEnumerable<SseItem<byte[]>> enumerable = SseParser.Create(response.ContentStream, (_, bytes) => bytes.ToArray()).Enumerate();
116117
return enumerable.GetEnumerator();
117118
}
118119

src/Custom/Chat/Internal/AsyncStreamingChatCompletionUpdateCollection.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.ClientModel.Primitives;
44
using System.Collections.Generic;
55
using System.Diagnostics;
6+
using System.Net.ServerSentEvents;
67
using System.Text.Json;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -32,7 +33,7 @@ public override IAsyncEnumerator<StreamingChatCompletionUpdate> GetAsyncEnumerat
3233

3334
private sealed class AsyncStreamingChatUpdateEnumerator : IAsyncEnumerator<StreamingChatCompletionUpdate>
3435
{
35-
private const string _terminalData = "[DONE]";
36+
private static ReadOnlySpan<byte> TerminalData => "[DONE]"u8;
3637

3738
private readonly Func<Task<ClientResult>> _getResultAsync;
3839
private readonly AsyncStreamingChatCompletionUpdateCollection _enumerable;
@@ -45,7 +46,7 @@ private sealed class AsyncStreamingChatUpdateEnumerator : IAsyncEnumerator<Strea
4546
// // get _updates from sse event
4647
// foreach (var update in _updates) { ... }
4748
// }
48-
private IAsyncEnumerator<ServerSentEvent>? _events;
49+
private IAsyncEnumerator<SseItem<byte[]>>? _events;
4950
private IEnumerator<StreamingChatCompletionUpdate>? _updates;
5051

5152
private StreamingChatCompletionUpdate? _current;
@@ -85,7 +86,7 @@ async ValueTask<bool> IAsyncEnumerator<StreamingChatCompletionUpdate>.MoveNextAs
8586

8687
if (await _events.MoveNextAsync().ConfigureAwait(false))
8788
{
88-
if (_events.Current.Data == _terminalData)
89+
if (_events.Current.Data.AsSpan().SequenceEqual(TerminalData))
8990
{
9091
_current = default;
9192
return false;
@@ -106,7 +107,7 @@ async ValueTask<bool> IAsyncEnumerator<StreamingChatCompletionUpdate>.MoveNextAs
106107
return false;
107108
}
108109

109-
private async Task<IAsyncEnumerator<ServerSentEvent>> CreateEventEnumeratorAsync()
110+
private async Task<IAsyncEnumerator<SseItem<byte[]>>> CreateEventEnumeratorAsync()
110111
{
111112
ClientResult result = await _getResultAsync().ConfigureAwait(false);
112113
PipelineResponse response = result.GetRawResponse();
@@ -117,7 +118,7 @@ private async Task<IAsyncEnumerator<ServerSentEvent>> CreateEventEnumeratorAsync
117118
throw new InvalidOperationException("Unable to create result from response with null ContentStream");
118119
}
119120

120-
AsyncServerSentEventEnumerable enumerable = new(response.ContentStream);
121+
IAsyncEnumerable<SseItem<byte[]>> enumerable = SseParser.Create(response.ContentStream, (_, bytes) => bytes.ToArray()).EnumerateAsync();
121122
return enumerable.GetAsyncEnumerator(_cancellationToken);
122123
}
123124

src/Custom/Chat/Internal/StreamingChatCompletionUpdateCollection.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Collections;
55
using System.Collections.Generic;
66
using System.Diagnostics;
7+
using System.Net.ServerSentEvents;
78
using System.Text.Json;
89

910
#nullable enable
@@ -31,7 +32,7 @@ public override IEnumerator<StreamingChatCompletionUpdate> GetEnumerator()
3132

3233
private sealed class StreamingChatUpdateEnumerator : IEnumerator<StreamingChatCompletionUpdate>
3334
{
34-
private const string _terminalData = "[DONE]";
35+
private static ReadOnlySpan<byte> TerminalData => "[DONE]"u8;
3536

3637
private readonly Func<ClientResult> _getResult;
3738
private readonly StreamingChatCompletionUpdateCollection _enumerable;
@@ -43,7 +44,7 @@ private sealed class StreamingChatUpdateEnumerator : IEnumerator<StreamingChatCo
4344
// // get _updates from sse event
4445
// foreach (var update in _updates) { ... }
4546
// }
46-
private IEnumerator<ServerSentEvent>? _events;
47+
private IEnumerator<SseItem<byte[]>>? _events;
4748
private IEnumerator<StreamingChatCompletionUpdate>? _updates;
4849

4950
private StreamingChatCompletionUpdate? _current;
@@ -82,7 +83,7 @@ public bool MoveNext()
8283

8384
if (_events.MoveNext())
8485
{
85-
if (_events.Current.Data == _terminalData)
86+
if (_events.Current.Data.AsSpan().SequenceEqual(TerminalData))
8687
{
8788
_current = default;
8889
return false;
@@ -103,7 +104,7 @@ public bool MoveNext()
103104
return false;
104105
}
105106

106-
private IEnumerator<ServerSentEvent> CreateEventEnumerator()
107+
private IEnumerator<SseItem<byte[]>> CreateEventEnumerator()
107108
{
108109
ClientResult result = _getResult();
109110
PipelineResponse response = result.GetRawResponse();
@@ -114,7 +115,7 @@ private IEnumerator<ServerSentEvent> CreateEventEnumerator()
114115
throw new InvalidOperationException("Unable to create result from response with null ContentStream");
115116
}
116117

117-
ServerSentEventEnumerable enumerable = new(response.ContentStream);
118+
IEnumerable<SseItem<byte[]>> enumerable = SseParser.Create(response.ContentStream, (_, bytes) => bytes.ToArray()).Enumerate();
118119
return enumerable.GetEnumerator();
119120
}
120121

src/OpenAI.csproj

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
<NoWarn>$(NoWarn),0169</NoWarn>
4242
</PropertyGroup>
4343

44+
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
45+
<!-- Allow use of unsafe code, for System.Net.ServerSentEvents polyfill on netstandard2.0
46+
TODO https://github.com/openai/openai-dotnet/issues/41: Remove once polyfill for
47+
System.Net.ServerSentEvents is removed in favor of referencing the package -->
48+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
49+
</PropertyGroup>
50+
4451
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
4552
<!-- Normalize stored file paths in symbols when in a CI build. -->
4653
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>

src/Utility/AsyncServerSentEventEnumerable.cs

Lines changed: 0 additions & 82 deletions
This file was deleted.

src/Utility/ServerSentEvent.cs

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)