Skip to content

Commit

Permalink
Implement reusing redis client
Browse files Browse the repository at this point in the history
  • Loading branch information
jerbob92 committed Sep 19, 2023
1 parent 6a7bf2c commit 9e548fc
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 50 deletions.
26 changes: 20 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// A Client is responsible for scheduling tasks.
Expand All @@ -25,15 +25,26 @@ import (
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
broker base.Broker
// When a Client has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}

// NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
return &Client{broker: rdb.NewRDB(c)}
client := NewClientFromRedisClient(redisClient)
client.sharedConnection = false
return client
}

// NewClientFromRedisClient returns a new Client instance given a redis client.
// Warning: the redis client will not be closed by Asynq, you are responsible for closing.
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
}

type OptionType int
Expand Down Expand Up @@ -150,9 +161,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
Expand Down Expand Up @@ -307,6 +318,9 @@ var (

// Close closes the connection with redis.
func (c *Client) Close() error {
if c.sharedConnection {
return fmt.Errorf("redis connection is shared so the Client can't be closed through asynq")
}
return c.broker.Close()
}

Expand Down
25 changes: 20 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)

func TestClientEnqueueWithProcessAtOption(t *testing.T) {
Expand Down Expand Up @@ -143,11 +144,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
}
}

func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()

func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) {
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "[email protected]", "from": "[email protected]"}))
now := time.Now()

Expand Down Expand Up @@ -478,6 +475,24 @@ func TestClientEnqueue(t *testing.T) {
}
}

func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
testClientEnqueue(t, client, r)
}

func TestClientFromRedisClientEnqueue(t *testing.T) {
r := setup(t)
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
client := NewClientFromRedisClient(redisClient)
testClientEnqueue(t, client, r)
err := client.Close()
if err == nil {
t.Error("client.Close() should have failed because of a shared client but it didn't")
}
}

func TestClientEnqueueWithGroupOption(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
Expand Down
19 changes: 17 additions & 2 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
// When an Inspector has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}

// New returns a new instance of Inspector.
Expand All @@ -28,13 +31,25 @@ func NewInspector(r RedisConnOpt) *Inspector {
if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
}
inspector := NewInspectorFromRedisClient(c)
inspector.sharedConnection = false
return inspector
}

// NewFromRedisClient returns a new instance of Inspector.
// Warning: the redis client will not be closed by Asynq, you are responsible for closing.
func NewInspectorFromRedisClient(c redis.UniversalClient) *Inspector {
return &Inspector{
rdb: rdb.NewRDB(c),
rdb: rdb.NewRDB(c),
sharedConnection: true,
}
}

// Close closes the connection with redis.
func (i *Inspector) Close() error {
if i.sharedConnection {
return fmt.Errorf("redis connection is shared so the Inspector can't be closed through asynq")

Check warning on line 51 in inspector.go

View check run for this annotation

Codecov / codecov/patch

inspector.go#L51

Added line #L51 was not covered by tests
}
return i.rdb.Close()
}

Expand Down
20 changes: 15 additions & 5 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ import (
"github.com/redis/go-redis/v9"
)

func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))

func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalClient) {
tests := []struct {
queues []string
}{
Expand All @@ -52,7 +48,21 @@ func TestInspectorQueues(t *testing.T) {
t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
}
}
}

func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
testInspectorQueues(t, inspector, r)
}

func TestInspectorFromRedisClientQueues(t *testing.T) {
r := setup(t)
defer r.Close()
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
inspector := NewInspectorFromRedisClient(redisClient)
testInspectorQueues(t, inspector, r)
}

func TestInspectorDeleteQueue(t *testing.T) {
Expand Down
22 changes: 18 additions & 4 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)

Expand Down Expand Up @@ -43,15 +43,27 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}

// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}

// NewSchedulerFromRedisClient returns a new Scheduler instance given a redis client.
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: the redis client will not be closed by Asynq, you are responsible for closing.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
Expand All @@ -72,7 +84,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
logger: logger,
client: NewClient(r),
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
Expand Down Expand Up @@ -262,7 +274,9 @@ func (s *Scheduler) Shutdown() {

s.clearHistory()
s.client.Close()
s.rdb.Close()
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}

Expand Down
24 changes: 24 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package asynq

import (
"github.com/redis/go-redis/v9"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestSchedulerRegister(t *testing.T) {

r := setup(t)

// Tests for new redis connection.
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
Expand All @@ -75,6 +77,28 @@ func TestSchedulerRegister(t *testing.T) {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}

r = setup(t)

// Tests for existing redis connection.
for _, tc := range tests {
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
scheduler := NewSchedulerFromRedisClient(redisClient, nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err)
}

if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
scheduler.Shutdown()

got := testutil.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
}

func TestSchedulerWhenRedisDown(t *testing.T) {
Expand Down
45 changes: 30 additions & 15 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// Server is responsible for task processing and task lifecycle management.
Expand All @@ -37,6 +37,9 @@ type Server struct {
logger *log.Logger

broker base.Broker
// When a Server has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool

state *serverState

Expand Down Expand Up @@ -402,10 +405,19 @@ const (
// NewServer returns a new Server given a redis connection option
// and server configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
server := NewServerFromRedisClient(redisClient, cfg)
server.sharedConnection = false
return server
}

// NewServerFromRedisClient returns a new Server given a redis client
// and server configuration.
// Warning: the redis client will not be closed by Asynq, you are responsible for closing.
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
baseCtxFn := cfg.BaseContext
if baseCtxFn == nil {
baseCtxFn = context.Background
Expand Down Expand Up @@ -545,18 +557,19 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
groupAggregator: cfg.GroupAggregator,
})
return &Server{
logger: logger,
broker: rdb,
state: srvState,
forwarder: forwarder,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
healthchecker: healthchecker,
janitor: janitor,
aggregator: aggregator,
logger: logger,
broker: rdb,
sharedConnection: true,
state: srvState,
forwarder: forwarder,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
healthchecker: healthchecker,
janitor: janitor,
aggregator: aggregator,
}
}

Expand Down Expand Up @@ -684,7 +697,9 @@ func (srv *Server) Shutdown() {
srv.heartbeater.shutdown()
srv.wg.Wait()

srv.broker.Close()
if !srv.sharedConnection {
srv.broker.Close()
}
srv.logger.Info("Exiting")
}

Expand Down
Loading

0 comments on commit 9e548fc

Please sign in to comment.