Skip to content

Commit

Permalink
Move test helpers to asynqtest package
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Apr 11, 2022
1 parent 0149396 commit 1acd62c
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 208 deletions.
94 changes: 94 additions & 0 deletions internal/asynqtest/asynqtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []
return out
})

var SortRedisZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []redis.Z) []redis.Z {
out := append([]redis.Z(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
// TODO: If member is a comparable type (int, string, etc) compare by the member
// Use generic comparable type here once update to go1.18
if _, ok := out[i].Member.(string); ok {
// If member is a string, compare the member
return out[i].Member.(string) < out[j].Member.(string)
}
return out[i].Score < out[j].Score
})
return out
})

// IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing.
var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")

Expand Down Expand Up @@ -522,3 +536,83 @@ func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient,
}
return res
}

// TaskSeedData holds the data required to seed tasks under the task key in test.
type TaskSeedData struct {
Msg *base.TaskMessage
State base.TaskState
PendingSince time.Time
}

func SeedTasks(tb testing.TB, r redis.UniversalClient, taskData []*TaskSeedData) {
for _, data := range taskData {
msg := data.Msg
ctx := context.Background()
key := base.TaskKey(msg.Queue, msg.ID)
v := map[string]interface{}{
"msg": MustMarshal(tb, msg),
"state": data.State.String(),
"unique_key": msg.UniqueKey,
"group": msg.GroupKey,
}
if !data.PendingSince.IsZero() {
v["pending_since"] = data.PendingSince.Unix()
}
if err := r.HSet(ctx, key, v).Err(); err != nil {
tb.Fatalf("Failed to write task data in redis: %v", err)
}
if len(msg.UniqueKey) > 0 {
err := r.SetNX(ctx, msg.UniqueKey, msg.ID, 1*time.Minute).Err()
if err != nil {
tb.Fatalf("Failed to set unique lock in redis: %v", err)
}
}
}
}

func SeedRedisZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis.Z) {
for key, zs := range zsets {
// FIXME: How come we can't simply do ZAdd(ctx, key, zs...) here?
for _, z := range zs {
if err := r.ZAdd(context.Background(), key, z).Err(); err != nil {
tb.Fatalf("Failed to seed zset (key=%q): %v", key, err)
}
}
}
}

func SeedRedisSets(tb testing.TB, r redis.UniversalClient, sets map[string][]string) {
for key, set := range sets {
SeedRedisSet(tb, r, key, set)
}
}

func SeedRedisSet(tb testing.TB, r redis.UniversalClient, key string, members []string) {
for _, mem := range members {
if err := r.SAdd(context.Background(), key, mem).Err(); err != nil {
tb.Fatalf("Failed to seed set (key=%q): %v", key, err)
}
}
}

func SeedRedisLists(tb testing.TB, r redis.UniversalClient, lists map[string][]string) {
for key, vals := range lists {
for _, v := range vals {
if err := r.LPush(context.Background(), key, v).Err(); err != nil {
tb.Fatalf("Failed to seed list (key=%q): %v", key, err)
}
}
}
}

func AssertRedisZSets(t *testing.T, r redis.UniversalClient, wantZSets map[string][]redis.Z) {
for key, want := range wantZSets {
got, err := r.ZRangeWithScores(context.Background(), key, 0, -1).Result()
if err != nil {
t.Fatalf("Failed to read zset (key=%q): %v", key, err)
}
if diff := cmp.Diff(want, got, SortRedisZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in zset (key=%q): (-want,+got)\n%s", key, diff)
}
}
}
70 changes: 35 additions & 35 deletions internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCurrentStats(t *testing.T) {
r.SetClock(timeutil.NewSimulatedClock(now))

tests := []struct {
tasks []*taskData
tasks []*h.TaskSeedData
allQueues []string
allGroups map[string][]string
pending map[string][]string
Expand All @@ -88,14 +88,14 @@ func TestCurrentStats(t *testing.T) {
want *Stats
}{
{
tasks: []*taskData{
{msg: m1, state: base.TaskStatePending},
{msg: m2, state: base.TaskStateActive},
{msg: m3, state: base.TaskStateScheduled},
{msg: m4, state: base.TaskStateScheduled},
{msg: m5, state: base.TaskStatePending},
{msg: m6, state: base.TaskStatePending},
{msg: m7, state: base.TaskStateAggregating},
tasks: []*h.TaskSeedData{
{Msg: m1, State: base.TaskStatePending},
{Msg: m2, State: base.TaskStateActive},
{Msg: m3, State: base.TaskStateScheduled},
{Msg: m4, State: base.TaskStateScheduled},
{Msg: m5, State: base.TaskStatePending},
{Msg: m6, State: base.TaskStatePending},
{Msg: m7, State: base.TaskStateAggregating},
},
allQueues: []string{"default", "critical", "low"},
allGroups: map[string][]string{
Expand Down Expand Up @@ -187,12 +187,12 @@ func TestCurrentStats(t *testing.T) {
},
},
{
tasks: []*taskData{
{msg: m1, state: base.TaskStatePending},
{msg: m2, state: base.TaskStateActive},
{msg: m3, state: base.TaskStateScheduled},
{msg: m4, state: base.TaskStateScheduled},
{msg: m6, state: base.TaskStatePending},
tasks: []*h.TaskSeedData{
{Msg: m1, State: base.TaskStatePending},
{Msg: m2, State: base.TaskStateActive},
{Msg: m3, State: base.TaskStateScheduled},
{Msg: m4, State: base.TaskStateScheduled},
{Msg: m6, State: base.TaskStatePending},
},
allQueues: []string{"default", "critical", "low"},
pending: map[string][]string{
Expand Down Expand Up @@ -284,16 +284,16 @@ func TestCurrentStats(t *testing.T) {
t.Fatal(err)
}
}
SeedSet(t, r.client, base.AllQueues, tc.allQueues)
SeedSets(t, r.client, tc.allGroups)
SeedTasks(t, r.client, tc.tasks)
SeedLists(t, r.client, tc.pending)
SeedLists(t, r.client, tc.active)
SeedZSets(t, r.client, tc.scheduled)
SeedZSets(t, r.client, tc.retry)
SeedZSets(t, r.client, tc.archived)
SeedZSets(t, r.client, tc.completed)
SeedZSets(t, r.client, tc.groups)
h.SeedRedisSet(t, r.client, base.AllQueues, tc.allQueues)
h.SeedRedisSets(t, r.client, tc.allGroups)
h.SeedTasks(t, r.client, tc.tasks)
h.SeedRedisLists(t, r.client, tc.pending)
h.SeedRedisLists(t, r.client, tc.active)
h.SeedRedisZSets(t, r.client, tc.scheduled)
h.SeedRedisZSets(t, r.client, tc.retry)
h.SeedRedisZSets(t, r.client, tc.archived)
h.SeedRedisZSets(t, r.client, tc.completed)
h.SeedRedisZSets(t, r.client, tc.groups)
ctx := context.Background()
for qname, n := range tc.processed {
r.client.Set(ctx, base.ProcessedKey(qname, now), n, 0)
Expand Down Expand Up @@ -434,16 +434,16 @@ func TestGroupStats(t *testing.T) {
now := time.Now()

fixtures := struct {
tasks []*taskData
tasks []*h.TaskSeedData
allGroups map[string][]string
groups map[string][]*redis.Z
}{
tasks: []*taskData{
{msg: m1, state: base.TaskStateAggregating},
{msg: m2, state: base.TaskStateAggregating},
{msg: m3, state: base.TaskStateAggregating},
{msg: m4, state: base.TaskStateAggregating},
{msg: m5, state: base.TaskStateAggregating},
tasks: []*h.TaskSeedData{
{Msg: m1, State: base.TaskStateAggregating},
{Msg: m2, State: base.TaskStateAggregating},
{Msg: m3, State: base.TaskStateAggregating},
{Msg: m4, State: base.TaskStateAggregating},
{Msg: m5, State: base.TaskStateAggregating},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"group1", "group2"},
Expand Down Expand Up @@ -499,9 +499,9 @@ func TestGroupStats(t *testing.T) {

for _, tc := range tests {
h.FlushDB(t, r.client)
SeedTasks(t, r.client, fixtures.tasks)
SeedSets(t, r.client, fixtures.allGroups)
SeedZSets(t, r.client, fixtures.groups)
h.SeedTasks(t, r.client, fixtures.tasks)
h.SeedRedisSets(t, r.client, fixtures.allGroups)
h.SeedRedisZSets(t, r.client, fixtures.groups)

t.Run(tc.desc, func(t *testing.T) {
got, err := r.GroupStats(tc.qname)
Expand Down
17 changes: 17 additions & 0 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,10 @@ func (r *RDB) ListGroups(qname string) ([]string, error) {
// Output:
// Returns 0 if no aggregation set was created
// Returns 1 if an aggregation set was created
//
// Time Complexity:
// O(log(N) + M) with N being the number tasks in the group zset
// and M being the max size.
var aggregationCheckCmd = redis.NewScript(`
local size = redis.call("ZCARD", KEYS[1])
if size == 0 then
Expand Down Expand Up @@ -1118,6 +1122,12 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma
// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>
// ------
// ARGV[1] -> task key prefix
//
// Output:
// Array of encoded task messages
//
// Time Complexity:
// O(N) with N being the number of tasks in the aggregation set.
var readAggregationSetCmd = redis.NewScript(`
local msgs = {}
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
Expand Down Expand Up @@ -1162,6 +1172,13 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag
// KEYS[2] -> asynq:{<qname>}:aggregation_sets
// -------
// ARGV[1] -> task key prefix
//
// Output:
// Redis status reply
//
// Time Complexity:
// max(O(N), O(log(M))) with N being the number of tasks in the aggregation set
// and M being the number of elements in the all-aggregation-sets list.
var deleteAggregationSetCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
Expand Down
Loading

0 comments on commit 1acd62c

Please sign in to comment.