Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/envoy-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func main() {
if err := root.GetRootCommand().ExecuteContext(ctrl.SetupSignalHandler()); err != nil {
if err := root.GetRootCommand(nil).ExecuteContext(ctrl.SetupSignalHandler()); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
Expand Down
30 changes: 26 additions & 4 deletions cmd/envoy-gateway/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,38 @@ import (
"github.com/envoyproxy/gateway/internal/cmd"
)

// GetRootCommand returns the root cobra command to be executed
// by main.
func GetRootCommand() *cobra.Command {
// GetRootCommand returns the root cobra command to be executed by main.
// This command receives an async error handler to let the main process decide how to
// handle critical errors that may happen in the runners that may prevent Envoy Gateway from
// functioning properly.
// The Envoy AI Gateway CLI is an example use case of this function, where it needs to terminate
// if the infra runner fails to start the Envoy process.
func GetRootCommand(asyncErrHandler func(error)) *cobra.Command {
errChan := make(chan error)
if asyncErrHandler == nil {
asyncErrHandler = func(error) {}
}

c := &cobra.Command{
Use: "envoy-gateway",
Short: "Envoy Gateway",
Long: "Manages Envoy Proxy as a standalone or Kubernetes-based application gateway",
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
go func() {
for {
select {
case <-cmd.Context().Done():
close(errChan)
return
case err := <-errChan:
asyncErrHandler(err)
}
}
}()
},
}

c.AddCommand(cmd.GetServerCommand())
c.AddCommand(cmd.GetServerCommand(errChan))
c.AddCommand(cmd.GetEnvoyCommand())
c.AddCommand(cmd.GetVersionCommand())
c.AddCommand(cmd.GetCertGenCommand())
Expand Down
2 changes: 1 addition & 1 deletion cmd/envoy-gateway/root/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ import (
)

func TestGetRootCommand(t *testing.T) {
got := GetRootCommand()
got := GetRootCommand(nil)
assert.Equal(t, "envoy-gateway", got.Use)
}
2 changes: 1 addition & 1 deletion internal/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func New(cfg *Config) *Runner {
}
}

func (r *Runner) Start(ctx context.Context) error {
func (r *Runner) Start(ctx context.Context, _ chan<- error) error {
if r.cfg.EnvoyGateway.GetEnvoyGatewayAdmin().EnableDumpConfig {
spewConfig := spew.NewDefaultConfig()
spewConfig.DisableMethods = true
Expand Down
3 changes: 2 additions & 1 deletion internal/admin/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/utils/test"
)

func TestInitAdminServer(t *testing.T) {
Expand All @@ -28,7 +29,7 @@ func TestInitAdminServer(t *testing.T) {
runner := New(&Config{
Server: *svrConfig,
})
err := runner.Start(context.Background())
err := runner.Start(context.Background(), test.RunnerErrorsChan(t))
require.NoError(t, err)

// Clean up
Expand Down
22 changes: 11 additions & 11 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

type Runner interface {
Start(context.Context) error
// Start the runner. This receives a channel where the runner can notify asynchronous errors.
Start(context.Context, chan<- error) error
Name() string
// Close closes the runner when the server is shutting down.
// This called after all the subscriptions are closed at the very end of the server shutdown.
Expand All @@ -39,23 +40,22 @@ type Runner interface {
var cfgPath string

// GetServerCommand returns the server cobra command to be executed.
func GetServerCommand() *cobra.Command {
func GetServerCommand(errChan chan<- error) *cobra.Command {
cmd := &cobra.Command{
Use: "server",
Aliases: []string{"serve"},
Short: "Serve Envoy Gateway",
RunE: func(cmd *cobra.Command, args []string) error {
return server(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr())
return server(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(), errChan)
},
}
cmd.PersistentFlags().StringVarP(&cfgPath, "config-path", "c", "",
"The path to the configuration file.")

return cmd
}

// server serves Envoy Gateway.
func server(ctx context.Context, stdout, stderr io.Writer) error {
func server(ctx context.Context, stdout, stderr io.Writer, errChan chan<- error) error {
cfg, err := getConfig(stdout, stderr)
if err != nil {
return err
Expand All @@ -64,7 +64,7 @@ func server(ctx context.Context, stdout, stderr io.Writer) error {
runnersDone := make(chan struct{})
hook := func(c context.Context, cfg *config.Server) error {
cfg.Logger.Info("Start runners")
if err := startRunners(c, cfg); err != nil {
if err := startRunners(c, cfg, errChan); err != nil {
cfg.Logger.Error(err, "failed to start runners")
return err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func getConfigByPath(stdout, stderr io.Writer, cfgPath string) (*config.Server,
//
// This will block until the context is done, and returns after synchronously
// closing all the runners.
func startRunners(ctx context.Context, cfg *config.Server) (err error) {
func startRunners(ctx context.Context, cfg *config.Server, errChan chan<- error) (err error) {
channels := struct {
pResources *message.ProviderResources
xdsIR *message.XdsIR
Expand Down Expand Up @@ -217,7 +217,7 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) {

// Start all runners
for _, r := range runners {
if err = startRunner(ctx, cfg, r.runner); err != nil {
if err = startRunner(ctx, cfg, r.runner, errChan); err != nil {
return err
}
}
Expand All @@ -229,7 +229,7 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) {
Server: *cfg,
XdsIR: channels.xdsIR,
})
if err = startRunner(ctx, cfg, rateLimitRunner); err != nil {
if err = startRunner(ctx, cfg, rateLimitRunner, errChan); err != nil {
return err
}
}
Expand Down Expand Up @@ -258,9 +258,9 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) {
return nil
}

func startRunner(ctx context.Context, cfg *config.Server, runner Runner) error {
func startRunner(ctx context.Context, cfg *config.Server, runner Runner, errors chan<- error) error {
cfg.Logger.Info("Starting runner", "name", runner.Name())
if err := runner.Start(ctx); err != nil {
if err := runner.Start(ctx, errors); err != nil {
cfg.Logger.Error(err, "Failed to start runner", "name", runner.Name())
return err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/envoyproxy/gateway/internal/utils/test"
)

var (
Expand All @@ -32,7 +34,7 @@ gateway: {}
)

func TestGetServerCommand(t *testing.T) {
got := GetServerCommand()
got := GetServerCommand(test.RunnerErrorsChan(t))
assert.Equal(t, "server", got.Use)
}

Expand Down
10 changes: 7 additions & 3 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func (r *Runner) Name() string {
}

// Start starts the gateway-api translator runner
func (r *Runner) Start(ctx context.Context) (err error) {
func (r *Runner) Start(ctx context.Context, errChan chan<- error) (err error) {
r.Logger = r.Logger.WithName(r.Name()).WithValues("runner", r.Name())

go r.startWasmCache(ctx)
// Do not call .Subscribe() inside Goroutine since it is supposed to be called from the same
// Goroutine where Close() is called.
c := r.ProviderResources.GatewayAPIResources.Subscribe(ctx)

go r.subscribeAndTranslate(c)
errNotifier := message.RunnerErrorNotifier(r.Name(), errChan)
go r.subscribeAndTranslate(c, errNotifier)
r.Logger.Info("started")
return
}
Expand Down Expand Up @@ -128,7 +129,7 @@ func (r *Runner) startWasmCache(ctx context.Context) {
r.wasmCache.Start(ctx)
}

func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResources]) {
func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResources], errors message.ErrorNotifier) {
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub,
func(update message.Update[string, *resource.ControllerResources], errChan chan error) {
r.Logger.Info("received an update")
Expand Down Expand Up @@ -182,6 +183,9 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
if err != nil {
// Currently all errors that Translate returns should just be logged
r.Logger.Error(err, "errors detected during translation", "gateway-class", resources.GatewayClass.Name)
// Notify the main control loop about translation errors. This may be a critical error in standalone mode, so
// notify the control loop in case this needs to be handled.
errors.Notify(err)
}

// Publish the IRs.
Expand Down
3 changes: 2 additions & 1 deletion internal/gatewayapi/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/envoyproxy/gateway/internal/extension/registry"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/utils/test"
pb "github.com/envoyproxy/gateway/proto/extension"
)

Expand All @@ -44,7 +45,7 @@ func TestRunner(t *testing.T) {
})
ctx := context.Background()
// Start
err = r.Start(ctx)
err = r.Start(ctx, test.RunnerErrorsChan(t))
require.NoError(t, err)

// IR is nil at start
Expand Down
2 changes: 1 addition & 1 deletion internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(cfg *Config) *Runner {
}

// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) (err error) {
func (r *Runner) Start(ctx context.Context, _ chan<- error) (err error) {
r.Logger = r.Logger.WithName(r.Name()).WithValues("runner", r.Name())

// Set up the gRPC server and register the xDS handler.
Expand Down
7 changes: 6 additions & 1 deletion internal/infrastructure/host/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/infrastructure/common"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/utils/file"
)

Expand Down Expand Up @@ -57,9 +58,12 @@ type Infra struct {
Stdout io.Writer
// Stderr is the writer for error output (for Envoy stderr).
Stderr io.Writer

// errors is the notifier used to send async errors to the main control loop.
errors message.ErrorNotifier
}

func NewInfra(runnerCtx context.Context, cfg *config.Server, logger logging.Logger) (*Infra, error) {
func NewInfra(runnerCtx context.Context, cfg *config.Server, logger logging.Logger, errors message.ErrorNotifier) (*Infra, error) {
// Ensure the home directory exist.
if err := os.MkdirAll(defaultHomeDir, 0o750); err != nil {
return nil, fmt.Errorf("failed to create dir: %w", err)
Expand All @@ -84,6 +88,7 @@ func NewInfra(runnerCtx context.Context, cfg *config.Server, logger logging.Logg
defaultEnvoyImage: egv1a1.DefaultEnvoyProxyImage,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
errors: errors,
}
return infra, nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/infrastructure/host/proxy_infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (i *Infra) runEnvoy(ctx context.Context, envoyVersion, name string, args []
api.EnvoyVersion(envoyVersion))
if err != nil {
i.Logger.Error(err, "failed to run envoy")
// If the Envoy process fails to start, notify an unrecoverable error so that the main control
// loop can properly handle it.
i.errors.Notify(err)
}
}()
}
Expand Down
6 changes: 6 additions & 0 deletions internal/infrastructure/host/proxy_infra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/envoyproxy/gateway/internal/infrastructure/common"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/utils/file"
"github.com/envoyproxy/gateway/internal/utils/test"
"github.com/envoyproxy/gateway/internal/xds/bootstrap"
"github.com/envoyproxy/gateway/test/utils"
)
Expand Down Expand Up @@ -54,7 +56,9 @@ func newMockInfra(t *testing.T, cfg *config.Server) *Infra {
sdsConfigPath: proxyDir,
Stdout: io.Discard,
Stderr: io.Discard,
errors: message.RunnerErrorNotifier(t.Name(), test.RunnerErrorsChan(t)),
}

return infra
}

Expand Down Expand Up @@ -109,6 +113,7 @@ func TestInfra_runEnvoy_stopEnvoy(t *testing.T) {
Logger: logging.DefaultLogger(stdout, egv1a1.LogLevelInfo),
Stdout: stdout,
Stderr: stderr,
errors: message.RunnerErrorNotifier(t.Name(), test.RunnerErrorsChan(t)),
}
// Ensures that run -> stop will successfully stop the envoy and we can
// run it again without any issues.
Expand Down Expand Up @@ -172,6 +177,7 @@ func TestInfra_runEnvoy_OutputRedirection(t *testing.T) {
Logger: logging.DefaultLogger(stdout, egv1a1.LogLevelInfo),
Stdout: stdout,
Stderr: stderr,
errors: message.RunnerErrorNotifier(t.Name(), test.RunnerErrorsChan(t)),
}

// Run envoy with an invalid config to force it to write to stderr and exit quickly
Expand Down
7 changes: 6 additions & 1 deletion internal/infrastructure/kubernetes/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
)

var (
Expand Down Expand Up @@ -59,10 +60,13 @@ type Infra struct {
Client *InfraClient

logger logging.Logger

// errors is the notifier used to send async errors to the main control loop.
errors message.ErrorNotifier
}

// NewInfra returns a new Infra.
func NewInfra(cli client.Client, cfg *config.Server) *Infra {
func NewInfra(cli client.Client, cfg *config.Server, errors message.ErrorNotifier) *Infra {
return &Infra{
// Always set infra namespace to cfg.ControllerNamespace,
// Otherwise RateLimit resource provider will failed to create/delete.
Expand All @@ -71,6 +75,7 @@ func NewInfra(cli client.Client, cfg *config.Server) *Infra {
EnvoyGateway: cfg.EnvoyGateway,
Client: New(cli),
logger: cfg.Logger.WithName(string(egv1a1.LogComponentInfrastructureRunner)),
errors: errors,
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/infrastructure/kubernetes/proxy_configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/envoyproxy/gateway/internal/infrastructure/common"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
testutil "github.com/envoyproxy/gateway/internal/utils/test"
)

Expand Down Expand Up @@ -221,7 +222,7 @@ func TestCreateOrUpdateProxyConfigMap(t *testing.T) {
WithInterceptorFuncs(interceptorFunc).
Build()
}
kube := NewInfra(cli, cfg)
kube := NewInfra(cli, cfg, message.RunnerErrorNotifier(t.Name(), testutil.RunnerErrorsChan(t)))
require.NoError(t, setupOwnerReferenceResources(ctx, kube.Client))
if tc.gatewayNamespaceMode {
kube.EnvoyGateway.Provider.Kubernetes.Deploy = &egv1a1.KubernetesDeployMode{
Expand Down Expand Up @@ -284,7 +285,7 @@ func TestDeleteConfigProxyMap(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cli := fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects(tc.current).Build()
kube := NewInfra(cli, cfg)
kube := NewInfra(cli, cfg, message.RunnerErrorNotifier(t.Name(), testutil.RunnerErrorsChan(t)))
require.NoError(t, setupOwnerReferenceResources(ctx, kube.Client))

infra.Proxy.GetProxyMetadata().Labels[gatewayapi.OwningGatewayNamespaceLabel] = "default"
Expand Down
Loading
Loading