Skip to content

CP/DP Split: optimize configuration events #3320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 35 additions & 75 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,21 @@
// (3) Updating control plane configuration.
// (4) Tracks the NGINX Plus usage reporting Secret (if applicable).
type eventHandlerImpl struct {
// latestConfiguration is the latest Configuration generation.
latestConfiguration *dataplane.Configuration
// latestConfigurations are the latest Configuration generation for each Gateway tree.
latestConfigurations map[types.NamespacedName]*dataplane.Configuration

// objectFilters contains all created objectFilters, with the key being a filterKey
objectFilters map[filterKey]objectFilter

cfg eventHandlerConfig
lock sync.Mutex

// version is the current version number of the nginx config.
version int
}

// newEventHandlerImpl creates a new eventHandlerImpl.
func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl {
handler := &eventHandlerImpl{
cfg: cfg,
cfg: cfg,
latestConfigurations: make(map[types.NamespacedName]*dataplane.Configuration),
}

handler.objectFilters = map[filterKey]objectFilter{
Expand Down Expand Up @@ -158,28 +156,23 @@
h.parseAndCaptureEvent(ctx, logger, event)
}

changeType, gr := h.cfg.processor.Process()
gr := h.cfg.processor.Process()

// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
if !h.cfg.graphBuiltHealthChecker.ready {
h.cfg.graphBuiltHealthChecker.setAsReady()
}

h.sendNginxConfig(ctx, logger, gr, changeType)
h.sendNginxConfig(ctx, logger, gr)
}

// enable is called when the pod becomes leader to ensure the provisioner has
// the latest configuration.
func (h *eventHandlerImpl) enable(ctx context.Context) {
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange)
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph())

Check warning on line 172 in internal/mode/static/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/handler.go#L172

Added line #L172 was not covered by tests
}

func (h *eventHandlerImpl) sendNginxConfig(
ctx context.Context,
logger logr.Logger,
gr *graph.Graph,
changeType state.ChangeType,
) {
func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logger, gr *graph.Graph) {
if gr == nil {
return
}
Expand Down Expand Up @@ -215,68 +208,30 @@
panic("expected deployment, got nil")
}

configApplied := h.processStateAndBuildConfig(ctx, logger, gr, gw, changeType, deployment)

configErr := deployment.GetLatestConfigError()
upstreamErr := deployment.GetLatestUpstreamError()
err := errors.Join(configErr, upstreamErr)

if configApplied || err != nil {
obj := &status.QueueObject{
UpdateType: status.UpdateAll,
Error: err,
Deployment: gw.DeploymentName,
}
h.cfg.statusQueue.Enqueue(obj)
}
}
}

func (h *eventHandlerImpl) processStateAndBuildConfig(
ctx context.Context,
logger logr.Logger,
gr *graph.Graph,
currentGateway *graph.Gateway,
changeType state.ChangeType,
deployment *agent.Deployment,
) bool {
var configApplied bool
switch changeType {
case state.EndpointsOnlyChange:
h.version++
cfg := dataplane.BuildConfiguration(ctx, gr, currentGateway, h.cfg.serviceResolver, h.version, h.cfg.plus)
cfg := dataplane.BuildConfiguration(ctx, gr, gw, h.cfg.serviceResolver, h.cfg.plus)
depCtx, getErr := h.getDeploymentContext(ctx)
if getErr != nil {
logger.Error(getErr, "error getting deployment context for usage reporting")
}
cfg.DeploymentContext = depCtx

h.setLatestConfiguration(&cfg)
h.setLatestConfiguration(gw, &cfg)

deployment.FileLock.Lock()
if h.cfg.plus {
configApplied = h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, cfg)
} else {
configApplied = h.updateNginxConf(deployment, cfg)
}
h.updateNginxConf(deployment, cfg)
deployment.FileLock.Unlock()
case state.ClusterStateChange:
h.version++
cfg := dataplane.BuildConfiguration(ctx, gr, currentGateway, h.cfg.serviceResolver, h.version, h.cfg.plus)
depCtx, getErr := h.getDeploymentContext(ctx)
if getErr != nil {
logger.Error(getErr, "error getting deployment context for usage reporting")
}
cfg.DeploymentContext = depCtx

h.setLatestConfiguration(&cfg)
configErr := deployment.GetLatestConfigError()
upstreamErr := deployment.GetLatestUpstreamError()
err := errors.Join(configErr, upstreamErr)

deployment.FileLock.Lock()
configApplied = h.updateNginxConf(deployment, cfg)
deployment.FileLock.Unlock()
obj := &status.QueueObject{
UpdateType: status.UpdateAll,
Error: err,
Deployment: gw.DeploymentName,
}
h.cfg.statusQueue.Enqueue(obj)
}

return configApplied
}

func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) {
Expand Down Expand Up @@ -451,16 +406,14 @@
func (h *eventHandlerImpl) updateNginxConf(
deployment *agent.Deployment,
conf dataplane.Configuration,
) bool {
) {
files := h.cfg.generator.Generate(conf)
applied := h.cfg.nginxUpdater.UpdateConfig(deployment, files)
h.cfg.nginxUpdater.UpdateConfig(deployment, files)

// If using NGINX Plus, update upstream servers using the API.
if h.cfg.plus {
h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, conf)
}

return applied
}

// updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status
Expand Down Expand Up @@ -570,21 +523,28 @@
}

// GetLatestConfiguration gets the latest configuration.
func (h *eventHandlerImpl) GetLatestConfiguration() *dataplane.Configuration {
func (h *eventHandlerImpl) GetLatestConfiguration() []*dataplane.Configuration {
h.lock.Lock()
defer h.lock.Unlock()

return h.latestConfiguration
configs := make([]*dataplane.Configuration, 0, len(h.latestConfigurations))
for _, cfg := range h.latestConfigurations {
configs = append(configs, cfg)
}

return configs
}

// setLatestConfiguration sets the latest configuration.
// TODO(sberman): once we support multiple Gateways, this will likely have to be a map
// of all configurations.
func (h *eventHandlerImpl) setLatestConfiguration(cfg *dataplane.Configuration) {
func (h *eventHandlerImpl) setLatestConfiguration(gateway *graph.Gateway, cfg *dataplane.Configuration) {
if gateway == nil || gateway.Source == nil {
return
}

Check warning on line 542 in internal/mode/static/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/handler.go#L541-L542

Added lines #L541 - L542 were not covered by tests

h.lock.Lock()
defer h.lock.Unlock()

h.latestConfiguration = cfg
h.latestConfigurations[client.ObjectKeyFromObject(gateway.Source)] = cfg
}

func objectFilterKey(obj client.Object, nsName types.NamespacedName) filterKey {
Expand Down
Loading
Loading