From 1e13014380a251e90447d500f29798e90af3f083 Mon Sep 17 00:00:00 2001 From: hungtcs Date: Fri, 19 Apr 2024 09:30:02 +0800 Subject: [PATCH] feat: add finished handler to server --- processor.go | 37 ++++++++++++++++++++++++++++++++++--- server.go | 22 +++++++++++++++++++++- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/processor.go b/processor.go index 0ba9890dd..80c8fb6b8 100644 --- a/processor.go +++ b/processor.go @@ -44,6 +44,8 @@ type processor struct { errHandler ErrorHandler shutdownTimeout time.Duration + finishedHandler FinishedHandler + // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest @@ -85,6 +87,7 @@ type processorParams struct { queues map[string]int strictPriority bool errHandler ErrorHandler + finishedHandler FinishedHandler shutdownTimeout time.Duration starting chan<- *workerInfo finished chan<- *base.TaskMessage @@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor { quit: make(chan struct{}), abort: make(chan struct{}), errHandler: params.errHandler, + finishedHandler: params.finishedHandler, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), shutdownTimeout: params.shutdownTimeout, starting: params.starting, @@ -291,12 +295,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) { p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.MarkAsComplete(ctx, msg) + if err := p.broker.MarkAsComplete(ctx, msg); err != nil { + return err + } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } + return nil }, errMsg: errMsg, deadline: l.Deadline(), } } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } } func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) { @@ -311,12 +324,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) { p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Done(ctx, msg) + if err := p.broker.Done(ctx, msg); err != nil { + return err + } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } + return nil }, errMsg: errMsg, deadline: l.Deadline(), } } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } } // SkipRetry is used as a return value from Handler.ProcessTask to indicate that @@ -374,12 +396,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) { p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Archive(ctx, msg, e.Error()) + if err := p.broker.Archive(ctx, msg, e.Error()); err != nil { + return err + } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } + return nil }, errMsg: errMsg, deadline: l.Deadline(), } } + if p.finishedHandler != nil { + p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil)) + } } // queues returns a list of queues to query. diff --git a/server.go b/server.go index cc6f41830..e7ebd7f28 100644 --- a/server.go +++ b/server.go @@ -103,7 +103,7 @@ type Config struct { // If BaseContext is nil, the default is context.Background(). // If this is defined, then it MUST return a non-nil context BaseContext func() context.Context - + // TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty. // // If unset, zero or a negative value, the interval is set to 1 second. @@ -183,6 +183,11 @@ type Config struct { ErrorHandler ErrorHandler + // FinishedHandler handles a task that has been processed. + // + // FinishedHandler is called when the task status becomes completed or archived. + FinishedHandler FinishedHandler + // Logger specifies the logger used by the server instance. // // If unset, default logger is used. @@ -275,6 +280,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro fn(ctx, task, err) } +// An FinishedHandler handles a task that has been processed. +type FinishedHandler interface { + HandleFinished(task *TaskInfo) +} + +// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler. +// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f. +type FinishedHandlerFunc func(task *TaskInfo) + +// HandleFinished calls fn(ctx, task, err) +func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) { + fn(task) +} + // RetryDelayFunc calculates the retry delay duration for a failed task given // the retry count, error, and the task. // @@ -529,6 +548,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { queues: queues, strictPriority: cfg.StrictPriority, errHandler: cfg.ErrorHandler, + finishedHandler: cfg.FinishedHandler, shutdownTimeout: shutdownTimeout, starting: starting, finished: finished,