Skip to content

Commit

Permalink
New endpoint: job/:id/actions (#18690)
Browse files Browse the repository at this point in the history
* unused param in action converter

* backing out of parse_job level and moved toward new endpoint level

* Adds taskName and taskGroupName to actions at job level

* Unmodified job mock actions tests

* actionless job test

* actionless job test

* Multi group multi task actions test

* HTTP method check for GET, cleaner errors in job_endpoint_test

* decomment
  • Loading branch information
philrenaud committed Oct 18, 2023
1 parent aeaa428 commit d6c350a
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 0 deletions.
25 changes: 25 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/submission"):
jobID := strings.TrimSuffix(path, "/submission")
return s.jobSubmissionCRUD(resp, req, jobID)
case strings.HasSuffix(path, "/actions"):
jobID := strings.TrimSuffix(path, "/actions")
return s.jobActions(resp, req, jobID)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -333,6 +336,28 @@ func (s *HTTPServer) jobLatestDeployment(resp http.ResponseWriter, req *http.Req
return out.Deployment, nil
}

func (s *HTTPServer) jobActions(resp http.ResponseWriter, req *http.Request, jobID string) (any, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}

args := structs.JobSpecificRequest{
JobID: jobID,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.ActionListResponse
if err := s.agent.RPC("Job.GetActions", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &structs.QueryMeta{})

return out.Actions, nil
}

func (s *HTTPServer) jobSubmissionCRUD(resp http.ResponseWriter, req *http.Request, jobID string) (*structs.JobSubmission, error) {
version, err := strconv.ParseUint(req.URL.Query().Get("version"), 10, 64)
if err != nil {
Expand Down
125 changes: 125 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,131 @@ func TestHTTP_Job_ScaleStatus(t *testing.T) {
})
}

func TestHTTP_JobActions(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
job := mock.Job()

regReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var regResp structs.JobRegisterResponse
must.NoError(t, s.Agent.RPC("Job.Register", &regReq, &regResp))

// Make the HTTP request to get job actions
req, err := http.NewRequest("GET", "/v1/job/"+job.ID+"/actions", nil)
must.NoError(t, err)
respW := httptest.NewRecorder()

obj, err := s.Server.JobSpecificRequest(respW, req)
must.NoError(t, err)

// Check the output
actionsResp := obj.([]*structs.JobAction)

// Two actions by default, both in Task web and Group web
must.Len(t, 2, actionsResp, must.Sprint("expected 2 actions"))

must.Eq(t, "date test", actionsResp[0].Name)

must.Eq(t, "echo test", actionsResp[1].Name)

// Both have Args lists length of 1
must.Len(t, 1, actionsResp[0].Args, must.Sprint("expected 1 arg"))
must.Len(t, 1, actionsResp[1].Args, must.Sprint("expected 1 arg"))

// Both pull the name of their task/group up with them
must.Eq(t, "web", actionsResp[0].TaskName)
must.Eq(t, "web", actionsResp[1].TaskName)

// A job with no actions
job2 := mock.Job()
job2.TaskGroups[0].Tasks[0].Actions = nil
regReq2 := structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var regResp2 structs.JobRegisterResponse
must.NoError(t, s.Agent.RPC("Job.Register", &regReq2, &regResp2))

// Make the HTTP request to get job actions
req2, err := http.NewRequest("GET", "/v1/job/"+job2.ID+"/actions", nil)
must.NoError(t, err)

respW2 := httptest.NewRecorder()

obj2, err := s.Server.JobSpecificRequest(respW2, req2)
must.NoError(t, err)

// Check the output
actionsResp2 := obj2.([]*structs.JobAction)
must.Len(t, 0, actionsResp2, must.Sprint("no actions received"))

// Construct a new job with 2 taskgroups
job3 := mock.ActionsJob()

regReq3 := structs.JobRegisterRequest{
Job: job3,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var regResp3 structs.JobRegisterResponse
must.NoError(t, s.Agent.RPC("Job.Register", &regReq3, &regResp3))

// Make the HTTP request to get job actions
req3, err := http.NewRequest("GET", "/v1/job/"+job3.ID+"/actions", nil)
must.NoError(t, err)

respW3 := httptest.NewRecorder()

obj3, err := s.Server.JobSpecificRequest(respW3, req3)
must.NoError(t, err)

// Check the output
// 3 task groups: g, g1, g2
// g has 3 tasks: t, t1, t2
// g1 has 1 task: t
// g2 has 1 task: t
// All tasks have 2 actions: date test, echo test
// Total actions: 2 * (3 + 1 + 1) = 10
actionsResp3 := obj3.([]*structs.JobAction)

must.Len(t, 10, actionsResp3, must.Sprint("expected 10 actions"))

// Five of the actions have a Name of date test, 5 have a Name of echo test
dateTestCount := 0
echoTestCount := 0
for _, action := range actionsResp3 {
if action.Name == "date test" {
dateTestCount++
} else if action.Name == "echo test" {
echoTestCount++
}
}
must.Eq(t, 5, dateTestCount)
must.Eq(t, 5, echoTestCount)

// 3 actions have a TaskGroupName of g
groupCount := 0
for _, action := range actionsResp3 {
if action.TaskGroupName == "g" {
groupCount++
}
}
must.Eq(t, 6, groupCount)

})
}

func TestHTTP_JobForceEvaluate(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
Expand Down
56 changes: 56 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,62 @@ func (j *Job) LatestDeployment(args *structs.JobSpecificRequest,
return j.srv.blockingRPC(&opts)
}

// jobActions is used to parse through a job's taskgroups' tasks and aggregate their actions, flattened
func (j *Job) GetActions(args *structs.JobSpecificRequest, reply *structs.ActionListResponse) error {
// authenticate, measure, and forward
authErr := j.srv.Authenticate(j.ctx, args)
if done, err := j.srv.forward("Job.GetActions", args, args, reply); done {
return err
}
j.srv.MeasureRPCRate("job", structs.RateMetricRead, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_actions"}, time.Now())

// Check for read-job permissions
if aclObj, err := j.srv.ResolveACL(args); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}

// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("JobID required for actions")
}

// Grab the job
job, err := j.srv.fsm.State().JobByID(nil, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
if job == nil {
return structs.NewErrUnknownJob(args.JobID)
}

// Get its task groups' tasks' actions
jobActions := make([]*structs.JobAction, 0)
for _, tg := range job.TaskGroups {
for _, task := range tg.Tasks {
for _, action := range task.Actions {
jobAction := &structs.JobAction{
Action: *action,
TaskName: task.Name,
TaskGroupName: tg.Name,
}
jobActions = append(jobActions, jobAction)
}
}
}

reply.Actions = jobActions

j.srv.setQueryMeta(&reply.QueryMeta)

return nil
}

// Plan is used to cause a dry-run evaluation of the Job and return the results
// with a potential diff containing annotations.
func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error {
Expand Down
35 changes: 35 additions & 0 deletions nomad/mock/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,3 +705,38 @@ func BigBenchmarkJob() *structs.Job {

return job
}

// A multi-group, multi-task job with actions testing.
func ActionsJob() *structs.Job {
job := MinJob()

for i := 0; i < 2; i++ {
tg := job.TaskGroups[0].Copy()
tg.Name = fmt.Sprintf("g%d", i+1)
job.TaskGroups = append(job.TaskGroups, tg)
}

for i := 0; i < 2; i++ {
task := job.TaskGroups[0].Tasks[0].Copy()
task.Name = fmt.Sprintf("t%d", i+1)
job.TaskGroups[0].Tasks = append(job.TaskGroups[0].Tasks, task)
}

for _, tg := range job.TaskGroups {
for _, task := range tg.Tasks {
task.Actions = []*structs.Action{
{
Name: "date test",
Command: "/bin/date",
Args: []string{"-u"},
},
{
Name: "echo test",
Command: "/bin/echo",
Args: []string{"hello world"},
},
}
}
}
return job
}
11 changes: 11 additions & 0 deletions nomad/structs/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ type Action struct {
Args []string
}

type JobAction struct {
Action
TaskName string
TaskGroupName string
}

type ActionListResponse struct {
Actions []*JobAction
QueryMeta
}

func (a *Action) Copy() *Action {
if a == nil {
return nil
Expand Down

0 comments on commit d6c350a

Please sign in to comment.