Skip to content

fix: Run exec agent #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions cmd/ctrlc/root/run/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package exec

import (
"fmt"
"runtime"

"github.com/MakeNowJust/heredoc/v2"
"github.com/charmbracelet/log"
"github.com/ctrlplanedev/cli/internal/api"
"github.com/ctrlplanedev/cli/pkg/jobagent"
Expand All @@ -11,21 +13,38 @@ import (
)

func NewRunExecCmd() *cobra.Command {
return &cobra.Command{
var name string
var jobAgentType = "exec-bash"
if runtime.GOOS == "windows" {
jobAgentType = "exec-powershell"
}

cmd := &cobra.Command{
Use: "exec",
Short: "Execute commands directly when a job is received",
Example: heredoc.Doc(`
$ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000
`),
RunE: func(cmd *cobra.Command, args []string) error {
apiURL := viper.GetString("url")
apiKey := viper.GetString("api-key")
workspaceId := viper.GetString("workspace")
client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey)
if err != nil {
return fmt.Errorf("failed to create API client: %w", err)
}
if name == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move input validation above the client creation?

return fmt.Errorf("name is required")
}
if workspaceId == "" {
return fmt.Errorf("workspace is required")
}
ja, err := jobagent.NewJobAgent(
client,
api.UpsertJobAgentJSONRequestBody{
Name: "exec",
Type: "exec",
Name: name,
Type: jobAgentType,
WorkspaceId: workspaceId,
},
&ExecRunner{},
)
Expand All @@ -35,10 +54,11 @@ func NewRunExecCmd() *cobra.Command {
if err := ja.RunQueuedJobs(); err != nil {
log.Error("failed to run queued jobs", "error", err)
}
if err := ja.UpdateRunningJobs(); err != nil {
log.Error("failed to check for jobs", "error", err)
}
return nil
},
}

cmd.Flags().StringVar(&name, "name", "", "Name of the job agent")
cmd.MarkFlagRequired("name")
return cmd
}
75 changes: 35 additions & 40 deletions cmd/ctrlc/root/run/exec/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"os"
"os/exec"
"runtime"
"strconv"
"syscall"

"github.com/charmbracelet/log"
"github.com/ctrlplanedev/cli/internal/api"
"github.com/ctrlplanedev/cli/pkg/jobagent"
)
Expand All @@ -24,72 +23,49 @@ type ExecConfig struct {
Script string `json:"script"`
}

func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) {
externalId, err := strconv.Atoi(*job.ExternalId)
if err != nil {
return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err)
}

process, err := os.FindProcess(externalId)
if err != nil {
return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err)
}

// On Unix systems, FindProcess always succeeds, so we need to send signal 0
// to check if process exists
err = process.Signal(syscall.Signal(0))
if err != nil {
return api.JobStatusSuccessful, fmt.Sprintf("process not running: %v", err)
}

return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId)
}

func (r *ExecRunner) Start(job api.Job) (string, error) {
// Create temp script file
// Start creates a temporary script file, starts the process, and updates job status when the process completes.
func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error) {
// Determine file extension based on OS.
ext := ".sh"
if runtime.GOOS == "windows" {
ext = ".ps1"
}

tmpFile, err := os.CreateTemp("", "script*"+ext)
if err != nil {
return "", fmt.Errorf("failed to create temp script file: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to create temp script file: %w", err)
}
defer os.Remove(tmpFile.Name())

config := ExecConfig{}
jsonBytes, err := json.Marshal(job.JobAgentConfig)
if err != nil {
return "", fmt.Errorf("failed to marshal job agent config: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to marshal job agent config: %w", err)
}
if err := json.Unmarshal(jsonBytes, &config); err != nil {
return "", fmt.Errorf("failed to unmarshal job agent config: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to unmarshal job agent config: %w", err)
}

templatedScript, err := template.New("script").Parse(config.Script)
if err != nil {
return "", fmt.Errorf("failed to parse script template: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to parse script template: %w", err)
}

buf := new(bytes.Buffer)
if err := templatedScript.Execute(buf, job); err != nil {
return "", fmt.Errorf("failed to execute script template: %w", err)
if err := templatedScript.Execute(buf, jobDetails); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential code injection in the script.
Passing jobDetails directly into the template and executing as a script can result in code injection if jobDetails includes malicious input. Consider sanitizing or validating fields in jobDetails and config.Script to reduce security risks.

return "", api.JobStatusFailure, fmt.Errorf("failed to execute script template: %w", err)
}
script := buf.String()

// Write script contents
if _, err := tmpFile.WriteString(script); err != nil {
return "", fmt.Errorf("failed to write script file: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to write script file: %w", err)
}
if err := tmpFile.Close(); err != nil {
return "", fmt.Errorf("failed to close script file: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to close script file: %w", err)
}

// Make executable on Unix systems
if runtime.GOOS != "windows" {
if err := os.Chmod(tmpFile.Name(), 0700); err != nil {
return "", fmt.Errorf("failed to make script executable: %w", err)
return "", api.JobStatusFailure, fmt.Errorf("failed to make script executable: %w", err)
}
}

Expand All @@ -104,9 +80,28 @@ func (r *ExecRunner) Start(job api.Job) (string, error) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
return "", fmt.Errorf("failed to execute script: %w", err)
if err := cmd.Start(); err != nil {
os.Remove(tmpFile.Name())
return "", api.JobStatusFailure, fmt.Errorf("failed to start process: %w", err)
}

return strconv.Itoa(cmd.Process.Pid), nil
// Use the pointer address as the handle
handle := fmt.Sprintf("%p", cmd)

// Spawn a goroutine to wait for the process to finish and update the job status
go func(handle, scriptPath string) {
defer os.Remove(scriptPath)

err := cmd.Wait()

if err != nil {
log.Error("Process execution failed", "handle", handle, "error", err)
statusUpdateFunc(job.Id.String(), api.JobStatusFailure, err.Error())
} else {
log.Info("Process execution succeeded", "handle", handle)
statusUpdateFunc(job.Id.String(), api.JobStatusSuccessful, "")
}
}(handle, tmpFile.Name())

return handle, api.JobStatusInProgress, nil
}
35 changes: 35 additions & 0 deletions pkg/jobagent/job_details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package jobagent

import (
"context"
"encoding/json"
"fmt"

"github.com/ctrlplanedev/cli/internal/api"
"github.com/spf13/viper"
)

func fetchJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) {
client, err := api.NewAPIKeyClientWithResponses(viper.GetString("url"), viper.GetString("api-key"))
if err != nil {
return nil, fmt.Errorf("failed to create API client for job details: %w", err)
}

resp, err := client.GetJobWithResponse(ctx, jobID)
if err != nil {
return nil, fmt.Errorf("failed to get job details: %w", err)
}
if resp.JSON200 == nil {
return nil, fmt.Errorf("received empty response from job details API")
}

var details map[string]interface{}
detailsBytes, err := json.Marshal(resp.JSON200)
if err != nil {
return nil, fmt.Errorf("failed to marshal job response: %w", err)
}
if err := json.Unmarshal(detailsBytes, &details); err != nil {
return nil, fmt.Errorf("failed to unmarshal job details: %w", err)
}
return details, nil
}
124 changes: 54 additions & 70 deletions pkg/jobagent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/ctrlplanedev/cli/internal/api"
)

// Runner defines the interface for job execution.
// Start initiates a job and returns an external ID or error.
// The implementation should handle status updates when the job completes.
type Runner interface {
Start(job api.Job) (string, error)
Status(job api.Job) (api.JobStatus, string)
Start(job api.Job, jobDetails map[string]interface{}, statusUpdateFunc func(jobID string, status api.JobStatus, message string)) (string, api.JobStatus, error)
}

func NewJobAgent(
Expand All @@ -29,10 +31,10 @@ func NewJobAgent(
}

ja := &JobAgent{
client: client,

client: client,
id: agent.JSON200.Id,
workspaceId: config.WorkspaceId,
runner: runner,
}

return ja, nil
Expand All @@ -47,12 +49,31 @@ type JobAgent struct {
runner Runner
}

// RunQueuedJobs retrieves and executes any queued jobs for this agent. For each
// job, it starts execution using the runner's Start method in a separate
// goroutine. If starting a job fails, it updates the job status to InProgress
// with an error message. If starting succeeds and returns an external ID, it
// updates the job with that ID. The function waits for all jobs to complete
// before returning.
// updateJobStatus is a helper function to update job status via the API
func (a *JobAgent) updateJobStatus(jobID string, status api.JobStatus, message string, externalID *string) {
body := api.UpdateJobJSONRequestBody{
Status: &status,
}
if message != "" {
body.Message = &message
}
if externalID != nil {
body.ExternalId = externalID
}

_, err := a.client.UpdateJobWithResponse(
context.Background(),
jobID,
body,
)
if err != nil {
log.Error("Failed to update job", "error", err, "jobId", jobID)
}
}

// RunQueuedJobs retrieves and executes any queued jobs for this agent.
// For each job, it starts execution using the runner's Start method, which
// will update the job status when the job completes.
func (a *JobAgent) RunQueuedJobs() error {
jobs, err := a.client.GetNextJobsWithResponse(context.Background(), a.id)
if err != nil {
Expand All @@ -65,77 +86,40 @@ func (a *JobAgent) RunQueuedJobs() error {
log.Debug("Got jobs", "count", len(*jobs.JSON200.Jobs))
var wg sync.WaitGroup
for _, job := range *jobs.JSON200.Jobs {
jobDetails, err := fetchJobDetails(context.Background(), job.Id.String())
if err != nil {
log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String())
continue
}
wg.Add(1)
go func(job api.Job) {
defer wg.Done()
externalId, err := a.runner.Start(job)

// Create a status update callback for this job
statusUpdateFunc := func(jobID string, status api.JobStatus, message string) {
a.updateJobStatus(jobID, status, message, nil)
}

externalId, status, err := a.runner.Start(job, jobDetails, statusUpdateFunc)

if err != nil {
status := api.JobStatusInProgress
message := fmt.Sprintf("Failed to start job: %s", err.Error())
log.Error("Failed to start job", "error", err, "jobId", job.Id.String())
a.client.UpdateJobWithResponse(
context.Background(),
job.Id.String(),
api.UpdateJobJSONRequestBody{
Status: &status,
Message: &message,
},
)
a.updateJobStatus(job.Id.String(), status, message, nil)
return
}
if externalId != "" {
a.client.UpdateJobWithResponse(
context.Background(),
job.Id.String(),
api.UpdateJobJSONRequestBody{
ExternalId: &externalId,
},
)
}
}(job)
}
wg.Wait()

return nil
}

// UpdateRunningJobs checks the status of all currently running jobs for this
// agent. It queries the API for running jobs, then concurrently checks the
// status of each job using the runner's Status method and updates the job
// status in the API accordingly. Any errors checking job status or updating the
// API are logged but do not stop other job updates from proceeding.
func (a *JobAgent) UpdateRunningJobs() error {
jobs, err := a.client.GetAgentRunningJobsWithResponse(context.Background(), a.id)
if err != nil {
log.Error("Failed to get job", "error", err, "status", jobs.StatusCode())
return err
}

if jobs.JSON200 == nil {
log.Error("Failed to get job", "error", err, "status", jobs.StatusCode())
return fmt.Errorf("failed to get job")
}

var wg sync.WaitGroup
for _, job := range *jobs.JSON200 {
wg.Add(1)
go func(job api.Job) {
defer wg.Done()
status, message := a.runner.Status(job)

body := api.UpdateJobJSONRequestBody{
Status: &status,
}
if message != "" {
body.Message = &message
if status == api.JobStatusFailure {
message := fmt.Sprintf("Failed to start job: %s", err.Error())
log.Error("Failed to start job", "error", err, "jobId", job.Id.String())
a.updateJobStatus(job.Id.String(), status, message, nil)
return
}
_, err := a.client.UpdateJobWithResponse(
context.Background(),
job.Id.String(),
body,
)
if err != nil {
log.Error("Failed to update job", "error", err, "jobId", job.Id.String())

if externalId != "" {
status := api.JobStatusInProgress
a.updateJobStatus(job.Id.String(), status, "", &externalId)
}
}(job)
}
Expand Down