diff --git a/Makefile b/Makefile index 0e506cb..4038827 100644 --- a/Makefile +++ b/Makefile @@ -5,18 +5,22 @@ LDFLAGS = -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.Version=$(VERSIO -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.GitCommit=$(COMMIT) \ -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.BuildDate=$(DATE) -.PHONY: build build: go build -ldflags "$(LDFLAGS)" -o bin/ctrlc ./cmd/ctrlc -.PHONY: install install: go install -ldflags "$(LDFLAGS)" ./cmd/ctrlc -.PHONY: test test: go test -v ./... -.PHONY: clean +fmt: + gofmt -w -s -e . + +lint: + golangci-lint run + clean: - rm -rf bin/ \ No newline at end of file + rm -rf bin/ + +.PHONY: build install test clean \ No newline at end of file diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index f0873b0..2401a0f 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -2,7 +2,12 @@ package exec import ( "fmt" + "os" + "os/signal" + "runtime" + "syscall" + "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" @@ -11,34 +16,71 @@ import ( ) func NewRunExecCmd() *cobra.Command { - return &cobra.Command{ + var ( + name string + jobAgentType = "exec-bash" + ) + + if runtime.GOOS == "windows" { + jobAgentType = "exec-powershell" + } + + cmd := &cobra.Command{ Use: "exec", Short: "Execute commands directly when a job is received", + Example: heredoc.Doc(` + $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 + `), RunE: func(cmd *cobra.Command, args []string) error { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") + workspaceId := viper.GetString("workspace") + if name == "" { + return fmt.Errorf("name is required") + } + if workspaceId == "" { + return fmt.Errorf("workspace is required") + } client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } + runner := NewExecRunner(client) + jobAgentConfig := api.UpsertJobAgentJSONRequestBody{ + Name: name, + Type: jobAgentType, + WorkspaceId: workspaceId, + } ja, err := jobagent.NewJobAgent( client, - api.UpsertJobAgentJSONRequestBody{ - Name: "exec", - Type: "exec", - }, - &ExecRunner{}, + jobAgentConfig, + runner, ) if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } + + // Set up a simple shutdown handler for non-interval mode + // When used with AddIntervalSupport, this would only affect a single iteration + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + log.Info("Shutting down gracefully...") + runner.ExitAll(true) + }() + + // Run job check - AddIntervalSupport will handle repeated execution if err := ja.RunQueuedJobs(); err != nil { - log.Error("failed to run queued jobs", "error", err) - } - if err := ja.UpdateRunningJobs(); err != nil { - log.Error("failed to check for jobs", "error", err) + return fmt.Errorf("failed to run queued jobs: %w", err) } + return nil }, } + + cmd.Flags().StringVar(&name, "name", "", "Name of the job agent") + cmd.MarkFlagRequired("name") + cmd.MarkFlagRequired("workspace") + return cmd } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 50ffa0b..e918fee 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -1,52 +1,87 @@ package exec import ( - "bytes" - "encoding/json" + "context" "fmt" - "html/template" "os" "os/exec" + "os/signal" "runtime" "strconv" + "sync" "syscall" + "time" + "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" ) var _ jobagent.Runner = &ExecRunner{} -type ExecRunner struct{} +type RunningJob struct { + cmd *exec.Cmd + jobID string + job *api.JobWithDetails + cancelled bool +} -type ExecConfig struct { - WorkingDir string `json:"workingDir,omitempty"` - Script string `json:"script"` +type ExecRunner struct { + runningJobs map[string]*RunningJob + client *api.ClientWithResponses + mu sync.Mutex } -func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { - externalId, err := strconv.Atoi(*job.ExternalId) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err) +// Helper function to update job status and handle error logging +func updateJobStatus(job *api.JobWithDetails, status api.JobStatus, message string, jobID string) { + if err := job.UpdateStatus(status, message); err != nil { + log.Error("Failed to update job status", "error", err, "jobId", jobID) } +} - process, err := os.FindProcess(externalId) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err) +func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { + runner := &ExecRunner{ + runningJobs: make(map[string]*RunningJob), + client: client, } - // On Unix systems, FindProcess always succeeds, so we need to send signal 0 - // to check if process exists - err = process.Signal(syscall.Signal(0)) - if err != nil { - return api.JobStatusFailure, fmt.Sprintf("process not running: %v", err) - } + // Set up signal handling for graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + sig := <-c + log.Info("Received shutdown signal, terminating all jobs", "signal", sig) + + // Update all job statuses before exiting + runner.mu.Lock() + for _, runningJob := range runner.runningJobs { + if runningJob.job != nil { + log.Info("Marking job as cancelled due to shutdown", "id", runningJob.jobID) + runningJob.cancelled = true + // Update the job status to failed with a specific message + updateJobStatus(runningJob.job, api.JobStatusFailure, fmt.Sprintf("Job terminated due to signal: %v", sig), runningJob.jobID) + } + } + runner.mu.Unlock() + + // Now terminate all processes + runner.ExitAll(true) + log.Info("Shutdown complete, exiting") + os.Exit(0) + }() - return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId) + return runner } -func (r *ExecRunner) Start(job api.Job) (string, error) { - // Create temp script file +// Start creates a temporary script file, starts the process, and updates job status when the process completes. +func (r *ExecRunner) Start(ctx context.Context, job *api.JobWithDetails) (api.JobStatus, error) { + // Template the script using the job + script, err := job.TemplateJobDetails() + if err != nil { + return api.JobStatusFailure, fmt.Errorf("failed to template job details: %w", err) + } + + // Determine file extension based on OS ext := ".sh" if runtime.GOOS == "windows" { ext = ".ps1" @@ -54,57 +89,153 @@ func (r *ExecRunner) Start(job api.Job) (string, error) { tmpFile, err := os.CreateTemp("", "script*"+ext) if err != nil { - return "", fmt.Errorf("failed to create temp script file: %w", err) - } - defer os.Remove(tmpFile.Name()) - - config := ExecConfig{} - jsonBytes, err := json.Marshal(job.JobAgentConfig) - if err != nil { - return "", fmt.Errorf("failed to marshal job agent config: %w", err) - } - if err := json.Unmarshal(jsonBytes, &config); err != nil { - return "", fmt.Errorf("failed to unmarshal job agent config: %w", err) - } - - templatedScript, err := template.New("script").Parse(config.Script) - if err != nil { - return "", fmt.Errorf("failed to parse script template: %w", err) - } - - buf := new(bytes.Buffer) - if err := templatedScript.Execute(buf, job); err != nil { - return "", fmt.Errorf("failed to execute script template: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err) } - script := buf.String() - // Write script contents + // Write the script to the temporary file if _, err := tmpFile.WriteString(script); err != nil { - return "", fmt.Errorf("failed to write script file: %w", err) + os.Remove(tmpFile.Name()) + return api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err) } if err := tmpFile.Close(); err != nil { - return "", fmt.Errorf("failed to close script file: %w", err) + os.Remove(tmpFile.Name()) + return api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err) } - // Make executable on Unix systems + // Make the script executable on Unix-like systems if runtime.GOOS != "windows" { if err := os.Chmod(tmpFile.Name(), 0700); err != nil { - return "", fmt.Errorf("failed to make script executable: %w", err) + os.Remove(tmpFile.Name()) + return api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err) } } - cmd := exec.Command("bash", "-c", tmpFile.Name()) + cmd := exec.CommandContext(ctx, "bash", "-c", tmpFile.Name()) if runtime.GOOS == "windows" { - cmd = exec.Command("powershell", "-File", tmpFile.Name()) + cmd = exec.CommandContext(ctx, "powershell", "-File", tmpFile.Name()) + } else { + // On Unix-like systems, create a new process group so we can terminate all child processes + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } - cmd.Dir = config.WorkingDir + // Set up command environment cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return "", fmt.Errorf("failed to execute script: %w", err) + // Start the process + if err := cmd.Start(); err != nil { + os.Remove(tmpFile.Name()) + return api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err) } - return strconv.Itoa(cmd.Process.Pid), nil + // Use the pointer address as the handle + handle := fmt.Sprintf("%p", cmd) + job.SetExternalID(handle) + + // Create a running job record + runningJob := &RunningJob{ + cmd: cmd, + jobID: job.Id.String(), + job: job, + cancelled: false, + } + + // Register the running job + r.mu.Lock() + r.runningJobs[handle] = runningJob + r.mu.Unlock() + + // Spawn a goroutine to wait for the process to finish and update the job status + go func(handle, scriptPath string) { + defer os.Remove(scriptPath) + defer func() { + r.mu.Lock() + delete(r.runningJobs, handle) + r.mu.Unlock() + log.Debug("Job cleanup complete", "id", runningJob.jobID) + }() + + log.Debug("Waiting for command to complete", "id", runningJob.jobID, "handle", handle) + err := cmd.Wait() + exitCode := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } + } + log.Debug("Command completed", "id", runningJob.jobID, "exitCode", exitCode, "error", err != nil) + + if runningJob.cancelled { + updateJobStatus(job, api.JobStatusCancelled, "Job was cancelled", runningJob.jobID) + } else if err != nil { + updateJobStatus(job, api.JobStatusFailure, + fmt.Sprintf("Process exited with code %d: %v", exitCode, err), + runningJob.jobID) + } else { + updateJobStatus(job, api.JobStatusSuccessful, "", runningJob.jobID) + } + }(handle, tmpFile.Name()) + + return api.JobStatusInProgress, nil +} + +// ExitAll stops all currently running commands +func (r *ExecRunner) ExitAll(cancelled bool) { + r.mu.Lock() + defer r.mu.Unlock() + + for id, runningJob := range r.runningJobs { + if runningJob != nil && runningJob.cmd != nil && runningJob.cmd.Process != nil { + // Check if process is still running before attempting to kill + if err := runningJob.cmd.Process.Signal(syscall.Signal(0)); err == nil { + log.Info("Killing job", "id", id) + runningJob.cancelled = cancelled + + // Update job status if cancellation requested and we have a job reference + if cancelled && runningJob.job != nil { + // Use JobStatusCancelled if it's an explicit cancellation, or JobStatusFailure if it's due to external termination + status := api.JobStatusCancelled + message := "Job was cancelled by user" + + // Update the status + updateJobStatus(runningJob.job, status, message, runningJob.jobID) + } + + // Kill the process + killProcess(runningJob.cmd, runningJob.jobID) + } + } + } + + if cancelled { + r.runningJobs = make(map[string]*RunningJob) + } +} + +// killProcess terminates a process and its children in a cross-platform way without exposing PIDs +func killProcess(cmd *exec.Cmd, jobID string) { + if runtime.GOOS == "windows" { + pid := cmd.Process.Pid + if err := exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(pid)).Run(); err != nil { + log.Error("Failed to kill process tree", "error", err, "jobId", jobID) + } + } else { + pgid, err := syscall.Getpgid(cmd.Process.Pid) + if err == nil && pgid > 0 { + if err = syscall.Kill(-pgid, syscall.SIGTERM); err != nil { + log.Error("Failed to terminate process group gracefully", "error", err, "jobId", jobID) + } + } else { + if err = cmd.Process.Signal(syscall.SIGTERM); err != nil { + log.Error("Failed to terminate process gracefully", "error", err, "jobId", jobID) + } + } + + go func() { + time.Sleep(2 * time.Second) + if err := cmd.Process.Kill(); err != nil { + log.Error("Failed to kill process", "error", err, "jobId", jobID) + } + }() + } } diff --git a/cmd/ctrlc/root/run/run.go b/cmd/ctrlc/root/run/run.go index 440f966..b765912 100644 --- a/cmd/ctrlc/root/run/run.go +++ b/cmd/ctrlc/root/run/run.go @@ -4,6 +4,7 @@ import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/run/exec" "github.com/ctrlplanedev/cli/internal/cliutil" "github.com/spf13/cobra" + "github.com/spf13/viper" ) func NewRunCmd() *cobra.Command { @@ -15,7 +16,12 @@ func NewRunCmd() *cobra.Command { }, } - cmd.AddCommand(cliutil.AddIntervalSupport(exec.NewRunExecCmd(), "")) + interval := viper.GetString("interval") + if interval == "" { + interval = "10s" + } + + cmd.AddCommand(cliutil.AddIntervalSupport(exec.NewRunExecCmd(), interval)) return cmd } diff --git a/cmd/ctrlc/root/sync/clickhouse/clickhouse.go b/cmd/ctrlc/root/sync/clickhouse/clickhouse.go index c2e27fd..4e553f6 100644 --- a/cmd/ctrlc/root/sync/clickhouse/clickhouse.go +++ b/cmd/ctrlc/root/sync/clickhouse/clickhouse.go @@ -17,18 +17,18 @@ import ( ) type ClickHouseConfig struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - Region string `json:"region"` - CloudProvider string `json:"cloudProvider"` - Tier string `json:"tier"` - IdleScaling map[string]interface{} `json:"idleScaling"` - TotalDiskSize int `json:"totalDiskSize"` - TotalMemoryMB int `json:"totalMemoryMB"` - MinTotalMemory int `json:"minTotalMemory"` - MaxTotalMemory int `json:"maxTotalMemory"` - Created string `json:"created"` + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + Region string `json:"region"` + CloudProvider string `json:"cloudProvider"` + Tier string `json:"tier"` + IdleScaling map[string]interface{} `json:"idleScaling"` + TotalDiskSize int `json:"totalDiskSize"` + TotalMemoryMB int `json:"totalMemoryMB"` + MinTotalMemory int `json:"minTotalMemory"` + MaxTotalMemory int `json:"maxTotalMemory"` + Created string `json:"created"` Endpoints []map[string]interface{} `json:"endpoints"` } @@ -40,17 +40,17 @@ func (c *ClickHouseConfig) Struct() map[string]interface{} { } type ClickHouseClient struct { - httpClient *http.Client - apiUrl string - apiKey string + httpClient *http.Client + apiUrl string + apiKey string organizationID string } func NewClickHouseClient(apiUrl, apiKey, organizationID string) *ClickHouseClient { return &ClickHouseClient{ - httpClient: &http.Client{}, - apiUrl: apiUrl, - apiKey: apiKey, + httpClient: &http.Client{}, + apiUrl: apiUrl, + apiKey: apiKey, organizationID: organizationID, } } @@ -60,18 +60,18 @@ type ServiceList struct { } type Service struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - Region string `json:"region"` - CloudProvider string `json:"cloudProvider"` - Tier string `json:"tier"` - IdleScaling map[string]interface{} `json:"idleScaling"` - TotalDiskSize int `json:"totalDiskSize"` - TotalMemoryMB int `json:"totalMemoryMB"` - MinTotalMemory int `json:"minTotalMemory"` - MaxTotalMemory int `json:"maxTotalMemory"` - Created string `json:"created"` + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + Region string `json:"region"` + CloudProvider string `json:"cloudProvider"` + Tier string `json:"tier"` + IdleScaling map[string]interface{} `json:"idleScaling"` + TotalDiskSize int `json:"totalDiskSize"` + TotalMemoryMB int `json:"totalMemoryMB"` + MinTotalMemory int `json:"minTotalMemory"` + MaxTotalMemory int `json:"maxTotalMemory"` + Created string `json:"created"` Endpoints []map[string]interface{} `json:"endpoints"` } @@ -186,4 +186,4 @@ func NewSyncClickhouseCmd() *cobra.Command { cmd.MarkFlagRequired("organization-id") return cmd -} \ No newline at end of file +} diff --git a/cmd/ctrlc/root/sync/sync.go b/cmd/ctrlc/root/sync/sync.go index 9f8ace3..6934485 100644 --- a/cmd/ctrlc/root/sync/sync.go +++ b/cmd/ctrlc/root/sync/sync.go @@ -29,4 +29,4 @@ func NewSyncCmd() *cobra.Command { cmd.AddCommand(cliutil.AddIntervalSupport(clickhouse.NewSyncClickhouseCmd(), "")) return cmd -} \ No newline at end of file +} diff --git a/cmd/ctrlc/root/version/version.go b/cmd/ctrlc/root/version/version.go index 9e13958..62a02d4 100644 --- a/cmd/ctrlc/root/version/version.go +++ b/cmd/ctrlc/root/version/version.go @@ -31,4 +31,4 @@ func NewVersionCmd() *cobra.Command { cmd.Flags().String("format", "json", "Output format. Accepts 'json', 'yaml', or 'github-action'") return cmd -} \ No newline at end of file +} diff --git a/internal/api/job.go b/internal/api/job.go new file mode 100644 index 0000000..ee97ecf --- /dev/null +++ b/internal/api/job.go @@ -0,0 +1,105 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "html/template" +) + +// JobWithDetails extends the Job type with additional fields and methods +type JobWithDetails struct { + Job + Details map[string]interface{} + Client *ClientWithResponses + ExternalID string +} + +// NewJobWithDetails creates a new JobWithDetails with the client and job data +func NewJobWithDetails(client *ClientWithResponses, job Job) (*JobWithDetails, error) { + j := &JobWithDetails{ + Job: job, + Client: client, + } + + // Fetch job details + var err error + j.Details, err = j.GetJobDetails(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + + return j, nil +} + +// GetJobDetails retrieves job details for templating +func (j *JobWithDetails) GetJobDetails(ctx context.Context) (map[string]interface{}, error) { + resp, err := j.Client.GetJobWithResponse(ctx, j.Id.String()) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + if resp.JSON200 == nil { + return nil, fmt.Errorf("received empty response from job details API") + } + + var details map[string]interface{} + detailsBytes, err := json.Marshal(resp.JSON200) + if err != nil { + return nil, fmt.Errorf("failed to marshal job response: %w", err) + } + if err := json.Unmarshal(detailsBytes, &details); err != nil { + return nil, fmt.Errorf("failed to unmarshal job details: %w", err) + } + return details, nil +} + +// TemplateJobDetails applies the job details template to the script +func (j *JobWithDetails) TemplateJobDetails() (string, error) { + // Extract script from JobAgentConfig + script, ok := j.JobAgentConfig["script"].(string) + if !ok { + return "", fmt.Errorf("script not found in job agent config") + } + + // Parse the script template + templatedScript, err := template.New("script").Parse(script) + if err != nil { + return "", fmt.Errorf("failed to parse script template: %w", err) + } + + // Execute the template with job details + buf := new(bytes.Buffer) + if err := templatedScript.Execute(buf, j.Details); err != nil { + return "", fmt.Errorf("failed to execute script template: %w", err) + } + + return buf.String(), nil +} + +// UpdateStatus updates the job status via API +func (j *JobWithDetails) UpdateStatus(status JobStatus, message string) error { + body := UpdateJobJSONRequestBody{ + Status: &status, + } + if message != "" { + body.Message = &message + } + if j.ExternalID != "" { + body.ExternalId = &j.ExternalID + } + + resp, err := j.Client.UpdateJobWithResponse(context.Background(), j.Id.String(), body) + if err != nil { + return fmt.Errorf("failed to update job status: %w", err) + } + if resp.JSON200 == nil { + return fmt.Errorf("failed to update job status: received empty response") + } + return nil +} + +// SetExternalID sets the external ID for the job +func (j *JobWithDetails) SetExternalID(externalID string) { + j.ExternalID = externalID +} diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index f7640c9..d5f6bcd 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -9,9 +9,10 @@ import ( "github.com/ctrlplanedev/cli/internal/api" ) +// Runner defines the interface for job execution. +// Start initiates a job and returns a status or error. type Runner interface { - Start(job api.Job) (string, error) - Status(job api.Job) (api.JobStatus, string) + Start(ctx context.Context, job *api.JobWithDetails) (api.JobStatus, error) } func NewJobAgent( @@ -30,9 +31,8 @@ func NewJobAgent( ja := &JobAgent{ client: client, - - id: agent.JSON200.Id, - workspaceId: config.WorkspaceId, + id: agent.JSON200.Id, + runner: runner, } return ja, nil @@ -40,19 +40,13 @@ func NewJobAgent( type JobAgent struct { client *api.ClientWithResponses - - workspaceId string - id string - + id string runner Runner } -// RunQueuedJobs retrieves and executes any queued jobs for this agent. For each -// job, it starts execution using the runner's Start method in a separate -// goroutine. If starting a job fails, it updates the job status to InProgress -// with an error message. If starting succeeds and returns an external ID, it -// updates the job with that ID. The function waits for all jobs to complete -// before returning. +// RunQueuedJobs retrieves and executes any queued jobs for this agent. +// For each job, it starts execution using the runner's Start method, which +// will update the job status when the job completes. func (a *JobAgent) RunQueuedJobs() error { jobs, err := a.client.GetNextJobsWithResponse(context.Background(), a.id) if err != nil { @@ -64,78 +58,43 @@ func (a *JobAgent) RunQueuedJobs() error { log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs)) var wg sync.WaitGroup - for _, job := range *jobs.JSON200.Jobs { + for _, apiJob := range *jobs.JSON200.Jobs { + if apiJob.Status == api.JobStatusInProgress { + continue + } + + job, err := api.NewJobWithDetails(a.client, apiJob) + if err != nil { + log.Error("Failed to create job with details", "error", err, "jobId", apiJob.Id.String()) + continue + } + + // Update job status to InProgress before starting execution + if err := job.UpdateStatus(api.JobStatusInProgress, "Job execution started"); err != nil { + log.Error("Failed to update job status to InProgress", "error", err, "jobId", job.Id.String()) + // Continue anyway + } + wg.Add(1) - go func(job api.Job) { + go func(job *api.JobWithDetails) { defer wg.Done() - externalId, err := a.runner.Start(job) + + // Start the job - status updates happen inside Start + status, err := a.runner.Start(context.Background(), job) + if err != nil { - status := api.JobStatusInProgress - message := fmt.Sprintf("Failed to start job: %s", err.Error()) log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) - a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - api.UpdateJobJSONRequestBody{ - Status: &status, - Message: &message, - }, - ) + if updErr := job.UpdateStatus(api.JobStatusFailure, fmt.Sprintf("Failed to start job: %s", err.Error())); updErr != nil { + log.Error("Failed to update job status", "error", updErr, "jobId", job.Id.String()) + } return } - if externalId != "" { - a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - api.UpdateJobJSONRequestBody{ - ExternalId: &externalId, - }, - ) - } - }(job) - } - wg.Wait() - - return nil -} - -// UpdateRunningJobs checks the status of all currently running jobs for this -// agent. It queries the API for running jobs, then concurrently checks the -// status of each job using the runner's Status method and updates the job -// status in the API accordingly. Any errors checking job status or updating the -// API are logged but do not stop other job updates from proceeding. -func (a *JobAgent) UpdateRunningJobs() error { - jobs, err := a.client.GetAgentRunningJobsWithResponse(context.Background(), a.id) - if err != nil { - log.Error("Failed to get job", "error", err, "status", jobs.StatusCode()) - return err - } - - if jobs.JSON200 == nil { - log.Error("Failed to get job", "error", err, "status", jobs.StatusCode()) - return fmt.Errorf("failed to get job") - } - var wg sync.WaitGroup - for _, job := range *jobs.JSON200 { - wg.Add(1) - go func(job api.Job) { - defer wg.Done() - status, message := a.runner.Status(job) - - body := api.UpdateJobJSONRequestBody{ - Status: &status, - } - if message != "" { - body.Message = &message - } - _, err := a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - body, - ) - if err != nil { - log.Error("Failed to update job", "error", err, "jobId", job.Id.String()) + // If we got a status that's not InProgress, update it + if status != api.JobStatusInProgress { + if updErr := job.UpdateStatus(status, ""); updErr != nil { + log.Error("Failed to update job status", "error", updErr, "jobId", job.Id.String()) + } } }(job) }