Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 21 additions & 32 deletions cmd/aigw/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,33 @@ package main

import (
"context"
"fmt"
"io"
"net/http"
"log/slog"
"time"
)

// healthcheck performs an HTTP GET request to the admin server health endpoint.
// This is used by Docker HEALTHCHECK to verify the aigw admin server is responsive.
// It exits with code 0 on success (healthy) or 1 on failure (unhealthy).
func healthcheck(ctx context.Context, port int, stdout, _ io.Writer) error {
url := fmt.Sprintf("http://localhost:%d/health", port)

client := &http.Client{
Timeout: 5 * time.Second,
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
"github.com/envoyproxy/ai-gateway/internal/aigw"
)

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to admin server")
}
defer resp.Body.Close()
// healthcheck performs looks up the Envoy subprocess, gets its admin port,
// and returns no error when ready.
func healthcheck(ctx context.Context, _, stderr io.Writer) error {
// Give up to 1 second for the health check
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unhealthy: status %d, body: %s", resp.StatusCode, string(body))
}
logger := slog.New(slog.NewTextHandler(stderr, &slog.HandlerOptions{}))
// In docker, pid 1 is the aigw process
return doHealthcheck(ctx, 1, logger)
}

// Optionally read and print the response for debugging
body, err := io.ReadAll(resp.Body)
func doHealthcheck(ctx context.Context, aigwPid int, logger *slog.Logger) error {
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, aigwPid, 0)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
logger.Error("Failed to find Envoy admin server", "error", err)
return err
} else if err = envoyAdmin.IsReady(ctx); err != nil {
logger.Error("Envoy admin server is not ready", "adminPort", envoyAdmin.Port(), "error", err)
return err
}

_, _ = fmt.Fprintf(stdout, "%s", body)
return nil
return err
}
106 changes: 49 additions & 57 deletions cmd/aigw/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,75 +7,67 @@ package main

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_healthcheck(t *testing.T) {
tests := []struct {
name string
closeServer bool
statusCode int
respBody string
expOut string
expErr string
}{
{
name: "success",
statusCode: http.StatusOK,
respBody: "OK",
expOut: "OK",
},
{
name: "unhealthy status",
statusCode: http.StatusServiceUnavailable,
respBody: "not ready",
expErr: "unhealthy: status 503, body: not ready",
},
{
name: "internal error",
statusCode: http.StatusInternalServerError,
respBody: "server error",
expErr: "unhealthy: status 500, body: server error",
},
{
name: "connection failure",
closeServer: true,
expErr: "failed to connect to admin server",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(tt.statusCode)
_, _ = w.Write([]byte(tt.respBody))
}))
t.Cleanup(s.Close)
pid := os.Getpid()

u, err := url.Parse(s.URL)
require.NoError(t, err)
port, err := strconv.Atoi(u.Port())
require.NoError(t, err)
t.Run("returns error when no envoy subprocess", func(t *testing.T) {
var buf bytes.Buffer
logger := slog.New(slog.NewTextHandler(&buf, nil))
err := doHealthcheck(t.Context(), pid, logger)
require.EqualError(t, err, "failed to get aigw process: process does not exist")
// Contains not Equal because there's a timestamp
require.Contains(t, buf.String(), "Failed to find Envoy admin server")
})

if tt.closeServer {
s.Close()
}
t.Run("returns nil when ready", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/ready", r.URL.Path)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("live"))
}))
defer server.Close()

stdout := &bytes.Buffer{}
err = healthcheck(t.Context(), port, stdout, nil)
u, err := url.Parse(server.URL)
require.NoError(t, err)
port, err := strconv.Atoi(u.Port())
require.NoError(t, err)

if tt.expErr != "" {
require.Equal(t, tt.expErr, err.Error())
require.Empty(t, stdout.String())
} else {
require.NoError(t, err)
require.Equal(t, tt.expOut, stdout.String())
}
})
}
adminFile := filepath.Join(t.TempDir(), "admin-address.txt")
require.NoError(t, os.WriteFile(adminFile, []byte(fmt.Sprintf("127.0.0.1:%d", port)), 0o600))

ctx, cancel := context.WithCancel(t.Context())
defer cancel()

cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile)
cmd := exec.CommandContext(ctx, "sh", "-c", cmdStr)
require.NoError(t, cmd.Start())
defer func() {
_ = cmd.Process.Kill()
_, _ = cmd.Process.Wait()
}()

time.Sleep(100 * time.Millisecond)

var buf bytes.Buffer
logger := slog.New(slog.NewTextHandler(&buf, nil))
err = doHealthcheck(t.Context(), pid, logger)
require.NoError(t, err)
require.Empty(t, buf)
})
}
8 changes: 3 additions & 5 deletions cmd/aigw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ type (
mcpConfig *autoconfig.MCPServers `kong:"-"` // Internal field: normalized MCP JSON data
}
// cmdHealthcheck corresponds to `aigw healthcheck` command.
cmdHealthcheck struct {
AdminPort int `help:"HTTP port for the admin server (serves /metrics and /health endpoints)." default:"1064"`
}
cmdHealthcheck struct{}
)

// Validate is called by Kong after parsing to validate the cmdRun arguments.
Expand Down Expand Up @@ -78,7 +76,7 @@ func (c *cmdRun) Validate() error {

type (
runFn func(context.Context, cmdRun, runOpts, io.Writer, io.Writer) error
healthcheckFn func(context.Context, int, io.Writer, io.Writer) error
healthcheckFn func(context.Context, io.Writer, io.Writer) error
)

func main() {
Expand Down Expand Up @@ -117,7 +115,7 @@ func doMain(ctx context.Context, stdout, stderr io.Writer, args []string, exitFn
log.Fatalf("Error running: %v", err)
}
case "healthcheck":
err = hf(ctx, c.Healthcheck.AdminPort, stdout, stderr)
err = hf(ctx, stdout, stderr)
if err != nil {
log.Fatalf("Health check failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aigw/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Commands:
run [<path>] [flags]
Run the AI Gateway locally for given configuration.

healthcheck [flags]
healthcheck
Docker HEALTHCHECK command.

Run "aigw <command> --help" for more information on a command.
Expand Down
32 changes: 19 additions & 13 deletions cmd/aigw/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
if err != nil {
return err
}
fakeClient, extProxDone, envoyAdminPort, envoyPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
fakeClient, extProxDone, envoyAdminPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
if err != nil {
return fmt.Errorf("failed to write envoy resources and run extproc: %w", err)
}
Expand Down Expand Up @@ -208,7 +208,13 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
// Start a monitoring goroutine to poll Envoy's readiness. This starts
// before the server to ensure we don't miss the readiness window.
go func() {
envoyAdmin := aigw.NewEnvoyAdminClient(ctx, stderrLogger, os.Getpid(), envoyAdminPort, envoyPort)
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), envoyAdminPort)
if err != nil {
stderrLogger.Error("Failed to find Envoy admin server", "error", err)
serverCancel() // Likely a crashed envoy process
Copy link
Contributor Author

@codefromthecrypt codefromthecrypt Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this cancel only occurs if the envoyAdminPort was zero (needs look up), so we still pass on crashed envoy if you supplied yaml with a hard-coded admin server in it (didn't use env config)

return
}
stderrLogger.Info("Found Envoy admin server", "adminPort", envoyAdmin.Port())
pollEnvoyReady(ctx, stderrLogger, envoyAdmin, 2*time.Second)
}()

Expand Down Expand Up @@ -254,33 +260,33 @@ func recreateDir(path string) error {

// writeEnvoyResourcesAndRunExtProc reads all resources from the given string, writes them to the output file, and runs
// external processes for EnvoyExtensionPolicy resources.
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, int, error) {
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, error) {
aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, envoyProxies, err := collectObjects(original, runCtx.envoyGatewayResourcesOut, runCtx.stderrLogger)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("error collecting: %w", err)
return nil, nil, 0, fmt.Errorf("error collecting: %w", err)
}
if len(gateways) > 1 {
return nil, nil, 0, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
return nil, nil, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
}
for _, bsp := range backendSecurityPolicies {
spec := bsp.Spec
if spec.AWSCredentials != nil && spec.AWSCredentials.OIDCExchangeToken != nil {
// TODO: We can make it work by generalizing the rotation logic.
return nil, nil, 0, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
return nil, nil, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
}
}

// Do the substitution for the secrets.
for _, s := range secrets {
if err = runCtx.rewriteSecretWithAnnotatedLocation(s); err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
return nil, nil, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
}
}

var secretList *corev1.SecretList
fakeClient, _fakeClientSet, httpRoutes, eps, httpRouteFilters, backends, secretList, backendTrafficPolicies, securityPolicies, err := translateCustomResourceObjects(ctx, aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, runCtx.stderrLogger)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("error translating: %w", err)
return nil, nil, 0, fmt.Errorf("error translating: %w", err)
}
runCtx.fakeClientSet = _fakeClientSet

Expand All @@ -304,7 +310,7 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
}
gw := gateways[0]
if len(gw.Spec.Listeners) == 0 {
return nil, nil, 0, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
return nil, nil, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
}
runCtx.mustClearSetOwnerReferencesAndStatusAndWriteObj(&gw.TypeMeta, gw)
for _, ep := range eps.Items {
Expand All @@ -315,20 +321,20 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
Secrets("").Get(ctx,
controller.FilterConfigSecretPerGatewayName(gw.Name, gw.Namespace), metav1.GetOptions{})
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config secret: %w", err)
return nil, nil, 0, fmt.Errorf("failed to get filter config secret: %w", err)
}

rawConfig, ok := filterConfigSecret.StringData[controller.FilterConfigKeyInSecret]
if !ok {
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
return nil, nil, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
}
var fc filterapi.Config
if err = yaml.Unmarshal([]byte(rawConfig), &fc); err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
return nil, nil, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
}
runCtx.stderrLogger.Info("Running external process", "config", fc)
done := runCtx.mustStartExtProc(ctx, &fc)
return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), runCtx.tryFindEnvoyListenerPort(gw), nil
return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), nil
}

// mustStartExtProc starts the external process with the given working directory, port, and filter configuration.
Expand Down
10 changes: 6 additions & 4 deletions cmd/aigw/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestRunCmdContext_writeEnvoyResourcesAndRunExtProc(t *testing.T) {
}
config := readFileFromProjectRoot(t, "examples/aigw/ollama.yaml")
ctx, cancel := context.WithCancel(t.Context())
_, done, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, config)
_, done, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, config)
require.NoError(t, err)
time.Sleep(time.Second)
cancel()
Expand All @@ -251,7 +251,7 @@ func TestRunCmdContext_writeEnvoyResourcesAndRunExtProc_noListeners(t *testing.T
adminPort: adminPort,
}

_, _, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(t.Context(), gatewayNoListenersConfig)
_, _, _, err := runCtx.writeEnvoyResourcesAndRunExtProc(t.Context(), gatewayNoListenersConfig)
require.EqualError(t, err, "gateway aigw-run has no listeners configured")
}

Expand Down Expand Up @@ -449,7 +449,8 @@ func TestPollEnvoyReady(t *testing.T) {

t.Run("ready", func(t *testing.T) {
t.Cleanup(func() { callCount = 0 })
envoyAdmin := aigw.NewEnvoyAdminClientFromPort(adminPort)
envoyAdmin, err := aigw.NewEnvoyAdminClient(t.Context(), os.Getpid(), adminPort)
require.NoError(t, err)
pollEnvoyReady(t.Context(), l, envoyAdmin, 50*time.Millisecond)
require.Equal(t, successAt, callCount)
})
Expand All @@ -458,7 +459,8 @@ func TestPollEnvoyReady(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
t.Cleanup(cancel)
t.Cleanup(func() { callCount = 0 })
envoyAdmin := aigw.NewEnvoyAdminClientFromPort(adminPort)
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), adminPort)
require.NoError(t, err)
pollEnvoyReady(ctx, l, envoyAdmin, 50*time.Millisecond)
require.Less(t, callCount, successAt)
})
Expand Down
Loading
Loading