Skip to content

Simplify SSE parsing #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions openai/init.moon
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,6 @@ parse_completion_chunk = types.partial {
}
}

-- lpeg pattern to read a json data block from the front of a string, returns
-- the json blob and the rest of the string if it could parse one
consume_json_head = do
import C, S, P from require "lpeg"

-- this pattern reads from the front just enough characters to consume a
-- valid json object
consume_json = P (str, pos) ->
str_len = #str
for k=pos+1,str_len
candidate = str\sub pos, k
parsed = false
pcall -> parsed = cjson.decode candidate
if parsed
return k + 1

return nil -- fail

S("\t\n\r ")^0 * P("data: ") * C(consume_json) * C(P(1)^0)


parse_error_message = types.partial {
error: types.partial {
message: types.string\tag "message"
Expand Down Expand Up @@ -240,21 +219,36 @@ class OpenAI
assert types.function(chunk_callback), "Must provide chunk_callback function when streaming response"

accumulation_buffer = ""
streamed = false

(...) ->
chunk = ...

if type(chunk) == "string"
if chunk == nil
assert not streamed or accumulation_buffer\match"^%s*$", "buffer not empty"
elseif type(chunk) == "string"
accumulation_buffer ..= chunk

while true
json_blob, rest = consume_json_head\match accumulation_buffer
unless json_blob
event, rest = accumulation_buffer\match "^(.-)\r?\n\r?\n(.-)$"
unless event
break

accumulation_buffer = rest
if chunk = parse_completion_chunk cjson.decode json_blob
chunk_callback chunk
while event and #event>0
field, value, rest_evt = event\match "^(.-):%s+([^\r\n]+)(.-)$"
switch field
when "data"
streamed = true
unless value=="[DONE]"
chunk_callback (cjson.decode value)
when "event","id","retry","" --comment
nil -- noop
when nil
error "Cannot parse SSE event: "..event
else
error ('Unknown field "%s" with value "%s"')\format(field, value)
event = #rest_evt>0 and rest_evt\match"^\r?\n(.*)"

...

Expand Down
Loading