-
-
Notifications
You must be signed in to change notification settings - Fork 749
Unique Tasks
The unique tasks feature in Asynq make it simple to ensure that you have only one copy of a task enqueued in Redis.
This feature is useful when you want to deduplicate tasks to ensure that you are not creating redundant work.
Asynq's unique task feature is based on uniqueness locks. When enqueueing a task with Unique
option, Client
checks whether if it can acquire a lock for the given task. The task is enqueued only if the lock can be acquired. If there's already another task holding the lock, then the Client
will return an error (See example code below on how to inspect the error).
The uniqueness lock has a TTL associated with it to avoid holding the lock forever.
The lock will be released after the TTL or if the task holding the lock gets processed successfully before the TTL.
One important thing to note is that the Asynq's unique task feature is best-effort uniqueness. In other words, it's possible to enqueue a duplicate task if the lock has expired before the task gets processed.
The uniqueness of a task is based on the following properties:
- Type
- Payload
- Queue
So if there's a task with the same type and payload, and if it's enqueued to the same queue, then another task with these same properties won't be enqueued until the lock has been released.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", map[string]interface{}{"a": 123})
// t1 will hold the uniqueness lock for the next hour.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// handle duplicate task
case err != nil:
// handle other errors
}
t2 := asynq.NewTask("example", map[string]interface{}{"a": 123})
// t2 cannot be enqueued because it's a duplicate of t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// handle duplicate task
case err != nil:
// handle other errors
}
In the above example, t2
won't be enqueued since t2
is a duplicate of t1
.
Returned error
value can be inspected using errors.Is
to see if it wraps asynq.ErrDuplicateTask
error.