diff --git a/.changelog/18794.txt b/.changelog/18794.txt new file mode 100644 index 00000000000..0fe9457ca42 --- /dev/null +++ b/.changelog/18794.txt @@ -0,0 +1,3 @@ +```release-note:improvement +actions: introduces the action concept to jobspecs, the web UI, CLI and API +``` diff --git a/api/allocations_exec.go b/api/allocations_exec.go index 5300f5f6082..49c673eccaa 100644 --- a/api/allocations_exec.go +++ b/api/allocations_exec.go @@ -28,6 +28,7 @@ type execSession struct { task string tty bool command []string + action string stdin io.Reader stdout io.Writer @@ -94,9 +95,15 @@ func (s *execSession) startConnection() (*websocket.Conn, error) { q.Params["tty"] = strconv.FormatBool(s.tty) q.Params["task"] = s.task q.Params["command"] = string(commandBytes) - reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", s.alloc.ID) + if s.action != "" { + q.Params["action"] = s.action + q.Params["allocID"] = s.alloc.ID + q.Params["group"] = s.alloc.TaskGroup + reqPath = fmt.Sprintf("/v1/job/%s/action", s.alloc.JobID) + } + var conn *websocket.Conn if nodeClient != nil { diff --git a/api/jobs.go b/api/jobs.go index 297e595c97e..ca7608a95ad 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -4,8 +4,10 @@ package api import ( + "context" "errors" "fmt" + "io" "net/url" "sort" "strconv" @@ -1514,3 +1516,30 @@ type JobEvaluateRequest struct { type EvalOptions struct { ForceReschedule bool } + +// ActionExec is used to run a pre-defined command inside a running task. +// The call blocks until command terminates (or an error occurs), and returns the exit code. +func (j *Jobs) ActionExec(ctx context.Context, + alloc *Allocation, task string, tty bool, command []string, + action string, + stdin io.Reader, stdout, stderr io.Writer, + terminalSizeCh <-chan TerminalSize, q *QueryOptions) (exitCode int, err error) { + + s := &execSession{ + client: j.client, + alloc: alloc, + task: task, + tty: tty, + command: command, + action: action, + + stdin: stdin, + stdout: stdout, + stderr: stderr, + + terminalSizeCh: terminalSizeCh, + q: q, + } + + return s.run(ctx) +} diff --git a/api/tasks.go b/api/tasks.go index 524b3f62550..b5851113098 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -731,6 +731,8 @@ type Task struct { // Workload Identities Identities []*WorkloadIdentity `hcl:"identity,block"` + + Actions []*Action `hcl:"action,block"` } func (t *Task) Canonicalize(tg *TaskGroup, job *Job) { @@ -1167,3 +1169,9 @@ type WorkloadIdentity struct { ServiceName string `hcl:"service_name,optional"` TTL time.Duration `mapstructure:"ttl" hcl:"ttl,optional"` } + +type Action struct { + Name string `hcl:"name,label"` + Command string `mapstructure:"command" hcl:"command"` + Args []string `mapstructure:"args" hcl:"args,optional"` +} diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 8eae2da78a3..fb6068a5d0c 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -177,7 +177,6 @@ func (a *Allocations) exec(conn io.ReadWriteCloser) { handleStreamResultError(err, code, encoder) return } - a.c.logger.Info("task exec session ended", "exec_id", execID) } @@ -216,6 +215,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e "task", req.Task, "command", req.Cmd, "tty", req.Tty, + "action", req.Action, } if ident != nil { if ident.ACLToken != nil { @@ -238,7 +238,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e // Check alloc-exec permission. if err != nil { - return nil, err + return pointer.Of(int64(400)), err } else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityAllocExec) { return nil, nstructs.ErrPermissionDenied } @@ -247,6 +247,20 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e if req.Task == "" { return pointer.Of(int64(400)), taskNotPresentErr } + + // If an action is present, go find the command and args + if req.Action != "" { + alloc, _ := a.c.GetAlloc(req.AllocID) + jobAction, err := validateActionExists(req.Action, req.Task, alloc) + if err != nil { + return pointer.Of(int64(400)), err + } + if jobAction != nil { + // append both Command and Args + req.Cmd = append([]string{jobAction.Command}, jobAction.Args...) + } + } + if len(req.Cmd) == 0 { return pointer.Of(int64(400)), errors.New("command is not present") } @@ -343,3 +357,14 @@ func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) { err := s.decoder.Decode(&req) return &req, err } + +func validateActionExists(actionName string, taskName string, alloc *nstructs.Allocation) (*nstructs.Action, error) { + t := alloc.LookupTask(taskName) + + for _, action := range t.Actions { + if action.Name == actionName { + return action, nil + } + } + return nil, fmt.Errorf("action %s not found", actionName) +} diff --git a/client/structs/structs.go b/client/structs/structs.go index 955725d8530..5075599e630 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -187,6 +187,9 @@ type AllocExecRequest struct { // Cmd is the command to be executed Cmd []string + // The name of a predefined command to be executed (optional) + Action string + structs.QueryOptions } diff --git a/command/action.go b/command/action.go new file mode 100644 index 00000000000..2d3dd791a0a --- /dev/null +++ b/command/action.go @@ -0,0 +1,317 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/hashicorp/nomad/helper/escapingio" + "github.com/posener/complete" +) + +type ActionCommand struct { + Meta + + Stdin io.Reader + Stdout io.WriteCloser + Stderr io.WriteCloser +} + +func (l *ActionCommand) Help() string { + helpText := ` +Usage: nomad action [options] + + Perform a predefined command inside the environment of the given allocation + and task. + + When ACLs are enabled, this command requires a token with the 'alloc-exec', + 'read-job', and 'list-jobs' capabilities for a task's namespace. If + the task driver does not have file system isolation (as with 'raw_exec'), + this command requires the 'alloc-node-exec', 'read-job', and 'list-jobs' + capabilities for the task's namespace. + +General Options: + + ` + generalOptionsUsage(usageOptsNoNamespace) + ` + +Action Specific Options: + + -job + Specifies the job in which the Action is defined + + -allocation + Specifies the allocation in which the Action is defined. If not provided, + a group and task name must be provided and a random allocation will be + selected from the job. + + -task + Specifies the task in which the Action is defined. Required if no + allocation is provided. + + -group + Specifies the group in which the Action is defined. Required if no + allocation is provided. + + -i + Pass stdin to the container, defaults to true. Pass -i=false to disable. + + -t + Allocate a pseudo-tty, defaults to true if stdin is detected to be a tty session. + Pass -t=false to disable explicitly. + + -e + Sets the escape character for sessions with a pty (default: '~'). The escape + character is only recognized at the beginning of a line. The escape character + followed by a dot ('.') closes the connection. Setting the character to + 'none' disables any escapes and makes the session fully transparent. + ` + return strings.TrimSpace(helpText) +} + +func (l *ActionCommand) Synopsis() string { + return "Run a pre-defined action from a Nomad task" +} + +func (l *ActionCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(l.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-task": complete.PredictAnything, + "-job": complete.PredictAnything, + "-allocation": complete.PredictAnything, + }) +} + +func (l *ActionCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := l.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Allocs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Allocs] + }) +} + +func (l *ActionCommand) Name() string { return "action" } + +func (l *ActionCommand) Run(args []string) int { + + var stdinOpt, ttyOpt bool + var task, allocation, job, group, escapeChar string + + flags := l.Meta.FlagSet(l.Name(), FlagSetClient) + flags.Usage = func() { l.Ui.Output(l.Help()) } + flags.StringVar(&task, "task", "", "") + flags.StringVar(&group, "group", "", "") + flags.StringVar(&allocation, "allocation", "", "") + flags.StringVar(&job, "job", "", "") + flags.BoolVar(&stdinOpt, "i", true, "") + flags.BoolVar(&ttyOpt, "t", isTty(), "") + flags.StringVar(&escapeChar, "e", "~", "") + + if err := flags.Parse(args); err != nil { + l.Ui.Error(fmt.Sprintf("Error parsing flags: %s", err)) + return 1 + } + + args = flags.Args() + + if len(args) < 1 { + l.Ui.Error("An action name is required") + return 1 + } + + if job == "" { + l.Ui.Error("A job ID is required") + return 1 + } + + if ttyOpt && !stdinOpt { + l.Ui.Error("-i must be enabled if running with tty") + return 1 + } + + if escapeChar == "none" { + escapeChar = "" + } + + if len(escapeChar) > 1 { + l.Ui.Error("-e requires 'none' or a single character") + return 1 + } + + client, err := l.Meta.Client() + if err != nil { + l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + var allocStub *api.AllocationListStub + // If no allocation provided, grab a random one from the job + if allocation == "" { + + // Group param cannot be empty if allocation is empty, + // since we'll need to get a random allocation from the group + if group == "" { + l.Ui.Error("A group name is required if no allocation is provided") + return 1 + } + + if task == "" { + l.Ui.Error("A task name is required if no allocation is provided") + return 1 + } + + jobID, ns, err := l.JobIDByPrefix(client, job, nil) + if err != nil { + l.Ui.Error(err.Error()) + return 1 + } + + allocStub, err = getRandomJobAlloc(client, jobID, group, ns) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) + return 1 + } + } else { + allocs, _, err := client.Allocations().PrefixList(sanitizeUUIDPrefix(allocation)) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + + if len(allocs) == 0 { + l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocation)) + return 1 + } + + if len(allocs) > 1 { + out := formatAllocListStubs(allocs, false, shortId) + l.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out)) + return 1 + } + + allocStub = allocs[0] + } + + q := &api.QueryOptions{Namespace: allocStub.Namespace} + alloc, _, err := client.Allocations().Info(allocStub.ID, q) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + if task != "" { + err = validateTaskExistsInAllocation(task, alloc) + } else { + task, err = lookupAllocTask(alloc) + } + if err != nil { + l.Ui.Error(err.Error()) + return 1 + } + + if !stdinOpt { + l.Stdin = bytes.NewReader(nil) + } + + if l.Stdin == nil { + l.Stdin = os.Stdin + } + + if l.Stdout == nil { + l.Stdout = os.Stdout + } + + if l.Stderr == nil { + l.Stderr = os.Stderr + } + + action := args[0] + + code, err := l.execImpl(client, alloc, task, job, action, ttyOpt, escapeChar, l.Stdin, l.Stdout, l.Stderr) + if err != nil { + l.Ui.Error(fmt.Sprintf("failed to exec into task: %v", err)) + return 1 + } + + return code +} + +// execImpl invokes the Alloc Exec api call, it also prepares and restores terminal states as necessary. +func (l *ActionCommand) execImpl(client *api.Client, alloc *api.Allocation, task string, job string, action string, tty bool, + escapeChar string, stdin io.Reader, stdout, stderr io.WriteCloser) (int, error) { + + sizeCh := make(chan api.TerminalSize, 1) + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + // When tty, ensures we capture all user input and monitor terminal resizes. + if tty { + if stdin == nil { + return -1, fmt.Errorf("stdin is null") + } + + inCleanup, err := setRawTerminal(stdin) + if err != nil { + return -1, err + } + defer inCleanup() + + outCleanup, err := setRawTerminalOutput(stdout) + if err != nil { + return -1, err + } + defer outCleanup() + + sizeCleanup, err := watchTerminalSize(stdout, sizeCh) + if err != nil { + return -1, err + } + defer sizeCleanup() + + if escapeChar != "" { + stdin = escapingio.NewReader(stdin, escapeChar[0], func(c byte) bool { + switch c { + case '.': + // need to restore tty state so error reporting here + // gets emitted at beginning of line + outCleanup() + inCleanup() + + stderr.Write([]byte("\nConnection closed\n")) + cancelFn() + return true + default: + return false + } + }) + } + } + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + go func() { + for range signalCh { + cancelFn() + } + }() + + return client.Jobs().ActionExec(ctx, + alloc, task, tty, make([]string, 0), action, stdin, stdout, stderr, sizeCh, nil) +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index a048f210f57..011d5fb25fc 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -12,8 +12,10 @@ import ( "strings" "github.com/golang/snappy" + "github.com/gorilla/websocket" "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" @@ -110,6 +112,11 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/submission"): jobID := strings.TrimSuffix(path, "/submission") return s.jobSubmissionCRUD(resp, req, jobID) + case strings.HasSuffix(path, "/actions"): + jobID := strings.TrimSuffix(path, "/actions") + return s.jobActions(resp, req, jobID) + case strings.HasSuffix(path, "/action"): + return s.jobRunAction(resp, req) default: return s.jobCRUD(resp, req, path) } @@ -333,6 +340,69 @@ func (s *HTTPServer) jobLatestDeployment(resp http.ResponseWriter, req *http.Req return out.Deployment, nil } +func (s *HTTPServer) jobActions(resp http.ResponseWriter, req *http.Request, jobID string) (any, error) { + if req.Method != http.MethodGet { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.JobSpecificRequest{ + JobID: jobID, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.ActionListResponse + if err := s.agent.RPC("Job.GetActions", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &structs.QueryMeta{}) + + return out.Actions, nil +} + +func (s *HTTPServer) jobRunAction(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + + s.logger.Info("jobRunAction called") + + // Build the request and parse the ACL token + task := req.URL.Query().Get("task") + action := req.URL.Query().Get("action") + allocID := req.URL.Query().Get("allocID") + isTTY := false + err := error(nil) + if tty := req.URL.Query().Get("tty"); tty != "" { + isTTY, err = strconv.ParseBool(tty) + if err != nil { + return nil, fmt.Errorf("tty value is not a boolean: %v", err) + } + } + + args := cstructs.AllocExecRequest{ + Task: task, + Action: action, + AllocID: allocID, + Tty: isTTY, + } + + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + conn, err := s.wsUpgrader.Upgrade(resp, req, nil) + + if err != nil { + return nil, fmt.Errorf("failed to upgrade connection: %v", err) + } + + if err := readWsHandshake(conn.ReadJSON, req, &args.QueryOptions); err != nil { + conn.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(toWsCode(400), err.Error())) + return nil, err + } + + return s.execStreamImpl(conn, &args) +} + func (s *HTTPServer) jobSubmissionCRUD(resp http.ResponseWriter, req *http.Request, jobID string) (*structs.JobSubmission, error) { version, err := strconv.ParseUint(req.URL.Query().Get("version"), 10, 64) if err != nil { @@ -1341,6 +1411,11 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, Sidecar: apiTask.Lifecycle.Sidecar, } } + + for _, action := range apiTask.Actions { + act := ApiActionToStructsAction(job, action) + structsTask.Actions = append(structsTask.Actions, act) + } } // apiWaitConfigToStructsWaitConfig is a copy and type conversion between the API @@ -1383,6 +1458,14 @@ func ApiCSIPluginConfigToStructsCSIPluginConfig(apiConfig *api.TaskCSIPluginConf return sc } +func ApiActionToStructsAction(job *structs.Job, action *api.Action) *structs.Action { + return &structs.Action{ + Name: action.Name, + Args: slices.Clone(action.Args), + Command: action.Command, + } +} + func ApiResourcesToStructs(in *api.Resources) *structs.Resources { if in == nil { return nil diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 690525766f6..0f59b6834d6 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1247,6 +1247,131 @@ func TestHTTP_Job_ScaleStatus(t *testing.T) { }) } +func TestHTTP_JobActions(t *testing.T) { + ci.Parallel(t) + httpTest(t, nil, func(s *TestAgent) { + job := mock.Job() + + regReq := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var regResp structs.JobRegisterResponse + must.NoError(t, s.Agent.RPC("Job.Register", ®Req, ®Resp)) + + // Make the HTTP request to get job actions + req, err := http.NewRequest("GET", "/v1/job/"+job.ID+"/actions", nil) + must.NoError(t, err) + respW := httptest.NewRecorder() + + obj, err := s.Server.JobSpecificRequest(respW, req) + must.NoError(t, err) + + // Check the output + actionsResp := obj.([]*structs.JobAction) + + // Two actions by default, both in Task web and Group web + must.Len(t, 2, actionsResp, must.Sprint("expected 2 actions")) + + must.Eq(t, "date test", actionsResp[0].Name) + + must.Eq(t, "echo test", actionsResp[1].Name) + + // Both have Args lists length of 1 + must.Len(t, 1, actionsResp[0].Args, must.Sprint("expected 1 arg")) + must.Len(t, 1, actionsResp[1].Args, must.Sprint("expected 1 arg")) + + // Both pull the name of their task/group up with them + must.Eq(t, "web", actionsResp[0].TaskName) + must.Eq(t, "web", actionsResp[1].TaskName) + + // A job with no actions + job2 := mock.Job() + job2.TaskGroups[0].Tasks[0].Actions = nil + regReq2 := structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var regResp2 structs.JobRegisterResponse + must.NoError(t, s.Agent.RPC("Job.Register", ®Req2, ®Resp2)) + + // Make the HTTP request to get job actions + req2, err := http.NewRequest("GET", "/v1/job/"+job2.ID+"/actions", nil) + must.NoError(t, err) + + respW2 := httptest.NewRecorder() + + obj2, err := s.Server.JobSpecificRequest(respW2, req2) + must.NoError(t, err) + + // Check the output + actionsResp2 := obj2.([]*structs.JobAction) + must.Len(t, 0, actionsResp2, must.Sprint("no actions received")) + + // Construct a new job with 2 taskgroups + job3 := mock.ActionsJob() + + regReq3 := structs.JobRegisterRequest{ + Job: job3, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var regResp3 structs.JobRegisterResponse + must.NoError(t, s.Agent.RPC("Job.Register", ®Req3, ®Resp3)) + + // Make the HTTP request to get job actions + req3, err := http.NewRequest("GET", "/v1/job/"+job3.ID+"/actions", nil) + must.NoError(t, err) + + respW3 := httptest.NewRecorder() + + obj3, err := s.Server.JobSpecificRequest(respW3, req3) + must.NoError(t, err) + + // Check the output + // 3 task groups: g, g1, g2 + // g has 3 tasks: t, t1, t2 + // g1 has 1 task: t + // g2 has 1 task: t + // All tasks have 2 actions: date test, echo test + // Total actions: 2 * (3 + 1 + 1) = 10 + actionsResp3 := obj3.([]*structs.JobAction) + + must.Len(t, 10, actionsResp3, must.Sprint("expected 10 actions")) + + // Five of the actions have a Name of date test, 5 have a Name of echo test + dateTestCount := 0 + echoTestCount := 0 + for _, action := range actionsResp3 { + if action.Name == "date test" { + dateTestCount++ + } else if action.Name == "echo test" { + echoTestCount++ + } + } + must.Eq(t, 5, dateTestCount) + must.Eq(t, 5, echoTestCount) + + // 3 actions have a TaskGroupName of g + groupCount := 0 + for _, action := range actionsResp3 { + if action.TaskGroupName == "g" { + groupCount++ + } + } + must.Eq(t, 6, groupCount) + + }) +} + func TestHTTP_JobForceEvaluate(t *testing.T) { ci.Parallel(t) httpTest(t, nil, func(s *TestAgent) { diff --git a/command/agent/testingutils_test.go b/command/agent/testingutils_test.go index 22a3f234f47..51787cb1d08 100644 --- a/command/agent/testingutils_test.go +++ b/command/agent/testingutils_test.go @@ -78,6 +78,19 @@ func MockJob() *api.Job { PortLabel: "admin", }, }, + // actions + Actions: []*api.Action{ + { + Name: "date test", + Command: "/bin/date", + Args: []string{"-u"}, + }, + { + Name: "echo test", + Command: "/bin/echo", + Args: []string{"hello world"}, + }, + }, LogConfig: api.DefaultLogConfig(), Resources: &api.Resources{ CPU: pointer.Of(500), diff --git a/command/alloc_exec.go b/command/alloc_exec.go index af1cf0d03f0..888f513039c 100644 --- a/command/alloc_exec.go +++ b/command/alloc_exec.go @@ -168,7 +168,7 @@ func (l *AllocExecCommand) Run(args []string) int { return 1 } - allocStub, err = getRandomJobAlloc(client, jobID, ns) + allocStub, err = getRandomJobAlloc(client, jobID, "", ns) if err != nil { l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) return 1 diff --git a/command/alloc_fs.go b/command/alloc_fs.go index 82d86411cff..60d99e1daa6 100644 --- a/command/alloc_fs.go +++ b/command/alloc_fs.go @@ -173,7 +173,7 @@ func (f *AllocFSCommand) Run(args []string) int { return 1 } - allocID, err = getRandomJobAllocID(client, jobID, ns) + allocID, err = getRandomJobAllocID(client, jobID, "", ns) if err != nil { f.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) return 1 @@ -387,7 +387,7 @@ func (f *AllocFSCommand) followFile(client *api.Client, alloc *api.Allocation, // Get Random Allocation from a known jobID. Prefer to use a running allocation, // but use a dead allocation if no running allocations are found -func getRandomJobAlloc(client *api.Client, jobID, namespace string) (*api.AllocationListStub, error) { +func getRandomJobAlloc(client *api.Client, jobID, taskGroupName, namespace string) (*api.AllocationListStub, error) { var runningAllocs []*api.AllocationListStub q := &api.QueryOptions{ Namespace: namespace, @@ -403,6 +403,19 @@ func getRandomJobAlloc(client *api.Client, jobID, namespace string) (*api.Alloca return nil, fmt.Errorf("job %q doesn't exist or it has no allocations", jobID) } + if taskGroupName != "" { + var filteredAllocs []*api.AllocationListStub + for _, alloc := range allocs { + if alloc.TaskGroup == taskGroupName { + filteredAllocs = append(filteredAllocs, alloc) + } + } + allocs = filteredAllocs + if len(allocs) == 0 { + return nil, fmt.Errorf("task group %q doesn't exist or it has no allocations", taskGroupName) + } + } + for _, v := range allocs { if v.ClientStatus == "running" { runningAllocs = append(runningAllocs, v) @@ -419,8 +432,8 @@ func getRandomJobAlloc(client *api.Client, jobID, namespace string) (*api.Alloca return alloc, err } -func getRandomJobAllocID(client *api.Client, jobID, namespace string) (string, error) { - alloc, err := getRandomJobAlloc(client, jobID, namespace) +func getRandomJobAllocID(client *api.Client, jobID, group, namespace string) (string, error) { + alloc, err := getRandomJobAlloc(client, jobID, group, namespace) if err != nil { return "", err } diff --git a/command/alloc_logs.go b/command/alloc_logs.go index fb430a753ca..3d9c2fc6962 100644 --- a/command/alloc_logs.go +++ b/command/alloc_logs.go @@ -173,7 +173,7 @@ func (l *AllocLogsCommand) Run(args []string) int { return 1 } - allocID, err = getRandomJobAllocID(client, jobID, ns) + allocID, err = getRandomJobAllocID(client, jobID, "", ns) if err != nil { l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) return 1 diff --git a/command/commands.go b/command/commands.go index d665c80e5e8..093c6beebe1 100644 --- a/command/commands.go +++ b/command/commands.go @@ -235,6 +235,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "action": func() (cli.Command, error) { + return &ActionCommand{ + Meta: meta, + }, nil + }, "alloc": func() (cli.Command, error) { return &AllocCommand{ Meta: meta, diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index 5d596738d94..08788d8ebdd 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -46,6 +46,7 @@ var ( "kind", "volume_mount", "csi_plugin", + "actions", ) sidecarTaskKeys = append(commonTaskKeys, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 6773d5b9aeb..edeaa04d4b6 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1720,6 +1720,62 @@ func (j *Job) LatestDeployment(args *structs.JobSpecificRequest, return j.srv.blockingRPC(&opts) } +// jobActions is used to parse through a job's taskgroups' tasks and aggregate their actions, flattened +func (j *Job) GetActions(args *structs.JobSpecificRequest, reply *structs.ActionListResponse) error { + // authenticate, measure, and forward + authErr := j.srv.Authenticate(j.ctx, args) + if done, err := j.srv.forward("Job.GetActions", args, args, reply); done { + return err + } + j.srv.MeasureRPCRate("job", structs.RateMetricRead, args) + if authErr != nil { + return structs.ErrPermissionDenied + } + defer metrics.MeasureSince([]string{"nomad", "job", "get_actions"}, time.Now()) + + // Check for read-job permissions + if aclObj, err := j.srv.ResolveACL(args); err != nil { + return err + } else if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + + // Validate the arguments + if args.JobID == "" { + return fmt.Errorf("JobID required for actions") + } + + // Grab the job + job, err := j.srv.fsm.State().JobByID(nil, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + if job == nil { + return structs.NewErrUnknownJob(args.JobID) + } + + // Get its task groups' tasks' actions + jobActions := make([]*structs.JobAction, 0) + for _, tg := range job.TaskGroups { + for _, task := range tg.Tasks { + for _, action := range task.Actions { + jobAction := &structs.JobAction{ + Action: *action, + TaskName: task.Name, + TaskGroupName: tg.Name, + } + jobActions = append(jobActions, jobAction) + } + } + } + + reply.Actions = jobActions + + j.srv.setQueryMeta(&reply.QueryMeta) + + return nil +} + // Plan is used to cause a dry-run evaluation of the Job and return the results // with a potential diff containing annotations. func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error { diff --git a/nomad/mock/job.go b/nomad/mock/job.go index e708f1bb941..e2e3c1b699a 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -76,6 +76,18 @@ func Job() *structs.Job { Env: map[string]string{ "FOO": "bar", }, + Actions: []*structs.Action{ + { + Name: "date test", + Command: "/bin/date", + Args: []string{"-u"}, + }, + { + Name: "echo test", + Command: "/bin/echo", + Args: []string{"hello world"}, + }, + }, Services: []*structs.Service{ { Name: "${TASK}-frontend", @@ -693,3 +705,38 @@ func BigBenchmarkJob() *structs.Job { return job } + +// A multi-group, multi-task job with actions testing. +func ActionsJob() *structs.Job { + job := MinJob() + + for i := 0; i < 2; i++ { + tg := job.TaskGroups[0].Copy() + tg.Name = fmt.Sprintf("g%d", i+1) + job.TaskGroups = append(job.TaskGroups, tg) + } + + for i := 0; i < 2; i++ { + task := job.TaskGroups[0].Tasks[0].Copy() + task.Name = fmt.Sprintf("t%d", i+1) + job.TaskGroups[0].Tasks = append(job.TaskGroups[0].Tasks, task) + } + + for _, tg := range job.TaskGroups { + for _, task := range tg.Tasks { + task.Actions = []*structs.Action{ + { + Name: "date test", + Command: "/bin/date", + Args: []string{"-u"}, + }, + { + Name: "echo test", + Command: "/bin/echo", + Args: []string{"hello world"}, + }, + } + } + } + return job +} diff --git a/nomad/structs/actions.go b/nomad/structs/actions.go new file mode 100644 index 00000000000..5eecc20de44 --- /dev/null +++ b/nomad/structs/actions.go @@ -0,0 +1,49 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +// Actions are executable commands that can be run on an allocation within +// the context of a task. They are left open-ended enough to be applied to +// other Nomad concepts like Nodes in the future. + +package structs + +import "slices" + +type Action struct { + Name string + Command string + Args []string +} + +type JobAction struct { + Action + TaskName string + TaskGroupName string +} + +type ActionListResponse struct { + Actions []*JobAction + QueryMeta +} + +func (a *Action) Copy() *Action { + if a == nil { + return nil + } + na := new(Action) + *na = *a + na.Args = slices.Clone(a.Args) + return na +} + +func (a *Action) Equal(o *Action) bool { + if a == o { + return true + } + if a == nil || o == nil { + return false + } + return a.Name == o.Name && + a.Command == o.Command && + slices.Equal(a.Args, o.Args) +} diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 4c688483a67..46987245a12 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -571,9 +571,76 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) { diff.Objects = append(diff.Objects, altIDDiffs...) } + // Actions diff + if aDiffs := actionDiffs(t.Actions, other.Actions, contextual); aDiffs != nil { + diff.Objects = append(diff.Objects, aDiffs...) + } + return diff, nil } +func actionDiff(old, new *Action, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Action"} + var oldPrimitiveFlat, newPrimitiveFlat map[string]string + + if reflect.DeepEqual(old, new) { + return nil + } else if old == nil { + old = &Action{} + diff.Type = DiffTypeAdded + newPrimitiveFlat = flatmap.Flatten(new, nil, true) + } else if new == nil { + new = &Action{} + diff.Type = DiffTypeDeleted + oldPrimitiveFlat = flatmap.Flatten(old, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimitiveFlat = flatmap.Flatten(old, nil, true) + newPrimitiveFlat = flatmap.Flatten(new, nil, true) + } + + // Diff the primitive fields + diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // Diff the Args field using stringSetDiff + if setDiff := stringSetDiff(old.Args, new.Args, "Args", contextual); setDiff != nil { + diff.Objects = append(diff.Objects, setDiff) + } + + return diff +} + +// actionDiffs diffs a set of actions. If contextual diff is enabled, unchanged +// fields within objects nested in the actions will be returned. +func actionDiffs(old, new []*Action, contextual bool) []*ObjectDiff { + var diffs []*ObjectDiff + + for i := 0; i < len(old) && i < len(new); i++ { + oldAction := old[i] + newAction := new[i] + + if diff := actionDiff(oldAction, newAction, contextual); diff != nil { + diffs = append(diffs, diff) + } + } + + for i := len(new); i < len(old); i++ { + if diff := actionDiff(old[i], nil, contextual); diff != nil { + diffs = append(diffs, diff) + } + } + + for i := len(old); i < len(new); i++ { + if diff := actionDiff(nil, new[i], contextual); diff != nil { + diffs = append(diffs, diff) + } + } + + sort.Sort(ObjectDiffs(diffs)) + + return diffs +} + func (t *TaskDiff) GoString() string { var out string if len(t.Annotations) == 0 { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index eb256c10e8a..c9caa4a4887 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -8673,6 +8673,195 @@ func TestTaskDiff(t *testing.T) { }, }, }, + { + Name: "Actions added", + Old: &Task{}, + New: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "echo", + Args: []string{"bar"}, + }, + }, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "Action", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Command", + Old: "", + New: "echo", + }, + { + Type: DiffTypeAdded, + Name: "Name", + Old: "", + New: "foo", + }, + }, + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "Args", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Args", + Old: "", + New: "bar", + }, + }, + }, + }, + }, + }, + }, + }, + { + Name: "Actions removed", + Old: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "echo", + Args: []string{"bar"}, + }, + }, + }, + New: &Task{}, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeDeleted, + Name: "Action", + Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "Command", + Old: "echo", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "Name", + Old: "foo", + New: "", + }, + }, + Objects: []*ObjectDiff{ + { + Type: DiffTypeDeleted, + Name: "Args", + Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "Args", + Old: "bar", + New: "", + }, + }, + }, + }, + }, + }, + }, + }, + { + Name: "Actions edited", + Old: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "bar", + Args: []string{"hello world"}, + }, + }, + }, + New: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "baz", + Args: []string{"hello world"}, + }, + }, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Action", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "Command", + Old: "bar", + New: "baz", + }, + }, + }, + }, + }, + }, + { + Name: "Action Args edited", + Old: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "echo", + // Multiple strings of "foo" and "bar" + Args: []string{"bar"}, + }, + }, + }, + New: &Task{ + Actions: []*Action{ + { + Name: "foo", + Command: "echo", + Args: []string{"baz"}, + }, + }, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Action", + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Args", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Args", + Old: "", + New: "baz", + }, + { + Type: DiffTypeDeleted, + Name: "Args", + Old: "bar", + New: "", + }, + }, + }, + }, + }, + }, + }, + }, } for _, c := range cases { diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 25333fc800e..85793cb5ecd 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -338,6 +338,19 @@ func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta { return c } +func CopySliceActions(s []*Action) []*Action { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*Action, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 55dff8331a1..a756c6ec3ef 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7571,6 +7571,9 @@ type Task struct { // Identities are the alternate workload identities for use with 3rd party // endpoints. Identities []*WorkloadIdentity + + // Alloc-exec-like runnable commands + Actions []*Action } func (t *Task) UsesCores() bool { @@ -7657,6 +7660,7 @@ func (t *Task) Copy() *Task { nt.Lifecycle = nt.Lifecycle.Copy() nt.Identity = nt.Identity.Copy() nt.Identities = helper.CopySlice(nt.Identities) + nt.Actions = CopySliceActions(nt.Actions) if t.Artifacts != nil { artifacts := make([]*TaskArtifact, 0, len(t.Artifacts)) diff --git a/ui/app/models/action.js b/ui/app/models/action.js new file mode 100644 index 00000000000..af51a1ee056 --- /dev/null +++ b/ui/app/models/action.js @@ -0,0 +1,15 @@ +/** + * Copyright (c) HashiCorp, Inc. + * SPDX-License-Identifier: BUSL-1.1 + */ + +import { attr } from '@ember-data/model'; +import { fragmentOwner } from 'ember-data-model-fragments/attributes'; +import Fragment from 'ember-data-model-fragments/fragment'; + +export default class ActionModel extends Fragment { + @attr('string') name; + @attr('string') command; + @attr() args; + @fragmentOwner() task; +} diff --git a/ui/app/models/job.js b/ui/app/models/job.js index db9daedab78..f017fa871a8 100644 --- a/ui/app/models/job.js +++ b/ui/app/models/job.js @@ -155,6 +155,18 @@ export default class Job extends Model { @hasMany('recommendation-summary') recommendationSummaries; + get actions() { + return this.taskGroups.reduce((acc, taskGroup) => { + return acc.concat( + taskGroup.tasks + .map((task) => { + return task.get('actions').toArray(); + }) + .reduce((taskAcc, taskActions) => taskAcc.concat(taskActions), []) + ); + }, []); + } + @computed('taskGroups.@each.drivers') get drivers() { return this.taskGroups diff --git a/ui/app/models/task.js b/ui/app/models/task.js index d7b5df828c0..2651e1dab97 100644 --- a/ui/app/models/task.js +++ b/ui/app/models/task.js @@ -19,6 +19,8 @@ export default class Task extends Fragment { @attr('string') driver; @attr('string') kind; + @fragmentArray('action') actions; + @attr() meta; @computed('taskGroup.mergedMeta', 'meta') diff --git a/ui/app/services/sockets.js b/ui/app/services/sockets.js index 3fcdd746346..939d2bb9507 100644 --- a/ui/app/services/sockets.js +++ b/ui/app/services/sockets.js @@ -3,6 +3,7 @@ * SPDX-License-Identifier: BUSL-1.1 */ +// @ts-check import Service from '@ember/service'; import config from 'nomad-ui/config/environment'; import { getOwner } from '@ember/application'; diff --git a/ui/tests/unit/models/job-test.js b/ui/tests/unit/models/job-test.js index 9e7d82e2308..7f61eca619c 100644 --- a/ui/tests/unit/models/job-test.js +++ b/ui/tests/unit/models/job-test.js @@ -139,6 +139,121 @@ module('Unit | Model | job', function (hooks) { ); }); + test('actions are aggregated from taskgroups tasks', function (assert) { + const job = run(() => + this.owner.lookup('service:store').createRecord('job', { + name: 'example', + taskGroups: [ + { + name: 'one', + count: 0, + tasks: [ + { + name: '1.1', + actions: [ + { + name: 'one', + command: 'date', + args: ['+%s'], + }, + { + name: 'two', + command: 'sh', + args: ['-c "echo hello"'], + }, + ], + }, + ], + }, + { + name: 'two', + count: 0, + tasks: [ + { + name: '2.1', + }, + ], + }, + { + name: 'three', + count: 0, + tasks: [ + { + name: '3.1', + actions: [ + { + name: 'one', + command: 'date', + args: ['+%s'], + }, + ], + }, + { + name: '3.2', + actions: [ + { + name: 'one', + command: 'date', + args: ['+%s'], + }, + ], + }, + ], + }, + ], + }) + ); + + assert.equal( + job.get('actions.length'), + 4, + 'Job draws actions from its task groups tasks' + ); + + // Three actions named one, one named two + assert.equal( + job.get('actions').filterBy('name', 'one').length, + 3, + 'Job has three actions named one' + ); + assert.equal( + job.get('actions').filterBy('name', 'two').length, + 1, + 'Job has one action named two' + ); + + // Job's actions mapped by task.name return 1.1, 1.1, 3.1, 3.2 + assert.equal( + job.get('actions').mapBy('task.name').length, + 4, + 'Job action fragments surface their task properties' + ); + assert.equal( + job + .get('actions') + .mapBy('task.name') + .filter((name) => name === '1.1').length, + 2, + 'Two of the job actions are from task 1.1' + ); + assert.equal( + job + .get('actions') + .mapBy('task.name') + .filter((name) => name === '3.1').length, + 1, + 'One of the job actions is from task 3.1' + ); + assert.equal( + job + .get('actions') + .mapBy('task.name') + .filter((name) => name === '3.2').length, + 1, + 'One of the job actions is from task 3.2' + ); + }); + module('#parse', function () { test('it parses JSON', async function (assert) { const store = this.owner.lookup('service:store');