Skip to content

Commit

Permalink
Preserve artifact even if a step failed, improve logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
gnutix committed Nov 23, 2023
1 parent 7234651 commit f431460
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 40 deletions.
70 changes: 41 additions & 29 deletions provisioner/internal/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import (

func RunContainer(
ctx context.Context,
log *slog.Logger,
docker *client.Client,
task *scheduler.Task,
fs WorkspaceFS,
runConfig scheduler.RunTaskConfig,
) (int, error) {
cleanup := func(what string, thunk func() error, args ...any) {
tryTo := func(what string, thunk func() error, args ...any) {
if err := thunk(); err != nil {
args = append([]any{"error", err}, args...)
log.Error("Failed to cleanup "+what, args...)
task.Log.Error("Failed to "+what, args...)
}
}

Expand All @@ -45,8 +44,8 @@ func RunContainer(
return -1, fmt.Errorf("failed to create docker network: %w", err)
}
networkId := netResp.ID
defer cleanup(
"network",
defer tryTo(
"remove Docker network",
func() error {
return docker.NetworkRemove(context.Background(), networkId)
},
Expand All @@ -58,8 +57,8 @@ func RunContainer(
if err := taskFs.MkDir("/"); err != nil {
return -1, fmt.Errorf("failed to create workspace: %w", err)
}
defer cleanup(
"workspace",
defer tryTo(
"remove task workspace",
func() error {
return taskFs.Delete("/")
},
Expand All @@ -77,7 +76,7 @@ func RunContainer(
serviceContainers := map[string]string{}

for _, service := range task.Job.Services {
serviceLog := log.With("service", service.Name)
serviceLog := task.Log.With("service", service.Name)

env := lo.Map(service.Env, func(jobEnv *proto.Job_Env, _ int) string {
return fmt.Sprintf("%s=%s", jobEnv.Key, jobEnv.Value)
Expand Down Expand Up @@ -131,8 +130,8 @@ func RunContainer(
if err != nil {
return -1, fmt.Errorf("failed to create docker container for service '%s': %w", service.Name, err)
}
defer cleanup(
"service container",
defer tryTo(
"remove service container",
func() error {
return docker.ContainerRemove(context.Background(), resp.ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true})
},
Expand All @@ -149,7 +148,7 @@ func RunContainer(
for _, service := range task.Job.Services {
go func(service *proto.Job_Service) {
defer wg.Done()
serviceLog := log.With("service", service.Name)
serviceLog := task.Log.With("service", service.Name)

serviceLog.Debug("Starting service container")
containerId := serviceContainers[service.Name]
Expand Down Expand Up @@ -237,9 +236,10 @@ func RunContainer(

// Create and execute steps containers
var status container.WaitResponse
var stepError error
for i, image := range task.Job.Steps {
// Using a func here so that defer are called between each iteration
if err := func(stepIndex int) error {
stepError = func(stepIndex int) error {
secretEnv := []string{}
for _, secret := range task.Job.Secrets {
if runConfig.SecretLoader == nil {
Expand Down Expand Up @@ -294,11 +294,12 @@ func RunContainer(
if err != nil {
return fmt.Errorf("failed to create docker container for step %d: %w", stepIndex, err)
}
defer cleanup(
"step container",
defer tryTo(
"remove step container",
func() error {
return docker.ContainerRemove(context.Background(), resp.ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true})
},
"step", stepIndex,
)

// Start main container
Expand Down Expand Up @@ -328,26 +329,37 @@ func RunContainer(
}

return nil
}(i + 1); err != nil {
return -1, err
}(i + 1)

// There's no point in executing further steps if one of them failed
if stepError != nil {
break
}
}

// Preserve artifacts
if runConfig.ArtifactPreserver != nil {
reader, err := taskFs.Archive("/output")
if err != nil {
return -1, fmt.Errorf("failed to archive 'output' directory: %w", err)
}
defer reader.Close()
// Here we don't use defer because the task workspace is removed in a defer statement already
tryTo(
"preserve artifact",
func() error {
if runConfig.ArtifactPreserver != nil {
task.Log.Debug("Preserve artifact")

if err := runConfig.ArtifactPreserver(reader, task); err != nil {
return -1, fmt.Errorf("failed to preserve artifacts: %w", err)
}
}
reader, err := taskFs.Archive("/output")
if err != nil {
return fmt.Errorf("failed to archive 'output' directory: %w", err)
}
defer reader.Close()

if err := runConfig.ArtifactPreserver(reader, task); err != nil {
return fmt.Errorf("failed to preserve artifacts: %w", err)
}
}
return nil
},
)

if status.StatusCode != 0 {
return int(status.StatusCode), fmt.Errorf("task execution ended with status: %d", status.StatusCode)
if stepError != nil {
return lo.Ternary(status.StatusCode != 0, int(status.StatusCode), -1), fmt.Errorf("task execution ended with error: %w", stepError)
}

return 0, nil
Expand Down
2 changes: 1 addition & 1 deletion provisioner/local/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (n *Node) Name() string {
}

func (n *Node) RunTask(task *scheduler.Task, runConfig scheduler.RunTaskConfig) (int, error) {
return internal.RunContainer(n.ctx, n.log, n.docker, task, n.provisioner.fs, runConfig)
return internal.RunContainer(n.ctx, n.docker, task, n.provisioner.fs, runConfig)
}

func (*Node) Terminate() error {
Expand Down
2 changes: 1 addition & 1 deletion provisioner/openstack/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (n *Node) RunTask(task *scheduler.Task, runConfig scheduler.RunTaskConfig)
}
}

return internal.RunContainer(context.TODO(), n.log, n.docker, task, n.fs, runConfig)
return internal.RunContainer(context.TODO(), n.docker, task, n.fs, runConfig)
}

func (n *Node) ensureNodeHasImage(image string) error {
Expand Down
13 changes: 6 additions & 7 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ func (s *Scheduler) Run() {
task := &Task{
Job: job,
Name: name,

log: s.log.With(slog.Group("task", "job", job.FQN(), "name", name)),
Log: s.log.With(slog.Group("task", "job", job.FQN(), "name", name)),
}

task.log.Debug("Queuing task")
task.Log.Debug("Queuing task")
s.broadcast(EventTaskQueued{Job: task.Job.FQN(), Task: task.Name})
s.tasksQueue = append(s.tasksQueue, task)
}
Expand Down Expand Up @@ -212,7 +211,7 @@ func (s *Scheduler) scheduleTaskOnOnlineNode() bool {
nodeState.tasks[slot] = nextTask
s.tasksQueue = s.tasksQueue[1:]

nextTask.log.Info("Scheduling task on node", "slot", fmt.Sprintf("%s:%d", nodeState.node.Name(), slot), "remainingTasks", len(s.tasksQueue))
nextTask.Log.Info("Scheduling task on node", "slot", fmt.Sprintf("%s:%d", nodeState.node.Name(), slot), "remainingTasks", len(s.tasksQueue))
s.broadcast(EventNodeSlotUpdated{Node: nodeState.nodeName, Slot: slot, Task: &NodeSlotTask{nextTask.Job.Name, nextTask.Name}})

go s.watchTaskExecution(nodeState, slot, nextTask)
Expand Down Expand Up @@ -390,14 +389,14 @@ func (s *Scheduler) watchTaskExecution(nodeState *nodeState, slot int, task *Tas
SecretLoader: s.config.SecretLoader,
}

task.log.Info("Running task")
task.Log.Info("Running task")
s.broadcast(EventTaskRunning{Job: task.Job.FQN(), Task: task.Name})

if exitCode, err := node.RunTask(task, runConfig); err != nil {
task.log.Warn("Task failed", "error", err)
task.Log.Warn("Task failed", "error", err)
s.broadcast(EventTaskFailed{Job: task.Job.FQN(), Task: task.Name, ExitCode: exitCode})
} else {
task.log.Info("Task completed")
task.Log.Info("Task completed")
s.broadcast(EventTaskCompleted{Job: task.Job.FQN(), Task: task.Name})
}

Expand Down
3 changes: 1 addition & 2 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ func (ts TaskStatus) AsProto() proto.TaskStatus_Status {
type Task struct {
Job *Job
Name string

log *slog.Logger
Log *slog.Logger
}

func (t *Task) FQN() string {
Expand Down

0 comments on commit f431460

Please sign in to comment.