Skip to content

Commit

Permalink
Update Client.Enqueue to return TaskInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jun 29, 2021
1 parent 09cbea6 commit b835090
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 173 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`.
- `Scheduler` API has changed. Renamed `Stop` to `Shutdown`.
- Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script)
- `Client.Enqueue` now returns `TaskInfo`
- Renamed pending key (TODO: need migration script

## [0.17.2] - 2021-06-06
Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,23 +177,23 @@ func main() {
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err := c.Enqueue(t)
info, err := c.Enqueue(t)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue())


// ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future.
// Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------

res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
info, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue())


// ----------------------------------------------------------------------------
Expand All @@ -207,22 +207,22 @@ func main() {
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err = c.Enqueue(t)
info, err = c.Enqueue(t)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue())

// ---------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time.
// Options passed at enqueue time override default ones.
// ---------------------------------------------------------------------------

res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
info, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue())
}
```

Expand Down
55 changes: 8 additions & 47 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,55 +254,21 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.opts[taskType] = opts
}

// A Result holds enqueued task's metadata.
type Result struct {
// ID is a unique identifier for the task.
ID string

// EnqueuedAt is the time the task was enqueued in UTC.
EnqueuedAt time.Time

// ProcessAt indicates when the task should be processed.
ProcessAt time.Time

// Retry is the maximum number of retry for the task.
Retry int

// Queue is a name of the queue the task is enqueued to.
Queue string

// Timeout is the timeout value for the task.
// Counting for timeout starts when a worker starts processing the task.
// If task processing doesn't complete within the timeout, the task will be retried.
// The value zero means no timeout.
//
// If deadline is set, min(now+timeout, deadline) is used, where the now is the time when
// a worker starts processing the task.
Timeout time.Duration

// Deadline is the deadline value for the task.
// If task processing doesn't complete before the deadline, the task will be retried.
// The value time.Unix(0, 0) means no deadline.
//
// If timeout is set, min(now+timeout, deadline) is used, where the now is the time when
// a worker starts processing the task.
Deadline time.Time
}

// Close closes the connection with redis.
func (c *Client) Close() error {
return c.rdb.Close()
}

// Enqueue enqueues the given task to be processed asynchronously.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
// Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
// If no ProcessAt or ProcessIn options are passed, the task will be processed immediately.
func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
c.mu.Lock()
if defaults, ok := c.opts[task.Type()]; ok {
opts = append(defaults, opts...)
Expand Down Expand Up @@ -339,27 +305,22 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
UniqueKey: uniqueKey,
}
now := time.Now()
var state base.TaskState
if opt.processAt.Before(now) || opt.processAt.Equal(now) {
opt.processAt = now
err = c.enqueue(msg, opt.uniqueTTL)
state = base.TaskStatePending
} else {
err = c.schedule(msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled
}
switch {
case errors.Is(err, errors.ErrDuplicateTask):
return nil, fmt.Errorf("%w", ErrDuplicateTask)
case err != nil:
return nil, err
}
return &Result{
ID: msg.ID.String(),
EnqueuedAt: time.Now().UTC(),
ProcessAt: opt.processAt,
Queue: msg.Queue,
Retry: msg.Retry,
Timeout: timeout,
Deadline: deadline,
}, nil
return &TaskInfo{msg, state, opt.processAt}, nil
}

func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
Expand Down
Loading

0 comments on commit b835090

Please sign in to comment.