-
Notifications
You must be signed in to change notification settings - Fork 114
fix: buffer gzip data for anthropic messages #1247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Suren Vartanian <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1247 +/- ##
=======================================
Coverage 79.21% 79.21%
=======================================
Files 97 97
Lines 11125 11178 +53
=======================================
+ Hits 8813 8855 +42
- Misses 1919 1930 +11
Partials 393 393 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
internal/extproc/util.go
Outdated
// Try to decompress the accumulated buffer | ||
if len(*gzipBuffer) > 0 { | ||
gzipReader, err := gzip.NewReader(bytes.NewReader(*gzipBuffer)) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to check the specific error like "unexpected EOF" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, I'm not really that good with gzip errors so not sure what are other error and whether we can buffer or cannot buffer them
Signed-off-by: Suren Vartanian <[email protected]>
@mathetake do you remember why we do not use the decompression and compression filter? |
// ResponseBody implements [AnthropicMessagesTranslator.ResponseBody] for Anthropic to GCP Anthropic. | ||
// This is essentially a passthrough since both use the same Anthropic response format. | ||
func (a *anthropicToGCPAnthropicTranslator) ResponseBody(_ map[string]string, body io.Reader, endOfStream bool) ( | ||
func (a *anthropicToGCPAnthropicTranslator) ResponseBody(_ map[string]string, body io.Reader, isStreaming bool) ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why renaming this? endOfStream is different from isStreaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, but I indeed mean isStreaming
rather than endOfStream
basically we have 2 cases:
- translation for streaming
- translation for regular request
This boolean basically tells which one to use.
Once you raised this question let me elaborate. Previously we would do translation while it's streaming, now because we accumulate we don't care about endOfStream
. Let me know if that makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave a comment so other maintainers are aware of the difference?
lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Mainly I am concerned about the diff of the impl with this and others in chat/completions. At the end of the day, the exact same logic can be applicable, so I would like the same logic appear here as well.
Especially, special-casing gzip and buffering in the processor side feels a bit confusing just to solve the underlying the issue inside translate/anthropic_gcpanthropic.go compared to the other chat/completion streaming stuff.
Let me push some fix to make this in line with others
edited: I am getting to understand the underlying problem more. I think I need more time to think about this one..
Sorry a couple of things I need to know
|
had a quick offline chat with Dan and getting the context more. I think the problem here is bigger than anthropic specific and i think we should be able to handle this at the fundamental level. Let me dig into this one more tomorrow |
my comment in the chat with Dan
|
Let’s be clear having buffering and gzip handling in the current code doesn’t look right to me either, if compressions/decompressions work I’d go that way but we already have some gzip logic , I don’t want to mess up with other processors, additionally there is a time pressure to get this done , several teams are now blocked by this |
Anthropic doesn’t send all events in one response it’s done one be one, but you never know when you going to have complete chunk to decompress, therefore we buffer. You can skip waiting till the end of stream, but because I didn’t what to accidentally double parse usage in intermediate chunks and when chunks are over I do in the end of stream. The other way would be once chunks complete ( can be decompressed ) you can translate etc |
Synced with @mathetake offline, will get this fix in first and then explore a proper long term fix using the decompression filter. @VarSuren can you help create an issue to track that? |
Sure , do we want internal ticket or open source ? |
issues here https://github.com/envoyproxy/ai-gateway/issues |
Description
Request with gzip were failing because of EOF ( trying to read in complete chunks )
Example:
Failing request prior to the change where gcp-claude.json any Claude request.
gcp-claude.json:
curl -X POST
-H "Authorization: Bearer $TOKEN"
-H "Content-Type: application/json" -H "accept-encoding: br, gzip, deflate"
-d @gcp-claude.json
"http://localhost:8080/anthropic/v1/messages" -v --compressed