From 8223bd0d769ad3bb2c47dcd4e3d74957d1a23dee Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 00:40:43 -0500 Subject: [PATCH 01/17] fix: run exec agent --- cmd/ctrlc/root/run/exec/exec.go | 42 ++++++++++++++-- cmd/ctrlc/root/run/exec/runner.go | 84 ++++++++++++++++++++++++++++++- pkg/jobagent/runner.go | 4 +- 3 files changed, 124 insertions(+), 6 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index f0873b0..e27322e 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -3,6 +3,7 @@ package exec import ( "fmt" + "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" @@ -10,22 +11,52 @@ import ( "github.com/spf13/viper" ) +type JobAgentType string + +const ( + JobAgentTypeLinux JobAgentType = "exec-linux" + JobAgentTypeWindows JobAgentType = "exec-windows" +) + func NewRunExecCmd() *cobra.Command { - return &cobra.Command{ + var name string + var jobAgentType string + + cmd := &cobra.Command{ Use: "exec", Short: "Execute commands directly when a job is received", + Example: heredoc.Doc(` + $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 + $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 --type windows + `), RunE: func(cmd *cobra.Command, args []string) error { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") + workspaceId := viper.GetString("workspace") client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } + if name == "" { + return fmt.Errorf("name is required") + } + if workspaceId == "" { + return fmt.Errorf("workspace is required") + } + validTypes := map[string]bool{ + string(JobAgentTypeLinux): true, + string(JobAgentTypeWindows): true, + } + if !validTypes[jobAgentType] { + return fmt.Errorf("invalid type: %s. Must be one of: linux, windows", jobAgentType) + } + ja, err := jobagent.NewJobAgent( client, api.UpsertJobAgentJSONRequestBody{ - Name: "exec", - Type: "exec", + Name: name, + Type: jobAgentType, + WorkspaceId: workspaceId, }, &ExecRunner{}, ) @@ -41,4 +72,9 @@ func NewRunExecCmd() *cobra.Command { return nil }, } + + cmd.Flags().StringVar(&name, "name", "", "Name of the job agent") + cmd.MarkFlagRequired("name") + cmd.Flags().StringVar(&jobAgentType, "type", "exec-linux", "Type of the job agent, defaults to linux") + return cmd } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 15cf89f..7cb2b22 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -2,6 +2,7 @@ package exec import ( "bytes" + "context" "encoding/json" "fmt" "html/template" @@ -13,6 +14,7 @@ import ( "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" + "github.com/spf13/viper" ) var _ jobagent.Runner = &ExecRunner{} @@ -24,7 +26,66 @@ type ExecConfig struct { Script string `json:"script"` } +type Resource struct { + ID string `json:"id"` + Name string `json:"name"` + Kind string `json:"kind"` + Version string `json:"version"` + Identifier string `json:"identifier"` + Config map[string]interface{} `json:"config"` + Metadata map[string]interface{} `json:"metadata"` + WorkspaceID string `json:"workspaceId"` +} + +type Release struct { + ID string `json:"id"` + Version string `json:"version"` + Config map[string]interface{} `json:"config"` + Metadata map[string]interface{} `json:"metadata"` +} + +type Environment struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type Approval struct { + Approver *struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"approver"` +} + +type Deployment struct { + ID string `json:"id"` + Name string `json:"name"` + Slug string `json:"slug"` + SystemID string `json:"systemId"` +} + +type Runbook struct { + ID string `json:"id"` + Name string `json:"name"` + SystemID string `json:"systemId"` +} + +type JobData struct { + Variables map[string]string `json:"variables"` + Resource *Resource `json:"resource"` + Release *Release `json:"release"` + Environment *Environment `json:"environment"` + Deployment *Deployment `json:"deployment"` + Runbook *Runbook `json:"runbook"` + Approval *Approval `json:"approval"` + // Add the original config for backward compatibility + Config map[string]interface{} `json:"config"` +} + func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { + if job.ExternalId == nil { + return api.JobStatusExternalRunNotFound, fmt.Sprintf("external ID is nil: %v", job.ExternalId) + } + externalId, err := strconv.Atoi(*job.ExternalId) if err != nil { return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err) @@ -67,13 +128,34 @@ func (r *ExecRunner) Start(job api.Job) (string, error) { return "", fmt.Errorf("failed to unmarshal job agent config: %w", err) } + client, err := api.NewAPIKeyClientWithResponses( + viper.GetString("url"), + viper.GetString("api-key"), + ) + if err != nil { + return "", fmt.Errorf("failed to create API client for job details: %w", err) + } + + resp, err := client.GetJobWithResponse(context.Background(), job.Id.String()) + if err != nil { + return "", fmt.Errorf("failed to get job details: %w", err) + } + + if resp.JSON200 == nil { + return "", fmt.Errorf("received empty response from job details API") + } + + var jobDetails map[string]interface{} + detailsBytes, _ := json.Marshal(resp.JSON200) + json.Unmarshal(detailsBytes, &jobDetails) + templatedScript, err := template.New("script").Parse(config.Script) if err != nil { return "", fmt.Errorf("failed to parse script template: %w", err) } buf := new(bytes.Buffer) - if err := templatedScript.Execute(buf, job); err != nil { + if err := templatedScript.Execute(buf, jobDetails); err != nil { return "", fmt.Errorf("failed to execute script template: %w", err) } script := buf.String() diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index f7640c9..766105a 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -29,10 +29,10 @@ func NewJobAgent( } ja := &JobAgent{ - client: client, - + client: client, id: agent.JSON200.Id, workspaceId: config.WorkspaceId, + runner: runner, } return ja, nil From 01bc449a71af6dbc9f86f49e349401bd207eb01e Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 00:53:07 -0500 Subject: [PATCH 02/17] remove types --- cmd/ctrlc/root/run/exec/runner.go | 55 ------------------------------- 1 file changed, 55 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 7cb2b22..d41bf94 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -26,61 +26,6 @@ type ExecConfig struct { Script string `json:"script"` } -type Resource struct { - ID string `json:"id"` - Name string `json:"name"` - Kind string `json:"kind"` - Version string `json:"version"` - Identifier string `json:"identifier"` - Config map[string]interface{} `json:"config"` - Metadata map[string]interface{} `json:"metadata"` - WorkspaceID string `json:"workspaceId"` -} - -type Release struct { - ID string `json:"id"` - Version string `json:"version"` - Config map[string]interface{} `json:"config"` - Metadata map[string]interface{} `json:"metadata"` -} - -type Environment struct { - ID string `json:"id"` - Name string `json:"name"` -} - -type Approval struct { - Approver *struct { - ID string `json:"id"` - Name string `json:"name"` - } `json:"approver"` -} - -type Deployment struct { - ID string `json:"id"` - Name string `json:"name"` - Slug string `json:"slug"` - SystemID string `json:"systemId"` -} - -type Runbook struct { - ID string `json:"id"` - Name string `json:"name"` - SystemID string `json:"systemId"` -} - -type JobData struct { - Variables map[string]string `json:"variables"` - Resource *Resource `json:"resource"` - Release *Release `json:"release"` - Environment *Environment `json:"environment"` - Deployment *Deployment `json:"deployment"` - Runbook *Runbook `json:"runbook"` - Approval *Approval `json:"approval"` - // Add the original config for backward compatibility - Config map[string]interface{} `json:"config"` -} - func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { if job.ExternalId == nil { return api.JobStatusExternalRunNotFound, fmt.Sprintf("external ID is nil: %v", job.ExternalId) From e1a6568cb0e2b2845e5b69c8aaf1ca1b46201fa5 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 01:49:11 -0500 Subject: [PATCH 03/17] move the job_details outside the runtime --- cmd/ctrlc/root/run/exec/exec.go | 24 ++++++--------------- cmd/ctrlc/root/run/exec/runner.go | 25 +--------------------- pkg/jobagent/job_details.go | 35 +++++++++++++++++++++++++++++++ pkg/jobagent/runner.go | 9 ++++++-- 4 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 pkg/jobagent/job_details.go diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index e27322e..8f6d50a 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -2,6 +2,7 @@ package exec import ( "fmt" + "runtime" "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" @@ -11,23 +12,18 @@ import ( "github.com/spf13/viper" ) -type JobAgentType string - -const ( - JobAgentTypeLinux JobAgentType = "exec-linux" - JobAgentTypeWindows JobAgentType = "exec-windows" -) - func NewRunExecCmd() *cobra.Command { var name string - var jobAgentType string + var jobAgentType = "exec-bash" + if runtime.GOOS == "windows" { + jobAgentType = "exec-powershell" + } cmd := &cobra.Command{ Use: "exec", Short: "Execute commands directly when a job is received", Example: heredoc.Doc(` - $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 - $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 --type windows + $ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000 `), RunE: func(cmd *cobra.Command, args []string) error { apiURL := viper.GetString("url") @@ -43,13 +39,6 @@ func NewRunExecCmd() *cobra.Command { if workspaceId == "" { return fmt.Errorf("workspace is required") } - validTypes := map[string]bool{ - string(JobAgentTypeLinux): true, - string(JobAgentTypeWindows): true, - } - if !validTypes[jobAgentType] { - return fmt.Errorf("invalid type: %s. Must be one of: linux, windows", jobAgentType) - } ja, err := jobagent.NewJobAgent( client, @@ -75,6 +64,5 @@ func NewRunExecCmd() *cobra.Command { cmd.Flags().StringVar(&name, "name", "", "Name of the job agent") cmd.MarkFlagRequired("name") - cmd.Flags().StringVar(&jobAgentType, "type", "exec-linux", "Type of the job agent, defaults to linux") return cmd } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index d41bf94..493c1c4 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -2,7 +2,6 @@ package exec import ( "bytes" - "context" "encoding/json" "fmt" "html/template" @@ -14,7 +13,6 @@ import ( "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" - "github.com/spf13/viper" ) var _ jobagent.Runner = &ExecRunner{} @@ -51,7 +49,7 @@ func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId) } -func (r *ExecRunner) Start(job api.Job) (string, error) { +func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (string, error) { // Create temp script file ext := ".sh" if runtime.GOOS == "windows" { @@ -73,27 +71,6 @@ func (r *ExecRunner) Start(job api.Job) (string, error) { return "", fmt.Errorf("failed to unmarshal job agent config: %w", err) } - client, err := api.NewAPIKeyClientWithResponses( - viper.GetString("url"), - viper.GetString("api-key"), - ) - if err != nil { - return "", fmt.Errorf("failed to create API client for job details: %w", err) - } - - resp, err := client.GetJobWithResponse(context.Background(), job.Id.String()) - if err != nil { - return "", fmt.Errorf("failed to get job details: %w", err) - } - - if resp.JSON200 == nil { - return "", fmt.Errorf("received empty response from job details API") - } - - var jobDetails map[string]interface{} - detailsBytes, _ := json.Marshal(resp.JSON200) - json.Unmarshal(detailsBytes, &jobDetails) - templatedScript, err := template.New("script").Parse(config.Script) if err != nil { return "", fmt.Errorf("failed to parse script template: %w", err) diff --git a/pkg/jobagent/job_details.go b/pkg/jobagent/job_details.go new file mode 100644 index 0000000..18705db --- /dev/null +++ b/pkg/jobagent/job_details.go @@ -0,0 +1,35 @@ +package jobagent + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/ctrlplanedev/cli/internal/api" + "github.com/spf13/viper" +) + +func fetchJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) { + client, err := api.NewAPIKeyClientWithResponses(viper.GetString("url"), viper.GetString("api-key")) + if err != nil { + return nil, fmt.Errorf("failed to create API client for job details: %w", err) + } + + resp, err := client.GetJobWithResponse(ctx, jobID) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + if resp.JSON200 == nil { + return nil, fmt.Errorf("received empty response from job details API") + } + + var details map[string]interface{} + detailsBytes, err := json.Marshal(resp.JSON200) + if err != nil { + return nil, fmt.Errorf("failed to marshal job response: %w", err) + } + if err := json.Unmarshal(detailsBytes, &details); err != nil { + return nil, fmt.Errorf("failed to unmarshal job details: %w", err) + } + return details, nil +} diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 766105a..7931667 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -10,7 +10,7 @@ import ( ) type Runner interface { - Start(job api.Job) (string, error) + Start(job api.Job, jobDetails map[string]interface{}) (string, error) Status(job api.Job) (api.JobStatus, string) } @@ -66,9 +66,14 @@ func (a *JobAgent) RunQueuedJobs() error { var wg sync.WaitGroup for _, job := range *jobs.JSON200.Jobs { wg.Add(1) + jobDetails, err := fetchJobDetails(context.Background(), job.Id.String()) + if err != nil { + log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String()) + continue + } go func(job api.Job) { defer wg.Done() - externalId, err := a.runner.Start(job) + externalId, err := a.runner.Start(job, jobDetails) if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) From 2d98dd459d4e35d46eb49f4aabbe98c60a80929d Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 03:05:03 -0500 Subject: [PATCH 04/17] Mutex --- cmd/ctrlc/root/run/exec/exec.go | 2 +- cmd/ctrlc/root/run/exec/runner.go | 67 +++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 8f6d50a..392f178 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -47,7 +47,7 @@ func NewRunExecCmd() *cobra.Command { Type: jobAgentType, WorkspaceId: workspaceId, }, - &ExecRunner{}, + NewExecRunner(), ) if err != nil { return fmt.Errorf("failed to create job agent: %w", err) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 493c1c4..1cd6b4e 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -9,15 +9,26 @@ import ( "os/exec" "runtime" "strconv" + "sync" "syscall" + "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" ) var _ jobagent.Runner = &ExecRunner{} -type ExecRunner struct{} +type ExecRunner struct { + mu sync.Mutex + finished map[int]error +} + +func NewExecRunner() *ExecRunner { + return &ExecRunner{ + finished: make(map[int]error), + } +} type ExecConfig struct { WorkingDir string `json:"workingDir,omitempty"` @@ -29,24 +40,34 @@ func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { return api.JobStatusExternalRunNotFound, fmt.Sprintf("external ID is nil: %v", job.ExternalId) } - externalId, err := strconv.Atoi(*job.ExternalId) + pid, err := strconv.Atoi(*job.ExternalId) if err != nil { return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err) } - process, err := os.FindProcess(externalId) + // Check if we've recorded a finished result for this process + r.mu.Lock() + finishedErr, exists := r.finished[pid] + r.mu.Unlock() + if exists { + if finishedErr != nil { + return api.JobStatusFailure, fmt.Sprintf("process exited with error: %v", finishedErr) + } + return api.JobStatusSuccessful, "process completed successfully" + } + + // If not finished yet, try to check if the process is still running. + process, err := os.FindProcess(pid) if err != nil { return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err) } - - // On Unix systems, FindProcess always succeeds, so we need to send signal 0 - // to check if process exists + // On Unix, Signal 0 will error if the process is not running. err = process.Signal(syscall.Signal(0)) if err != nil { - return api.JobStatusSuccessful, fmt.Sprintf("process not running: %v", err) + // Process is not running but we haven't recorded its result. + return api.JobStatusFailure, fmt.Sprintf("process not running: %v", err) } - - return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId) + return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", pid) } func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (string, error) { @@ -60,7 +81,6 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri if err != nil { return "", fmt.Errorf("failed to create temp script file: %w", err) } - defer os.Remove(tmpFile.Name()) config := ExecConfig{} jsonBytes, err := json.Marshal(job.JobAgentConfig) @@ -108,9 +128,32 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return "", fmt.Errorf("failed to execute script: %w", err) + if err := cmd.Start(); err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("failed to start process: %w", err) } + pid := cmd.Process.Pid + + // Launch a goroutine to wait for process completion and store the result. + go func(pid int, scriptPath string) { + err := cmd.Wait() + // Ensure the map is not nil; if there's any chance ExecRunner is used as a zero-value, initialize it. + r.mu.Lock() + if r.finished == nil { + r.finished = make(map[int]error) + } + r.finished[pid] = err + r.mu.Unlock() + + if err != nil { + log.Error("Process execution failed", "pid", pid, "error", err) + } else { + log.Info("Process execution succeeded", "pid", pid) + } + + os.Remove(scriptPath) + }(pid, tmpFile.Name()) + return strconv.Itoa(cmd.Process.Pid), nil } From 2d395fb30a3d05ac13662e732ba7bdd4700a4efa Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 03:14:18 -0500 Subject: [PATCH 05/17] Async process management and remove nil check --- cmd/ctrlc/root/run/exec/runner.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 1cd6b4e..52687d9 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -2,6 +2,7 @@ package exec import ( "bytes" + "context" "encoding/json" "fmt" "html/template" @@ -136,24 +137,23 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri pid := cmd.Process.Pid // Launch a goroutine to wait for process completion and store the result. - go func(pid int, scriptPath string) { + ctx, cancel := context.WithCancel(context.Background()) + go func(ctx context.Context, pid int, scriptPath string) { + defer cancel() + defer os.Remove(scriptPath) // Ensure cleanup happens in all cases + err := cmd.Wait() - // Ensure the map is not nil; if there's any chance ExecRunner is used as a zero-value, initialize it. + // Store the result r.mu.Lock() - if r.finished == nil { - r.finished = make(map[int]error) - } r.finished[pid] = err r.mu.Unlock() - + if err != nil { log.Error("Process execution failed", "pid", pid, "error", err) } else { log.Info("Process execution succeeded", "pid", pid) } - - os.Remove(scriptPath) - }(pid, tmpFile.Name()) + }(ctx, pid, tmpFile.Name()) return strconv.Itoa(cmd.Process.Pid), nil } From f01a21699461373d159f7935e7465965956b8cda Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 12:09:19 -0500 Subject: [PATCH 06/17] fix pid usage by using a in-memmory uuid map --- cmd/ctrlc/root/run/exec/exec.go | 5 +- cmd/ctrlc/root/run/exec/runner.go | 148 +++++++++++++++++++++--------- 2 files changed, 110 insertions(+), 43 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 392f178..ef8b7eb 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -40,6 +40,9 @@ func NewRunExecCmd() *cobra.Command { return fmt.Errorf("workspace is required") } + runner := NewExecRunner() + defer runner.Shutdown() + ja, err := jobagent.NewJobAgent( client, api.UpsertJobAgentJSONRequestBody{ @@ -47,7 +50,7 @@ func NewRunExecCmd() *cobra.Command { Type: jobAgentType, WorkspaceId: workspaceId, }, - NewExecRunner(), + runner, ) if err != nil { return fmt.Errorf("failed to create job agent: %w", err) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 52687d9..fe765dc 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -9,25 +9,86 @@ import ( "os" "os/exec" "runtime" - "strconv" "sync" - "syscall" + "time" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" + "github.com/google/uuid" ) var _ jobagent.Runner = &ExecRunner{} +type ProcessInfo struct { + cmd *exec.Cmd + finished error + startTime time.Time +} type ExecRunner struct { - mu sync.Mutex - finished map[int]error + mu sync.Mutex + processes map[string]*ProcessInfo + + // For graceful shutdown + ctx context.Context + cancelFunc context.CancelFunc + wg sync.WaitGroup } func NewExecRunner() *ExecRunner { - return &ExecRunner{ - finished: make(map[int]error), + ctx, cancel := context.WithCancel(context.Background()) + + runner := &ExecRunner{ + processes: make(map[string]*ProcessInfo), + ctx: ctx, + cancelFunc: cancel, + } + + runner.wg.Add(1) + go runner.startHousekeeping() + + return runner +} + +func (r *ExecRunner) Shutdown() { + if r.cancelFunc != nil { + r.cancelFunc() + r.wg.Wait() + } +} + +func (r *ExecRunner) startHousekeeping() { + defer r.wg.Done() + + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + r.cleanupOldProcesses() + case <-r.ctx.Done(): + log.Debug("Housekeeping goroutine shutting down") + return + } + } +} + +func (r *ExecRunner) cleanupOldProcesses() { + const retentionPeriod = 24 * time.Hour + now := time.Now() + + r.mu.Lock() + defer r.mu.Unlock() + + for id, proc := range r.processes { + if proc.finished != nil { + age := now.Sub(proc.startTime) + if age > retentionPeriod { + log.Debug("Cleaning up old process", "uniqueID", id, "age", age.String()) + delete(r.processes, id) + } + } } } @@ -36,43 +97,38 @@ type ExecConfig struct { Script string `json:"script"` } +// Status looks up the process using the unique ID (stored in job.ExternalId) +// rather than the PID. It returns the status based on whether the process has +// finished and if so, whether it exited with an error. func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { if job.ExternalId == nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("external ID is nil: %v", job.ExternalId) - } - - pid, err := strconv.Atoi(*job.ExternalId) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err) + return api.JobStatusExternalRunNotFound, "external ID is nil" } + uniqueID := *job.ExternalId - // Check if we've recorded a finished result for this process r.mu.Lock() - finishedErr, exists := r.finished[pid] + proc, exists := r.processes[uniqueID] r.mu.Unlock() - if exists { - if finishedErr != nil { - return api.JobStatusFailure, fmt.Sprintf("process exited with error: %v", finishedErr) + + if !exists { + return api.JobStatusExternalRunNotFound, "process info not found" + } + + if proc.cmd.ProcessState != nil { + if proc.finished != nil && proc.finished.Error() != "" { + return api.JobStatusFailure, fmt.Sprintf("process exited with error: %s", proc.finished.Error()) } return api.JobStatusSuccessful, "process completed successfully" } - // If not finished yet, try to check if the process is still running. - process, err := os.FindProcess(pid) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err) - } - // On Unix, Signal 0 will error if the process is not running. - err = process.Signal(syscall.Signal(0)) - if err != nil { - // Process is not running but we haven't recorded its result. - return api.JobStatusFailure, fmt.Sprintf("process not running: %v", err) - } - return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", pid) + // Process is still running + return api.JobStatusInProgress, fmt.Sprintf("process running with unique ID %s", uniqueID) } +// Start creates a temporary script file, starts the process, and stores a unique +// identifier along with the process handle in the runner's in-memory map. func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (string, error) { - // Create temp script file + // Determine file extension based on OS. ext := ".sh" if runtime.GOOS == "windows" { ext = ".ps1" @@ -103,7 +159,6 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri } script := buf.String() - // Write script contents if _, err := tmpFile.WriteString(script); err != nil { return "", fmt.Errorf("failed to write script file: %w", err) } @@ -111,7 +166,6 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri return "", fmt.Errorf("failed to close script file: %w", err) } - // Make executable on Unix systems if runtime.GOOS != "windows" { if err := os.Chmod(tmpFile.Name(), 0700); err != nil { return "", fmt.Errorf("failed to make script executable: %w", err) @@ -134,26 +188,36 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri return "", fmt.Errorf("failed to start process: %w", err) } - pid := cmd.Process.Pid + // Generate a unique identifier for this process. + uniqueID := uuid.New().String() + + // Store the process handle in the runner's map. + r.mu.Lock() + r.processes[uniqueID] = &ProcessInfo{ + cmd: cmd, + startTime: time.Now(), + } + r.mu.Unlock() - // Launch a goroutine to wait for process completion and store the result. ctx, cancel := context.WithCancel(context.Background()) - go func(ctx context.Context, pid int, scriptPath string) { + go func(ctx context.Context, uniqueID, scriptPath string) { defer cancel() - defer os.Remove(scriptPath) // Ensure cleanup happens in all cases + defer os.Remove(scriptPath) err := cmd.Wait() - // Store the result + r.mu.Lock() - r.finished[pid] = err + if proc, exists := r.processes[uniqueID]; exists { + proc.finished = err + } r.mu.Unlock() if err != nil { - log.Error("Process execution failed", "pid", pid, "error", err) + log.Error("Process execution failed", "uniqueID", uniqueID, "error", err) } else { - log.Info("Process execution succeeded", "pid", pid) + log.Info("Process execution succeeded", "uniqueID", uniqueID) } - }(ctx, pid, tmpFile.Name()) + }(ctx, uniqueID, tmpFile.Name()) - return strconv.Itoa(cmd.Process.Pid), nil + return uniqueID, nil } From eae19f10121379e8b542876e4fecfd26007c5f6c Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 12:31:26 -0500 Subject: [PATCH 07/17] shift to a pointer over a UUID --- cmd/ctrlc/root/run/exec/runner.go | 40 ++++++++++++++++--------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index fe765dc..efacf9e 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -15,7 +15,6 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" - "github.com/google/uuid" ) var _ jobagent.Runner = &ExecRunner{} @@ -81,12 +80,12 @@ func (r *ExecRunner) cleanupOldProcesses() { r.mu.Lock() defer r.mu.Unlock() - for id, proc := range r.processes { + for handle, proc := range r.processes { if proc.finished != nil { age := now.Sub(proc.startTime) if age > retentionPeriod { - log.Debug("Cleaning up old process", "uniqueID", id, "age", age.String()) - delete(r.processes, id) + log.Debug("Cleaning up old process", "handle", handle, "age", age.String()) + delete(r.processes, handle) } } } @@ -104,10 +103,10 @@ func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { if job.ExternalId == nil { return api.JobStatusExternalRunNotFound, "external ID is nil" } - uniqueID := *job.ExternalId + handle := *job.ExternalId r.mu.Lock() - proc, exists := r.processes[uniqueID] + proc, exists := r.processes[handle] r.mu.Unlock() if !exists { @@ -122,7 +121,7 @@ func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { } // Process is still running - return api.JobStatusInProgress, fmt.Sprintf("process running with unique ID %s", uniqueID) + return api.JobStatusInProgress, fmt.Sprintf("process running with handle %s", handle) } // Start creates a temporary script file, starts the process, and stores a unique @@ -188,36 +187,39 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri return "", fmt.Errorf("failed to start process: %w", err) } - // Generate a unique identifier for this process. - uniqueID := uuid.New().String() - - // Store the process handle in the runner's map. - r.mu.Lock() - r.processes[uniqueID] = &ProcessInfo{ + // Create the ProcessInfo object + procInfo := &ProcessInfo{ cmd: cmd, startTime: time.Now(), } + + // Use the pointer address as the handle + handle := fmt.Sprintf("%p", procInfo) + + // Store the process handle in the runner's map. + r.mu.Lock() + r.processes[handle] = procInfo r.mu.Unlock() ctx, cancel := context.WithCancel(context.Background()) - go func(ctx context.Context, uniqueID, scriptPath string) { + go func(ctx context.Context, handle, scriptPath string) { defer cancel() defer os.Remove(scriptPath) err := cmd.Wait() r.mu.Lock() - if proc, exists := r.processes[uniqueID]; exists { + if proc, exists := r.processes[handle]; exists { proc.finished = err } r.mu.Unlock() if err != nil { - log.Error("Process execution failed", "uniqueID", uniqueID, "error", err) + log.Error("Process execution failed", "handle", handle, "error", err) } else { - log.Info("Process execution succeeded", "uniqueID", uniqueID) + log.Info("Process execution succeeded", "handle", handle) } - }(ctx, uniqueID, tmpFile.Name()) + }(ctx, handle, tmpFile.Name()) - return uniqueID, nil + return handle, nil } From df80fe2fc22613ca64e8583d007f11c945205c50 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 13:40:11 -0500 Subject: [PATCH 08/17] WIP, fixing infinite loop's and the agent re-running jobs --- cmd/ctrlc/root/run/exec/runner.go | 8 ++--- pkg/jobagent/runner.go | 52 +++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index efacf9e..de0b5ea 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -59,13 +59,13 @@ func (r *ExecRunner) Shutdown() { func (r *ExecRunner) startHousekeeping() { defer r.wg.Done() - ticker := time.NewTicker(1 * time.Hour) + ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: - r.cleanupOldProcesses() + r.cleanupOldProcesses(time.Now(), 1*time.Minute) case <-r.ctx.Done(): log.Debug("Housekeeping goroutine shutting down") return @@ -73,9 +73,7 @@ func (r *ExecRunner) startHousekeeping() { } } -func (r *ExecRunner) cleanupOldProcesses() { - const retentionPeriod = 24 * time.Hour - now := time.Now() +func (r *ExecRunner) cleanupOldProcesses(now time.Time, retentionPeriod time.Duration) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 7931667..720ef3b 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -33,6 +33,7 @@ func NewJobAgent( id: agent.JSON200.Id, workspaceId: config.WorkspaceId, runner: runner, + handleMap: make(map[string]string), } return ja, nil @@ -45,6 +46,8 @@ type JobAgent struct { id string runner Runner + handleMap map[string]string + mu sync.Mutex } // RunQueuedJobs retrieves and executes any queued jobs for this agent. For each @@ -54,6 +57,11 @@ type JobAgent struct { // updates the job with that ID. The function waits for all jobs to complete // before returning. func (a *JobAgent) RunQueuedJobs() error { + a.mu.Lock() + mapSize := len(a.handleMap) + a.mu.Unlock() + log.Debug("Starting RunQueuedJobs", "handleMap size", mapSize) + jobs, err := a.client.GetNextJobsWithResponse(context.Background(), a.id) if err != nil { return err @@ -73,7 +81,24 @@ func (a *JobAgent) RunQueuedJobs() error { } go func(job api.Job) { defer wg.Done() + + // Lock when checking the map + a.mu.Lock() + _, exists := a.handleMap[job.Id.String()] + a.mu.Unlock() + + if exists { + log.Debug("Job already running", "jobId", job.Id.String()) + return + } + externalId, err := a.runner.Start(job, jobDetails) + + // Lock when updating the map + a.mu.Lock() + a.handleMap[job.Id.String()] = externalId + a.mu.Unlock() + if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) @@ -89,11 +114,13 @@ func (a *JobAgent) RunQueuedJobs() error { return } if externalId != "" { + status := api.JobStatusInProgress a.client.UpdateJobWithResponse( context.Background(), job.Id.String(), api.UpdateJobJSONRequestBody{ ExternalId: &externalId, + Status: &status, }, ) } @@ -101,6 +128,11 @@ func (a *JobAgent) RunQueuedJobs() error { } wg.Wait() + a.mu.Lock() + mapSize = len(a.handleMap) + a.mu.Unlock() + log.Debug("Finished RunQueuedJobs", "handleMap size", mapSize) + return nil } @@ -110,6 +142,11 @@ func (a *JobAgent) RunQueuedJobs() error { // status in the API accordingly. Any errors checking job status or updating the // API are logged but do not stop other job updates from proceeding. func (a *JobAgent) UpdateRunningJobs() error { + a.mu.Lock() + mapSize := len(a.handleMap) + a.mu.Unlock() + log.Debug("Starting UpdateRunningJobs", "handleMap size", mapSize) + jobs, err := a.client.GetAgentRunningJobsWithResponse(context.Background(), a.id) if err != nil { log.Error("Failed to get job", "error", err, "status", jobs.StatusCode()) @@ -142,9 +179,24 @@ func (a *JobAgent) UpdateRunningJobs() error { if err != nil { log.Error("Failed to update job", "error", err, "jobId", job.Id.String()) } + + // If the job is no longer in progress, remove it from the handleMap + if status != api.JobStatusInProgress { + // Lock when removing from the map + a.mu.Lock() + delete(a.handleMap, job.Id.String()) + a.mu.Unlock() + + log.Debug("Removed completed job from handleMap", "jobId", job.Id.String(), "status", status) + } }(job) } wg.Wait() + a.mu.Lock() + mapSize = len(a.handleMap) + a.mu.Unlock() + log.Debug("Finished UpdateRunningJobs", "handleMap size", mapSize) + return nil } From a29b566655bca607d86f0377913f6a6e5b737478 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 15:51:41 -0500 Subject: [PATCH 09/17] remove the status func and use a go routine to return the status --- cmd/ctrlc/root/run/exec/exec.go | 9 +- cmd/ctrlc/root/run/exec/runner.go | 154 ++++------------------------- pkg/jobagent/runner.go | 159 ++++++++---------------------- 3 files changed, 63 insertions(+), 259 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index ef8b7eb..8c017c2 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -39,10 +39,6 @@ func NewRunExecCmd() *cobra.Command { if workspaceId == "" { return fmt.Errorf("workspace is required") } - - runner := NewExecRunner() - defer runner.Shutdown() - ja, err := jobagent.NewJobAgent( client, api.UpsertJobAgentJSONRequestBody{ @@ -50,7 +46,7 @@ func NewRunExecCmd() *cobra.Command { Type: jobAgentType, WorkspaceId: workspaceId, }, - runner, + &ExecRunner{}, ) if err != nil { return fmt.Errorf("failed to create job agent: %w", err) @@ -58,9 +54,6 @@ func NewRunExecCmd() *cobra.Command { if err := ja.RunQueuedJobs(); err != nil { log.Error("failed to run queued jobs", "error", err) } - if err := ja.UpdateRunningJobs(); err != nil { - log.Error("failed to check for jobs", "error", err) - } return nil }, } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index de0b5ea..60bcc2e 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -2,15 +2,12 @@ package exec import ( "bytes" - "context" "encoding/json" "fmt" "html/template" "os" "os/exec" "runtime" - "sync" - "time" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" @@ -19,112 +16,15 @@ import ( var _ jobagent.Runner = &ExecRunner{} -type ProcessInfo struct { - cmd *exec.Cmd - finished error - startTime time.Time -} -type ExecRunner struct { - mu sync.Mutex - processes map[string]*ProcessInfo - - // For graceful shutdown - ctx context.Context - cancelFunc context.CancelFunc - wg sync.WaitGroup -} - -func NewExecRunner() *ExecRunner { - ctx, cancel := context.WithCancel(context.Background()) - - runner := &ExecRunner{ - processes: make(map[string]*ProcessInfo), - ctx: ctx, - cancelFunc: cancel, - } - - runner.wg.Add(1) - go runner.startHousekeeping() - - return runner -} - -func (r *ExecRunner) Shutdown() { - if r.cancelFunc != nil { - r.cancelFunc() - r.wg.Wait() - } -} - -func (r *ExecRunner) startHousekeeping() { - defer r.wg.Done() - - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - r.cleanupOldProcesses(time.Now(), 1*time.Minute) - case <-r.ctx.Done(): - log.Debug("Housekeeping goroutine shutting down") - return - } - } -} - -func (r *ExecRunner) cleanupOldProcesses(now time.Time, retentionPeriod time.Duration) { - - r.mu.Lock() - defer r.mu.Unlock() - - for handle, proc := range r.processes { - if proc.finished != nil { - age := now.Sub(proc.startTime) - if age > retentionPeriod { - log.Debug("Cleaning up old process", "handle", handle, "age", age.String()) - delete(r.processes, handle) - } - } - } -} +type ExecRunner struct{} type ExecConfig struct { WorkingDir string `json:"workingDir,omitempty"` Script string `json:"script"` } -// Status looks up the process using the unique ID (stored in job.ExternalId) -// rather than the PID. It returns the status based on whether the process has -// finished and if so, whether it exited with an error. -func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { - if job.ExternalId == nil { - return api.JobStatusExternalRunNotFound, "external ID is nil" - } - handle := *job.ExternalId - - r.mu.Lock() - proc, exists := r.processes[handle] - r.mu.Unlock() - - if !exists { - return api.JobStatusExternalRunNotFound, "process info not found" - } - - if proc.cmd.ProcessState != nil { - if proc.finished != nil && proc.finished.Error() != "" { - return api.JobStatusFailure, fmt.Sprintf("process exited with error: %s", proc.finished.Error()) - } - return api.JobStatusSuccessful, "process completed successfully" - } - - // Process is still running - return api.JobStatusInProgress, fmt.Sprintf("process running with handle %s", handle) -} - -// Start creates a temporary script file, starts the process, and stores a unique -// identifier along with the process handle in the runner's in-memory map. -func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (string, error) { +// Start creates a temporary script file, starts the process, and updates job status when the process completes. +func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) { // Determine file extension based on OS. ext := ".sh" if runtime.GOOS == "windows" { @@ -133,39 +33,39 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri tmpFile, err := os.CreateTemp("", "script*"+ext) if err != nil { - return "", fmt.Errorf("failed to create temp script file: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err) } config := ExecConfig{} jsonBytes, err := json.Marshal(job.JobAgentConfig) if err != nil { - return "", fmt.Errorf("failed to marshal job agent config: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to marshal job agent config: %w", err) } if err := json.Unmarshal(jsonBytes, &config); err != nil { - return "", fmt.Errorf("failed to unmarshal job agent config: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to unmarshal job agent config: %w", err) } templatedScript, err := template.New("script").Parse(config.Script) if err != nil { - return "", fmt.Errorf("failed to parse script template: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to parse script template: %w", err) } buf := new(bytes.Buffer) if err := templatedScript.Execute(buf, jobDetails); err != nil { - return "", fmt.Errorf("failed to execute script template: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to execute script template: %w", err) } script := buf.String() if _, err := tmpFile.WriteString(script); err != nil { - return "", fmt.Errorf("failed to write script file: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err) } if err := tmpFile.Close(); err != nil { - return "", fmt.Errorf("failed to close script file: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err) } if runtime.GOOS != "windows" { if err := os.Chmod(tmpFile.Name(), 0700); err != nil { - return "", fmt.Errorf("failed to make script executable: %w", err) + return "", api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err) } } @@ -182,42 +82,26 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (stri if err := cmd.Start(); err != nil { os.Remove(tmpFile.Name()) - return "", fmt.Errorf("failed to start process: %w", err) - } - - // Create the ProcessInfo object - procInfo := &ProcessInfo{ - cmd: cmd, - startTime: time.Now(), + return "", api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err) } // Use the pointer address as the handle - handle := fmt.Sprintf("%p", procInfo) - - // Store the process handle in the runner's map. - r.mu.Lock() - r.processes[handle] = procInfo - r.mu.Unlock() + handle := fmt.Sprintf("%p", cmd) - ctx, cancel := context.WithCancel(context.Background()) - go func(ctx context.Context, handle, scriptPath string) { - defer cancel() + // Spawn a goroutine to wait for the process to finish and update the job status + go func(handle, scriptPath string) { defer os.Remove(scriptPath) err := cmd.Wait() - r.mu.Lock() - if proc, exists := r.processes[handle]; exists { - proc.finished = err - } - r.mu.Unlock() - if err != nil { log.Error("Process execution failed", "handle", handle, "error", err) + statusUpdateFunc(job.Id.String(), api.JobStatusFailure, err.Error()) } else { log.Info("Process execution succeeded", "handle", handle) + statusUpdateFunc(job.Id.String(), api.JobStatusSuccessful, "") } - }(ctx, handle, tmpFile.Name()) + }(handle, tmpFile.Name()) - return handle, nil + return handle, api.JobStatusInProgress, nil } diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 720ef3b..be430f2 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -9,9 +9,11 @@ import ( "github.com/ctrlplanedev/cli/internal/api" ) +// Runner defines the interface for job execution. +// Start initiates a job and returns an external ID or error. +// The implementation should handle status updates when the job completes. type Runner interface { - Start(job api.Job, jobDetails map[string]interface{}) (string, error) - Status(job api.Job) (api.JobStatus, string) + Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) } func NewJobAgent( @@ -33,7 +35,6 @@ func NewJobAgent( id: agent.JSON200.Id, workspaceId: config.WorkspaceId, runner: runner, - handleMap: make(map[string]string), } return ja, nil @@ -46,22 +47,34 @@ type JobAgent struct { id string runner Runner - handleMap map[string]string - mu sync.Mutex } -// RunQueuedJobs retrieves and executes any queued jobs for this agent. For each -// job, it starts execution using the runner's Start method in a separate -// goroutine. If starting a job fails, it updates the job status to InProgress -// with an error message. If starting succeeds and returns an external ID, it -// updates the job with that ID. The function waits for all jobs to complete -// before returning. -func (a *JobAgent) RunQueuedJobs() error { - a.mu.Lock() - mapSize := len(a.handleMap) - a.mu.Unlock() - log.Debug("Starting RunQueuedJobs", "handleMap size", mapSize) +// updateJobStatus is a helper function to update job status via the API +func (a *JobAgent) updateJobStatus(jobID string, status api.JobStatus, message string, externalID *string) { + body := api.UpdateJobJSONRequestBody{ + Status: &status, + } + if message != "" { + body.Message = &message + } + if externalID != nil { + body.ExternalId = externalID + } + + _, err := a.client.UpdateJobWithResponse( + context.Background(), + jobID, + body, + ) + if err != nil { + log.Error("Failed to update job", "error", err, "jobId", jobID) + } +} +// RunQueuedJobs retrieves and executes any queued jobs for this agent. +// For each job, it starts execution using the runner's Start method, which +// will update the job status when the job completes. +func (a *JobAgent) RunQueuedJobs() error { jobs, err := a.client.GetNextJobsWithResponse(context.Background(), a.id) if err != nil { return err @@ -73,130 +86,44 @@ func (a *JobAgent) RunQueuedJobs() error { log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs)) var wg sync.WaitGroup for _, job := range *jobs.JSON200.Jobs { - wg.Add(1) jobDetails, err := fetchJobDetails(context.Background(), job.Id.String()) if err != nil { log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String()) continue } + wg.Add(1) go func(job api.Job) { defer wg.Done() - // Lock when checking the map - a.mu.Lock() - _, exists := a.handleMap[job.Id.String()] - a.mu.Unlock() - - if exists { - log.Debug("Job already running", "jobId", job.Id.String()) - return + // Create a status update callback for this job + statusUpdateFunc := func(jobID string, status api.JobStatus, message string) { + a.updateJobStatus(jobID, status, message, nil) } - externalId, err := a.runner.Start(job, jobDetails) - - // Lock when updating the map - a.mu.Lock() - a.handleMap[job.Id.String()] = externalId - a.mu.Unlock() + externalId, status, err := a.runner.Start(job, jobDetails, statusUpdateFunc) if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) - a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - api.UpdateJobJSONRequestBody{ - Status: &status, - Message: &message, - }, - ) + a.updateJobStatus(job.Id.String(), status, message, nil) return } - if externalId != "" { - status := api.JobStatusInProgress - a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - api.UpdateJobJSONRequestBody{ - ExternalId: &externalId, - Status: &status, - }, - ) - } - }(job) - } - wg.Wait() - - a.mu.Lock() - mapSize = len(a.handleMap) - a.mu.Unlock() - log.Debug("Finished RunQueuedJobs", "handleMap size", mapSize) - - return nil -} - -// UpdateRunningJobs checks the status of all currently running jobs for this -// agent. It queries the API for running jobs, then concurrently checks the -// status of each job using the runner's Status method and updates the job -// status in the API accordingly. Any errors checking job status or updating the -// API are logged but do not stop other job updates from proceeding. -func (a *JobAgent) UpdateRunningJobs() error { - a.mu.Lock() - mapSize := len(a.handleMap) - a.mu.Unlock() - log.Debug("Starting UpdateRunningJobs", "handleMap size", mapSize) - - jobs, err := a.client.GetAgentRunningJobsWithResponse(context.Background(), a.id) - if err != nil { - log.Error("Failed to get job", "error", err, "status", jobs.StatusCode()) - return err - } - - if jobs.JSON200 == nil { - log.Error("Failed to get job", "error", err, "status", jobs.StatusCode()) - return fmt.Errorf("failed to get job") - } - var wg sync.WaitGroup - for _, job := range *jobs.JSON200 { - wg.Add(1) - go func(job api.Job) { - defer wg.Done() - status, message := a.runner.Status(job) - - body := api.UpdateJobJSONRequestBody{ - Status: &status, - } - if message != "" { - body.Message = &message - } - _, err := a.client.UpdateJobWithResponse( - context.Background(), - job.Id.String(), - body, - ) - if err != nil { - log.Error("Failed to update job", "error", err, "jobId", job.Id.String()) + if status == api.JobStatusFailure { + message := fmt.Sprintf("Failed to start job: %s", err.Error()) + log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) + a.updateJobStatus(job.Id.String(), status, message, nil) + return } - // If the job is no longer in progress, remove it from the handleMap - if status != api.JobStatusInProgress { - // Lock when removing from the map - a.mu.Lock() - delete(a.handleMap, job.Id.String()) - a.mu.Unlock() - - log.Debug("Removed completed job from handleMap", "jobId", job.Id.String(), "status", status) + if externalId != "" { + status := api.JobStatusInProgress + a.updateJobStatus(job.Id.String(), status, "", &externalId) } }(job) } wg.Wait() - a.mu.Lock() - mapSize = len(a.handleMap) - a.mu.Unlock() - log.Debug("Finished UpdateRunningJobs", "handleMap size", mapSize) - return nil } From ab92d451cbc4e9e21483183fb901b946093cd0c1 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Sat, 22 Mar 2025 15:53:12 -0500 Subject: [PATCH 10/17] Fmt --- pkg/jobagent/runner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index be430f2..11d438e 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -94,14 +94,14 @@ func (a *JobAgent) RunQueuedJobs() error { wg.Add(1) go func(job api.Job) { defer wg.Done() - + // Create a status update callback for this job statusUpdateFunc := func(jobID string, status api.JobStatus, message string) { a.updateJobStatus(jobID, status, message, nil) } - + externalId, status, err := a.runner.Start(job, jobDetails, statusUpdateFunc) - + if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) @@ -116,7 +116,7 @@ func (a *JobAgent) RunQueuedJobs() error { a.updateJobStatus(job.Id.String(), status, message, nil) return } - + if externalId != "" { status := api.JobStatusInProgress a.updateJobStatus(job.Id.String(), status, "", &externalId) From 6d1fda902f07ae4bbeb9811b28c378ae505d00a9 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 20:49:59 -0500 Subject: [PATCH 11/17] testing --- cmd/ctrlc/root/run/exec/exec.go | 80 ++++++++++++++-- cmd/ctrlc/root/run/exec/runner.go | 149 ++++++++++++++++++++++-------- internal/api/client.go | 69 ++++++++++++++ pkg/jobagent/job_details.go | 35 ------- pkg/jobagent/runner.go | 67 +++++--------- 5 files changed, 274 insertions(+), 126 deletions(-) delete mode 100644 pkg/jobagent/job_details.go diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 8c017c2..94fcd59 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -1,8 +1,13 @@ package exec import ( + "context" "fmt" + "os" + "os/signal" "runtime" + "syscall" + "time" "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" @@ -13,8 +18,11 @@ import ( ) func NewRunExecCmd() *cobra.Command { - var name string - var jobAgentType = "exec-bash" + var ( + name string + jobAgentType = "exec-bash" + ) + if runtime.GOOS == "windows" { jobAgentType = "exec-powershell" } @@ -29,36 +37,88 @@ func NewRunExecCmd() *cobra.Command { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") workspaceId := viper.GetString("workspace") + + // Get interval from the flag set by AddIntervalSupport + intervalStr, _ := cmd.Flags().GetString("interval") + + interval := 10 * time.Second + if intervalStr != "" { + duration, err := time.ParseDuration(intervalStr) + if err != nil { + return fmt.Errorf("invalid interval format: %w", err) + } + interval = duration + } + client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } + if name == "" { return fmt.Errorf("name is required") } if workspaceId == "" { return fmt.Errorf("workspace is required") } + + runner := NewExecRunner(client) + + jobAgentConfig := api.UpsertJobAgentJSONRequestBody{ + Name: name, + Type: jobAgentType, + WorkspaceId: workspaceId, + } + ja, err := jobagent.NewJobAgent( client, - api.UpsertJobAgentJSONRequestBody{ - Name: name, - Type: jobAgentType, - WorkspaceId: workspaceId, - }, - &ExecRunner{}, + jobAgentConfig, + runner, ) + if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } + + // Setup signal handling for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigCh + log.Info("Shutting down gracefully...") + runner.ExitAll(true) + cancel() + }() + + // Main polling loop + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Run initial job check if err := ja.RunQueuedJobs(); err != nil { - log.Error("failed to run queued jobs", "error", err) + log.Error("Failed to run queued jobs", "error", err) + } + + // Polling loop + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := ja.RunQueuedJobs(); err != nil { + log.Error("Failed to run queued jobs", "error", err) + } + } } - return nil }, } cmd.Flags().StringVar(&name, "name", "", "Name of the job agent") cmd.MarkFlagRequired("name") + cmd.MarkFlagRequired("workspace") return cmd } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 60bcc2e..bc5d0fe 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -1,13 +1,15 @@ package exec import ( - "bytes" - "encoding/json" + "context" "fmt" - "html/template" "os" "os/exec" + "os/signal" "runtime" + "strconv" + "sync" + "syscall" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" @@ -16,16 +18,51 @@ import ( var _ jobagent.Runner = &ExecRunner{} -type ExecRunner struct{} +type RunningJob struct { + cmd *exec.Cmd + jobID string + client *api.ClientWithResponses + exitCode int + cancelled bool +} + +type ExecRunner struct { + runningJobs map[string]*RunningJob + client *api.ClientWithResponses + mu sync.Mutex + wg sync.WaitGroup +} + +func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { + runner := &ExecRunner{ + runningJobs: make(map[string]*RunningJob), + client: client, + } -type ExecConfig struct { - WorkingDir string `json:"workingDir,omitempty"` - Script string `json:"script"` + // Set up signal handling for graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + log.Info("Received shutdown signal, terminating all jobs") + runner.ExitAll(true) + os.Exit(0) + }() + + return runner } // Start creates a temporary script file, starts the process, and updates job status when the process completes. -func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) { - // Determine file extension based on OS. +func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, + statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) { + + // Template the script using the API client + script, err := r.client.TemplateJobDetails(job, jobDetails) + if err != nil { + return "", api.JobStatusFailure, fmt.Errorf("failed to template job details: %w", err) + } + + // Determine file extension based on OS ext := ".sh" if runtime.GOOS == "windows" { ext = ".ps1" @@ -36,50 +73,37 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}, statu return "", api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err) } - config := ExecConfig{} - jsonBytes, err := json.Marshal(job.JobAgentConfig) - if err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to marshal job agent config: %w", err) - } - if err := json.Unmarshal(jsonBytes, &config); err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to unmarshal job agent config: %w", err) - } - - templatedScript, err := template.New("script").Parse(config.Script) - if err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to parse script template: %w", err) - } - - buf := new(bytes.Buffer) - if err := templatedScript.Execute(buf, jobDetails); err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to execute script template: %w", err) - } - script := buf.String() - + // Write the script to the temporary file if _, err := tmpFile.WriteString(script); err != nil { + os.Remove(tmpFile.Name()) return "", api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err) } if err := tmpFile.Close(); err != nil { + os.Remove(tmpFile.Name()) return "", api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err) } + // Make the script executable on Unix-like systems if runtime.GOOS != "windows" { if err := os.Chmod(tmpFile.Name(), 0700); err != nil { + os.Remove(tmpFile.Name()) return "", api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err) } } + // Create and configure the command var cmd *exec.Cmd if runtime.GOOS == "windows" { - cmd = exec.Command("powershell", "-File", tmpFile.Name()) + cmd = exec.CommandContext(ctx, "powershell", "-File", tmpFile.Name()) } else { - cmd = exec.Command("bash", "-c", tmpFile.Name()) + cmd = exec.CommandContext(ctx, "bash", "-c", tmpFile.Name()) } - cmd.Dir = config.WorkingDir + // Set up command environment cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + // Start the process if err := cmd.Start(); err != nil { os.Remove(tmpFile.Name()) return "", api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err) @@ -88,20 +112,71 @@ func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}, statu // Use the pointer address as the handle handle := fmt.Sprintf("%p", cmd) + // Create a running job record + runningJob := &RunningJob{ + cmd: cmd, + jobID: job.Id.String(), + client: r.client, + } + + // Register the running job + r.mu.Lock() + r.runningJobs[handle] = runningJob + r.mu.Unlock() + // Spawn a goroutine to wait for the process to finish and update the job status go func(handle, scriptPath string) { defer os.Remove(scriptPath) + defer func() { + r.mu.Lock() + delete(r.runningJobs, handle) + r.mu.Unlock() + }() err := cmd.Wait() - + exitCode := 0 if err != nil { - log.Error("Process execution failed", "handle", handle, "error", err) - statusUpdateFunc(job.Id.String(), api.JobStatusFailure, err.Error()) + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } + } + + if runningJob.cancelled { + statusUpdateFunc(runningJob.jobID, api.JobStatusCancelled, "Job was cancelled") + } else if err != nil { + statusUpdateFunc(runningJob.jobID, api.JobStatusFailure, fmt.Sprintf("Process exited with code %d: %v", exitCode, err)) } else { - log.Info("Process execution succeeded", "handle", handle) - statusUpdateFunc(job.Id.String(), api.JobStatusSuccessful, "") + statusUpdateFunc(runningJob.jobID, api.JobStatusSuccessful, "") } }(handle, tmpFile.Name()) return handle, api.JobStatusInProgress, nil } + +// ExitAll stops all currently running commands +func (r *ExecRunner) ExitAll(cancelled bool) { + r.mu.Lock() + defer r.mu.Unlock() + + for id, runningJob := range r.runningJobs { + if runningJob != nil && runningJob.cmd != nil && runningJob.cmd.Process != nil { + // Check if process is still running before attempting to kill + if err := runningJob.cmd.Process.Signal(syscall.Signal(0)); err == nil { + log.Info("Killing job", "id", id) + runningJob.cancelled = cancelled + + // Process is running, kill it and its children + if runtime.GOOS == "windows" { + exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(runningJob.cmd.Process.Pid)).Run() + } else { + // Send SIGTERM first for graceful shutdown + runningJob.cmd.Process.Signal(syscall.SIGTERM) + } + } + } + } + + if cancelled { + r.runningJobs = make(map[string]*RunningJob) + } +} \ No newline at end of file diff --git a/internal/api/client.go b/internal/api/client.go index c7169ba..46f8394 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -1,8 +1,11 @@ package api import ( + "bytes" "context" "encoding/json" + "fmt" + "html/template" "net/http" "strings" ) @@ -21,3 +24,69 @@ func NewAPIKeyClientWithResponses(server string, apiKey string) (*ClientWithResp func (v *Variable_Value) SetString(value string) { v.union = json.RawMessage("\"" + value + "\"") } + +// TemplateJobDetails applies the job details template to the script +func (c *ClientWithResponses) TemplateJobDetails(job Job, jobDetails map[string]interface{}) (string, error) { + // Extract script from JobAgentConfig + script, ok := job.JobAgentConfig["script"].(string) + if !ok { + return "", fmt.Errorf("script not found in job agent config") + } + + // Parse the script template + templatedScript, err := template.New("script").Parse(script) + if err != nil { + return "", fmt.Errorf("failed to parse script template: %w", err) + } + + // Execute the template with job details + buf := new(bytes.Buffer) + if err := templatedScript.Execute(buf, jobDetails); err != nil { + return "", fmt.Errorf("failed to execute script template: %w", err) + } + + return buf.String(), nil +} + +// UpdateJobStatus updates the job status via API +func (c *ClientWithResponses) UpdateJobStatus(jobID string, status JobStatus, message string, externalID *string) error { + body := UpdateJobJSONRequestBody{ + Status: &status, + } + if message != "" { + body.Message = &message + } + if externalID != nil { + body.ExternalId = externalID + } + + resp, err := c.UpdateJobWithResponse(context.Background(), jobID, body) + if err != nil { + return fmt.Errorf("failed to update job status: %w", err) + } + if resp.JSON200 == nil { + return fmt.Errorf("failed to update job status: received empty response") + } + return nil +} + +// GetJobDetails retrieves job details for templating +func (c *ClientWithResponses) GetJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) { + resp, err := c.GetJobWithResponse(ctx, jobID) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + if resp.JSON200 == nil { + return nil, fmt.Errorf("received empty response from job details API") + } + + var details map[string]interface{} + detailsBytes, err := json.Marshal(resp.JSON200) + if err != nil { + return nil, fmt.Errorf("failed to marshal job response: %w", err) + } + if err := json.Unmarshal(detailsBytes, &details); err != nil { + return nil, fmt.Errorf("failed to unmarshal job details: %w", err) + } + return details, nil +} \ No newline at end of file diff --git a/pkg/jobagent/job_details.go b/pkg/jobagent/job_details.go deleted file mode 100644 index 18705db..0000000 --- a/pkg/jobagent/job_details.go +++ /dev/null @@ -1,35 +0,0 @@ -package jobagent - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/ctrlplanedev/cli/internal/api" - "github.com/spf13/viper" -) - -func fetchJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) { - client, err := api.NewAPIKeyClientWithResponses(viper.GetString("url"), viper.GetString("api-key")) - if err != nil { - return nil, fmt.Errorf("failed to create API client for job details: %w", err) - } - - resp, err := client.GetJobWithResponse(ctx, jobID) - if err != nil { - return nil, fmt.Errorf("failed to get job details: %w", err) - } - if resp.JSON200 == nil { - return nil, fmt.Errorf("received empty response from job details API") - } - - var details map[string]interface{} - detailsBytes, err := json.Marshal(resp.JSON200) - if err != nil { - return nil, fmt.Errorf("failed to marshal job response: %w", err) - } - if err := json.Unmarshal(detailsBytes, &details); err != nil { - return nil, fmt.Errorf("failed to unmarshal job details: %w", err) - } - return details, nil -} diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 11d438e..4bbbdb1 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -13,7 +13,8 @@ import ( // Start initiates a job and returns an external ID or error. // The implementation should handle status updates when the job completes. type Runner interface { - Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) + Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, + statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) } func NewJobAgent( @@ -41,34 +42,10 @@ func NewJobAgent( } type JobAgent struct { - client *api.ClientWithResponses - + client *api.ClientWithResponses workspaceId string id string - - runner Runner -} - -// updateJobStatus is a helper function to update job status via the API -func (a *JobAgent) updateJobStatus(jobID string, status api.JobStatus, message string, externalID *string) { - body := api.UpdateJobJSONRequestBody{ - Status: &status, - } - if message != "" { - body.Message = &message - } - if externalID != nil { - body.ExternalId = externalID - } - - _, err := a.client.UpdateJobWithResponse( - context.Background(), - jobID, - body, - ) - if err != nil { - log.Error("Failed to update job", "error", err, "jobId", jobID) - } + runner Runner } // RunQueuedJobs retrieves and executes any queued jobs for this agent. @@ -86,44 +63,46 @@ func (a *JobAgent) RunQueuedJobs() error { log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs)) var wg sync.WaitGroup for _, job := range *jobs.JSON200.Jobs { - jobDetails, err := fetchJobDetails(context.Background(), job.Id.String()) + jobDetails, err := a.client.GetJobDetails(context.Background(), job.Id.String()) if err != nil { log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String()) continue } + if job.Status == api.JobStatusInProgress { + continue + } wg.Add(1) go func(job api.Job) { defer wg.Done() - + // Create a status update callback for this job statusUpdateFunc := func(jobID string, status api.JobStatus, message string) { - a.updateJobStatus(jobID, status, message, nil) + if err := a.client.UpdateJobStatus(jobID, status, message, nil); err != nil { + log.Error("Failed to update job status", "error", err, "jobId", jobID) + } } - - externalId, status, err := a.runner.Start(job, jobDetails, statusUpdateFunc) - + + externalId, _, err := a.runner.Start(context.Background(), job, jobDetails, statusUpdateFunc) + if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) - a.updateJobStatus(job.Id.String(), status, message, nil) + if err := a.client.UpdateJobStatus(job.Id.String(), status, message, nil); err != nil { + log.Error("Failed to update job status", "error", err, "jobId", job.Id.String()) + } return } - - if status == api.JobStatusFailure { - message := fmt.Sprintf("Failed to start job: %s", err.Error()) - log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) - a.updateJobStatus(job.Id.String(), status, message, nil) - return - } - + if externalId != "" { status := api.JobStatusInProgress - a.updateJobStatus(job.Id.String(), status, "", &externalId) + if err := a.client.UpdateJobStatus(job.Id.String(), status, "", &externalId); err != nil { + log.Error("Failed to update job status", "error", err, "jobId", job.Id.String()) + } } }(job) } wg.Wait() return nil -} +} \ No newline at end of file From 840e166a7dc5b8a5008ffb88962aa6e7f4046adb Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 20:52:49 -0500 Subject: [PATCH 12/17] format --- cmd/ctrlc/root/run/exec/exec.go | 14 ++--- cmd/ctrlc/root/run/exec/runner.go | 6 +- cmd/ctrlc/root/sync/clickhouse/clickhouse.go | 62 ++++++++++---------- cmd/ctrlc/root/sync/sync.go | 2 +- cmd/ctrlc/root/version/version.go | 2 +- internal/api/client.go | 2 +- pkg/jobagent/runner.go | 14 ++--- 7 files changed, 51 insertions(+), 51 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 94fcd59..c07281a 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -19,8 +19,8 @@ import ( func NewRunExecCmd() *cobra.Command { var ( - name string - jobAgentType = "exec-bash" + name string + jobAgentType = "exec-bash" ) if runtime.GOOS == "windows" { @@ -37,10 +37,10 @@ func NewRunExecCmd() *cobra.Command { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") workspaceId := viper.GetString("workspace") - + // Get interval from the flag set by AddIntervalSupport intervalStr, _ := cmd.Flags().GetString("interval") - + interval := 10 * time.Second if intervalStr != "" { duration, err := time.ParseDuration(intervalStr) @@ -49,12 +49,12 @@ func NewRunExecCmd() *cobra.Command { } interval = duration } - + client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } - + if name == "" { return fmt.Errorf("name is required") } @@ -79,7 +79,7 @@ func NewRunExecCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } - + // Setup signal handling for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 69bc0db..b68ff20 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -53,9 +53,9 @@ func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { } // Start creates a temporary script file, starts the process, and updates job status when the process completes. -func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, +func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) { - + // Template the script using the API client script, err := r.client.TemplateJobDetails(job, jobDetails) if err != nil { @@ -176,4 +176,4 @@ func (r *ExecRunner) ExitAll(cancelled bool) { if cancelled { r.runningJobs = make(map[string]*RunningJob) } -} \ No newline at end of file +} diff --git a/cmd/ctrlc/root/sync/clickhouse/clickhouse.go b/cmd/ctrlc/root/sync/clickhouse/clickhouse.go index c2e27fd..4e553f6 100644 --- a/cmd/ctrlc/root/sync/clickhouse/clickhouse.go +++ b/cmd/ctrlc/root/sync/clickhouse/clickhouse.go @@ -17,18 +17,18 @@ import ( ) type ClickHouseConfig struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - Region string `json:"region"` - CloudProvider string `json:"cloudProvider"` - Tier string `json:"tier"` - IdleScaling map[string]interface{} `json:"idleScaling"` - TotalDiskSize int `json:"totalDiskSize"` - TotalMemoryMB int `json:"totalMemoryMB"` - MinTotalMemory int `json:"minTotalMemory"` - MaxTotalMemory int `json:"maxTotalMemory"` - Created string `json:"created"` + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + Region string `json:"region"` + CloudProvider string `json:"cloudProvider"` + Tier string `json:"tier"` + IdleScaling map[string]interface{} `json:"idleScaling"` + TotalDiskSize int `json:"totalDiskSize"` + TotalMemoryMB int `json:"totalMemoryMB"` + MinTotalMemory int `json:"minTotalMemory"` + MaxTotalMemory int `json:"maxTotalMemory"` + Created string `json:"created"` Endpoints []map[string]interface{} `json:"endpoints"` } @@ -40,17 +40,17 @@ func (c *ClickHouseConfig) Struct() map[string]interface{} { } type ClickHouseClient struct { - httpClient *http.Client - apiUrl string - apiKey string + httpClient *http.Client + apiUrl string + apiKey string organizationID string } func NewClickHouseClient(apiUrl, apiKey, organizationID string) *ClickHouseClient { return &ClickHouseClient{ - httpClient: &http.Client{}, - apiUrl: apiUrl, - apiKey: apiKey, + httpClient: &http.Client{}, + apiUrl: apiUrl, + apiKey: apiKey, organizationID: organizationID, } } @@ -60,18 +60,18 @@ type ServiceList struct { } type Service struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - Region string `json:"region"` - CloudProvider string `json:"cloudProvider"` - Tier string `json:"tier"` - IdleScaling map[string]interface{} `json:"idleScaling"` - TotalDiskSize int `json:"totalDiskSize"` - TotalMemoryMB int `json:"totalMemoryMB"` - MinTotalMemory int `json:"minTotalMemory"` - MaxTotalMemory int `json:"maxTotalMemory"` - Created string `json:"created"` + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + Region string `json:"region"` + CloudProvider string `json:"cloudProvider"` + Tier string `json:"tier"` + IdleScaling map[string]interface{} `json:"idleScaling"` + TotalDiskSize int `json:"totalDiskSize"` + TotalMemoryMB int `json:"totalMemoryMB"` + MinTotalMemory int `json:"minTotalMemory"` + MaxTotalMemory int `json:"maxTotalMemory"` + Created string `json:"created"` Endpoints []map[string]interface{} `json:"endpoints"` } @@ -186,4 +186,4 @@ func NewSyncClickhouseCmd() *cobra.Command { cmd.MarkFlagRequired("organization-id") return cmd -} \ No newline at end of file +} diff --git a/cmd/ctrlc/root/sync/sync.go b/cmd/ctrlc/root/sync/sync.go index 9f8ace3..6934485 100644 --- a/cmd/ctrlc/root/sync/sync.go +++ b/cmd/ctrlc/root/sync/sync.go @@ -29,4 +29,4 @@ func NewSyncCmd() *cobra.Command { cmd.AddCommand(cliutil.AddIntervalSupport(clickhouse.NewSyncClickhouseCmd(), "")) return cmd -} \ No newline at end of file +} diff --git a/cmd/ctrlc/root/version/version.go b/cmd/ctrlc/root/version/version.go index 9e13958..62a02d4 100644 --- a/cmd/ctrlc/root/version/version.go +++ b/cmd/ctrlc/root/version/version.go @@ -31,4 +31,4 @@ func NewVersionCmd() *cobra.Command { cmd.Flags().String("format", "json", "Output format. Accepts 'json', 'yaml', or 'github-action'") return cmd -} \ No newline at end of file +} diff --git a/internal/api/client.go b/internal/api/client.go index 46f8394..13a3252 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -89,4 +89,4 @@ func (c *ClientWithResponses) GetJobDetails(ctx context.Context, jobID string) ( return nil, fmt.Errorf("failed to unmarshal job details: %w", err) } return details, nil -} \ No newline at end of file +} diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 4bbbdb1..34ca414 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -13,8 +13,8 @@ import ( // Start initiates a job and returns an external ID or error. // The implementation should handle status updates when the job completes. type Runner interface { - Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, - statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) + Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, + statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) } func NewJobAgent( @@ -74,16 +74,16 @@ func (a *JobAgent) RunQueuedJobs() error { wg.Add(1) go func(job api.Job) { defer wg.Done() - + // Create a status update callback for this job statusUpdateFunc := func(jobID string, status api.JobStatus, message string) { if err := a.client.UpdateJobStatus(jobID, status, message, nil); err != nil { log.Error("Failed to update job status", "error", err, "jobId", jobID) } } - + externalId, _, err := a.runner.Start(context.Background(), job, jobDetails, statusUpdateFunc) - + if err != nil { status := api.JobStatusInProgress message := fmt.Sprintf("Failed to start job: %s", err.Error()) @@ -93,7 +93,7 @@ func (a *JobAgent) RunQueuedJobs() error { } return } - + if externalId != "" { status := api.JobStatusInProgress if err := a.client.UpdateJobStatus(job.Id.String(), status, "", &externalId); err != nil { @@ -105,4 +105,4 @@ func (a *JobAgent) RunQueuedJobs() error { wg.Wait() return nil -} \ No newline at end of file +} From 36600195826ba22083a1c1f4f1cfe67b93bbcc68 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 20:54:37 -0500 Subject: [PATCH 13/17] format --- Makefile | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 0e506cb..8df5ab6 100644 --- a/Makefile +++ b/Makefile @@ -5,18 +5,19 @@ LDFLAGS = -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.Version=$(VERSIO -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.GitCommit=$(COMMIT) \ -X github.com/ctrlplanedev/cli/cmd/ctrlc/root/version.BuildDate=$(DATE) -.PHONY: build build: go build -ldflags "$(LDFLAGS)" -o bin/ctrlc ./cmd/ctrlc -.PHONY: install install: go install -ldflags "$(LDFLAGS)" ./cmd/ctrlc -.PHONY: test test: go test -v ./... -.PHONY: clean +fmt: + gofmt -w -s -e . + clean: - rm -rf bin/ \ No newline at end of file + rm -rf bin/ + +.PHONY: build install test clean \ No newline at end of file From f51d8d4cffe7b38f070e3e303f3fd054117f7c07 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 23:05:23 -0500 Subject: [PATCH 14/17] refactor --- cmd/ctrlc/root/run/exec/exec.go | 57 +++----------- cmd/ctrlc/root/run/exec/runner.go | 120 ++++++++++++++++++++++-------- internal/api/client.go | 69 ----------------- internal/api/job.go | 105 ++++++++++++++++++++++++++ pkg/jobagent/runner.go | 50 ++++++------- 5 files changed, 227 insertions(+), 174 deletions(-) create mode 100644 internal/api/job.go diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index c07281a..9d3c8b2 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -1,13 +1,11 @@ package exec import ( - "context" "fmt" "os" "os/signal" "runtime" "syscall" - "time" "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" @@ -37,24 +35,10 @@ func NewRunExecCmd() *cobra.Command { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") workspaceId := viper.GetString("workspace") - - // Get interval from the flag set by AddIntervalSupport - intervalStr, _ := cmd.Flags().GetString("interval") - - interval := 10 * time.Second - if intervalStr != "" { - duration, err := time.ParseDuration(intervalStr) - if err != nil { - return fmt.Errorf("invalid interval format: %w", err) - } - interval = duration - } - client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } - if name == "" { return fmt.Errorf("name is required") } @@ -63,57 +47,36 @@ func NewRunExecCmd() *cobra.Command { } runner := NewExecRunner(client) - jobAgentConfig := api.UpsertJobAgentJSONRequestBody{ Name: name, Type: jobAgentType, WorkspaceId: workspaceId, } - ja, err := jobagent.NewJobAgent( client, jobAgentConfig, runner, ) - if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } - - // Setup signal handling for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - + + // Set up a simple shutdown handler for non-interval mode + // When used with AddIntervalSupport, this would only affect a single iteration + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { - <-sigCh + <-c log.Info("Shutting down gracefully...") runner.ExitAll(true) - cancel() }() - // Main polling loop - ticker := time.NewTicker(interval) - defer ticker.Stop() - - // Run initial job check + // Run job check - AddIntervalSupport will handle repeated execution if err := ja.RunQueuedJobs(); err != nil { - log.Error("Failed to run queued jobs", "error", err) - } - - // Polling loop - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - if err := ja.RunQueuedJobs(); err != nil { - log.Error("Failed to run queued jobs", "error", err) - } - } + return fmt.Errorf("failed to run queued jobs: %w", err) } + + return nil }, } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index b68ff20..1cca880 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -10,6 +10,7 @@ import ( "strconv" "sync" "syscall" + "time" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" @@ -21,8 +22,7 @@ var _ jobagent.Runner = &ExecRunner{} type RunningJob struct { cmd *exec.Cmd jobID string - client *api.ClientWithResponses - exitCode int + job *api.JobWithDetails cancelled bool } @@ -30,7 +30,13 @@ type ExecRunner struct { runningJobs map[string]*RunningJob client *api.ClientWithResponses mu sync.Mutex - wg sync.WaitGroup +} + +// Helper function to update job status and handle error logging +func updateJobStatus(job *api.JobWithDetails, status api.JobStatus, message string, jobID string) { + if err := job.UpdateStatus(status, message); err != nil { + log.Error("Failed to update job status", "error", err, "jobId", jobID) + } } func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { @@ -43,9 +49,24 @@ func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { - <-c - log.Info("Received shutdown signal, terminating all jobs") + sig := <-c + log.Info("Received shutdown signal, terminating all jobs", "signal", sig) + + // Update all job statuses before exiting + runner.mu.Lock() + for _, runningJob := range runner.runningJobs { + if runningJob.job != nil { + log.Info("Marking job as cancelled due to shutdown", "id", runningJob.jobID) + runningJob.cancelled = true + // Update the job status to failed with a specific message + updateJobStatus(runningJob.job, api.JobStatusFailure, fmt.Sprintf("Job terminated due to signal: %v", sig), runningJob.jobID) + } + } + runner.mu.Unlock() + + // Now terminate all processes runner.ExitAll(true) + log.Info("Shutdown complete, exiting") os.Exit(0) }() @@ -53,13 +74,11 @@ func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { } // Start creates a temporary script file, starts the process, and updates job status when the process completes. -func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, - statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) { - - // Template the script using the API client - script, err := r.client.TemplateJobDetails(job, jobDetails) +func (r *ExecRunner) Start(ctx context.Context, job *api.JobWithDetails) (api.JobStatus, error) { + // Template the script using the job + script, err := job.TemplateJobDetails() if err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to template job details: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to template job details: %w", err) } // Determine file extension based on OS @@ -70,30 +89,33 @@ func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[stri tmpFile, err := os.CreateTemp("", "script*"+ext) if err != nil { - return "", api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err) } // Write the script to the temporary file if _, err := tmpFile.WriteString(script); err != nil { os.Remove(tmpFile.Name()) - return "", api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err) } if err := tmpFile.Close(); err != nil { os.Remove(tmpFile.Name()) - return "", api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err) } // Make the script executable on Unix-like systems if runtime.GOOS != "windows" { if err := os.Chmod(tmpFile.Name(), 0700); err != nil { os.Remove(tmpFile.Name()) - return "", api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err) } } cmd := exec.CommandContext(ctx, "bash", "-c", tmpFile.Name()) if runtime.GOOS == "windows" { cmd = exec.CommandContext(ctx, "powershell", "-File", tmpFile.Name()) + } else { + // On Unix-like systems, create a new process group so we can terminate all child processes + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } // Set up command environment @@ -103,17 +125,19 @@ func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[stri // Start the process if err := cmd.Start(); err != nil { os.Remove(tmpFile.Name()) - return "", api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err) + return api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err) } // Use the pointer address as the handle handle := fmt.Sprintf("%p", cmd) + job.SetExternalID(handle) // Create a running job record runningJob := &RunningJob{ - cmd: cmd, - jobID: job.Id.String(), - client: r.client, + cmd: cmd, + jobID: job.Id.String(), + job: job, + cancelled: false, } // Register the running job @@ -128,8 +152,10 @@ func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[stri r.mu.Lock() delete(r.runningJobs, handle) r.mu.Unlock() + log.Debug("Job cleanup complete", "id", runningJob.jobID) }() + log.Debug("Waiting for command to complete", "id", runningJob.jobID, "handle", handle) err := cmd.Wait() exitCode := 0 if err != nil { @@ -137,17 +163,20 @@ func (r *ExecRunner) Start(ctx context.Context, job api.Job, jobDetails map[stri exitCode = exitErr.ExitCode() } } + log.Debug("Command completed", "id", runningJob.jobID, "exitCode", exitCode, "error", err != nil) if runningJob.cancelled { - statusUpdateFunc(runningJob.jobID, api.JobStatusCancelled, "Job was cancelled") + updateJobStatus(job, api.JobStatusCancelled, "Job was cancelled", runningJob.jobID) } else if err != nil { - statusUpdateFunc(runningJob.jobID, api.JobStatusFailure, fmt.Sprintf("Process exited with code %d: %v", exitCode, err)) + updateJobStatus(job, api.JobStatusFailure, + fmt.Sprintf("Process exited with code %d: %v", exitCode, err), + runningJob.jobID) } else { - statusUpdateFunc(runningJob.jobID, api.JobStatusSuccessful, "") + updateJobStatus(job, api.JobStatusSuccessful, "", runningJob.jobID) } }(handle, tmpFile.Name()) - return handle, api.JobStatusInProgress, nil + return api.JobStatusInProgress, nil } // ExitAll stops all currently running commands @@ -162,13 +191,18 @@ func (r *ExecRunner) ExitAll(cancelled bool) { log.Info("Killing job", "id", id) runningJob.cancelled = cancelled - // Process is running, kill it and its children - if runtime.GOOS == "windows" { - exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(runningJob.cmd.Process.Pid)).Run() - } else { - // Send SIGTERM first for graceful shutdown - runningJob.cmd.Process.Signal(syscall.SIGTERM) + // Update job status if cancellation requested and we have a job reference + if cancelled && runningJob.job != nil { + // Use JobStatusCancelled if it's an explicit cancellation, or JobStatusFailure if it's due to external termination + status := api.JobStatusCancelled + message := "Job was cancelled by user" + + // Update the status + updateJobStatus(runningJob.job, status, message, runningJob.jobID) } + + // Kill the process + killProcess(runningJob.cmd, runningJob.jobID) } } } @@ -177,3 +211,31 @@ func (r *ExecRunner) ExitAll(cancelled bool) { r.runningJobs = make(map[string]*RunningJob) } } + +// killProcess terminates a process and its children in a cross-platform way without exposing PIDs +func killProcess(cmd *exec.Cmd, jobID string) { + if runtime.GOOS == "windows" { + pid := cmd.Process.Pid + if err := exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(pid)).Run(); err != nil { + log.Error("Failed to kill process tree", "error", err, "jobId", jobID) + } + } else { + pgid, err := syscall.Getpgid(cmd.Process.Pid) + if err == nil && pgid > 0 { + if err = syscall.Kill(-pgid, syscall.SIGTERM); err != nil { + log.Error("Failed to terminate process group gracefully", "error", err, "jobId", jobID) + } + } else { + if err = cmd.Process.Signal(syscall.SIGTERM); err != nil { + log.Error("Failed to terminate process gracefully", "error", err, "jobId", jobID) + } + } + + go func() { + time.Sleep(2 * time.Second) + if err := cmd.Process.Kill(); err != nil { + log.Error("Failed to kill process", "error", err, "jobId", jobID) + } + }() + } +} diff --git a/internal/api/client.go b/internal/api/client.go index 13a3252..c7169ba 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -1,11 +1,8 @@ package api import ( - "bytes" "context" "encoding/json" - "fmt" - "html/template" "net/http" "strings" ) @@ -24,69 +21,3 @@ func NewAPIKeyClientWithResponses(server string, apiKey string) (*ClientWithResp func (v *Variable_Value) SetString(value string) { v.union = json.RawMessage("\"" + value + "\"") } - -// TemplateJobDetails applies the job details template to the script -func (c *ClientWithResponses) TemplateJobDetails(job Job, jobDetails map[string]interface{}) (string, error) { - // Extract script from JobAgentConfig - script, ok := job.JobAgentConfig["script"].(string) - if !ok { - return "", fmt.Errorf("script not found in job agent config") - } - - // Parse the script template - templatedScript, err := template.New("script").Parse(script) - if err != nil { - return "", fmt.Errorf("failed to parse script template: %w", err) - } - - // Execute the template with job details - buf := new(bytes.Buffer) - if err := templatedScript.Execute(buf, jobDetails); err != nil { - return "", fmt.Errorf("failed to execute script template: %w", err) - } - - return buf.String(), nil -} - -// UpdateJobStatus updates the job status via API -func (c *ClientWithResponses) UpdateJobStatus(jobID string, status JobStatus, message string, externalID *string) error { - body := UpdateJobJSONRequestBody{ - Status: &status, - } - if message != "" { - body.Message = &message - } - if externalID != nil { - body.ExternalId = externalID - } - - resp, err := c.UpdateJobWithResponse(context.Background(), jobID, body) - if err != nil { - return fmt.Errorf("failed to update job status: %w", err) - } - if resp.JSON200 == nil { - return fmt.Errorf("failed to update job status: received empty response") - } - return nil -} - -// GetJobDetails retrieves job details for templating -func (c *ClientWithResponses) GetJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) { - resp, err := c.GetJobWithResponse(ctx, jobID) - if err != nil { - return nil, fmt.Errorf("failed to get job details: %w", err) - } - if resp.JSON200 == nil { - return nil, fmt.Errorf("received empty response from job details API") - } - - var details map[string]interface{} - detailsBytes, err := json.Marshal(resp.JSON200) - if err != nil { - return nil, fmt.Errorf("failed to marshal job response: %w", err) - } - if err := json.Unmarshal(detailsBytes, &details); err != nil { - return nil, fmt.Errorf("failed to unmarshal job details: %w", err) - } - return details, nil -} diff --git a/internal/api/job.go b/internal/api/job.go new file mode 100644 index 0000000..ee97ecf --- /dev/null +++ b/internal/api/job.go @@ -0,0 +1,105 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "html/template" +) + +// JobWithDetails extends the Job type with additional fields and methods +type JobWithDetails struct { + Job + Details map[string]interface{} + Client *ClientWithResponses + ExternalID string +} + +// NewJobWithDetails creates a new JobWithDetails with the client and job data +func NewJobWithDetails(client *ClientWithResponses, job Job) (*JobWithDetails, error) { + j := &JobWithDetails{ + Job: job, + Client: client, + } + + // Fetch job details + var err error + j.Details, err = j.GetJobDetails(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + + return j, nil +} + +// GetJobDetails retrieves job details for templating +func (j *JobWithDetails) GetJobDetails(ctx context.Context) (map[string]interface{}, error) { + resp, err := j.Client.GetJobWithResponse(ctx, j.Id.String()) + if err != nil { + return nil, fmt.Errorf("failed to get job details: %w", err) + } + if resp.JSON200 == nil { + return nil, fmt.Errorf("received empty response from job details API") + } + + var details map[string]interface{} + detailsBytes, err := json.Marshal(resp.JSON200) + if err != nil { + return nil, fmt.Errorf("failed to marshal job response: %w", err) + } + if err := json.Unmarshal(detailsBytes, &details); err != nil { + return nil, fmt.Errorf("failed to unmarshal job details: %w", err) + } + return details, nil +} + +// TemplateJobDetails applies the job details template to the script +func (j *JobWithDetails) TemplateJobDetails() (string, error) { + // Extract script from JobAgentConfig + script, ok := j.JobAgentConfig["script"].(string) + if !ok { + return "", fmt.Errorf("script not found in job agent config") + } + + // Parse the script template + templatedScript, err := template.New("script").Parse(script) + if err != nil { + return "", fmt.Errorf("failed to parse script template: %w", err) + } + + // Execute the template with job details + buf := new(bytes.Buffer) + if err := templatedScript.Execute(buf, j.Details); err != nil { + return "", fmt.Errorf("failed to execute script template: %w", err) + } + + return buf.String(), nil +} + +// UpdateStatus updates the job status via API +func (j *JobWithDetails) UpdateStatus(status JobStatus, message string) error { + body := UpdateJobJSONRequestBody{ + Status: &status, + } + if message != "" { + body.Message = &message + } + if j.ExternalID != "" { + body.ExternalId = &j.ExternalID + } + + resp, err := j.Client.UpdateJobWithResponse(context.Background(), j.Id.String(), body) + if err != nil { + return fmt.Errorf("failed to update job status: %w", err) + } + if resp.JSON200 == nil { + return fmt.Errorf("failed to update job status: received empty response") + } + return nil +} + +// SetExternalID sets the external ID for the job +func (j *JobWithDetails) SetExternalID(externalID string) { + j.ExternalID = externalID +} diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 34ca414..dcc71a4 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -10,11 +10,9 @@ import ( ) // Runner defines the interface for job execution. -// Start initiates a job and returns an external ID or error. -// The implementation should handle status updates when the job completes. +// Start initiates a job and returns a status or error. type Runner interface { - Start(ctx context.Context, job api.Job, jobDetails map[string]interface{}, - statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) + Start(ctx context.Context, job *api.JobWithDetails) (api.JobStatus, error) } func NewJobAgent( @@ -34,7 +32,6 @@ func NewJobAgent( ja := &JobAgent{ client: client, id: agent.JSON200.Id, - workspaceId: config.WorkspaceId, runner: runner, } @@ -43,7 +40,6 @@ func NewJobAgent( type JobAgent struct { client *api.ClientWithResponses - workspaceId string id string runner Runner } @@ -62,42 +58,38 @@ func (a *JobAgent) RunQueuedJobs() error { log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs)) var wg sync.WaitGroup - for _, job := range *jobs.JSON200.Jobs { - jobDetails, err := a.client.GetJobDetails(context.Background(), job.Id.String()) + for _, apiJob := range *jobs.JSON200.Jobs { + + // TODO: If we can start jobs in pending, then this is valid and will prevent us from starting the same job twice. + // if apiJob.Status == api.JobStatusInProgress { + // continue + // } + + job, err := api.NewJobWithDetails(a.client, apiJob) if err != nil { - log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String()) - continue - } - if job.Status == api.JobStatusInProgress { + log.Error("Failed to create job with details", "error", err, "jobId", apiJob.Id.String()) continue } + wg.Add(1) - go func(job api.Job) { + go func(job *api.JobWithDetails) { defer wg.Done() - // Create a status update callback for this job - statusUpdateFunc := func(jobID string, status api.JobStatus, message string) { - if err := a.client.UpdateJobStatus(jobID, status, message, nil); err != nil { - log.Error("Failed to update job status", "error", err, "jobId", jobID) - } - } - - externalId, _, err := a.runner.Start(context.Background(), job, jobDetails, statusUpdateFunc) + // Start the job - status updates happen inside Start + status, err := a.runner.Start(context.Background(), job) if err != nil { - status := api.JobStatusInProgress - message := fmt.Sprintf("Failed to start job: %s", err.Error()) log.Error("Failed to start job", "error", err, "jobId", job.Id.String()) - if err := a.client.UpdateJobStatus(job.Id.String(), status, message, nil); err != nil { - log.Error("Failed to update job status", "error", err, "jobId", job.Id.String()) + if updErr := job.UpdateStatus(api.JobStatusFailure, fmt.Sprintf("Failed to start job: %s", err.Error())); updErr != nil { + log.Error("Failed to update job status", "error", updErr, "jobId", job.Id.String()) } return } - if externalId != "" { - status := api.JobStatusInProgress - if err := a.client.UpdateJobStatus(job.Id.String(), status, "", &externalId); err != nil { - log.Error("Failed to update job status", "error", err, "jobId", job.Id.String()) + // If we got a status that's not InProgress, update it + if status != api.JobStatusInProgress { + if updErr := job.UpdateStatus(status, ""); updErr != nil { + log.Error("Failed to update job status", "error", updErr, "jobId", job.Id.String()) } } }(job) From 51605552ace4346aa4409f0b7c90a7125589e9b5 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 23:06:00 -0500 Subject: [PATCH 15/17] format --- cmd/ctrlc/root/run/exec/exec.go | 4 ++-- cmd/ctrlc/root/run/exec/runner.go | 4 ++-- pkg/jobagent/runner.go | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 9d3c8b2..49bb47a 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -60,7 +60,7 @@ func NewRunExecCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } - + // Set up a simple shutdown handler for non-interval mode // When used with AddIntervalSupport, this would only affect a single iteration c := make(chan os.Signal, 1) @@ -75,7 +75,7 @@ func NewRunExecCmd() *cobra.Command { if err := ja.RunQueuedJobs(); err != nil { return fmt.Errorf("failed to run queued jobs: %w", err) } - + return nil }, } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 1cca880..e918fee 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -168,8 +168,8 @@ func (r *ExecRunner) Start(ctx context.Context, job *api.JobWithDetails) (api.Jo if runningJob.cancelled { updateJobStatus(job, api.JobStatusCancelled, "Job was cancelled", runningJob.jobID) } else if err != nil { - updateJobStatus(job, api.JobStatusFailure, - fmt.Sprintf("Process exited with code %d: %v", exitCode, err), + updateJobStatus(job, api.JobStatusFailure, + fmt.Sprintf("Process exited with code %d: %v", exitCode, err), runningJob.jobID) } else { updateJobStatus(job, api.JobStatusSuccessful, "", runningJob.jobID) diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index dcc71a4..0a34583 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -30,18 +30,18 @@ func NewJobAgent( } ja := &JobAgent{ - client: client, - id: agent.JSON200.Id, - runner: runner, + client: client, + id: agent.JSON200.Id, + runner: runner, } return ja, nil } type JobAgent struct { - client *api.ClientWithResponses - id string - runner Runner + client *api.ClientWithResponses + id string + runner Runner } // RunQueuedJobs retrieves and executes any queued jobs for this agent. From c6fdca4af22804a155bd56eacc301a8492785b36 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Tue, 25 Mar 2025 23:51:53 -0500 Subject: [PATCH 16/17] default interval --- cmd/ctrlc/root/run/exec/exec.go | 4 ++++ cmd/ctrlc/root/run/run.go | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 49bb47a..8cf70f1 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -45,6 +45,10 @@ func NewRunExecCmd() *cobra.Command { if workspaceId == "" { return fmt.Errorf("workspace is required") } + interval := viper.GetString("interval") + if interval == "" { + interval = "10s" + } runner := NewExecRunner(client) jobAgentConfig := api.UpsertJobAgentJSONRequestBody{ diff --git a/cmd/ctrlc/root/run/run.go b/cmd/ctrlc/root/run/run.go index 440f966..b765912 100644 --- a/cmd/ctrlc/root/run/run.go +++ b/cmd/ctrlc/root/run/run.go @@ -4,6 +4,7 @@ import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/run/exec" "github.com/ctrlplanedev/cli/internal/cliutil" "github.com/spf13/cobra" + "github.com/spf13/viper" ) func NewRunCmd() *cobra.Command { @@ -15,7 +16,12 @@ func NewRunCmd() *cobra.Command { }, } - cmd.AddCommand(cliutil.AddIntervalSupport(exec.NewRunExecCmd(), "")) + interval := viper.GetString("interval") + if interval == "" { + interval = "10s" + } + + cmd.AddCommand(cliutil.AddIntervalSupport(exec.NewRunExecCmd(), interval)) return cmd } From 33ea1ad60a0df6d7a4bc6cb48146be3b31fd24d1 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Thu, 3 Apr 2025 00:58:54 -0500 Subject: [PATCH 17/17] fix the job status logic for pending vs in-progress --- Makefile | 3 +++ cmd/ctrlc/root/run/exec/exec.go | 11 +++-------- pkg/jobagent/runner.go | 14 +++++++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index 8df5ab6..4038827 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ test: fmt: gofmt -w -s -e . +lint: + golangci-lint run + clean: rm -rf bin/ diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index 8cf70f1..2401a0f 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -35,21 +35,16 @@ func NewRunExecCmd() *cobra.Command { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") workspaceId := viper.GetString("workspace") - client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) - if err != nil { - return fmt.Errorf("failed to create API client: %w", err) - } if name == "" { return fmt.Errorf("name is required") } if workspaceId == "" { return fmt.Errorf("workspace is required") } - interval := viper.GetString("interval") - if interval == "" { - interval = "10s" + client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) + if err != nil { + return fmt.Errorf("failed to create API client: %w", err) } - runner := NewExecRunner(client) jobAgentConfig := api.UpsertJobAgentJSONRequestBody{ Name: name, diff --git a/pkg/jobagent/runner.go b/pkg/jobagent/runner.go index 0a34583..d5f6bcd 100644 --- a/pkg/jobagent/runner.go +++ b/pkg/jobagent/runner.go @@ -59,11 +59,9 @@ func (a *JobAgent) RunQueuedJobs() error { log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs)) var wg sync.WaitGroup for _, apiJob := range *jobs.JSON200.Jobs { - - // TODO: If we can start jobs in pending, then this is valid and will prevent us from starting the same job twice. - // if apiJob.Status == api.JobStatusInProgress { - // continue - // } + if apiJob.Status == api.JobStatusInProgress { + continue + } job, err := api.NewJobWithDetails(a.client, apiJob) if err != nil { @@ -71,6 +69,12 @@ func (a *JobAgent) RunQueuedJobs() error { continue } + // Update job status to InProgress before starting execution + if err := job.UpdateStatus(api.JobStatusInProgress, "Job execution started"); err != nil { + log.Error("Failed to update job status to InProgress", "error", err, "jobId", job.Id.String()) + // Continue anyway + } + wg.Add(1) go func(job *api.JobWithDetails) { defer wg.Done()