From 39718f8beac042659a5921f4dd45b4435108b848 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 7 Apr 2022 06:19:04 -0700 Subject: [PATCH] Always enqueue the aggregated task in the same queue --- aggregator.go | 2 +- server.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/aggregator.go b/aggregator.go index 7340f3cf..9f8da70a 100644 --- a/aggregator.go +++ b/aggregator.go @@ -160,7 +160,7 @@ func (a *aggregator) aggregate(t time.Time) { } aggregatedTask := a.ga.Aggregate(gname, tasks) ctx, cancel := context.WithDeadline(context.Background(), deadline) - if _, err := a.client.EnqueueContext(ctx, aggregatedTask); err != nil { + if _, err := a.client.EnqueueContext(ctx, aggregatedTask, Queue(qname)); err != nil { a.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v", qname, gname, aggregationSetID, err) cancel() diff --git a/server.go b/server.go index 945a7c37..bbe5d1f6 100644 --- a/server.go +++ b/server.go @@ -224,8 +224,12 @@ type Config struct { // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. type GroupAggregator interface { - // Aggregate aggregates the given tasks which belong to a same group + // Aggregate aggregates the given tasks which belong to a same group with the given groupKey // and returns a new task which is the aggregation of those tasks. + // + // Use NewTask(typename, payload, opts...) to set any options for the aggregated task. + // Queue option will be ignored and the aggregated task will always be enqueued to the same queue + // the group belonged. Aggregate(groupKey string, tasks []*Task) *Task }