Skip to content

Commit

Permalink
Add batch actions to inspector for aggregating tasks
Browse files Browse the repository at this point in the history
Added:
- Inspector.DeleteAllAggregatingTasks
- Inspector.ArchiveAllAggregatingTasks
- Inspector.RunAllAggregatingTasks
  • Loading branch information
hibiken committed Apr 11, 2022
1 parent de139cc commit a369443
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,14 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) {
return int(n), err
}

// TODO: comment
// DeleteAllAggregatingTasks deletes all tasks from the specified group,
// and reports the number of tasks deleted.
func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) {
// TODO: implement this
return 0, nil
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname)
return int(n), err
}

// DeleteTask deletes a task with the given id from the given queue.
Expand All @@ -588,8 +592,8 @@ func (i *Inspector) DeleteTask(qname, id string) error {

}

// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
Expand All @@ -598,8 +602,8 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
return int(n), err
}

// RunAllRetryTasks transition all retry tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllRetryTasks schedules all retry tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
Expand All @@ -608,8 +612,8 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
return int(n), err
}

// RunAllArchivedTasks transition all archived tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllArchivedTasks schedules all archived tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
Expand All @@ -618,6 +622,16 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
return int(n), err
}

// RunAllAggregatingTasks schedules all tasks from the given grou to run.
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.RunAllAggregatingTasks(qname, gname)
return int(n), err
}

// RunTask updates the task to pending state given a queue name and task id.
// The task needs to be in scheduled, retry, or archived state, otherwise RunTask
// will return an error.
Expand Down Expand Up @@ -671,10 +685,14 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
return int(n), err
}

// TODO: comment
// ArchiveAllAggregatingTasks archives all tasks from the given group,
// and reports the number of tasks archived.
func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) {
// TODO: implement this
return 0, nil
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname)
return int(n), err
}

// ArchiveTask archives a task with the given id in the given queue.
Expand Down

0 comments on commit a369443

Please sign in to comment.