Skip to content

Commit

Permalink
Improve batch job and restart behavior (#30)
Browse files Browse the repository at this point in the history
The motivation for this was starting to use nomad batch jobs. Using
these depends on being able to get an exit code from the container,
which the driver didn't support so far. Changes to pot to make
this work are in bsdpot/pot#200 .

When a batch job returns an error code != 0, the restart behavior
in Nomad's restart stanza is applied. Restarts happen in the
context of the same allocation - this is also true for jobs
of type `service`. As the pot nomad driver would base the potName
on this data, the container name would be recycled. This
resulted in all kinds of problems when restarting tasks
rapidly.

To correct this, I changed the naming of pots from

    jobname + taskname + "_" + allocId

to

    taskname + "_" + invocationId + "_" + allocId

Having invocationId in there makes sure that each container
name is actually complete (for the sake of not changing pot
in this respect, `invocationId + "_" + allocId` are passed in as
allocId when calling `pot prepare`. The resulting pot names
look quite okay (the way I structured jobs, they actually look
better/are easier on the eyes, but that's subjective).

Retrieving the exit code makes use of a new pot feature in the
review mentioned above. This is a two level process:

1. If in potWait (no Nomad restart happened):
   Check if `pot start` returned a distinct error code and if it did,
   use `pot last-run-stats` to retrieve the process' exit code.
2. If in recoverWait (Nomad restart happened):
   Always use `pot last-run-stats` to retrieve the process' exit code.

In both cases, the pot container is destroyed immediately once finished
to avoid piling up stale pots that would need to be garbage collected
with `pot prune` (which can get quite expensive). In the future,
a parameter could be added to allow to configure this behavior.

I hope I didn't miss any potential code paths (batch jobs rely on
getting reliable results from the driver).

Example batch job definition:

    job "cmd" {
      datacenters = ["dc1"]
      type = "batch"

      group "cmd-group" {
        task "command" {
          driver = "pot"

          restart { # agressive
            interval = "30m"
            attempts = 200
            delay    = "0s"
            mode     = "fail"
          }

          config {
            image = "https://pottery.example.org"
            pot = "command_13_0"
            tag = "0.1"
            command = "/bin/sh"
            args = ["-c", "'date; false'"]
          }
        }
      }
    }
  • Loading branch information
grembo authored Jul 6, 2022
1 parent 36f92e4 commit fa96104
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 20 deletions.
39 changes: 36 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -295,7 +297,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("unable to recover a container that is not running")
} else {
se.containerPid = alive
completeName := handle.Config.JobName + handle.Config.Name + "_" + handle.Config.AllocID
parts := strings.Split(handle.Config.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]
Sout, err := se.Stdout()
if err != nil {
d.logger.Error("Error setting stdout with", "err", err)
Expand Down Expand Up @@ -433,7 +436,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}
} else {
se.containerPid = alive
completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

se.cmd = &exec.Cmd{
Args: []string{"/usr/local/bin/pot", "start", completeName},
Expand Down Expand Up @@ -505,16 +509,45 @@ OuterLoop:

handle, _ := d.tasks.Get(id)
handle.procState = drivers.TaskStateExited

last_run, err := se.getContainerLastRunStats(handle.taskConfig)
if err != nil {
d.logger.Error("Error getting container last-run-stats with err: ", err)
handle.exitResult.ExitCode = defaultFailedCode
} else {
handle.exitResult.ExitCode = last_run.ExitCode
}

err = se.destroyContainer(handle.taskConfig)
if err != nil {
d.logger.Error("Error destroying container with err: ", err)
}
}

func (d *Driver) potWait(taskID string, se syexec) {
handle, _ := d.tasks.Get(taskID)
err := se.cmd.Wait()
handle.procState = drivers.TaskStateExited
if err != nil {
d.logger.Error("Error exiting se.cmd.Wait in potWait", "Err", err)
handle.exitResult.ExitCode = defaultFailedCode
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
if ws.ExitStatus() == 125 { // enclosed process exited with error
last_run_stats, err := se.getContainerLastRunStats(handle.taskConfig)
if err != nil {
d.logger.Error("Error getting container last-run-stats with err: ", err)
} else {
handle.exitResult.ExitCode = last_run_stats.ExitCode
}
}
}
}
handle.procState = drivers.TaskStateExited

err = se.destroyContainer(handle.taskConfig)
if err != nil {
d.logger.Error("Error destroying container with err: ", err)
}
}

// WaitTask waits for task completion
Expand Down
1 change: 1 addition & 0 deletions driver/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (h *taskHandle) run() {

if h.syexec.ExitError != nil {
h.exitResult.Err = h.syexec.ExitError
h.exitResult.ExitCode = h.syexec.exitCode
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
Expand Down
63 changes: 59 additions & 4 deletions driver/pot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type syexec struct {
argvStart []string
argvStop []string
argvStats []string
argvLastRunStats []string
argvDestroy []string
cmd *exec.Cmd
cachedir string
Expand Down Expand Up @@ -67,6 +68,10 @@ type potStats struct {
} `json:"ResourceUsage"`
}

type lastRunStats struct {
ExitCode int `json:"ExitCode"`
}

var potStatistics map[string]potStats

func init() {
Expand Down Expand Up @@ -364,8 +369,8 @@ func (s *syexec) containerStats(commandCfg *drivers.TaskConfig) (stats potStats,

func (s *syexec) checkContainerAlive(commandCfg *drivers.TaskConfig) int {
s.logger.Trace("Checking if pot is alive", "Checking")
completeName := commandCfg.JobName + commandCfg.Name
potName := completeName + "_" + commandCfg.AllocID
parts := strings.Split(commandCfg.ID, "/")
potName := parts[1] + "_" + parts[2] + "_" + parts[0]
s.logger.Trace("Allocation name beeing check for liveness", "alive", potName)

psCommand := "/bin/sh /usr/local/bin/pot start " + potName
Expand Down Expand Up @@ -395,8 +400,8 @@ func (s *syexec) checkContainerAlive(commandCfg *drivers.TaskConfig) int {

func (s *syexec) checkContainerExists(commandCfg *drivers.TaskConfig) int {
s.logger.Debug("Checking if pot is alive")
completeName := commandCfg.JobName + commandCfg.Name
potName := completeName + "_" + commandCfg.AllocID
parts := strings.Split(commandCfg.ID, "/")
potName := parts[1] + "_" + parts[2] + "_" + parts[0]
s.logger.Trace("Allocation name beeing check for liveness", "alive", potName)

pidCommand := "/usr/local/bin/pot ls -q | grep " + potName
Expand All @@ -420,3 +425,53 @@ func (s *syexec) checkContainerExists(commandCfg *drivers.TaskConfig) int {

return 0
}

func (s *syexec) getContainerLastRunStats(commandCfg *drivers.TaskConfig) (stats lastRunStats, err error) {
s.logger.Debug("launching LastRunStatsContainer command", strings.Join(s.argvLastRunStats, " "))

cmd := exec.Command(potBIN, s.argvLastRunStats...)

// set the task dir as the working directory for the command
cmd.Dir = commandCfg.TaskDir().Dir
cmd.Path = potBIN
cmd.Args = append([]string{cmd.Path}, s.argvLastRunStats...)

var outb, errb bytes.Buffer
cmd.Stdout = &outb
cmd.Stderr = &errb

// Start the process
if err := cmd.Run(); err != nil {
// try to get the exit code
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.exitCode = ws.ExitStatus()
} else {
s.logger.Error("Could not get exit code for container last-run-stats ", "pot", s.argvLastRunStats)
s.exitCode = defaultFailedCode
}
} else {
// success, exitCode should be 0 if go is ok
ws := cmd.ProcessState.Sys().(syscall.WaitStatus)
s.exitCode = ws.ExitStatus()
}

s.cmd = cmd

s.state = &psState{Pid: s.cmd.Process.Pid, ExitCode: s.exitCode, Time: time.Now()}

var lastRunStats lastRunStats

if s.exitCode != 0 {
err = errors.New("Pot exit code different than 0")
return lastRunStats, err
}

err = json.Unmarshal([]byte(outb.String()), &lastRunStats)
if err != nil {
s.logger.Error("Error unmarshaling json with err: ", err)
return lastRunStats, err
}

return lastRunStats, nil
}
33 changes: 20 additions & 13 deletions driver/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro

argv = append(argv, "prepare", "-U", taskCfg.Image, "-p", taskCfg.Pot, "-t", taskCfg.Tag)

if cfg.AllocID != "" {
argv = append(argv, "-a", cfg.AllocID)
}

if len(taskCfg.Args) > 0 {
if taskCfg.Command == "" {
err := errors.New("command can not be empty if arguments are provided")
Expand Down Expand Up @@ -59,12 +55,17 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro
}
}

completeName := cfg.JobName + cfg.Name
argv = append(argv, "-n", completeName, "-v")
parts := strings.Split(cfg.ID, "/")
baseName := parts[1]
jobIDAllocID := parts[2] + "_" + parts[0]
if jobIDAllocID != "" {
argv = append(argv, "-a", jobIDAllocID)
}
argv = append(argv, "-n", baseName, "-v")

se.argvCreate = argv

potName := completeName + "_" + cfg.AllocID
potName := baseName + "_" + jobIDAllocID

//Mount local
commandLocal := "mount-in -p " + potName + " -d " + cfg.TaskDir().LocalDir + " -m /local"
Expand Down Expand Up @@ -140,14 +141,18 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro
argvStop = append(argvStop, "stop", potName)
se.argvStop = argvStop

argvDestroy := make([]string, 0, 50)
argvDestroy = append(argvDestroy, "destroy", "-p", potName)
se.argvDestroy = argvDestroy

argvStats := make([]string, 0, 50)
argvStats = append(argvStats, "get-rss", "-p", potName, "-J")
se.argvStats = argvStats

argvLastRunStats := make([]string, 0, 50)
argvLastRunStats = append(argvLastRunStats, "last-run-stats", "-p", potName)
se.argvLastRunStats = argvLastRunStats

argvDestroy := make([]string, 0, 50)
argvDestroy = append(argvDestroy, "destroy", "-p", potName)
se.argvDestroy = argvDestroy

return se, nil
}

Expand Down Expand Up @@ -209,7 +214,8 @@ func prepareStop(cfg *drivers.TaskConfig, taskCfg TaskConfig) syexec {

argv = append(argv, "stop")

completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

argv = append(argv, completeName)

Expand All @@ -229,7 +235,8 @@ func prepareDestroy(cfg *drivers.TaskConfig, taskCfg TaskConfig) syexec {

argv = append(argv, "destroy")

completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

argv = append(argv, "-p", completeName)

Expand Down

0 comments on commit fa96104

Please # to comment.