diff --git a/command/monitor.go b/command/monitor.go new file mode 100644 index 000000000..5a04bfc22 --- /dev/null +++ b/command/monitor.go @@ -0,0 +1,89 @@ +package command + +import ( + "strings" + + "github.com/jrasell/levant/levant" + "github.com/jrasell/levant/levant/structs" + "github.com/jrasell/levant/logging" +) + +// MonitorCommand is the command implementation that allows users to monitor +// a job until it completes +type MonitorCommand struct { + Meta +} + +// Help provides the help information for the template command. +func (c *MonitorCommand) Help() string { + helpText := ` +Usage: levant monitor [options] [EVAL_ID] + + Monitor a Nomad job until it completes + +Arguments: + + EVAL_ID nomad job evaluation + +General Options: + + -timeout= + Number of seconds to allow until we exit with an error. + + -log-level= + Specify the verbosity level of Levant's logs. Valid values include DEBUG, + INFO, and WARN, in decreasing order of verbosity. The default is INFO. +` + return strings.TrimSpace(helpText) +} + +// Synopsis is provides a brief summary of the template command. +func (c *MonitorCommand) Synopsis() string { + return "Monitor a Nomad job until it completes" +} + +// Run triggers a run of the Levant monitor function. +func (c *MonitorCommand) Run(args []string) int { + + var timeout uint64 + var evalID string + var err error + config := &structs.Config{} + + flags := c.Meta.FlagSet("monitor", FlagSetVars) + flags.Usage = func() { c.UI.Output(c.Help()) } + + flags.Uint64Var(&timeout, "timeout", 0, "") + flags.StringVar(&config.LogLevel, "log-level", "INFO", "") + + if err = flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + + if err = logging.SetupLogger(config.LogLevel, config.LogFormat); err != nil { + c.UI.Error(err.Error()) + return 1 + } + + if len(args) == 1 { + evalID = args[0] + } else if len(args) == 0 { + c.UI.Error(c.Help()) + c.UI.Error("\nERROR: Please provide an evaluation ID to monitor.") + return 1 + } else { + c.UI.Error(c.Help()) + return 1 + } + + // trigger our monitor + err = levant.StartMonitor(config, &evalID, timeout, &c.Meta.flagVars) + if err != nil { + // we have already reported the errors so we don't need to do it again here. + return 1 + } + + return 0 +} diff --git a/commands.go b/commands.go index d3b9960d9..508a9cd0a 100644 --- a/commands.go +++ b/commands.go @@ -36,6 +36,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "monitor": func() (cli.Command, error) { + return &command.MonitorCommand{ + Meta: meta, + }, nil + }, "render": func() (cli.Command, error) { return &command.RenderCommand{ Meta: meta, diff --git a/levant/monitor.go b/levant/monitor.go new file mode 100644 index 000000000..a674f08e7 --- /dev/null +++ b/levant/monitor.go @@ -0,0 +1,185 @@ +package levant + +import ( + "errors" + "sync" + "time" + + nomad "github.com/hashicorp/nomad/api" + nomadStructs "github.com/hashicorp/nomad/nomad/structs" + "github.com/jrasell/levant/levant/structs" + "github.com/rs/zerolog/log" +) + +// StartMonitor will start monitoring a job +func StartMonitor(config *structs.Config, evalID *string, timeoutSeconds uint64, flagVars *map[string]string) error { + // Create our new deployment object. |~ + levantDep, err := newLevantDeployment(config) + if err != nil { + log.Error().Err(err).Msgf("levant/monitor: unable to setup Levant deployment: %v", err) + return err + } + + // start monitoring + err = levantDep.monitor(evalID, timeoutSeconds) + if err != nil { + // we have already reported the error so we don't need to report it again here + return err + } + return nil +} + +// Monitor follows a job until it completes. +func (l *levantDeployment) monitor(evalID *string, timeoutSeconds uint64) error { + + // set our timeout + timeout := time.Tick(time.Second * time.Duration(timeoutSeconds)) + + // close this to stop the job monitor + finish := make(chan bool) + defer close(finish) + // set up some communication channels + jobChan := make(chan *nomad.Job, 1) + errChan := make(chan error, 1) + + // get the evaluation + eval, _, err := l.nomad.Evaluations().Info(*evalID, nil) + if err != nil { + log.Error().Err(err).Msgf("levant/monitor: unable to get evaluation %v: %v", *evalID, err) + return err + } + + // get the job name + jobName := eval.JobID + + // start our monitor + go l.monitorJobInfo(jobName, jobChan, errChan, finish) + + for { + select { + case <-timeout: + log.Error().Err(err).Msgf("levant/monitor: timeout reached while monitoring job %s", jobName) + // run the allocation inspector + var allocIDS []string + // get some additional information about the exit of the job + allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil) + if err != nil { + log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err) + return err + } + // check to see if any of our allocations failed + for _, alloc := range allocs { + for _, task := range alloc.TaskStates { + // we need to test for success + if task.State != nomadStructs.TaskStarted { + allocIDS = append(allocIDS, alloc.ID) + // once we add the allocation we don't need to add it again + break + } + } + } + + l.inspectAllocs(allocIDS) + return errors.New("timeout reached") + case err = <-errChan: + log.Error().Err(err).Msgf("levant/monitor: unable to query job %s: %v", jobName, err) + log.Error().Err(err).Msg("Retrying...") + + case job := <-jobChan: + // depending on the state of the job we do different things + switch *job.Status { + // if the job is stopped then we take some action depending on why it stopped + case nomadStructs.JobStatusDead: + var allocIDS []string + // get some additional information about the exit of the job + allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil) + if err != nil { + log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err) + return err + } + // check to see if any of our allocations failed + for _, alloc := range allocs { + for _, task := range alloc.TaskStates { + if task.Failed { + allocIDS = append(allocIDS, alloc.ID) + } + } + } + if len(allocIDS) > 0 { + l.inspectAllocs(allocIDS) + return errors.New("Some or all allocations failed") + } + // otherwise we print a message and just return no error + log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status) + return nil + case nomadStructs.JobStatusRunning: + log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status) + // if its a paramaterized or periodic job we stop here + if job.IsParameterized() { + log.Info().Msgf("levant/monitor: job %s is parameterized. Running is its final state.", jobName) + return nil + } + if job.IsPeriodic() { + log.Info().Msgf("levant/monitor: job %s is periodic. Running is its final state.", jobName) + return nil + } + default: + log.Debug().Msgf("levant/monitor: got job state %s. Don't know what to do with that.", *job.Status) + } + } + } +} + +// monitorJobInfo will get information on a job from nomad and returns the information on channels +// once it has updated +func (l *levantDeployment) monitorJobInfo(jobName string, jobChan chan<- *nomad.Job, errChan chan<- error, done chan bool) { + + // Setup the Nomad QueryOptions to allow blocking query and a timeout. + q := &nomad.QueryOptions{WaitIndex: 0, WaitTime: time.Second * 10} + + for { + select { + case <-done: + // allow us to exit on demand (technically, it will still need to wait for the namad query to return) + return + default: + // get our job info + job, meta, err := l.nomad.Jobs().Info(jobName, q) + if err != nil { + errChan <- err + // sleep a bit before retrying + time.Sleep(time.Second * 5) + continue + } + // only take action if the information has changed + if meta.LastIndex > q.WaitIndex { + q.WaitIndex = meta.LastIndex + jobChan <- job + } else { + // log a debug message + log.Debug().Msgf("levant/monitor: job %s currently has status %s", jobName, *job.Status) + } + } + } +} + +// inspectAllocs is a helper function that will call the allocInspector for each of the provided allocations +// and return when it has completed +func (l *levantDeployment) inspectAllocs(allocs []string) { + // if we have failed allocations than we print information about them + if len(allocs) > 0 { + // we want to run throuh and get messages for all of our failed allocations in parallel + var wg sync.WaitGroup + wg.Add(+len(allocs)) + + // Inspect each allocation. + for _, id := range allocs { + log.Debug().Msgf("levant/monitor: launching allocation inspector for alloc %v", id) + go l.allocInspector(id, &wg) + } + + // wait until our allocations have printed messages + wg.Wait() + return + } +}