From cb43661c7fa338da6a4db9d08a941a68a3c95b6d Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 21:50:42 -0400 Subject: [PATCH 1/6] feat: cleanly implement HEALTHCHECK for aigw docker Signed-off-by: Adrian Cole --- cmd/aigw/healthcheck.go | 52 ++++++----------- cmd/aigw/healthcheck_test.go | 106 ++++++++++++++++------------------- cmd/aigw/main.go | 8 +-- cmd/aigw/run.go | 32 ++++++----- cmd/aigw/run_test.go | 10 ++-- internal/aigw/admin.go | 76 +++++++++---------------- internal/aigw/admin_test.go | 73 ++++-------------------- tests/e2e-aigw/aigw_test.go | 4 +- 8 files changed, 135 insertions(+), 226 deletions(-) diff --git a/cmd/aigw/healthcheck.go b/cmd/aigw/healthcheck.go index 673c31f9df..4d4c6b44ae 100644 --- a/cmd/aigw/healthcheck.go +++ b/cmd/aigw/healthcheck.go @@ -7,44 +7,28 @@ package main import ( "context" - "fmt" "io" - "net/http" - "time" -) - -// healthcheck performs an HTTP GET request to the admin server health endpoint. -// This is used by Docker HEALTHCHECK to verify the aigw admin server is responsive. -// It exits with code 0 on success (healthy) or 1 on failure (unhealthy). -func healthcheck(ctx context.Context, port int, stdout, _ io.Writer) error { - url := fmt.Sprintf("http://localhost:%d/health", port) - - client := &http.Client{ - Timeout: 5 * time.Second, - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } + "log/slog" - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to connect to admin server") - } - defer resp.Body.Close() + "github.com/envoyproxy/ai-gateway/internal/aigw" +) - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("unhealthy: status %d, body: %s", resp.StatusCode, string(body)) - } +// healthcheck performs looks up the Envoy subprocess, gets its admin port, +// and returns no error when ready. +func healthcheck(ctx context.Context, _, stderr io.Writer) error { + logger := slog.New(slog.NewTextHandler(stderr, &slog.HandlerOptions{})) + // In docker, pid 1 is the aigw process + return doHealthcheck(ctx, 1, logger) +} - // Optionally read and print the response for debugging - body, err := io.ReadAll(resp.Body) +func doHealthcheck(ctx context.Context, aigwPid int, logger *slog.Logger) error { + envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, aigwPid, 0) if err != nil { - return fmt.Errorf("failed to read response: %w", err) + logger.Error("Failed to find Envoy admin server", "error", err) + return err + } else if err = envoyAdmin.IsReady(ctx); err != nil { + logger.Error("Envoy admin server is not ready", "adminPort", envoyAdmin.Port(), "error", err) + return err } - - _, _ = fmt.Fprintf(stdout, "%s", body) - return nil + return err } diff --git a/cmd/aigw/healthcheck_test.go b/cmd/aigw/healthcheck_test.go index 3d4689cc7d..917c854b93 100644 --- a/cmd/aigw/healthcheck_test.go +++ b/cmd/aigw/healthcheck_test.go @@ -7,75 +7,67 @@ package main import ( "bytes" + "context" + "fmt" + "log/slog" "net/http" "net/http/httptest" "net/url" + "os" + "os/exec" + "path/filepath" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) func Test_healthcheck(t *testing.T) { - tests := []struct { - name string - closeServer bool - statusCode int - respBody string - expOut string - expErr string - }{ - { - name: "success", - statusCode: http.StatusOK, - respBody: "OK", - expOut: "OK", - }, - { - name: "unhealthy status", - statusCode: http.StatusServiceUnavailable, - respBody: "not ready", - expErr: "unhealthy: status 503, body: not ready", - }, - { - name: "internal error", - statusCode: http.StatusInternalServerError, - respBody: "server error", - expErr: "unhealthy: status 500, body: server error", - }, - { - name: "connection failure", - closeServer: true, - expErr: "failed to connect to admin server", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(tt.statusCode) - _, _ = w.Write([]byte(tt.respBody)) - })) - t.Cleanup(s.Close) + pid := os.Getpid() - u, err := url.Parse(s.URL) - require.NoError(t, err) - port, err := strconv.Atoi(u.Port()) - require.NoError(t, err) + t.Run("returns error when no envoy subprocess", func(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, nil)) + err := doHealthcheck(t.Context(), pid, logger) + require.EqualError(t, err, "failed to get aigw process: process does not exist") + // Contains not Equal because there's a timestamp + require.Contains(t, buf.String(), "Failed to find Envoy admin server") + }) - if tt.closeServer { - s.Close() - } + t.Run("returns nil when ready", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/ready", r.URL.Path) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("live")) + })) + defer server.Close() - stdout := &bytes.Buffer{} - err = healthcheck(t.Context(), port, stdout, nil) + u, err := url.Parse(server.URL) + require.NoError(t, err) + port, err := strconv.Atoi(u.Port()) + require.NoError(t, err) - if tt.expErr != "" { - require.Equal(t, tt.expErr, err.Error()) - require.Empty(t, stdout.String()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expOut, stdout.String()) - } - }) - } + adminFile := filepath.Join(t.TempDir(), "admin-address.txt") + require.NoError(t, os.WriteFile(adminFile, []byte(fmt.Sprintf("127.0.0.1:%d", port)), 0o600)) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile) + cmd := exec.CommandContext(ctx, "sh", "-c", cmdStr) + require.NoError(t, cmd.Start()) + defer func() { + _ = cmd.Process.Kill() + _, _ = cmd.Process.Wait() + }() + + time.Sleep(100 * time.Millisecond) + + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, nil)) + err = doHealthcheck(t.Context(), pid, logger) + require.NoError(t, err) + require.Empty(t, buf) + }) } diff --git a/cmd/aigw/main.go b/cmd/aigw/main.go index f5986158f6..3d1413ff10 100644 --- a/cmd/aigw/main.go +++ b/cmd/aigw/main.go @@ -41,9 +41,7 @@ type ( mcpConfig *autoconfig.MCPServers `kong:"-"` // Internal field: normalized MCP JSON data } // cmdHealthcheck corresponds to `aigw healthcheck` command. - cmdHealthcheck struct { - AdminPort int `help:"HTTP port for the admin server (serves /metrics and /health endpoints)." default:"1064"` - } + cmdHealthcheck struct{} ) // Validate is called by Kong after parsing to validate the cmdRun arguments. @@ -78,7 +76,7 @@ func (c *cmdRun) Validate() error { type ( runFn func(context.Context, cmdRun, runOpts, io.Writer, io.Writer) error - healthcheckFn func(context.Context, int, io.Writer, io.Writer) error + healthcheckFn func(context.Context, io.Writer, io.Writer) error ) func main() { @@ -117,7 +115,7 @@ func doMain(ctx context.Context, stdout, stderr io.Writer, args []string, exitFn log.Fatalf("Error running: %v", err) } case "healthcheck": - err = hf(ctx, c.Healthcheck.AdminPort, stdout, stderr) + err = hf(ctx, stdout, stderr) if err != nil { log.Fatalf("Health check failed: %v", err) } diff --git a/cmd/aigw/run.go b/cmd/aigw/run.go index 333c1bef5d..6cd9fc54f3 100644 --- a/cmd/aigw/run.go +++ b/cmd/aigw/run.go @@ -152,7 +152,7 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err if err != nil { return err } - fakeClient, extProxDone, envoyAdminPort, envoyPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml) + fakeClient, extProxDone, envoyAdminPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml) if err != nil { return fmt.Errorf("failed to write envoy resources and run extproc: %w", err) } @@ -208,7 +208,13 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err // Start a monitoring goroutine to poll Envoy's readiness. This starts // before the server to ensure we don't miss the readiness window. go func() { - envoyAdmin := aigw.NewEnvoyAdminClient(ctx, stderrLogger, os.Getpid(), envoyAdminPort, envoyPort) + envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), envoyAdminPort) + if err != nil { + stderrLogger.Error("Failed to find Envoy admin server", "error", err) + serverCancel() // Likely a crashed envoy process + return + } + stderrLogger.Info("Found Envoy admin server", "adminPort", envoyAdmin.Port()) pollEnvoyReady(ctx, stderrLogger, envoyAdmin, 2*time.Second) }() @@ -254,33 +260,33 @@ func recreateDir(path string) error { // writeEnvoyResourcesAndRunExtProc reads all resources from the given string, writes them to the output file, and runs // external processes for EnvoyExtensionPolicy resources. -func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, int, error) { +func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, error) { aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, envoyProxies, err := collectObjects(original, runCtx.envoyGatewayResourcesOut, runCtx.stderrLogger) if err != nil { - return nil, nil, 0, 0, fmt.Errorf("error collecting: %w", err) + return nil, nil, 0, fmt.Errorf("error collecting: %w", err) } if len(gateways) > 1 { - return nil, nil, 0, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name) + return nil, nil, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name) } for _, bsp := range backendSecurityPolicies { spec := bsp.Spec if spec.AWSCredentials != nil && spec.AWSCredentials.OIDCExchangeToken != nil { // TODO: We can make it work by generalizing the rotation logic. - return nil, nil, 0, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name) + return nil, nil, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name) } } // Do the substitution for the secrets. for _, s := range secrets { if err = runCtx.rewriteSecretWithAnnotatedLocation(s); err != nil { - return nil, nil, 0, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err) + return nil, nil, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err) } } var secretList *corev1.SecretList fakeClient, _fakeClientSet, httpRoutes, eps, httpRouteFilters, backends, secretList, backendTrafficPolicies, securityPolicies, err := translateCustomResourceObjects(ctx, aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, runCtx.stderrLogger) if err != nil { - return nil, nil, 0, 0, fmt.Errorf("error translating: %w", err) + return nil, nil, 0, fmt.Errorf("error translating: %w", err) } runCtx.fakeClientSet = _fakeClientSet @@ -304,7 +310,7 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex } gw := gateways[0] if len(gw.Spec.Listeners) == 0 { - return nil, nil, 0, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name) + return nil, nil, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name) } runCtx.mustClearSetOwnerReferencesAndStatusAndWriteObj(&gw.TypeMeta, gw) for _, ep := range eps.Items { @@ -315,20 +321,20 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex Secrets("").Get(ctx, controller.FilterConfigSecretPerGatewayName(gw.Name, gw.Namespace), metav1.GetOptions{}) if err != nil { - return nil, nil, 0, 0, fmt.Errorf("failed to get filter config secret: %w", err) + return nil, nil, 0, fmt.Errorf("failed to get filter config secret: %w", err) } rawConfig, ok := filterConfigSecret.StringData[controller.FilterConfigKeyInSecret] if !ok { - return nil, nil, 0, 0, fmt.Errorf("failed to get filter config from secret: %w", err) + return nil, nil, 0, fmt.Errorf("failed to get filter config from secret: %w", err) } var fc filterapi.Config if err = yaml.Unmarshal([]byte(rawConfig), &fc); err != nil { - return nil, nil, 0, 0, fmt.Errorf("failed to unmarshal filter config: %w", err) + return nil, nil, 0, fmt.Errorf("failed to unmarshal filter config: %w", err) } runCtx.stderrLogger.Info("Running external process", "config", fc) done := runCtx.mustStartExtProc(ctx, &fc) - return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), runCtx.tryFindEnvoyListenerPort(gw), nil + return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), nil } // mustStartExtProc starts the external process with the given working directory, port, and filter configuration. diff --git a/cmd/aigw/run_test.go b/cmd/aigw/run_test.go index b5adb8696f..23f0394d8f 100644 --- a/cmd/aigw/run_test.go +++ b/cmd/aigw/run_test.go @@ -226,7 +226,7 @@ func TestRunCmdContext_writeEnvoyResourcesAndRunExtProc(t *testing.T) { } config := readFileFromProjectRoot(t, "examples/aigw/ollama.yaml") ctx, cancel := context.WithCancel(t.Context()) - _, done, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, config) + _, done, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, config) require.NoError(t, err) time.Sleep(time.Second) cancel() @@ -251,7 +251,7 @@ func TestRunCmdContext_writeEnvoyResourcesAndRunExtProc_noListeners(t *testing.T adminPort: adminPort, } - _, _, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(t.Context(), gatewayNoListenersConfig) + _, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(t.Context(), gatewayNoListenersConfig) require.EqualError(t, err, "gateway aigw-run has no listeners configured") } @@ -449,7 +449,8 @@ func TestPollEnvoyReady(t *testing.T) { t.Run("ready", func(t *testing.T) { t.Cleanup(func() { callCount = 0 }) - envoyAdmin := aigw.NewEnvoyAdminClientFromPort(adminPort) + envoyAdmin, err := aigw.NewEnvoyAdminClient(t.Context(), os.Getpid(), adminPort) + require.NoError(t, err) pollEnvoyReady(t.Context(), l, envoyAdmin, 50*time.Millisecond) require.Equal(t, successAt, callCount) }) @@ -458,7 +459,8 @@ func TestPollEnvoyReady(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) t.Cleanup(cancel) t.Cleanup(func() { callCount = 0 }) - envoyAdmin := aigw.NewEnvoyAdminClientFromPort(adminPort) + envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), adminPort) + require.NoError(t, err) pollEnvoyReady(ctx, l, envoyAdmin, 50*time.Millisecond) require.Less(t, callCount, successAt) }) diff --git a/internal/aigw/admin.go b/internal/aigw/admin.go index a11ffdce94..e290b75da7 100644 --- a/internal/aigw/admin.go +++ b/internal/aigw/admin.go @@ -10,8 +10,6 @@ import ( "errors" "fmt" "io" - "log/slog" - "net" "net/http" "net/url" "os" @@ -26,58 +24,54 @@ const adminAddressPathFlag = `--admin-address-path` // EnvoyAdminClient provides methods to check if Envoy is ready. type EnvoyAdminClient interface { + Port() int // IsReady returns true if Envoy is ready to accept requests. // This method has a 1-second timeout for the readiness check. IsReady(ctx context.Context) error } -func NewEnvoyAdminClientFromPort(envoyAdminPort int) EnvoyAdminClient { - return &envoyAdminAPIClient{adminPort: envoyAdminPort} -} - // NewEnvoyAdminClient creates an EnvoyAdminClient based on the provided parameters. // If envoyAdminPort > 0, it creates an admin API client using 127.0.0.1:{envoyAdminPort}. -// If envoyAdminPort == 0, it attempts to discover the admin adminPort from the Envoy subprocess. -// On discovery failure, it logs a warning and returns a fallback client that checks the listener adminPort. -// The envoyParentPid parameter specifies which process to check for Envoy child processes. -func NewEnvoyAdminClient(ctx context.Context, logger *slog.Logger, envoyParentPid int, envoyAdminPort int, envoyListenerPort int) EnvoyAdminClient { +// If envoyAdminPort == 0, it attempts to discover the admin port from the Envoy subprocess. +// The aigwPid parameter specifies which process to check for Envoy child processes. +func NewEnvoyAdminClient(ctx context.Context, aigwPid int, envoyAdminPort int) (EnvoyAdminClient, error) { if envoyAdminPort > 0 { - logger.Info("Using configured Envoy admin adminPort", "adminPort", envoyAdminPort) - return &envoyAdminAPIClient{adminPort: envoyAdminPort} + return &envoyAdminAPIClient{port: envoyAdminPort}, nil } - // Poll for the run dir and admin adminPort with a shared timeout + // Poll for the run dir and admin port with a shared timeout ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - // Discover the envoy subprocess and extract the admin address path - envoyAdminAddressPath, err := pollEnvoyAdminAddressPathFromArgs(ctx, int32(envoyParentPid)) // #nosec G115 -- PID fits in int32 + // Discover the Envoy subprocess and extract the admin address path + envoyAdminAddressPath, err := pollEnvoyAdminAddressPathFromArgs(ctx, int32(aigwPid)) // #nosec G115 -- PID fits in int32 if err != nil { - logger.Warn("Falling back to Envoy listener adminPort check", "err", err) - return &envoyListenerPortClient{port: envoyListenerPort} + return nil, err } - // Attempt to discover the admin adminPort from the admin address path + // Attempt to discover the admin port from the admin address path envoyAdminPort, err = pollPortFromEnvoyAddressPath(ctx, envoyAdminAddressPath) if err != nil { - logger.Warn("Falling back to Envoy listener adminPort check", "err", err) - return &envoyListenerPortClient{port: envoyListenerPort} + return nil, err } - logger.Info("Discovered Envoy admin adminPort", "envoyAdminPort", envoyAdminPort) - return &envoyAdminAPIClient{adminPort: envoyAdminPort} + return &envoyAdminAPIClient{port: envoyAdminPort}, nil } // envoyAdminAPIClient checks Envoy readiness via the admin API /ready endpoint. type envoyAdminAPIClient struct { - adminPort int + port int +} + +func (c *envoyAdminAPIClient) Port() int { + return c.port } func (c *envoyAdminAPIClient) IsReady(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/ready", c.adminPort), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/ready", c.port), nil) if err != nil { return err } @@ -103,24 +97,6 @@ func (c *envoyAdminAPIClient) IsReady(ctx context.Context) error { return nil } -// envoyListenerPortClient checks Envoy readiness by attempting to connect to the listener port. -type envoyListenerPortClient struct { - port int -} - -func (c *envoyListenerPortClient) IsReady(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - - dialer := net.Dialer{} - conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("127.0.0.1:%d", c.port)) - if err != nil { - return err - } - _ = conn.Close() - return nil -} - // extractAdminAddressPath parses the adminAddressPathFlag flag from command // line arguments. It validates that the path is actually a file. func extractAdminAddressPath(cmdline []string) (string, error) { @@ -150,7 +126,7 @@ func extractAdminAddressPath(cmdline []string) (string, error) { func pollEnvoyAdminAddressPathFromArgs(ctx context.Context, currentPID int32) (string, error) { currentProc, err := process.NewProcessWithContext(ctx, currentPID) if err != nil { - return "", fmt.Errorf("failed to get parent process: %w", err) + return "", fmt.Errorf("failed to get aigw process: %w", err) } ticker := time.NewTicker(50 * time.Millisecond) @@ -162,7 +138,7 @@ LOOP: for { select { case <-ctx.Done(): - return "", fmt.Errorf("timeout waiting for child processes: %w", lastErr) + return "", fmt.Errorf("timeout waiting for Envoy process: %w", lastErr) case <-ticker.C: children, childErr := currentProc.ChildrenWithContext(ctx) if childErr != nil { @@ -171,7 +147,7 @@ LOOP: } if len(children) == 0 { - lastErr = errors.New("no child process found") + lastErr = errors.New("no Envoy process found") continue } @@ -184,7 +160,7 @@ LOOP: // Get command line args envoyCmdline, err := envoyProc.CmdlineSlice() if err != nil { - return "", fmt.Errorf("failed to get command line of envoy: %w", err) + return "", fmt.Errorf("failed to get command line of Envoy: %w", err) } // Extract admin address path @@ -203,7 +179,7 @@ LOOP: for { select { case <-ctx.Done(): - return 0, fmt.Errorf("timeout waiting for %s: %w", envoyAdminAddressPath, lastErr) + return 0, fmt.Errorf("timeout waiting for Envoy admin address file %s: %w", envoyAdminAddressPath, lastErr) case <-ticker.C: data, err := os.ReadFile(envoyAdminAddressPath) if err != nil { @@ -213,7 +189,7 @@ LOOP: adminAddr = strings.TrimSpace(string(data)) if adminAddr == "" { - lastErr = fmt.Errorf("%s was empty", envoyAdminAddressPath) + lastErr = fmt.Errorf("envoy admin address file %s was empty", envoyAdminAddressPath) continue } break LOOP @@ -223,12 +199,12 @@ LOOP: // Parse as URL to extract port u, err := url.Parse("http://" + adminAddr) if err != nil { - return 0, fmt.Errorf("failed to parse admin address %q from %s: %w", adminAddr, envoyAdminAddressPath, err) + return 0, fmt.Errorf("failed to parse Envoy's admin address %q from %s: %w", adminAddr, envoyAdminAddressPath, err) } port, err := strconv.Atoi(u.Port()) if err != nil { - return 0, fmt.Errorf("failed to parse port from %q: %w", adminAddr, err) + return 0, fmt.Errorf("failed to parse Envoy's admin port from %q: %w", adminAddr, err) } return port, nil diff --git a/internal/aigw/admin_test.go b/internal/aigw/admin_test.go index 7c661c3558..29ea35abc9 100644 --- a/internal/aigw/admin_test.go +++ b/internal/aigw/admin_test.go @@ -6,11 +6,8 @@ package aigw import ( - "bytes" "context" "fmt" - "log/slog" - "net" "net/http" "net/http/httptest" "net/url" @@ -101,7 +98,7 @@ func TestPollEnvoyAdminAddressPathFromArgs(t *testing.T) { adminFile := filepath.Join(t.TempDir(), "admin-address.txt") require.NoError(t, os.WriteFile(adminFile, []byte("127.0.0.1:9901"), 0o600)) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) defer cancel() cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile) @@ -184,7 +181,7 @@ func TestEnvoyAdminAPIClient_IsReady(t *testing.T) { port, err := strconv.Atoi(u.Port()) require.NoError(t, err) - client := &envoyAdminAPIClient{adminPort: port} + client := &envoyAdminAPIClient{port: port} err = client.IsReady(t.Context()) require.NoError(t, err) }) @@ -201,7 +198,7 @@ func TestEnvoyAdminAPIClient_IsReady(t *testing.T) { port, err := strconv.Atoi(u.Port()) require.NoError(t, err) - client := &envoyAdminAPIClient{adminPort: port} + client := &envoyAdminAPIClient{port: port} err = client.IsReady(t.Context()) require.Error(t, err) }) @@ -218,7 +215,7 @@ func TestEnvoyAdminAPIClient_IsReady(t *testing.T) { port, err := strconv.Atoi(u.Port()) require.NoError(t, err) - client := &envoyAdminAPIClient{adminPort: port} + client := &envoyAdminAPIClient{port: port} err = client.IsReady(t.Context()) require.Error(t, err) }) @@ -236,7 +233,7 @@ func TestEnvoyAdminAPIClient_IsReady(t *testing.T) { port, err := strconv.Atoi(u.Port()) require.NoError(t, err) - client := &envoyAdminAPIClient{adminPort: port} + client := &envoyAdminAPIClient{port: port} ctx, cancel := context.WithTimeout(t.Context(), 10*time.Millisecond) defer cancel() @@ -246,64 +243,16 @@ func TestEnvoyAdminAPIClient_IsReady(t *testing.T) { }) } -func TestEnvoyListenerPortClient_IsReady(t *testing.T) { - t.Run("returns nil when port is open", func(t *testing.T) { - listener, err := net.Listen("tcp", "127.0.0.1:0") +func TestNewEnvoyAdminClient(t *testing.T) { + t.Run("envoyAdminPort > 0", func(t *testing.T) { + client, err := NewEnvoyAdminClient(t.Context(), os.Getpid(), 9901) require.NoError(t, err) - defer listener.Close() - port := listener.Addr().(*net.TCPAddr).Port - client := &envoyListenerPortClient{port: port} - - err = client.IsReady(t.Context()) - require.NoError(t, err) + require.Equal(t, 9901, client.Port()) }) - t.Run("returns error when port is closed", func(t *testing.T) { - client := &envoyListenerPortClient{port: 54321} - - err := client.IsReady(t.Context()) - require.Error(t, err) - }) - - t.Run("respects context cancellation", func(t *testing.T) { - client := &envoyListenerPortClient{port: 54321} - - ctx, cancel := context.WithCancel(t.Context()) - cancel() - - err := client.IsReady(ctx) + t.Run("returns error when discovery fails", func(t *testing.T) { + _, err := NewEnvoyAdminClient(t.Context(), 1, 0) require.Error(t, err) }) } - -func TestNewEnvoyAdminClient(t *testing.T) { - t.Run("returns envoyAdminAPIClient when envoyAdminPort > 0", func(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - - client := NewEnvoyAdminClient(t.Context(), logger, os.Getpid(), 9901, 1975) - - adminClient, ok := client.(*envoyAdminAPIClient) - require.True(t, ok) - require.Equal(t, 9901, adminClient.adminPort) - - logOutput := buf.String() - require.Contains(t, logOutput, "Using configured Envoy admin") - require.Contains(t, logOutput, "9901") - }) - - t.Run("returns envoyListenerPortClient when discovery fails", func(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - - client := NewEnvoyAdminClient(t.Context(), logger, 1, 0, 1975) - - listenerClient, ok := client.(*envoyListenerPortClient) - require.True(t, ok) - require.Equal(t, 1975, listenerClient.port) - - logOutput := buf.String() - require.Contains(t, logOutput, "Falling back to Envoy listener") - }) -} diff --git a/tests/e2e-aigw/aigw_test.go b/tests/e2e-aigw/aigw_test.go index 57f15ca7dd..fe58ae7b6d 100644 --- a/tests/e2e-aigw/aigw_test.go +++ b/tests/e2e-aigw/aigw_test.go @@ -121,7 +121,9 @@ func startAIGWCLI(t *testing.T, aigwBin string, arg ...string) { // Wait for health check using RequireEventuallyNoError. t.Log("Waiting for aigw to start (Envoy admin endpoint)...") - envoyAdmin := aigw.NewEnvoyAdminClientFromPort(envoyAdminPort) + envoyAdmin, err := aigw.NewEnvoyAdminClient(t.Context(), 1, envoyAdminPort) + require.NoError(t, err) + internaltesting.RequireEventuallyNoError(t, func() error { return envoyAdmin.IsReady(t.Context()) }, 180*time.Second, 2*time.Second, From 8ae40f7daf2cbf400c7878564ebfb2fba8fbe712 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 22:04:58 -0400 Subject: [PATCH 2/6] Removes arbitrary timeout Signed-off-by: Adrian Cole --- cmd/aigw/healthcheck.go | 5 +++++ cmd/aigw/main_test.go | 2 +- internal/aigw/admin.go | 4 ---- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/aigw/healthcheck.go b/cmd/aigw/healthcheck.go index 4d4c6b44ae..6cc2d9e091 100644 --- a/cmd/aigw/healthcheck.go +++ b/cmd/aigw/healthcheck.go @@ -9,6 +9,7 @@ import ( "context" "io" "log/slog" + "time" "github.com/envoyproxy/ai-gateway/internal/aigw" ) @@ -16,6 +17,10 @@ import ( // healthcheck performs looks up the Envoy subprocess, gets its admin port, // and returns no error when ready. func healthcheck(ctx context.Context, _, stderr io.Writer) error { + // Give up to 1 second for the health check + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + logger := slog.New(slog.NewTextHandler(stderr, &slog.HandlerOptions{})) // In docker, pid 1 is the aigw process return doHealthcheck(ctx, 1, logger) diff --git a/cmd/aigw/main_test.go b/cmd/aigw/main_test.go index 0d4002ce72..a663c6d5d3 100644 --- a/cmd/aigw/main_test.go +++ b/cmd/aigw/main_test.go @@ -44,7 +44,7 @@ Commands: run [] [flags] Run the AI Gateway locally for given configuration. - healthcheck [flags] + healthcheck Docker HEALTHCHECK command. Run "aigw --help" for more information on a command. diff --git a/internal/aigw/admin.go b/internal/aigw/admin.go index e290b75da7..e605141125 100644 --- a/internal/aigw/admin.go +++ b/internal/aigw/admin.go @@ -39,10 +39,6 @@ func NewEnvoyAdminClient(ctx context.Context, aigwPid int, envoyAdminPort int) ( return &envoyAdminAPIClient{port: envoyAdminPort}, nil } - // Poll for the run dir and admin port with a shared timeout - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - // Discover the Envoy subprocess and extract the admin address path envoyAdminAddressPath, err := pollEnvoyAdminAddressPathFromArgs(ctx, int32(aigwPid)) // #nosec G115 -- PID fits in int32 if err != nil { From d4aa483fbe028ae5dc31a45aedb3a12eafe37247 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 22:27:33 -0400 Subject: [PATCH 3/6] don't wait forever in test Signed-off-by: Adrian Cole --- cmd/aigw/healthcheck_test.go | 6 ++++-- internal/aigw/admin.go | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cmd/aigw/healthcheck_test.go b/cmd/aigw/healthcheck_test.go index 917c854b93..9740413c9c 100644 --- a/cmd/aigw/healthcheck_test.go +++ b/cmd/aigw/healthcheck_test.go @@ -29,8 +29,10 @@ func Test_healthcheck(t *testing.T) { t.Run("returns error when no envoy subprocess", func(t *testing.T) { var buf bytes.Buffer logger := slog.New(slog.NewTextHandler(&buf, nil)) - err := doHealthcheck(t.Context(), pid, logger) - require.EqualError(t, err, "failed to get aigw process: process does not exist") + ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) + defer cancel() + err := doHealthcheck(ctx, pid, logger) + require.EqualError(t, err, "timeout waiting for Envoy process: no Envoy process found") // Contains not Equal because there's a timestamp require.Contains(t, buf.String(), "Failed to find Envoy admin server") }) diff --git a/internal/aigw/admin.go b/internal/aigw/admin.go index e605141125..6aa6ff6a89 100644 --- a/internal/aigw/admin.go +++ b/internal/aigw/admin.go @@ -134,6 +134,9 @@ LOOP: for { select { case <-ctx.Done(): + if lastErr == nil { + return "", errors.New("timeout waiting for Envoy process") + } return "", fmt.Errorf("timeout waiting for Envoy process: %w", lastErr) case <-ticker.C: children, childErr := currentProc.ChildrenWithContext(ctx) @@ -175,6 +178,9 @@ LOOP: for { select { case <-ctx.Done(): + if lastErr == nil { + return 0, fmt.Errorf("timeout waiting for Envoy admin address file %s", envoyAdminAddressPath) + } return 0, fmt.Errorf("timeout waiting for Envoy admin address file %s: %w", envoyAdminAddressPath, lastErr) case <-ticker.C: data, err := os.ReadFile(envoyAdminAddressPath) From 2a8ec4e1ee463371b0eb1a2022c005597a185e90 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 22:39:39 -0400 Subject: [PATCH 4/6] very slow MCP server Signed-off-by: Adrian Cole --- cmd/aigw/run_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/aigw/run_test.go b/cmd/aigw/run_test.go index 23f0394d8f..c187ab9540 100644 --- a/cmd/aigw/run_test.go +++ b/cmd/aigw/run_test.go @@ -90,7 +90,7 @@ func TestRun(t *testing.T) { } } return fmt.Errorf("no content in response") - }, 30*time.Second, 2*time.Second, + }, 1*time.Minute, 2*time.Second, "chat completion never succeeded") }) @@ -108,7 +108,7 @@ func TestRun(t *testing.T) { return fmt.Errorf("status %d", resp.StatusCode) } return nil - }, 2*time.Minute, time.Second, + }, 1*time.Minute, time.Second, "metrics endpoint never became available") }) } @@ -178,7 +178,7 @@ func TestRunMCP(t *testing.T) { return fmt.Errorf("tool returned error response") } return nil - }, 1*time.Minute, 2*time.Second, + }, 2*time.Minute, 2*time.Second, "MCP tool call never succeeded") }) } From 30eade8f3331b83421faac1cd30f9275a36815d7 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 23:11:06 -0400 Subject: [PATCH 5/6] log in run_test flake Signed-off-by: Adrian Cole --- cmd/aigw/run_test.go | 10 ++-------- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/cmd/aigw/run_test.go b/cmd/aigw/run_test.go index c187ab9540..96fa41db5a 100644 --- a/cmd/aigw/run_test.go +++ b/cmd/aigw/run_test.go @@ -39,12 +39,6 @@ import ( internaltesting "github.com/envoyproxy/ai-gateway/internal/testing" ) -// Do NOT commit runDebug = true to the branch until Envoy Gateway removes -// hard-coding of func-e calls which ends up always using os.Stdout/Stderr -// Only set this to true when you are debugging. Once envoy-gateway supports -// redirection, commit this as true since debug logs only show on failure. -const runDebug = false - func TestRun(t *testing.T) { ollamaModel, err := internaltesting.GetOllamaModel(internaltesting.ChatModel) if err == nil { @@ -67,7 +61,7 @@ func TestRun(t *testing.T) { go func() { opts := runOpts{extProcLauncher: mainlib.Main} - require.NoError(t, run(ctx, cmdRun{Debug: runDebug, AdminPort: adminPort}, opts, buffers[0], buffers[1])) + require.NoError(t, run(ctx, cmdRun{Debug: true, AdminPort: adminPort}, opts, buffers[0], buffers[1])) }() client := openai.NewClient(option.WithBaseURL("http://localhost:1975/v1/")) @@ -146,7 +140,7 @@ func TestRunMCP(t *testing.T) { go func() { opts := runOpts{extProcLauncher: mainlib.Main} - require.NoError(t, run(ctx, cmdRun{Debug: runDebug, AdminPort: adminPort, mcpConfig: mcpServers}, opts, buffers[0], buffers[1])) + require.NoError(t, run(ctx, cmdRun{Debug: true, AdminPort: adminPort, mcpConfig: mcpServers}, opts, buffers[0], buffers[1])) }() url := fmt.Sprintf("http://localhost:%d/mcp", 1975) diff --git a/go.mod b/go.mod index 40fff960e5..4d265baffd 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 github.com/coreos/go-oidc/v3 v3.15.0 github.com/docker/docker v28.4.0+incompatible - github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251008155728-197fd10aeb03 + github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251009024134-4337e573e370 github.com/envoyproxy/go-control-plane v0.13.5-0.20250929230642-07d3df27ff4f github.com/envoyproxy/go-control-plane/envoy v1.35.1-0.20250929230642-07d3df27ff4f github.com/go-logr/logr v1.4.3 diff --git a/go.sum b/go.sum index f692bd08f2..80f29962be 100644 --- a/go.sum +++ b/go.sum @@ -141,8 +141,8 @@ github.com/ebitengine/purego v0.9.0 h1:mh0zpKBIXDceC63hpvPuGLiJ8ZAa3DfrFTudmfi8A github.com/ebitengine/purego v0.9.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/emicklei/go-restful/v3 v3.13.0 h1:C4Bl2xDndpU6nJ4bc1jXd+uTmYPVUwkD6bFY/oTyCes= github.com/emicklei/go-restful/v3 v3.13.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251008155728-197fd10aeb03 h1:6Yg//b4+j6oN4VLfhUXBkSpZTP6otmrtPGwmU2f0Y7w= -github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251008155728-197fd10aeb03/go.mod h1:oxohnP6irhQSCcIc5CAer+xBhwT8VjIvzVM3g0IxvNA= +github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251009024134-4337e573e370 h1:2IZ99F8C2DHa6gC5x4g4Wu3zGo2itY0H9KzxA4NozVo= +github.com/envoyproxy/gateway v0.5.0-rc.1.0.20251009024134-4337e573e370/go.mod h1:wUZX3mrY32Z6bIV5ma/iQxd6kOQeM6fF1zvzzmwpQrY= github.com/envoyproxy/go-control-plane v0.13.5-0.20250929230642-07d3df27ff4f h1:36vvJBe/wXWfD7qrTb1WnbPVPMxNFDfEygztH8wgebw= github.com/envoyproxy/go-control-plane v0.13.5-0.20250929230642-07d3df27ff4f/go.mod h1:PTY7yDlLxB4bW7rEOO7e79uTDr9yXzpuI1QGIDfxEzc= github.com/envoyproxy/go-control-plane/contrib v1.32.5-0.20250430092421-68a532e11403 h1:5wPocL1bGYhA4TtKZwcdVI5fsXo1JatkbcxPBcFQswc= From 43c4581753ed2ddc9f23924b28b8a8c3a05d714b Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 8 Oct 2025 23:33:49 -0400 Subject: [PATCH 6/6] fix race I hope Signed-off-by: Adrian Cole --- internal/aigw/admin.go | 36 +++++++++++++++++++++++++---------- internal/aigw/admin_test.go | 38 +++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/internal/aigw/admin.go b/internal/aigw/admin.go index 6aa6ff6a89..949b87beeb 100644 --- a/internal/aigw/admin.go +++ b/internal/aigw/admin.go @@ -94,7 +94,7 @@ func (c *envoyAdminAPIClient) IsReady(ctx context.Context) error { } // extractAdminAddressPath parses the adminAddressPathFlag flag from command -// line arguments. It validates that the path is actually a file. +// line arguments. func extractAdminAddressPath(cmdline []string) (string, error) { // Join cmdline into a single string and split by spaces to handle sh -c // cases (these cases are only used in tests). @@ -103,14 +103,7 @@ func extractAdminAddressPath(cmdline []string) (string, error) { for i, arg := range parts { if arg == adminAddressPathFlag && i+1 < len(parts) { - path := parts[i+1] - - // Verify it's a file - if info, err := os.Stat(path); err != nil || info.IsDir() { - return "", fmt.Errorf("envoy admin address path %q is not a file", path) - } - - return path, nil + return parts[i+1], nil } } return "", fmt.Errorf("%s not found in command line", adminAddressPathFlag) @@ -163,7 +156,30 @@ LOOP: } // Extract admin address path - return extractAdminAddressPath(envoyCmdline) + path, err := extractAdminAddressPath(envoyCmdline) + if err != nil { + return "", err + } + + // Loop until the file is valid + for { + select { + case <-ctx.Done(): + if lastErr == nil { + return "", fmt.Errorf("timeout waiting for admin address file %s", path) + } + return "", fmt.Errorf("timeout waiting for admin address file %s: %w", path, lastErr) + case <-ticker.C: + // Verify it's a file + if info, err := os.Stat(path); err != nil { + lastErr = err + continue // try later + } else if info.IsDir() { + return "", fmt.Errorf("envoy admin address path %q is not a file", path) + } + return path, nil + } + } } // pollPortFromEnvoyAddressPath polls for the admin-address.txt. diff --git a/internal/aigw/admin_test.go b/internal/aigw/admin_test.go index 29ea35abc9..6239ae0fcf 100644 --- a/internal/aigw/admin_test.go +++ b/internal/aigw/admin_test.go @@ -57,16 +57,6 @@ func TestExtractAdminAddressPath(t *testing.T) { cmdline: []string{}, expectedError: "--admin-address-path not found in command line", }, - { - name: "path is a directory not a file", - cmdline: []string{"envoy", "--admin-address-path", tmpDir}, - expectedError: fmt.Sprintf("envoy admin address path %q is not a file", tmpDir), - }, - { - name: "path does not exist", - cmdline: []string{"envoy", "--admin-address-path", "/nonexistent/path"}, - expectedError: "envoy admin address path \"/nonexistent/path\" is not a file", - }, { name: "sh -c wrapped command", cmdline: []string{"sh", "-c", "sleep 30 && echo -- --admin-address-path " + validFile}, @@ -96,7 +86,6 @@ func TestExtractAdminAddressPath(t *testing.T) { func TestPollEnvoyAdminAddressPathFromArgs(t *testing.T) { t.Run("success - finds admin address path from child process", func(t *testing.T) { adminFile := filepath.Join(t.TempDir(), "admin-address.txt") - require.NoError(t, os.WriteFile(adminFile, []byte("127.0.0.1:9901"), 0o600)) ctx, cancel := context.WithCancel(t.Context()) defer cancel() @@ -109,7 +98,11 @@ func TestPollEnvoyAdminAddressPathFromArgs(t *testing.T) { _, _ = cmd.Process.Wait() }() - time.Sleep(100 * time.Millisecond) + // write the file later + go func() { + time.Sleep(100 * time.Millisecond) + _ = os.WriteFile(adminFile, []byte("127.0.0.1:9901"), 0o600) + }() pid := os.Getpid() actual, err := pollEnvoyAdminAddressPathFromArgs(t.Context(), int32(pid)) // #nosec G115 -- PID fits in int32 @@ -121,6 +114,27 @@ func TestPollEnvoyAdminAddressPathFromArgs(t *testing.T) { _, err := pollEnvoyAdminAddressPathFromArgs(t.Context(), 1) require.Error(t, err) }) + + t.Run("failure - not a file", func(t *testing.T) { + adminFile := t.TempDir() + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile) + cmd := exec.CommandContext(ctx, "sh", "-c", cmdStr) + require.NoError(t, cmd.Start()) + defer func() { + _ = cmd.Process.Kill() + _, _ = cmd.Process.Wait() + }() + + time.Sleep(100 * time.Millisecond) + + pid := os.Getpid() + _, err := pollEnvoyAdminAddressPathFromArgs(t.Context(), int32(pid)) // #nosec G115 -- PID fits in int32 + require.EqualError(t, err, fmt.Sprintf("envoy admin address path %q is not a file", adminFile)) + }) } func TestPollPortFromEnvoyAddressPath(t *testing.T) {