From 7fd8111ebc65fc3141768c04a70d61decab1f36f Mon Sep 17 00:00:00 2001 From: hungtcs Date: Thu, 18 Apr 2024 10:04:29 +0800 Subject: [PATCH] feat: add finished handler to server config --- processor.go | 11 +++++++++++ server.go | 22 +++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/processor.go b/processor.go index 0ba9890dd..a74768c3b 100644 --- a/processor.go +++ b/processor.go @@ -44,6 +44,8 @@ type processor struct { errHandler ErrorHandler shutdownTimeout time.Duration + finishedHandler FinishedHandler + // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest @@ -85,6 +87,7 @@ type processorParams struct { queues map[string]int strictPriority bool errHandler ErrorHandler + finishedHandler FinishedHandler shutdownTimeout time.Duration starting chan<- *workerInfo finished chan<- *base.TaskMessage @@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor { quit: make(chan struct{}), abort: make(chan struct{}), errHandler: params.errHandler, + finishedHandler: params.finishedHandler, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), shutdownTimeout: params.shutdownTimeout, starting: params.starting, @@ -276,6 +280,9 @@ func (p *processor) handleSucceededMessage(l *base.Lease, msg *base.TaskMessage) } else { p.markAsDone(l, msg) } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } } func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) { @@ -380,6 +387,10 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) { deadline: l.Deadline(), } } + if p.finishedHandler != nil { + + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } } // queues returns a list of queues to query. diff --git a/server.go b/server.go index cc6f41830..e7ebd7f28 100644 --- a/server.go +++ b/server.go @@ -103,7 +103,7 @@ type Config struct { // If BaseContext is nil, the default is context.Background(). // If this is defined, then it MUST return a non-nil context BaseContext func() context.Context - + // TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty. // // If unset, zero or a negative value, the interval is set to 1 second. @@ -183,6 +183,11 @@ type Config struct { ErrorHandler ErrorHandler + // FinishedHandler handles a task that has been processed. + // + // FinishedHandler is called when the task status becomes completed or archived. + FinishedHandler FinishedHandler + // Logger specifies the logger used by the server instance. // // If unset, default logger is used. @@ -275,6 +280,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro fn(ctx, task, err) } +// An FinishedHandler handles a task that has been processed. +type FinishedHandler interface { + HandleFinished(task *TaskInfo) +} + +// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler. +// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f. +type FinishedHandlerFunc func(task *TaskInfo) + +// HandleFinished calls fn(ctx, task, err) +func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) { + fn(task) +} + // RetryDelayFunc calculates the retry delay duration for a failed task given // the retry count, error, and the task. // @@ -529,6 +548,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { queues: queues, strictPriority: cfg.StrictPriority, errHandler: cfg.ErrorHandler, + finishedHandler: cfg.FinishedHandler, shutdownTimeout: shutdownTimeout, starting: starting, finished: finished,