diff --git a/consulmq.go b/consulmq.go index dd6c99a..c1bddff 100644 --- a/consulmq.go +++ b/consulmq.go @@ -216,37 +216,13 @@ func (mq *MQ) loadIndex(q string) ([]string, *api.KVPair, error) { return idx, obj, nil } -func (mq *MQ) writeIndex(q string, idx []string, kv *api.KVPair) error { - sess, _, err := mq.session.CreateNoChecks(nil, nil) - if err != nil { - return err - } +func (mq *MQ) writeIndex(idx []string, kv *api.KVPair) error { b, err := json.MarshalIndent(idx, "", " ") if err != nil { return err } - k := &api.KVPair{ - Key: mq.q.RootPath + q + "/_index", - Value: b, - Session: sess, - } - lock := false - kv.Session = sess - for i := 0; i <= 10; i++ { - locked, _, err := mq.kv.Acquire(kv, nil) - if err != nil { - return err - } - if locked { - lock = locked - } - } - defer mq.unlock(k) - if !lock { - return fmt.Errorf("unable to acquire index lock for queue " + q) - } - - _, err = mq.kv.Put(k, nil) + kv.Value = b + _, err = mq.kv.Put(kv, nil) if err != nil { return err } @@ -264,12 +240,17 @@ func (mq *MQ) unlock(kv *api.KVPair) { } func (mq *MQ) indexPush(queue string, id string) error { - idx, kv, err := mq.loadIndex(queue) + kv, err := mq.lock(queue) + if err != nil { + return err + } + defer mq.unlock(kv) + idx, _, err := mq.loadIndex(queue) if err != nil { return err } idx = append(idx, id) - err = mq.writeIndex(queue, idx, kv) + err = mq.writeIndex(idx, kv) if err != nil { return err } @@ -277,12 +258,17 @@ func (mq *MQ) indexPush(queue string, id string) error { } func (mq *MQ) indexPushFirst(queue string, id string) error { - idx, kv, err := mq.loadIndex(queue) + kv, err := mq.lock(queue) + if err != nil { + return err + } + defer mq.unlock(kv) + idx, _, err := mq.loadIndex(queue) if err != nil { return err } idx = append([]string{id}, idx...) - err = mq.writeIndex(queue, idx, kv) + err = mq.writeIndex(idx, kv) if err != nil { return err } @@ -290,14 +276,19 @@ func (mq *MQ) indexPushFirst(queue string, id string) error { } func (mq *MQ) indexPop(queue string) (string, int, error) { - idx, kv, err := mq.loadIndex(queue) + kv, err := mq.lock(queue) + if err != nil { + return "", -1, err + } + defer mq.unlock(kv) + idx, _, err := mq.loadIndex(queue) if err != nil { return "", len(idx), err } var id string if len(idx) > 0 { id, idx = idx[0], idx[1:] - err = mq.writeIndex(queue, idx, kv) + err = mq.writeIndex(idx, kv) if err != nil { return "", len(idx), err } @@ -306,7 +297,12 @@ func (mq *MQ) indexPop(queue string) (string, int, error) { } func (mq *MQ) indexPopLast(queue string) (string, int, error) { - idx, kv, err := mq.loadIndex(queue) + kv, err := mq.lock(queue) + if err != nil { + return "", -1, err + } + defer mq.unlock(kv) + idx, _, err := mq.loadIndex(queue) if err != nil { return "", len(idx), err } @@ -315,13 +311,39 @@ func (mq *MQ) indexPopLast(queue string) (string, int, error) { id = idx[len(idx)-1] idx[len(idx)-1] = "" idx = idx[:len(idx)-1] - err = mq.writeIndex(queue, idx, kv) + err = mq.writeIndex(idx, kv) if err != nil { return "", len(idx), err } } return id, len(idx), nil +} +func (mq *MQ) lock(q string) (*api.KVPair, error) { + sess, _, err := mq.session.CreateNoChecks(nil, nil) + if err != nil { + return nil, err + } + + kv := &api.KVPair{ + Key: mq.q.RootPath + q + "/_index", + Session: sess, + } + lock := false + kv.Session = sess + for i := 0; i <= 10; i++ { + locked, _, err := mq.kv.Acquire(kv, nil) + if err != nil { + return kv, err + } + if locked { + lock = locked + } + } + if !lock { + return kv, fmt.Errorf("unable to acquire index lock for queue " + q) + } + return kv, nil } // Push an object to the rear of the queue. Push returns a QueueObject with the object ID,