Skip to content

Commit a6b5afe

Browse files
feat: cleanly implement HEALTHCHECK for aigw docker (envoyproxy#1314)
Signed-off-by: Hrushikesh Patil <[email protected]>
1 parent 5b326a1 commit a6b5afe

File tree

11 files changed

+206
-264
lines changed

11 files changed

+206
-264
lines changed

cmd/aigw/healthcheck.go

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,44 +7,33 @@ package main
77

88
import (
99
"context"
10-
"fmt"
1110
"io"
12-
"net/http"
11+
"log/slog"
1312
"time"
14-
)
15-
16-
// healthcheck performs an HTTP GET request to the admin server health endpoint.
17-
// This is used by Docker HEALTHCHECK to verify the aigw admin server is responsive.
18-
// It exits with code 0 on success (healthy) or 1 on failure (unhealthy).
19-
func healthcheck(ctx context.Context, port int, stdout, _ io.Writer) error {
20-
url := fmt.Sprintf("http://localhost:%d/health", port)
21-
22-
client := &http.Client{
23-
Timeout: 5 * time.Second,
24-
}
2513

26-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
27-
if err != nil {
28-
return fmt.Errorf("failed to create request: %w", err)
29-
}
14+
"github.com/envoyproxy/ai-gateway/internal/aigw"
15+
)
3016

31-
resp, err := client.Do(req)
32-
if err != nil {
33-
return fmt.Errorf("failed to connect to admin server")
34-
}
35-
defer resp.Body.Close()
17+
// healthcheck performs looks up the Envoy subprocess, gets its admin port,
18+
// and returns no error when ready.
19+
func healthcheck(ctx context.Context, _, stderr io.Writer) error {
20+
// Give up to 1 second for the health check
21+
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
22+
defer cancel()
3623

37-
if resp.StatusCode != http.StatusOK {
38-
body, _ := io.ReadAll(resp.Body)
39-
return fmt.Errorf("unhealthy: status %d, body: %s", resp.StatusCode, string(body))
40-
}
24+
logger := slog.New(slog.NewTextHandler(stderr, &slog.HandlerOptions{}))
25+
// In docker, pid 1 is the aigw process
26+
return doHealthcheck(ctx, 1, logger)
27+
}
4128

42-
// Optionally read and print the response for debugging
43-
body, err := io.ReadAll(resp.Body)
29+
func doHealthcheck(ctx context.Context, aigwPid int, logger *slog.Logger) error {
30+
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, aigwPid, 0)
4431
if err != nil {
45-
return fmt.Errorf("failed to read response: %w", err)
32+
logger.Error("Failed to find Envoy admin server", "error", err)
33+
return err
34+
} else if err = envoyAdmin.IsReady(ctx); err != nil {
35+
logger.Error("Envoy admin server is not ready", "adminPort", envoyAdmin.Port(), "error", err)
36+
return err
4637
}
47-
48-
_, _ = fmt.Fprintf(stdout, "%s", body)
49-
return nil
38+
return err
5039
}

cmd/aigw/healthcheck_test.go

Lines changed: 51 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,75 +7,69 @@ package main
77

88
import (
99
"bytes"
10+
"context"
11+
"fmt"
12+
"log/slog"
1013
"net/http"
1114
"net/http/httptest"
1215
"net/url"
16+
"os"
17+
"os/exec"
18+
"path/filepath"
1319
"strconv"
1420
"testing"
21+
"time"
1522

1623
"github.com/stretchr/testify/require"
1724
)
1825

1926
func Test_healthcheck(t *testing.T) {
20-
tests := []struct {
21-
name string
22-
closeServer bool
23-
statusCode int
24-
respBody string
25-
expOut string
26-
expErr string
27-
}{
28-
{
29-
name: "success",
30-
statusCode: http.StatusOK,
31-
respBody: "OK",
32-
expOut: "OK",
33-
},
34-
{
35-
name: "unhealthy status",
36-
statusCode: http.StatusServiceUnavailable,
37-
respBody: "not ready",
38-
expErr: "unhealthy: status 503, body: not ready",
39-
},
40-
{
41-
name: "internal error",
42-
statusCode: http.StatusInternalServerError,
43-
respBody: "server error",
44-
expErr: "unhealthy: status 500, body: server error",
45-
},
46-
{
47-
name: "connection failure",
48-
closeServer: true,
49-
expErr: "failed to connect to admin server",
50-
},
51-
}
52-
for _, tt := range tests {
53-
t.Run(tt.name, func(t *testing.T) {
54-
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
55-
w.WriteHeader(tt.statusCode)
56-
_, _ = w.Write([]byte(tt.respBody))
57-
}))
58-
t.Cleanup(s.Close)
27+
pid := os.Getpid()
5928

60-
u, err := url.Parse(s.URL)
61-
require.NoError(t, err)
62-
port, err := strconv.Atoi(u.Port())
63-
require.NoError(t, err)
29+
t.Run("returns error when no envoy subprocess", func(t *testing.T) {
30+
var buf bytes.Buffer
31+
logger := slog.New(slog.NewTextHandler(&buf, nil))
32+
ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
33+
defer cancel()
34+
err := doHealthcheck(ctx, pid, logger)
35+
require.EqualError(t, err, "timeout waiting for Envoy process: no Envoy process found")
36+
// Contains not Equal because there's a timestamp
37+
require.Contains(t, buf.String(), "Failed to find Envoy admin server")
38+
})
6439

65-
if tt.closeServer {
66-
s.Close()
67-
}
40+
t.Run("returns nil when ready", func(t *testing.T) {
41+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
42+
require.Equal(t, "/ready", r.URL.Path)
43+
w.WriteHeader(http.StatusOK)
44+
_, _ = w.Write([]byte("live"))
45+
}))
46+
defer server.Close()
6847

69-
stdout := &bytes.Buffer{}
70-
err = healthcheck(t.Context(), port, stdout, nil)
48+
u, err := url.Parse(server.URL)
49+
require.NoError(t, err)
50+
port, err := strconv.Atoi(u.Port())
51+
require.NoError(t, err)
7152

72-
if tt.expErr != "" {
73-
require.Equal(t, tt.expErr, err.Error())
74-
require.Empty(t, stdout.String())
75-
} else {
76-
require.NoError(t, err)
77-
require.Equal(t, tt.expOut, stdout.String())
78-
}
79-
})
80-
}
53+
adminFile := filepath.Join(t.TempDir(), "admin-address.txt")
54+
require.NoError(t, os.WriteFile(adminFile, []byte(fmt.Sprintf("127.0.0.1:%d", port)), 0o600))
55+
56+
ctx, cancel := context.WithCancel(t.Context())
57+
defer cancel()
58+
59+
cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile)
60+
cmd := exec.CommandContext(ctx, "sh", "-c", cmdStr)
61+
require.NoError(t, cmd.Start())
62+
defer func() {
63+
_ = cmd.Process.Kill()
64+
_, _ = cmd.Process.Wait()
65+
}()
66+
67+
time.Sleep(100 * time.Millisecond)
68+
69+
var buf bytes.Buffer
70+
logger := slog.New(slog.NewTextHandler(&buf, nil))
71+
err = doHealthcheck(t.Context(), pid, logger)
72+
require.NoError(t, err)
73+
require.Empty(t, buf)
74+
})
8175
}

cmd/aigw/main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ type (
4141
mcpConfig *autoconfig.MCPServers `kong:"-"` // Internal field: normalized MCP JSON data
4242
}
4343
// cmdHealthcheck corresponds to `aigw healthcheck` command.
44-
cmdHealthcheck struct {
45-
AdminPort int `help:"HTTP port for the admin server (serves /metrics and /health endpoints)." default:"1064"`
46-
}
44+
cmdHealthcheck struct{}
4745
)
4846

4947
// Validate is called by Kong after parsing to validate the cmdRun arguments.
@@ -78,7 +76,7 @@ func (c *cmdRun) Validate() error {
7876

7977
type (
8078
runFn func(context.Context, cmdRun, runOpts, io.Writer, io.Writer) error
81-
healthcheckFn func(context.Context, int, io.Writer, io.Writer) error
79+
healthcheckFn func(context.Context, io.Writer, io.Writer) error
8280
)
8381

8482
func main() {
@@ -117,7 +115,7 @@ func doMain(ctx context.Context, stdout, stderr io.Writer, args []string, exitFn
117115
log.Fatalf("Error running: %v", err)
118116
}
119117
case "healthcheck":
120-
err = hf(ctx, c.Healthcheck.AdminPort, stdout, stderr)
118+
err = hf(ctx, stdout, stderr)
121119
if err != nil {
122120
log.Fatalf("Health check failed: %v", err)
123121
}

cmd/aigw/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Commands:
4444
run [<path>] [flags]
4545
Run the AI Gateway locally for given configuration.
4646
47-
healthcheck [flags]
47+
healthcheck
4848
Docker HEALTHCHECK command.
4949
5050
Run "aigw <command> --help" for more information on a command.

cmd/aigw/run.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
152152
if err != nil {
153153
return err
154154
}
155-
fakeClient, extProxDone, envoyAdminPort, envoyPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
155+
fakeClient, extProxDone, envoyAdminPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
156156
if err != nil {
157157
return fmt.Errorf("failed to write envoy resources and run extproc: %w", err)
158158
}
@@ -208,7 +208,13 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
208208
// Start a monitoring goroutine to poll Envoy's readiness. This starts
209209
// before the server to ensure we don't miss the readiness window.
210210
go func() {
211-
envoyAdmin := aigw.NewEnvoyAdminClient(ctx, stderrLogger, os.Getpid(), envoyAdminPort, envoyPort)
211+
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), envoyAdminPort)
212+
if err != nil {
213+
stderrLogger.Error("Failed to find Envoy admin server", "error", err)
214+
serverCancel() // Likely a crashed envoy process
215+
return
216+
}
217+
stderrLogger.Info("Found Envoy admin server", "adminPort", envoyAdmin.Port())
212218
pollEnvoyReady(ctx, stderrLogger, envoyAdmin, 2*time.Second)
213219
}()
214220

@@ -254,33 +260,33 @@ func recreateDir(path string) error {
254260

255261
// writeEnvoyResourcesAndRunExtProc reads all resources from the given string, writes them to the output file, and runs
256262
// external processes for EnvoyExtensionPolicy resources.
257-
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, int, error) {
263+
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, error) {
258264
aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, envoyProxies, err := collectObjects(original, runCtx.envoyGatewayResourcesOut, runCtx.stderrLogger)
259265
if err != nil {
260-
return nil, nil, 0, 0, fmt.Errorf("error collecting: %w", err)
266+
return nil, nil, 0, fmt.Errorf("error collecting: %w", err)
261267
}
262268
if len(gateways) > 1 {
263-
return nil, nil, 0, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
269+
return nil, nil, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
264270
}
265271
for _, bsp := range backendSecurityPolicies {
266272
spec := bsp.Spec
267273
if spec.AWSCredentials != nil && spec.AWSCredentials.OIDCExchangeToken != nil {
268274
// TODO: We can make it work by generalizing the rotation logic.
269-
return nil, nil, 0, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
275+
return nil, nil, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
270276
}
271277
}
272278

273279
// Do the substitution for the secrets.
274280
for _, s := range secrets {
275281
if err = runCtx.rewriteSecretWithAnnotatedLocation(s); err != nil {
276-
return nil, nil, 0, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
282+
return nil, nil, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
277283
}
278284
}
279285

280286
var secretList *corev1.SecretList
281287
fakeClient, _fakeClientSet, httpRoutes, eps, httpRouteFilters, backends, secretList, backendTrafficPolicies, securityPolicies, err := translateCustomResourceObjects(ctx, aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, runCtx.stderrLogger)
282288
if err != nil {
283-
return nil, nil, 0, 0, fmt.Errorf("error translating: %w", err)
289+
return nil, nil, 0, fmt.Errorf("error translating: %w", err)
284290
}
285291
runCtx.fakeClientSet = _fakeClientSet
286292

@@ -304,7 +310,7 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
304310
}
305311
gw := gateways[0]
306312
if len(gw.Spec.Listeners) == 0 {
307-
return nil, nil, 0, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
313+
return nil, nil, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
308314
}
309315
runCtx.mustClearSetOwnerReferencesAndStatusAndWriteObj(&gw.TypeMeta, gw)
310316
for _, ep := range eps.Items {
@@ -315,20 +321,20 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
315321
Secrets("").Get(ctx,
316322
controller.FilterConfigSecretPerGatewayName(gw.Name, gw.Namespace), metav1.GetOptions{})
317323
if err != nil {
318-
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config secret: %w", err)
324+
return nil, nil, 0, fmt.Errorf("failed to get filter config secret: %w", err)
319325
}
320326

321327
rawConfig, ok := filterConfigSecret.StringData[controller.FilterConfigKeyInSecret]
322328
if !ok {
323-
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
329+
return nil, nil, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
324330
}
325331
var fc filterapi.Config
326332
if err = yaml.Unmarshal([]byte(rawConfig), &fc); err != nil {
327-
return nil, nil, 0, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
333+
return nil, nil, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
328334
}
329335
runCtx.stderrLogger.Info("Running external process", "config", fc)
330336
done := runCtx.mustStartExtProc(ctx, &fc)
331-
return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), runCtx.tryFindEnvoyListenerPort(gw), nil
337+
return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), nil
332338
}
333339

334340
// mustStartExtProc starts the external process with the given working directory, port, and filter configuration.

0 commit comments

Comments
 (0)