Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type AgentConfiguration struct {
TimestampLines bool
HealthCheckAddr string
DisconnectAfterJob bool
PauseAfterJob bool
DisconnectAfterIdleTimeout time.Duration
DisconnectAfterUptime time.Duration
CancelGracePeriod int
Expand Down
33 changes: 30 additions & 3 deletions agent/agent_worker_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func (a *AgentWorker) runActionLoop(ctx context.Context, idleMon *idleMonitor, f
// We're not paused any more! Log a helpful message.
a.logger.Info("Agent has resumed after being paused")
paused = false
// Reset ranJob so that pause-after-job agents can accept
// the next job after being resumed. Only reset for
// pause-after-job; disconnect-after-job agents should still
// disconnect after resume.
if a.agentConfiguration.PauseAfterJob {
ranJob = false
}
}

// For acquire-job agents, registration sets ignore-in-dispatches=true,
Expand All @@ -158,6 +165,26 @@ func (a *AgentWorker) runActionLoop(ctx context.Context, idleMon *idleMonitor, f
return nil
}

// In pause-after-job mode, finishing the job sets
// ignore-in-dispatches=true. Pause the agent so it remains connected
// but won't accept new jobs until resumed via the API.
if ranJob && a.agentConfiguration.PauseAfterJob && !paused {
if msg.jobID != "" {
a.logger.Error("Agent ping dispatched a job (id %q) but agent is in pause-after-job mode (and already ran a job)! Ignoring the new job", msg.jobID)
}
a.logger.Info("Job finished, pausing agent (pause-after-job)")
if _, err := a.apiClient.Pause(ctx, &api.AgentPauseRequest{
Note: "pause-after-job",
}); err != nil {
a.logger.Error("Failed to pause agent: %v. Disconnecting instead.", err)
return nil
}
paused = true
idleMon.MarkBusy(a)
close(msg.errCh)
continue
}

// If the jobID is empty, then it's an idle message
if msg.jobID == "" {
// This ensures agents that never receive a job are still tracked
Expand Down Expand Up @@ -217,10 +244,10 @@ func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, jobID string, idleMon
return fmt.Errorf("Failed to accept job: %w", err)
}

// If we're disconnecting-after-job, signal back to Buildkite that we're not
// interested in jobs after this one.
// If we're disconnecting or pausing after the job, signal back to Buildkite
// that we're not interested in jobs after this one.
var ignoreAgentInDispatches *bool
if a.agentConfiguration.DisconnectAfterJob {
if a.agentConfiguration.DisconnectAfterJob || a.agentConfiguration.PauseAfterJob {
ignoreAgentInDispatches = ptr.To(true)
}

Expand Down
115 changes: 115 additions & 0 deletions agent/agent_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,121 @@ func TestAgentWorker_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) {
}
}

// TestAgentWorker_PauseAfterJob tests that with PauseAfterJob enabled, the
// agent self-pauses via the API after completing a job, then resumes and
// accepts a new job when unpaused (unlike DisconnectAfterJob which exits).
func TestAgentWorker_PauseAfterJob(t *testing.T) {
t.Parallel()

buildPath := filepath.Join(os.TempDir(), t.Name(), "build")
hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks")
if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil {
t.Fatalf("Couldn't create directories: %v", err)
}
t.Cleanup(func() {
os.RemoveAll(filepath.Join(os.TempDir(), t.Name())) //nolint:errcheck // Best-effort cleanup
})

server := NewFakeAPIServer()
defer server.Close()

job := server.AddJob(map[string]string{
"BUILDKITE_COMMAND": "echo hello",
})

// Pre-register the agent.
const agentSessionToken = "alpacas"
agent := server.AddAgent(agentSessionToken)

// Ping sequence:
// 0: dispatch job
// 1: empty → agent self-pauses (calls POST /pause) because ranJob=true
// 2: empty → agent resumes (not a "pause" action), resets ranJob
// 3: disconnect → agent exits cleanly
agent.PingHandler = func(*http.Request) (api.Ping, error) {
switch agent.Pings {
case 0:
return api.Ping{
Job: job.Job,
}, nil

case 1:
// Agent finished the job and will self-pause on the next
// empty ping (the pause-after-job code path).
return api.Ping{}, nil

case 2:
// Agent is paused. Send an empty (non-"pause") action to
// resume it. This tests that ranJob is reset on resume.
return api.Ping{}, nil

case 3:
// The agent is now idle with ranJob=false (was reset on resume).
// Disconnect to end the test.
return api.Ping{
Action: "disconnect",
}, nil

default:
return api.Ping{}, errors.New("too many pings")
}
}

server.Assign(agent, job)

apiClient := api.NewClient(logger.Discard, api.Config{
Endpoint: server.URL,
Token: "llamas",
})

l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {})

worker := NewAgentWorker(
l,
&api.AgentRegisterResponse{
UUID: uuid.New().String(),
Name: "agent-1",
AccessToken: "alpacas",
Endpoint: server.URL,
PingInterval: 1,
JobStatusInterval: 1,
HeartbeatInterval: 10,
},
metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}),
apiClient,
AgentWorkerConfig{
SpawnIndex: 1,
AgentConfiguration: AgentConfiguration{
BootstrapScript: dummyBootstrap,
BuildPath: buildPath,
HooksPath: hooksPath,
PauseAfterJob: true,
},
},
)
worker.noWaitBetweenPingsForTesting = true

if err := worker.Start(t.Context(), nil); err != nil {
t.Errorf("worker.Start() = %v", err)
}

// Verify the agent went through all expected pings.
if got, want := agent.Pings, 4; got != want {
t.Errorf("agent.Pings = %d, want %d", got, want)
}
// Verify the agent called POST /pause exactly once (after the job).
if got, want := agent.PauseCalls, 1; got != want {
t.Errorf("agent.PauseCalls = %d, want %d", got, want)
}
// Verify ignoreAgentInDispatches was set during the job.
if got, want := agent.IgnoreInDispatches, true; got != want {
t.Errorf("agent.IgnoreInDispatches = %t, want %t", got, want)
}
if got, want := job.State, JobStateFinished; got != want {
t.Errorf("job.State = %q, want %q", got, want)
}
}

func TestAgentWorker_DisconnectAfterUptime(t *testing.T) {
t.Parallel()

Expand Down
18 changes: 18 additions & 0 deletions agent/fake_api_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type FakeAgent struct {
Stop bool
Pings int
Heartbeats int
PauseCalls int
IgnoreInDispatches bool

PingHandler func(*http.Request) (api.Ping, error)
Expand Down Expand Up @@ -99,6 +100,7 @@ func NewFakeAPIServer(opts ...fakeAPIServerOption) *FakeAPIServer {
mux.HandleFunc("PUT /jobs/{job_uuid}/finish", fs.handleJobFinish)
mux.HandleFunc("POST /jobs/{job_uuid}/chunks", fs.handleJobChunks)
mux.HandleFunc("GET /ping", fs.handlePing)
mux.HandleFunc("POST /pause", fs.handlePause)
mux.HandleFunc("POST /heartbeat", fs.handleHeartbeat)
mux.HandleFunc("POST /register", fs.handleRegister)
fs.Server = httptest.NewServer(mux)
Expand Down Expand Up @@ -416,6 +418,22 @@ func (fs *FakeAPIServer) handlePing(rw http.ResponseWriter, req *http.Request) {
rw.Write(out) //nolint:errcheck // Test should fail on incomplete response.
}

func (fs *FakeAPIServer) handlePause(rw http.ResponseWriter, req *http.Request) {
fs.mu.Lock()
defer fs.mu.Unlock()

auth := req.Header.Get("Authorization")
agent := fs.agentForAuth(auth)
if agent == nil {
http.Error(rw, encodeMsgf("invalid Authorization header value %q", auth), http.StatusUnauthorized)
return
}

agent.PauseCalls++
agent.Paused = true
rw.Write([]byte("{}")) //nolint:errcheck // Test should fail on incomplete response.
}

func (fs *FakeAPIServer) handleHeartbeat(rw http.ResponseWriter, req *http.Request) {
fs.mu.Lock()
defer fs.mu.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions clicommand/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type AgentStartConfig struct {

AcquireJob string `cli:"acquire-job"`
DisconnectAfterJob bool `cli:"disconnect-after-job"`
PauseAfterJob bool `cli:"pause-after-job"`
DisconnectAfterIdleTimeout int `cli:"disconnect-after-idle-timeout"`
DisconnectAfterUptime int `cli:"disconnect-after-uptime"`
CancelGracePeriod int `cli:"cancel-grace-period"`
Expand Down Expand Up @@ -245,6 +246,10 @@ func (asc AgentStartConfig) Features(ctx context.Context) []string {
features = append(features, "disconnect-after-job")
}

if asc.PauseAfterJob {
features = append(features, "pause-after-job")
}

if asc.DisconnectAfterIdleTimeout != 0 {
features = append(features, "disconnect-after-idle")
}
Expand Down Expand Up @@ -387,6 +392,11 @@ var AgentStartCommand = cli.Command{
Usage: "Disconnect the agent after running exactly one job. When used in conjunction with the ′--spawn′ flag, each worker booted will run exactly one job (default: false)",
EnvVar: "BUILDKITE_AGENT_DISCONNECT_AFTER_JOB",
},
cli.BoolFlag{
Name: "pause-after-job",
Usage: "Pause the agent after running a job, instead of disconnecting. The agent remains connected and can be resumed via the API. Useful for running health checks between jobs without losing agent identity (default: false)",
EnvVar: "BUILDKITE_AGENT_PAUSE_AFTER_JOB",
},
cli.IntFlag{
Name: "disconnect-after-idle-timeout",
Value: 0,
Expand Down Expand Up @@ -922,6 +932,10 @@ var AgentStartCommand = cli.Command{
cfg.NoPlugins = true
}

if cfg.DisconnectAfterJob && cfg.PauseAfterJob {
l.Fatal("disconnect-after-job and pause-after-job are mutually exclusive")
}

// Guess the shell if none is provided
if cfg.Shell == "" {
cfg.Shell = DefaultShell()
Expand Down Expand Up @@ -1094,6 +1108,7 @@ var AgentStartCommand = cli.Command{
ANSITimestamps: !cfg.NoANSITimestamps,
TimestampLines: cfg.TimestampLines,
DisconnectAfterJob: cfg.DisconnectAfterJob,
PauseAfterJob: cfg.PauseAfterJob,
DisconnectAfterIdleTimeout: time.Duration(cfg.DisconnectAfterIdleTimeout) * time.Second,
DisconnectAfterUptime: time.Duration(cfg.DisconnectAfterUptime) * time.Second,
CancelGracePeriod: cfg.CancelGracePeriod,
Expand Down