diff --git a/openai/init.moon b/openai/init.moon index c844ac3..89b6198 100644 --- a/openai/init.moon +++ b/openai/init.moon @@ -97,27 +97,6 @@ parse_completion_chunk = types.partial { } } --- lpeg pattern to read a json data block from the front of a string, returns --- the json blob and the rest of the string if it could parse one -consume_json_head = do - import C, S, P from require "lpeg" - - -- this pattern reads from the front just enough characters to consume a - -- valid json object - consume_json = P (str, pos) -> - str_len = #str - for k=pos+1,str_len - candidate = str\sub pos, k - parsed = false - pcall -> parsed = cjson.decode candidate - if parsed - return k + 1 - - return nil -- fail - - S("\t\n\r ")^0 * P("data: ") * C(consume_json) * C(P(1)^0) - - parse_error_message = types.partial { error: types.partial { message: types.string\tag "message" @@ -240,21 +219,36 @@ class OpenAI assert types.function(chunk_callback), "Must provide chunk_callback function when streaming response" accumulation_buffer = "" + streamed = false (...) -> chunk = ... - if type(chunk) == "string" + if chunk == nil + assert not streamed or accumulation_buffer\match"^%s*$", "buffer not empty" + elseif type(chunk) == "string" accumulation_buffer ..= chunk while true - json_blob, rest = consume_json_head\match accumulation_buffer - unless json_blob + event, rest = accumulation_buffer\match "^(.-)\r?\n\r?\n(.-)$" + unless event break accumulation_buffer = rest - if chunk = parse_completion_chunk cjson.decode json_blob - chunk_callback chunk + while event and #event>0 + field, value, rest_evt = event\match "^(.-):%s+([^\r\n]+)(.-)$" + switch field + when "data" + streamed = true + unless value=="[DONE]" + chunk_callback (cjson.decode value) + when "event","id","retry","" --comment + nil -- noop + when nil + error "Cannot parse SSE event: "..event + else + error ('Unknown field "%s" with value "%s"')\format(field, value) + event = #rest_evt>0 and rest_evt\match"^\r?\n(.*)" ...