Skip to content

Commit ca3421d

Browse files
authored
Merge pull request #47 from Flowpack/onError_handler_in_pipeline
On error handler in pipeline
2 parents f855dc6 + 3265df3 commit ca3421d

File tree

7 files changed

+429
-20
lines changed

7 files changed

+429
-20
lines changed

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,43 @@ pipelines:
281281
tasks: # as usual
282282
```
283283

284+
### Error handling with on_error
285+
286+
When a pipeline fails due to a task error, you can optionally configure an `on_error` task that will be executed
287+
to handle the failure. This is useful for sending notifications, cleanup operations, or logging failure details.
288+
289+
```yaml
290+
pipelines:
291+
deployment:
292+
tasks:
293+
build:
294+
script:
295+
- npm run build
296+
deploy:
297+
script:
298+
- ./deploy.sh
299+
depends_on:
300+
- build
301+
on_error:
302+
script:
303+
- echo "Deployment failed! Notifying team..."
304+
- curl -d "Deployment of {{.failedTaskName}} failed" ntfy.sh/mytopic
305+
```
306+
307+
The on_error task has access to special variables containing information about the failed task:
308+
309+
* `failedTaskName`: Name of the task that failed (key from `pipelines`)
310+
* `failedTaskExitCode`: Exit code of the failed task
311+
* `failedTaskError`: Error message of the failed task
312+
* `failedTaskStdout`: Standard output of the failed task
313+
* `failedTaskStderr`: Standard error output of the failed task
314+
315+
#### Important notes:
316+
317+
* The `on_error` task only runs when a "normal" task in the pipeline fails
318+
* If the `on_error` task itself fails, the error will be logged but won't trigger another error handler
319+
* The `on_error` task has access to all the same job variables as regular tasks
320+
* Environment variables can be configured for the `on_error` task just like regular tasks
284321

285322
### Configuring retention period
286323

definition/pipelines.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool {
4040
return true
4141
}
4242

43+
// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling.
44+
type OnErrorTaskDef struct {
45+
// Script is a list of shell commands that are executed if an error occurs in a "normal" task
46+
Script []string `yaml:"script"`
47+
48+
// Env sets/overrides environment variables for this task (takes precedence over pipeline environment)
49+
Env map[string]string `yaml:"env"`
50+
}
51+
4352
type PipelineDef struct {
4453
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
4554
Concurrency int `yaml:"concurrency"`
@@ -60,8 +69,19 @@ type PipelineDef struct {
6069
// Env sets/overrides environment variables for all tasks (takes precedence over process environment)
6170
Env map[string]string `yaml:"env"`
6271

72+
// Tasks is a map of task names to task definitions
6373
Tasks map[string]TaskDef `yaml:"tasks"`
6474

75+
// Task to be added and executed if this pipeline fails, e.g. for notifications.
76+
//
77+
// In this task, you have the following variables set:
78+
// - failedTaskName: Name of the failed task (key from pipelines.yml)
79+
// - failedTaskExitCode: Exit code of the failed task
80+
// - failedTaskError: Error message of the failed task
81+
// - failedTaskStdout: Stdout of the failed task
82+
// - failedTaskStderr: Stderr of the failed task
83+
OnError *OnErrorTaskDef `yaml:"on_error"`
84+
6585
// SourcePath stores the source path where the pipeline was defined
6686
SourcePath string
6787
}

examples/pipelines.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pipelines:
6060
no-work:
6161
script:
6262
- go for a walk
63+
on_error:
64+
script:
65+
- echo "Something went wrong, let's handle the error from {{.failedTaskName}}"
6366

6467
queue_it:
6568
concurrency: 2

prunner.go

Lines changed: 165 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package prunner
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"sort"
78
"sync"
89
"time"
@@ -40,22 +41,27 @@ type PipelineRunner struct {
4041
// persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see NewPipelineRunner)
4142
// externally, call requestPersist()
4243
persistRequests chan struct{}
44+
persistLoopDone chan struct{}
4345

44-
// Mutex for reading or writing jobs and job state
46+
// Mutex for reading or writing pipeline definitions (defs), jobs and job state
4547
mx sync.RWMutex
4648
createTaskRunner func(j *PipelineJob) taskctl.Runner
4749

4850
// Wait group for waiting for asynchronous operations like job.Cancel
4951
wg sync.WaitGroup
5052
// Flag if the runner is shutting down
5153
isShuttingDown bool
54+
// shutdownCancel is the cancel function for the shutdown context (will stop persist loop)
55+
shutdownCancel context.CancelFunc
5256

5357
// Poll interval for completed jobs for graceful shutdown
5458
ShutdownPollInterval time.Duration
5559
}
5660

5761
// NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running
5862
func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, createTaskRunner func(j *PipelineJob) taskctl.Runner, store store.DataStore, outputStore taskctl.OutputStore) (*PipelineRunner, error) {
63+
ctx, cancel := context.WithCancel(ctx)
64+
5965
pRunner := &PipelineRunner{
6066
defs: defs,
6167
// jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled.
@@ -68,6 +74,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
6874
outputStore: outputStore,
6975
// Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking
7076
persistRequests: make(chan struct{}, 1),
77+
persistLoopDone: make(chan struct{}),
78+
shutdownCancel: cancel,
7179
createTaskRunner: createTaskRunner,
7280
ShutdownPollInterval: 3 * time.Second,
7381
}
@@ -79,6 +87,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
7987
}
8088

8189
go func() {
90+
defer close(pRunner.persistLoopDone) // Signal that the persist loop is done on shutdown
91+
8292
for {
8393
select {
8494
case <-ctx.Done():
@@ -103,7 +113,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
103113
// It can be scheduled (in the waitListByPipeline of PipelineRunner),
104114
// or currently running (jobsByID / jobsByPipeline in PipelineRunner).
105115
type PipelineJob struct {
106-
ID uuid.UUID
116+
ID uuid.UUID
117+
// Identifier of the pipeline (from the YAML file)
107118
Pipeline string
108119
Env map[string]string
109120
Variables map[string]interface{}
@@ -121,6 +132,8 @@ type PipelineJob struct {
121132
// Tasks is an in-memory representation with state of tasks, sorted by dependencies
122133
Tasks jobTasks
123134
LastError error
135+
// firstFailedTask is a reference to the first task that failed in this job
136+
firstFailedTask *jobTask
124137

125138
sched *taskctl.Scheduler
126139
taskRunner runner.Runner
@@ -194,6 +207,10 @@ var ErrJobNotFound = errors.New("job not found")
194207
var errJobAlreadyCompleted = errors.New("job is already completed")
195208
var ErrShuttingDown = errors.New("runner is shutting down")
196209

210+
// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
211+
// "pipeline" is the pipeline ID from the YAML file.
212+
//
213+
// the returned PipelineJob is the individual execution context.
197214
func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
198215
r.mx.Lock()
199216
defer r.mx.Unlock()
@@ -395,14 +412,142 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {
395412

396413
// Run graph asynchronously
397414
r.wg.Add(1)
398-
go func() {
415+
go func(sched *taskctl.Scheduler) {
399416
defer r.wg.Done()
400-
lastErr := job.sched.Schedule(graph)
401-
r.JobCompleted(job.ID, lastErr)
402-
}()
417+
lastErr := sched.Schedule(graph)
418+
if lastErr != nil {
419+
r.RunJobErrorHandler(job)
420+
}
421+
r.JobCompleted(job, lastErr)
422+
}(job.sched)
423+
}
424+
func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) {
425+
r.mx.Lock()
426+
errorGraph, err := r.buildErrorGraph(job)
427+
r.mx.Unlock()
428+
if err != nil {
429+
log.
430+
WithError(err).
431+
WithField("jobID", job.ID).
432+
WithField("pipeline", job.Pipeline).
433+
Error("Failed to build error pipeline graph")
434+
// At this point, an error with the error handling happened - duh...
435+
// Nothing we can do at this point.
436+
return
437+
}
438+
439+
// if errorGraph is nil (and no error); no error handling configured for task.
440+
if errorGraph == nil {
441+
return
442+
}
443+
444+
// re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down
445+
// if ContinueRunningTasksAfterFailure == false)
446+
r.mx.Lock()
447+
r.initScheduler(job)
448+
r.mx.Unlock()
449+
450+
err = job.sched.Schedule(errorGraph)
451+
452+
if err != nil {
453+
log.
454+
WithError(err).
455+
WithField("jobID", job.ID).
456+
WithField("pipeline", job.Pipeline).
457+
Error("Failed to run error handling for job")
458+
} else {
459+
log.
460+
WithField("jobID", job.ID).
461+
WithField("pipeline", job.Pipeline).
462+
Info("error handling completed")
463+
}
464+
}
465+
466+
const OnErrorTaskName = "on_error"
467+
468+
func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) {
469+
pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline]
470+
if !pipelineDefExists {
471+
return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline)
472+
}
473+
onErrorTaskDef := pipelineDef.OnError
474+
if onErrorTaskDef == nil {
475+
// no error, but no error handling configured
476+
return nil, nil
477+
}
478+
479+
failedTask := job.firstFailedTask
480+
481+
failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout")
482+
failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr")
483+
484+
onErrorVariables := make(map[string]interface{})
485+
for key, value := range job.Variables {
486+
onErrorVariables[key] = value
487+
}
488+
489+
if failedTask != nil {
490+
onErrorVariables["failedTaskName"] = failedTask.Name
491+
onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode
492+
onErrorVariables["failedTaskError"] = failedTask.Error
493+
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
494+
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)
495+
}
496+
497+
onErrorJobTask := jobTask{
498+
TaskDef: definition.TaskDef{
499+
Script: onErrorTaskDef.Script,
500+
// AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log)
501+
AllowFailure: false,
502+
Env: onErrorTaskDef.Env,
503+
},
504+
Name: OnErrorTaskName,
505+
Status: toStatus(scheduler.StatusWaiting),
506+
}
507+
job.Tasks = append(job.Tasks, onErrorJobTask)
508+
509+
return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables)
403510
}
404511

405-
// HandleTaskChange will be called when the task state changes in the task runner
512+
func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte {
513+
if task == nil || job == nil {
514+
return []byte(nil)
515+
}
516+
517+
rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName)
518+
if err != nil {
519+
log.
520+
WithField("component", "runner").
521+
WithField("jobID", job.ID.String()).
522+
WithField("pipeline", job.Pipeline).
523+
WithField("failedTaskName", task.Name).
524+
WithField("outputName", outputName).
525+
WithError(err).
526+
Debug("Could not create stderrReader for failed task")
527+
return []byte(nil)
528+
} else {
529+
defer func(rc io.ReadCloser) {
530+
_ = rc.Close()
531+
}(rc)
532+
outputAsBytes, err := io.ReadAll(rc)
533+
if err != nil {
534+
log.
535+
WithField("component", "runner").
536+
WithField("jobID", job.ID.String()).
537+
WithField("pipeline", job.Pipeline).
538+
WithField("failedTaskName", task.Name).
539+
WithField("outputName", outputName).
540+
WithError(err).
541+
Debug("Could not read output of task")
542+
}
543+
544+
return outputAsBytes
545+
}
546+
547+
}
548+
549+
// HandleTaskChange will be called when the task state changes in the task runner (taskctl)
550+
// it is short-lived and updates our JobTask state accordingly.
406551
func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
407552
r.mx.Lock()
408553
defer r.mx.Unlock()
@@ -437,11 +582,15 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
437582
jt.Error = t.Error
438583
}
439584

440-
// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
585+
// If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false),
441586
// then we directly abort all other tasks of the job.
442587
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
443588
// if one task failed, and we want to kill the other tasks.
444589
if jt.Errored {
590+
if j.firstFailedTask == nil {
591+
// Remember the first failed task for later use in the error handling
592+
j.firstFailedTask = jt
593+
}
445594
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
446595
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
447596
log.
@@ -484,15 +633,10 @@ func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) {
484633
r.requestPersist()
485634
}
486635

487-
func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) {
636+
func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) {
488637
r.mx.Lock()
489638
defer r.mx.Unlock()
490639

491-
job := r.jobsByID[id]
492-
if job == nil {
493-
return
494-
}
495-
496640
job.deinitScheduler()
497641

498642
job.Completed = true
@@ -508,7 +652,7 @@ func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) {
508652
pipeline := job.Pipeline
509653
log.
510654
WithField("component", "runner").
511-
WithField("jobID", id).
655+
WithField("jobID", job.ID).
512656
WithField("pipeline", pipeline).
513657
Debug("Job completed")
514658

@@ -710,9 +854,6 @@ func (r *PipelineRunner) initialLoadFromStore() error {
710854
}
711855

712856
func (r *PipelineRunner) SaveToStore() {
713-
r.wg.Add(1)
714-
defer r.wg.Done()
715-
716857
log.
717858
WithField("component", "runner").
718859
Debugf("Saving job state to data store")
@@ -807,15 +948,21 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error {
807948
log.
808949
WithField("component", "runner").
809950
Debugf("Shutting down, waiting for pending operations...")
951+
810952
// Wait for all running jobs to have called JobCompleted
811953
r.wg.Wait()
812954

955+
// Wait until the persist loop is done
956+
<-r.persistLoopDone
957+
813958
// Do a final save to include the state of recently completed jobs
814959
r.SaveToStore()
815960
}()
816961

817962
r.mx.Lock()
818963
r.isShuttingDown = true
964+
r.shutdownCancel()
965+
819966
// Cancel all jobs on wait list
820967
for pipelineName, jobs := range r.waitListByPipeline {
821968
for _, job := range jobs {

0 commit comments

Comments
 (0)