Skip to content

Commit

Permalink
Change Server API
Browse files Browse the repository at this point in the history
* Rename ServerStatus to ServerState internally

* Rename terminate to shutdown internally

* Update Scheduler API to match Server API
  • Loading branch information
hibiken committed Jun 29, 2021
1 parent 4768124 commit 9c95c41
Show file tree
Hide file tree
Showing 26 changed files with 175 additions and 169 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- NewTask function now takes array of bytes as payload.
- Task `Type` and `Payload` should be accessed by a method call.
- `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`.
- `Scheduler` API has changed. Renamed `Stop` to `Shutdown`.
- Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script

Expand Down
12 changes: 6 additions & 6 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ExampleServer_Run() {
}
}

func ExampleServer_Stop() {
func ExampleServer_Shutdown() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
Expand All @@ -47,10 +47,10 @@ func ExampleServer_Stop() {
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs // wait for termination signal

srv.Stop()
srv.Shutdown()
}

func ExampleServer_Quiet() {
func ExampleServer_Stop() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
Expand All @@ -70,13 +70,13 @@ func ExampleServer_Quiet() {
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Quiet() // stop processing new tasks
srv.Stop() // stop processing new tasks
continue
}
break
break // received SIGTERM or SIGINT signal
}

srv.Stop()
srv.Shutdown()
}

func ExampleScheduler() {
Expand Down
2 changes: 1 addition & 1 deletion forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newForwarder(params forwarderParams) *forwarder {
}
}

func (f *forwarder) terminate() {
func (f *forwarder) shutdown() {
f.logger.Debug("Forwarder shutting down...")
// Signal the forwarder goroutine to stop polling.
f.done <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestForwarder(t *testing.T) {
var wg sync.WaitGroup
s.start(&wg)
time.Sleep(tc.wait)
s.terminate()
s.shutdown()

for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r, qname)
Expand Down
2 changes: 1 addition & 1 deletion healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newHealthChecker(params healthcheckerParams) *healthchecker {
}
}

func (hc *healthchecker) terminate() {
func (hc *healthchecker) shutdown() {
if hc.healthcheckFunc == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestHealthChecker(t *testing.T) {
}
mu.Unlock()

hc.terminate()
hc.shutdown()
}

func TestHealthCheckerWhenRedisDown(t *testing.T) {
Expand Down Expand Up @@ -99,5 +99,5 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) {
}
mu.Unlock()

hc.terminate()
hc.shutdown()
}
12 changes: 6 additions & 6 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type heartbeater struct {
started time.Time
workers map[string]*workerInfo

// status is shared with other goroutine but is concurrency safe.
status *base.ServerStatus
// state is shared with other goroutine but is concurrency safe.
state *base.ServerState

// channels to receive updates on active workers.
starting <-chan *workerInfo
Expand All @@ -55,7 +55,7 @@ type heartbeaterParams struct {
concurrency int
queues map[string]int
strictPriority bool
status *base.ServerStatus
state *base.ServerState
starting <-chan *workerInfo
finished <-chan *base.TaskMessage
}
Expand All @@ -79,14 +79,14 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
queues: params.queues,
strictPriority: params.strictPriority,

status: params.status,
state: params.state,
workers: make(map[string]*workerInfo),
starting: params.starting,
finished: params.finished,
}
}

func (h *heartbeater) terminate() {
func (h *heartbeater) shutdown() {
h.logger.Debug("Heartbeater shutting down...")
// Signal the heartbeater goroutine to stop.
h.done <- struct{}{}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (h *heartbeater) beat() {
Concurrency: h.concurrency,
Queues: h.queues,
StrictPriority: h.strictPriority,
Status: h.status.String(),
Status: h.state.String(),
Started: h.started,
ActiveWorkerCount: len(h.workers),
}
Expand Down
32 changes: 17 additions & 15 deletions heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func TestHeartbeater(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)

status := base.NewServerStatus(base.StatusIdle)
state := base.NewServerState()
hb := newHeartbeater(heartbeaterParams{
logger: testLogger,
broker: rdbClient,
interval: tc.interval,
concurrency: tc.concurrency,
queues: tc.queues,
strictPriority: false,
status: status,
state: state,
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})
Expand All @@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) {
hb.host = tc.host
hb.pid = tc.pid

status.Set(base.StatusRunning)
state.Set(base.StateActive)
var wg sync.WaitGroup
hb.start(&wg)

Expand All @@ -65,7 +65,7 @@ func TestHeartbeater(t *testing.T) {
Queues: tc.queues,
Concurrency: tc.concurrency,
Started: time.Now(),
Status: "running",
Status: "active",
}

// allow for heartbeater to write to redis
Expand All @@ -74,49 +74,49 @@ func TestHeartbeater(t *testing.T) {
ss, err := rdbClient.ListServers()
if err != nil {
t.Errorf("could not read server info from redis: %v", err)
hb.terminate()
hb.shutdown()
continue
}

if len(ss) != 1 {
t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss))
hb.terminate()
hb.shutdown()
continue
}

if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
hb.terminate()
hb.shutdown()
continue
}

// status change
status.Set(base.StatusStopped)
state.Set(base.StateClosed)

// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)

want.Status = "stopped"
want.Status = "closed"
ss, err = rdbClient.ListServers()
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
hb.terminate()
hb.shutdown()
continue
}

if len(ss) != 1 {
t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss))
hb.terminate()
hb.shutdown()
continue
}

if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
hb.terminate()
hb.shutdown()
continue
}

hb.terminate()
hb.shutdown()
}
}

Expand All @@ -131,14 +131,16 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r)
state := base.NewServerState()
state.Set(base.StateActive)
hb := newHeartbeater(heartbeaterParams{
logger: testLogger,
broker: testBroker,
interval: time.Second,
concurrency: 10,
queues: map[string]int{"default": 1},
strictPriority: false,
status: base.NewServerStatus(base.StatusRunning),
state: state,
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})
Expand All @@ -150,5 +152,5 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
// wait for heartbeater to try writing data to redis
time.Sleep(2 * time.Second)

hb.terminate()
hb.shutdown()
}
53 changes: 28 additions & 25 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,60 +215,63 @@ type Z struct {
Score int64
}

// ServerStatus represents status of a server.
// ServerStatus methods are concurrency safe.
type ServerStatus struct {
// ServerState represents state of a server.
// ServerState methods are concurrency safe.
type ServerState struct {
mu sync.Mutex
val ServerStatusValue
val ServerStateValue
}

// NewServerStatus returns a new status instance given an initial value.
func NewServerStatus(v ServerStatusValue) *ServerStatus {
return &ServerStatus{val: v}
// NewServerState returns a new state instance.
// Initial state is set to StateNew.
func NewServerState() *ServerState {
return &ServerState{val: StateNew}
}

type ServerStatusValue int
type ServerStateValue int

const (
// StatusIdle indicates the server is in idle state.
StatusIdle ServerStatusValue = iota
// StateNew represents a new server. Server begins in
// this state and then transition to StatusActive when
// Start or Run is callled.
StateNew ServerStateValue = iota

// StatusRunning indicates the server is up and active.
StatusRunning
// StateActive indicates the server is up and active.
StateActive

// StatusQuiet indicates the server is up but not active.
StatusQuiet
// StateStopped indicates the server is up but no longer processing new tasks.
StateStopped

// StatusStopped indicates the server server has been stopped.
StatusStopped
// StateClosed indicates the server has been shutdown.
StateClosed
)

var statuses = []string{
"idle",
"running",
"quiet",
var serverStates = []string{
"new",
"active",
"stopped",
"closed",
}

func (s *ServerStatus) String() string {
func (s *ServerState) String() string {
s.mu.Lock()
defer s.mu.Unlock()
if StatusIdle <= s.val && s.val <= StatusStopped {
return statuses[s.val]
if StateNew <= s.val && s.val <= StateClosed {
return serverStates[s.val]
}
return "unknown status"
}

// Get returns the status value.
func (s *ServerStatus) Get() ServerStatusValue {
func (s *ServerState) Get() ServerStateValue {
s.mu.Lock()
v := s.val
s.mu.Unlock()
return v
}

// Set sets the status value.
func (s *ServerStatus) Set(v ServerStatusValue) {
func (s *ServerState) Set(v ServerStateValue) {
s.mu.Lock()
s.val = v
s.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions internal/base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestServerInfoEncoding(t *testing.T) {
Concurrency: 10,
Queues: map[string]int{"default": 1, "critical": 2},
StrictPriority: false,
Status: "running",
Status: "active",
Started: time.Now().Add(-3 * time.Hour),
ActiveWorkerCount: 8,
},
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) {
// Test for status being accessed by multiple goroutines.
// Run with -race flag to check for data race.
func TestStatusConcurrentAccess(t *testing.T) {
status := NewServerStatus(StatusIdle)
status := NewServerState()

var wg sync.WaitGroup

Expand All @@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
status.Set(StatusStopped)
status.Set(StateClosed)
_ = status.String()
}()

Expand Down
2 changes: 1 addition & 1 deletion internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3305,7 +3305,7 @@ func TestListServers(t *testing.T) {
ServerID: "server123",
Concurrency: 10,
Queues: map[string]int{"default": 1},
Status: "running",
Status: "active",
Started: started1,
ActiveWorkerCount: 0,
}
Expand Down
Loading

0 comments on commit 9c95c41

Please sign in to comment.