From a01644c0ae5f9340807aff97d489faa98aed8a37 Mon Sep 17 00:00:00 2001 From: "Han Verstraete (OpenFaaS Ltd)" Date: Thu, 19 Jun 2025 16:51:43 +0200 Subject: [PATCH] Support ndjson streams for function invocations Additional fixes: - Support Accept headers with multiple Accept values - Accept header values should be mathced case insensitive. As stated in RFC 7321: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1 Media Types are case-insensitive Signed-off-by: Han Verstraete (OpenFaaS Ltd) --- proxy/proxy.go | 14 ++++++-- proxy/proxy_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/proxy/proxy.go b/proxy/proxy.go index 2a7ab1e..284a9d2 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -28,6 +28,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "strings" "time" "github.com/gorilla/mux" @@ -189,8 +190,7 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient }() } - if v := originalReq.Header.Get("Accept"); v == "text/event-stream" || - originalReq.Header.Get("Upgrade") == "websocket" { + if requiresStdlibProxy(originalReq) { originalReq.URL = proxyReq.URL reverseProxy.ServeHTTP(w, originalReq) @@ -222,6 +222,16 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient } } +// requiresStdlibProxy checks if the request should be proxied using the standard library reverse proxy. +// Support SSE, NDSJON and WebSockets through the stdlib reverse proxy +func requiresStdlibProxy(req *http.Request) bool { + acceptHeader := strings.ToLower(req.Header.Get("Accept")) + + return strings.Contains(acceptHeader, "text/event-stream") || + strings.Contains(acceptHeader, "application/x-ndjson") || + req.Header.Get("Upgrade") == "websocket" +} + // buildProxyRequest creates a request object for the proxy request, it will ensure that // the original request headers are preserved as well as setting openfaas system headers func buildProxyRequest(originalReq *http.Request, baseURL url.URL, extraPath string) (*http.Request, error) { diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index d9a57d3..d14c2ab 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -528,3 +528,83 @@ func Test_NewProxyClientConfig(t *testing.T) { }) } } + +func Test_requiresStdlibProxy(t *testing.T) { + testCases := []struct { + name string + headers map[string]string + want bool + }{ + { + name: "SSE request", + headers: map[string]string{"Accept": "text/event-stream"}, + want: true, + }, + { + name: "SSE request with multiple accept values", + headers: map[string]string{"Accept": "application/json, text/event-stream;q=0.9, text/plain"}, + want: true, + }, + { + name: "NDJSON request", + headers: map[string]string{"Accept": "application/x-ndjson"}, + want: true, + }, + { + name: "NDJSON request with multiple accept values", + headers: map[string]string{"Accept": "text/plain, application/x-ndjson;q=0.9, application/json;q=0.8"}, + want: true, + }, + { + name: "WebSocket request", + headers: map[string]string{"Upgrade": "websocket"}, + want: true, + }, + { + name: "Regular JSON request", + headers: map[string]string{"Accept": "application/json"}, + want: false, + }, + { + name: "Regular request with multiple values", + headers: map[string]string{"Accept": "text/plain, application/json;q=0.9"}, + want: false, + }, + { + name: "Request without headers", + headers: map[string]string{}, + want: false, + }, + { + name: "Request with non-websocket Upgrade header", + headers: map[string]string{"Accept": "application/json", "Upgrade": "h2c"}, + want: false, + }, + + { + name: "Case insensitive headers", + headers: map[string]string{"Accept": "APPLICATION/X-NDJSON"}, + want: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest("GET", "/function/test", nil) + if err != nil { + t.Fatal(err) + } + + // Set headers from test case + for key, value := range tc.headers { + req.Header.Set(key, value) + } + + got := requiresStdlibProxy(req) + + if got != tc.want { + t.Errorf("Want %t, got %t", tc.want, got) + } + }) + } +}