From 4f00f52c1d72a2f4ed3c0c5c642b365ae335905a Mon Sep 17 00:00:00 2001 From: Pior Bastida Date: Thu, 7 Nov 2024 06:34:28 +0100 Subject: [PATCH] Add the scheduler option HeartbeatInterval (#956) * Add the scheduler option HeartbeatInterval * Fix possible premature expiration of scheduler entries --- internal/rdb/rdb.go | 6 ++--- scheduler.go | 66 ++++++++++++++++++++++++++++----------------- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index e358b015..22df5060 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1437,7 +1437,7 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error { // KEYS[1] -> asynq:schedulers:{} // ARGV[1] -> TTL in seconds -// ARGV[2:] -> schedler entries +// ARGV[2:] -> scheduler entries var writeSchedulerEntriesCmd = redis.NewScript(` redis.call("DEL", KEYS[1]) for i = 2, #ARGV do @@ -1468,10 +1468,10 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule } // ClearSchedulerEntries deletes scheduler entries data from redis. -func (r *RDB) ClearSchedulerEntries(scheduelrID string) error { +func (r *RDB) ClearSchedulerEntries(schedulerID string) error { var op errors.Op = "rdb.ClearSchedulerEntries" ctx := context.Background() - key := base.SchedulerEntriesKey(scheduelrID) + key := base.SchedulerEntriesKey(schedulerID) if err := r.client.ZRem(ctx, base.AllSchedulers, key).Err(); err != nil { return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err}) } diff --git a/scheduler.go b/scheduler.go index 5e8781a0..ecb3c3c0 100644 --- a/scheduler.go +++ b/scheduler.go @@ -26,16 +26,17 @@ type Scheduler struct { state *serverState - logger *log.Logger - client *Client - rdb *rdb.RDB - cron *cron.Cron - location *time.Location - done chan struct{} - wg sync.WaitGroup - preEnqueueFunc func(task *Task, opts []Option) - postEnqueueFunc func(info *TaskInfo, err error) - errHandler func(task *Task, opts []Option, err error) + heartbeatInterval time.Duration + logger *log.Logger + client *Client + rdb *rdb.RDB + cron *cron.Cron + location *time.Location + done chan struct{} + wg sync.WaitGroup + preEnqueueFunc func(task *Task, opts []Option) + postEnqueueFunc func(info *TaskInfo, err error) + errHandler func(task *Task, opts []Option, err error) // guards idmap mu sync.Mutex @@ -48,6 +49,8 @@ type Scheduler struct { sharedConnection bool } +const defaultHeartbeatInterval = 10 * time.Second + // NewScheduler returns a new Scheduler instance given the redis connection option. // The parameter opts is optional, defaults will be used if opts is set to nil func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { @@ -68,6 +71,11 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * opts = &SchedulerOpts{} } + heartbeatInterval := opts.HeartbeatInterval + if heartbeatInterval <= 0 { + heartbeatInterval = defaultHeartbeatInterval + } + logger := log.NewLogger(opts.Logger) loglevel := opts.LogLevel if loglevel == level_unspecified { @@ -81,18 +89,19 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * } return &Scheduler{ - id: generateSchedulerID(), - state: &serverState{value: srvStateNew}, - logger: logger, - client: NewClientFromRedisClient(c), - rdb: rdb.NewRDB(c), - cron: cron.New(cron.WithLocation(loc)), - location: loc, - done: make(chan struct{}), - preEnqueueFunc: opts.PreEnqueueFunc, - postEnqueueFunc: opts.PostEnqueueFunc, - errHandler: opts.EnqueueErrorHandler, - idmap: make(map[string]cron.EntryID), + id: generateSchedulerID(), + state: &serverState{value: srvStateNew}, + heartbeatInterval: heartbeatInterval, + logger: logger, + client: NewClientFromRedisClient(c), + rdb: rdb.NewRDB(c), + cron: cron.New(cron.WithLocation(loc)), + location: loc, + done: make(chan struct{}), + preEnqueueFunc: opts.PreEnqueueFunc, + postEnqueueFunc: opts.PostEnqueueFunc, + errHandler: opts.EnqueueErrorHandler, + idmap: make(map[string]cron.EntryID), } } @@ -106,6 +115,15 @@ func generateSchedulerID() string { // SchedulerOpts specifies scheduler options. type SchedulerOpts struct { + // HeartbeatInterval specifies the interval between scheduler heartbeats. + // + // If unset, zero or a negative value, the interval is set to 10 second. + // + // Note: Setting this value too low may add significant load to redis. + // + // By default, HeartbeatInterval is set to 10 seconds. + HeartbeatInterval time.Duration + // Logger specifies the logger used by the scheduler instance. // // If unset, the default logger is used. @@ -284,7 +302,7 @@ func (s *Scheduler) Shutdown() { func (s *Scheduler) runHeartbeater() { defer s.wg.Done() - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(s.heartbeatInterval) for { select { case <-s.done: @@ -317,7 +335,7 @@ func (s *Scheduler) beat() { entries = append(entries, e) } s.logger.Debugf("Writing entries %v", entries) - if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil { + if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil { s.logger.Warnf("Scheduler could not write heartbeat data: %v", err) } }