diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 664c373753..f8ca784757 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -51,6 +51,7 @@ type AgentConfiguration struct { TimestampLines bool HealthCheckAddr string DisconnectAfterJob bool + PauseAfterJob bool DisconnectAfterIdleTimeout time.Duration DisconnectAfterUptime time.Duration CancelGracePeriod int diff --git a/agent/agent_worker_action.go b/agent/agent_worker_action.go index e24ef057df..a917558211 100644 --- a/agent/agent_worker_action.go +++ b/agent/agent_worker_action.go @@ -136,6 +136,13 @@ func (a *AgentWorker) runActionLoop(ctx context.Context, idleMon *idleMonitor, f // We're not paused any more! Log a helpful message. a.logger.Info("Agent has resumed after being paused") paused = false + // Reset ranJob so that pause-after-job agents can accept + // the next job after being resumed. Only reset for + // pause-after-job; disconnect-after-job agents should still + // disconnect after resume. + if a.agentConfiguration.PauseAfterJob { + ranJob = false + } } // For acquire-job agents, registration sets ignore-in-dispatches=true, @@ -158,6 +165,26 @@ func (a *AgentWorker) runActionLoop(ctx context.Context, idleMon *idleMonitor, f return nil } + // In pause-after-job mode, finishing the job sets + // ignore-in-dispatches=true. Pause the agent so it remains connected + // but won't accept new jobs until resumed via the API. + if ranJob && a.agentConfiguration.PauseAfterJob && !paused { + if msg.jobID != "" { + a.logger.Error("Agent ping dispatched a job (id %q) but agent is in pause-after-job mode (and already ran a job)! Ignoring the new job", msg.jobID) + } + a.logger.Info("Job finished, pausing agent (pause-after-job)") + if _, err := a.apiClient.Pause(ctx, &api.AgentPauseRequest{ + Note: "pause-after-job", + }); err != nil { + a.logger.Error("Failed to pause agent: %v. Disconnecting instead.", err) + return nil + } + paused = true + idleMon.MarkBusy(a) + close(msg.errCh) + continue + } + // If the jobID is empty, then it's an idle message if msg.jobID == "" { // This ensures agents that never receive a job are still tracked @@ -217,10 +244,10 @@ func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, jobID string, idleMon return fmt.Errorf("Failed to accept job: %w", err) } - // If we're disconnecting-after-job, signal back to Buildkite that we're not - // interested in jobs after this one. + // If we're disconnecting or pausing after the job, signal back to Buildkite + // that we're not interested in jobs after this one. var ignoreAgentInDispatches *bool - if a.agentConfiguration.DisconnectAfterJob { + if a.agentConfiguration.DisconnectAfterJob || a.agentConfiguration.PauseAfterJob { ignoreAgentInDispatches = ptr.To(true) } diff --git a/agent/agent_worker_test.go b/agent/agent_worker_test.go index e6d4ddcd7a..e2a7d3c1bb 100644 --- a/agent/agent_worker_test.go +++ b/agent/agent_worker_test.go @@ -488,6 +488,121 @@ func TestAgentWorker_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) { } } +// TestAgentWorker_PauseAfterJob tests that with PauseAfterJob enabled, the +// agent self-pauses via the API after completing a job, then resumes and +// accepts a new job when unpaused (unlike DisconnectAfterJob which exits). +func TestAgentWorker_PauseAfterJob(t *testing.T) { + t.Parallel() + + buildPath := filepath.Join(os.TempDir(), t.Name(), "build") + hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") + if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { + t.Fatalf("Couldn't create directories: %v", err) + } + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), t.Name())) //nolint:errcheck // Best-effort cleanup + }) + + server := NewFakeAPIServer() + defer server.Close() + + job := server.AddJob(map[string]string{ + "BUILDKITE_COMMAND": "echo hello", + }) + + // Pre-register the agent. + const agentSessionToken = "alpacas" + agent := server.AddAgent(agentSessionToken) + + // Ping sequence: + // 0: dispatch job + // 1: empty → agent self-pauses (calls POST /pause) because ranJob=true + // 2: empty → agent resumes (not a "pause" action), resets ranJob + // 3: disconnect → agent exits cleanly + agent.PingHandler = func(*http.Request) (api.Ping, error) { + switch agent.Pings { + case 0: + return api.Ping{ + Job: job.Job, + }, nil + + case 1: + // Agent finished the job and will self-pause on the next + // empty ping (the pause-after-job code path). + return api.Ping{}, nil + + case 2: + // Agent is paused. Send an empty (non-"pause") action to + // resume it. This tests that ranJob is reset on resume. + return api.Ping{}, nil + + case 3: + // The agent is now idle with ranJob=false (was reset on resume). + // Disconnect to end the test. + return api.Ping{ + Action: "disconnect", + }, nil + + default: + return api.Ping{}, errors.New("too many pings") + } + } + + server.Assign(agent, job) + + apiClient := api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }) + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: "alpacas", + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 1, + HeartbeatInterval: 10, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + apiClient, + AgentWorkerConfig{ + SpawnIndex: 1, + AgentConfiguration: AgentConfiguration{ + BootstrapScript: dummyBootstrap, + BuildPath: buildPath, + HooksPath: hooksPath, + PauseAfterJob: true, + }, + }, + ) + worker.noWaitBetweenPingsForTesting = true + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() = %v", err) + } + + // Verify the agent went through all expected pings. + if got, want := agent.Pings, 4; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } + // Verify the agent called POST /pause exactly once (after the job). + if got, want := agent.PauseCalls, 1; got != want { + t.Errorf("agent.PauseCalls = %d, want %d", got, want) + } + // Verify ignoreAgentInDispatches was set during the job. + if got, want := agent.IgnoreInDispatches, true; got != want { + t.Errorf("agent.IgnoreInDispatches = %t, want %t", got, want) + } + if got, want := job.State, JobStateFinished; got != want { + t.Errorf("job.State = %q, want %q", got, want) + } +} + func TestAgentWorker_DisconnectAfterUptime(t *testing.T) { t.Parallel() diff --git a/agent/fake_api_server_test.go b/agent/fake_api_server_test.go index 29e574d4ce..d8424d7851 100644 --- a/agent/fake_api_server_test.go +++ b/agent/fake_api_server_test.go @@ -47,6 +47,7 @@ type FakeAgent struct { Stop bool Pings int Heartbeats int + PauseCalls int IgnoreInDispatches bool PingHandler func(*http.Request) (api.Ping, error) @@ -99,6 +100,7 @@ func NewFakeAPIServer(opts ...fakeAPIServerOption) *FakeAPIServer { mux.HandleFunc("PUT /jobs/{job_uuid}/finish", fs.handleJobFinish) mux.HandleFunc("POST /jobs/{job_uuid}/chunks", fs.handleJobChunks) mux.HandleFunc("GET /ping", fs.handlePing) + mux.HandleFunc("POST /pause", fs.handlePause) mux.HandleFunc("POST /heartbeat", fs.handleHeartbeat) mux.HandleFunc("POST /register", fs.handleRegister) fs.Server = httptest.NewServer(mux) @@ -416,6 +418,22 @@ func (fs *FakeAPIServer) handlePing(rw http.ResponseWriter, req *http.Request) { rw.Write(out) //nolint:errcheck // Test should fail on incomplete response. } +func (fs *FakeAPIServer) handlePause(rw http.ResponseWriter, req *http.Request) { + fs.mu.Lock() + defer fs.mu.Unlock() + + auth := req.Header.Get("Authorization") + agent := fs.agentForAuth(auth) + if agent == nil { + http.Error(rw, encodeMsgf("invalid Authorization header value %q", auth), http.StatusUnauthorized) + return + } + + agent.PauseCalls++ + agent.Paused = true + rw.Write([]byte("{}")) //nolint:errcheck // Test should fail on incomplete response. +} + func (fs *FakeAPIServer) handleHeartbeat(rw http.ResponseWriter, req *http.Request) { fs.mu.Lock() defer fs.mu.Unlock() diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index baf490d186..068e7ba0c9 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -109,6 +109,7 @@ type AgentStartConfig struct { AcquireJob string `cli:"acquire-job"` DisconnectAfterJob bool `cli:"disconnect-after-job"` + PauseAfterJob bool `cli:"pause-after-job"` DisconnectAfterIdleTimeout int `cli:"disconnect-after-idle-timeout"` DisconnectAfterUptime int `cli:"disconnect-after-uptime"` CancelGracePeriod int `cli:"cancel-grace-period"` @@ -245,6 +246,10 @@ func (asc AgentStartConfig) Features(ctx context.Context) []string { features = append(features, "disconnect-after-job") } + if asc.PauseAfterJob { + features = append(features, "pause-after-job") + } + if asc.DisconnectAfterIdleTimeout != 0 { features = append(features, "disconnect-after-idle") } @@ -387,6 +392,11 @@ var AgentStartCommand = cli.Command{ Usage: "Disconnect the agent after running exactly one job. When used in conjunction with the ′--spawn′ flag, each worker booted will run exactly one job (default: false)", EnvVar: "BUILDKITE_AGENT_DISCONNECT_AFTER_JOB", }, + cli.BoolFlag{ + Name: "pause-after-job", + Usage: "Pause the agent after running a job, instead of disconnecting. The agent remains connected and can be resumed via the API. Useful for running health checks between jobs without losing agent identity (default: false)", + EnvVar: "BUILDKITE_AGENT_PAUSE_AFTER_JOB", + }, cli.IntFlag{ Name: "disconnect-after-idle-timeout", Value: 0, @@ -922,6 +932,10 @@ var AgentStartCommand = cli.Command{ cfg.NoPlugins = true } + if cfg.DisconnectAfterJob && cfg.PauseAfterJob { + l.Fatal("disconnect-after-job and pause-after-job are mutually exclusive") + } + // Guess the shell if none is provided if cfg.Shell == "" { cfg.Shell = DefaultShell() @@ -1094,6 +1108,7 @@ var AgentStartCommand = cli.Command{ ANSITimestamps: !cfg.NoANSITimestamps, TimestampLines: cfg.TimestampLines, DisconnectAfterJob: cfg.DisconnectAfterJob, + PauseAfterJob: cfg.PauseAfterJob, DisconnectAfterIdleTimeout: time.Duration(cfg.DisconnectAfterIdleTimeout) * time.Second, DisconnectAfterUptime: time.Duration(cfg.DisconnectAfterUptime) * time.Second, CancelGracePeriod: cfg.CancelGracePeriod,