Skip to content

Commit

Permalink
Add task enqueue command to cli (#918)
Browse files Browse the repository at this point in the history
  • Loading branch information
mboorstin authored Oct 26, 2024
1 parent 1e102a5 commit 013190b
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
40 changes: 40 additions & 0 deletions tools/asynq/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"
"text/tabwriter"
"time"
"unicode"
"unicode/utf8"

Expand Down Expand Up @@ -369,6 +370,11 @@ func createRDB() *rdb.RDB {
return rdb.NewRDB(c)
}

// createClient creates a Client instance using flag values and returns it.
func createClient() *asynq.Client {
return asynq.NewClient(getRedisConnOpt())
}

// createInspector creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector {
return asynq.NewInspector(getRedisConnOpt())
Expand Down Expand Up @@ -456,3 +462,37 @@ func isPrintable(data []byte) bool {
}
return !isAllSpace
}

// Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) time.Duration {
durationStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

return duration
}

// Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) time.Time {
timeStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

return timeVal
}
118 changes: 118 additions & 0 deletions tools/asynq/cmd/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ func init() {
taskRunCmd.MarkFlagRequired("queue")
taskRunCmd.MarkFlagRequired("id")

taskCmd.AddCommand(taskEnqueueCmd)
taskEnqueueCmd.Flags().StringP("type_name", "t", "", "type name to enqueue the task as (required)")
taskEnqueueCmd.Flags().StringP("payload", "l", "", "payload to enqueue (required)")
// The following are the various OptionTypes; if not specified we won't pass them so that composeOptions()
// can apply its own defaults
taskEnqueueCmd.Flags().Int("retry", 0, "maximum retries")
taskEnqueueCmd.Flags().String("queue", "", "queue to enqueue the task to")
taskEnqueueCmd.Flags().String("id", "", "id to enqueue the task as")
taskEnqueueCmd.Flags().String("timeout", "", "timeout for the task (how long it can run); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("deadline", "", "deadline for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("unique", "", "unique period for the task (duration within which it is guaranteed to be unique); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("process_at", "", "process at time for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("process_in", "", "process in window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("retention", "", "retention window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("group", "", "group for the task")
taskEnqueueCmd.MarkFlagRequired("type_name")
taskEnqueueCmd.MarkFlagRequired("payload")

taskCmd.AddCommand(taskArchiveAllCmd)
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)")
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)")
Expand Down Expand Up @@ -151,6 +169,16 @@ var taskRunCmd = &cobra.Command{
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}

var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task",
Args: cobra.NoArgs,
Run: taskEnqueue,
Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
}

var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state",
Expand Down Expand Up @@ -521,6 +549,95 @@ func taskRun(cmd *cobra.Command, args []string) {
fmt.Println("task is now pending")
}

func taskEnqueue(cmd *cobra.Command, args []string) {
typeName, err := cmd.Flags().GetString("type_name")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

payload, err := cmd.Flags().GetString("payload")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

// For all of the optional flags, we need to explicitly check whether they were set or
// not; for consistency we want to use the defaults set in composeOptions() rather than
// the ones in the flag definitions.
opts := []asynq.Option{}
if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.MaxRetry(retry))
}

if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Queue(queue))
}

if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.TaskID(id))
}

if cmd.Flags().Changed("timeout") {
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
}

if cmd.Flags().Changed("deadline") {
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
}

if cmd.Flags().Changed("unique") {
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
}

if cmd.Flags().Changed("process_at") {
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
}

if cmd.Flags().Changed("process_in") {
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
}

if cmd.Flags().Changed("retention") {
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
}

if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Group(group))
}

c := createClient()
task := asynq.NewTask(typeName, []byte(payload), opts...)

taskInfo, err := c.Enqueue(task)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
}

func taskArchiveAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
Expand Down Expand Up @@ -653,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) {
}
fmt.Printf("%d tasks are now pending\n", n)
}

0 comments on commit 013190b

Please sign in to comment.