From 9e548fc0978e8d719c447bbb7030c7775d7b6886 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Tue, 19 Sep 2023 10:16:51 +0200 Subject: [PATCH] Implement reusing redis client --- client.go | 26 +++++++++++++++++------ client_test.go | 25 +++++++++++++++++----- inspector.go | 19 +++++++++++++++-- inspector_test.go | 20 +++++++++++++----- scheduler.go | 22 ++++++++++++++++---- scheduler_test.go | 24 +++++++++++++++++++++ server.go | 45 ++++++++++++++++++++++++++-------------- server_test.go | 53 +++++++++++++++++++++++++++++++++++------------ 8 files changed, 184 insertions(+), 50 deletions(-) diff --git a/client.go b/client.go index 8f2f92c4..a54e0bcf 100644 --- a/client.go +++ b/client.go @@ -10,11 +10,11 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // A Client is responsible for scheduling tasks. @@ -25,15 +25,26 @@ import ( // Clients are safe for concurrent use by multiple goroutines. type Client struct { broker base.Broker + // When a Client has been created with an existing Redis connection, we do + // not want to close it. + sharedConnection bool } // NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { - c, ok := r.MakeRedisClient().(redis.UniversalClient) + redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - return &Client{broker: rdb.NewRDB(c)} + client := NewClientFromRedisClient(redisClient) + client.sharedConnection = false + return client +} + +// NewClientFromRedisClient returns a new Client instance given a redis client. +// Warning: the redis client will not be closed by Asynq, you are responsible for closing. +func NewClientFromRedisClient(c redis.UniversalClient) *Client { + return &Client{broker: rdb.NewRDB(c), sharedConnection: true} } type OptionType int @@ -150,9 +161,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } // TTL duration must be greater than or equal to 1 second. // // Uniqueness of a task is based on the following properties: -// - Task Type -// - Task Payload -// - Queue Name +// - Task Type +// - Task Payload +// - Queue Name func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } @@ -307,6 +318,9 @@ var ( // Close closes the connection with redis. func (c *Client) Close() error { + if c.sharedConnection { + return fmt.Errorf("redis connection is shared so the Client can't be closed through asynq") + } return c.broker.Close() } diff --git a/client_test.go b/client_test.go index da24d13d..a73114a1 100644 --- a/client_test.go +++ b/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/hibiken/asynq/internal/base" h "github.com/hibiken/asynq/internal/testutil" + "github.com/redis/go-redis/v9" ) func TestClientEnqueueWithProcessAtOption(t *testing.T) { @@ -143,11 +144,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { } } -func TestClientEnqueue(t *testing.T) { - r := setup(t) - client := NewClient(getRedisConnOpt(t)) - defer client.Close() - +func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) { task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() @@ -478,6 +475,24 @@ func TestClientEnqueue(t *testing.T) { } } +func TestClientEnqueue(t *testing.T) { + r := setup(t) + client := NewClient(getRedisConnOpt(t)) + defer client.Close() + testClientEnqueue(t, client, r) +} + +func TestClientFromRedisClientEnqueue(t *testing.T) { + r := setup(t) + redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient) + client := NewClientFromRedisClient(redisClient) + testClientEnqueue(t, client, r) + err := client.Close() + if err == nil { + t.Error("client.Close() should have failed because of a shared client but it didn't") + } +} + func TestClientEnqueueWithGroupOption(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) diff --git a/inspector.go b/inspector.go index ee99f525..5990390f 100644 --- a/inspector.go +++ b/inspector.go @@ -10,16 +10,19 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // Inspector is a client interface to inspect and mutate the state of // queues and tasks. type Inspector struct { rdb *rdb.RDB + // When an Inspector has been created with an existing Redis connection, we do + // not want to close it. + sharedConnection bool } // New returns a new instance of Inspector. @@ -28,13 +31,25 @@ func NewInspector(r RedisConnOpt) *Inspector { if !ok { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } + inspector := NewInspectorFromRedisClient(c) + inspector.sharedConnection = false + return inspector +} + +// NewFromRedisClient returns a new instance of Inspector. +// Warning: the redis client will not be closed by Asynq, you are responsible for closing. +func NewInspectorFromRedisClient(c redis.UniversalClient) *Inspector { return &Inspector{ - rdb: rdb.NewRDB(c), + rdb: rdb.NewRDB(c), + sharedConnection: true, } } // Close closes the connection with redis. func (i *Inspector) Close() error { + if i.sharedConnection { + return fmt.Errorf("redis connection is shared so the Inspector can't be closed through asynq") + } return i.rdb.Close() } diff --git a/inspector_test.go b/inspector_test.go index 0d47d06d..1d30a030 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -22,11 +22,7 @@ import ( "github.com/redis/go-redis/v9" ) -func TestInspectorQueues(t *testing.T) { - r := setup(t) - defer r.Close() - inspector := NewInspector(getRedisConnOpt(t)) - +func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalClient) { tests := []struct { queues []string }{ @@ -52,7 +48,21 @@ func TestInspectorQueues(t *testing.T) { t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff) } } +} + +func TestInspectorQueues(t *testing.T) { + r := setup(t) + defer r.Close() + inspector := NewInspector(getRedisConnOpt(t)) + testInspectorQueues(t, inspector, r) +} +func TestInspectorFromRedisClientQueues(t *testing.T) { + r := setup(t) + defer r.Close() + redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient) + inspector := NewInspectorFromRedisClient(redisClient) + testInspectorQueues(t, inspector, r) } func TestInspectorDeleteQueue(t *testing.T) { diff --git a/scheduler.go b/scheduler.go index 2655a946..affbeec4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,11 +10,11 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" ) @@ -43,15 +43,27 @@ type Scheduler struct { // to avoid using cron.EntryID as the public API of // the Scheduler. idmap map[string]cron.EntryID + // When a Scheduler has been created with an existing Redis connection, we do + // not want to close it. + sharedConnection bool } // NewScheduler returns a new Scheduler instance given the redis connection option. // The parameter opts is optional, defaults will be used if opts is set to nil func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { - c, ok := r.MakeRedisClient().(redis.UniversalClient) + redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } + scheduler := NewSchedulerFromRedisClient(redisClient, opts) + scheduler.sharedConnection = false + return scheduler +} + +// NewSchedulerFromRedisClient returns a new Scheduler instance given a redis client. +// The parameter opts is optional, defaults will be used if opts is set to nil. +// Warning: the redis client will not be closed by Asynq, you are responsible for closing. +func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler { if opts == nil { opts = &SchedulerOpts{} } @@ -72,7 +84,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { id: generateSchedulerID(), state: &serverState{value: srvStateNew}, logger: logger, - client: NewClient(r), + client: NewClientFromRedisClient(c), rdb: rdb.NewRDB(c), cron: cron.New(cron.WithLocation(loc)), location: loc, @@ -262,7 +274,9 @@ func (s *Scheduler) Shutdown() { s.clearHistory() s.client.Close() - s.rdb.Close() + if !s.sharedConnection { + s.rdb.Close() + } s.logger.Info("Scheduler stopped") } diff --git a/scheduler_test.go b/scheduler_test.go index fea048e3..a35f3909 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -5,6 +5,7 @@ package asynq import ( + "github.com/redis/go-redis/v9" "sync" "testing" "time" @@ -58,6 +59,7 @@ func TestSchedulerRegister(t *testing.T) { r := setup(t) + // Tests for new redis connection. for _, tc := range tests { scheduler := NewScheduler(getRedisConnOpt(t), nil) if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { @@ -75,6 +77,28 @@ func TestSchedulerRegister(t *testing.T) { t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff) } } + + r = setup(t) + + // Tests for existing redis connection. + for _, tc := range tests { + redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient) + scheduler := NewSchedulerFromRedisClient(redisClient, nil) + if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { + t.Fatal(err) + } + + if err := scheduler.Start(); err != nil { + t.Fatal(err) + } + time.Sleep(tc.wait) + scheduler.Shutdown() + + got := testutil.GetPendingMessages(t, r, tc.queue) + if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" { + t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff) + } + } } func TestSchedulerWhenRedisDown(t *testing.T) { diff --git a/server.go b/server.go index e31ce2f2..88d9fcc6 100644 --- a/server.go +++ b/server.go @@ -15,10 +15,10 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // Server is responsible for task processing and task lifecycle management. @@ -37,6 +37,9 @@ type Server struct { logger *log.Logger broker base.Broker + // When a Server has been created with an existing Redis connection, we do + // not want to close it. + sharedConnection bool state *serverState @@ -402,10 +405,19 @@ const ( // NewServer returns a new Server given a redis connection option // and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { - c, ok := r.MakeRedisClient().(redis.UniversalClient) + redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } + server := NewServerFromRedisClient(redisClient, cfg) + server.sharedConnection = false + return server +} + +// NewServerFromRedisClient returns a new Server given a redis client +// and server configuration. +// Warning: the redis client will not be closed by Asynq, you are responsible for closing. +func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { baseCtxFn := cfg.BaseContext if baseCtxFn == nil { baseCtxFn = context.Background @@ -545,18 +557,19 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { groupAggregator: cfg.GroupAggregator, }) return &Server{ - logger: logger, - broker: rdb, - state: srvState, - forwarder: forwarder, - processor: processor, - syncer: syncer, - heartbeater: heartbeater, - subscriber: subscriber, - recoverer: recoverer, - healthchecker: healthchecker, - janitor: janitor, - aggregator: aggregator, + logger: logger, + broker: rdb, + sharedConnection: true, + state: srvState, + forwarder: forwarder, + processor: processor, + syncer: syncer, + heartbeater: heartbeater, + subscriber: subscriber, + recoverer: recoverer, + healthchecker: healthchecker, + janitor: janitor, + aggregator: aggregator, } } @@ -684,7 +697,9 @@ func (srv *Server) Shutdown() { srv.heartbeater.shutdown() srv.wg.Wait() - srv.broker.Close() + if !srv.sharedConnection { + srv.broker.Close() + } srv.logger.Info("Exiting") } diff --git a/server_test.go b/server_test.go index 724b48a1..da7aac5b 100644 --- a/server_test.go +++ b/server_test.go @@ -14,22 +14,12 @@ import ( "github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/testbroker" "github.com/hibiken/asynq/internal/testutil" + + "github.com/redis/go-redis/v9" "go.uber.org/goleak" ) -func TestServer(t *testing.T) { - // https://github.com/go-redis/redis/issues/1029 - ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper") - defer goleak.VerifyNone(t, ignoreOpt) - - redisConnOpt := getRedisConnOpt(t) - c := NewClient(redisConnOpt) - defer c.Close() - srv := NewServer(redisConnOpt, Config{ - Concurrency: 10, - LogLevel: testLogLevel, - }) - +func testServer(t *testing.T, c *Client, srv *Server) { // no-op handler h := func(ctx context.Context, task *Task) error { return nil @@ -53,6 +43,43 @@ func TestServer(t *testing.T) { srv.Shutdown() } +func TestServer(t *testing.T) { + // https://github.com/go-redis/redis/issues/1029 + ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper") + defer goleak.VerifyNone(t, ignoreOpt) + + redisConnOpt := getRedisConnOpt(t) + c := NewClient(redisConnOpt) + defer c.Close() + srv := NewServer(redisConnOpt, Config{ + Concurrency: 10, + LogLevel: testLogLevel, + }) + + testServer(t, c, srv) +} + +func TestServerFromRedisClient(t *testing.T) { + // https://github.com/go-redis/redis/issues/1029 + ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper") + defer goleak.VerifyNone(t, ignoreOpt) + + redisConnOpt := getRedisConnOpt(t) + redisClient := redisConnOpt.MakeRedisClient().(redis.UniversalClient) + c := NewClientFromRedisClient(redisClient) + srv := NewServerFromRedisClient(redisClient, Config{ + Concurrency: 10, + LogLevel: testLogLevel, + }) + + testServer(t, c, srv) + + err := c.Close() + if err == nil { + t.Error("client.Close() should have failed because of a shared client but it didn't") + } +} + func TestServerRun(t *testing.T) { // https://github.com/go-redis/redis/issues/1029 ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")