diff --git a/consulmq.go b/consulmq.go index c1bddff..833f63c 100644 --- a/consulmq.go +++ b/consulmq.go @@ -203,17 +203,13 @@ func (mq *MQ) createPaths() error { return nil } -func (mq *MQ) loadIndex(q string) ([]string, *api.KVPair, error) { - obj, _, err := mq.kv.Get(mq.q.RootPath+q+"/_index", nil) - if err != nil || obj == nil { - return nil, obj, err - } +func (mq *MQ) loadIndex(kv *api.KVPair) ([]string, *api.KVPair, error) { var idx []string - err = json.Unmarshal(obj.Value, &idx) + err := json.Unmarshal(kv.Value, &idx) if err != nil { - return nil, obj, err + return nil, kv, err } - return idx, obj, nil + return idx, kv, nil } func (mq *MQ) writeIndex(idx []string, kv *api.KVPair) error { @@ -245,7 +241,7 @@ func (mq *MQ) indexPush(queue string, id string) error { return err } defer mq.unlock(kv) - idx, _, err := mq.loadIndex(queue) + idx, _, err := mq.loadIndex(kv) if err != nil { return err } @@ -263,7 +259,7 @@ func (mq *MQ) indexPushFirst(queue string, id string) error { return err } defer mq.unlock(kv) - idx, _, err := mq.loadIndex(queue) + idx, _, err := mq.loadIndex(kv) if err != nil { return err } @@ -281,7 +277,7 @@ func (mq *MQ) indexPop(queue string) (string, int, error) { return "", -1, err } defer mq.unlock(kv) - idx, _, err := mq.loadIndex(queue) + idx, _, err := mq.loadIndex(kv) if err != nil { return "", len(idx), err } @@ -302,7 +298,7 @@ func (mq *MQ) indexPopLast(queue string) (string, int, error) { return "", -1, err } defer mq.unlock(kv) - idx, _, err := mq.loadIndex(queue) + idx, _, err := mq.loadIndex(kv) if err != nil { return "", len(idx), err } @@ -324,13 +320,9 @@ func (mq *MQ) lock(q string) (*api.KVPair, error) { if err != nil { return nil, err } - - kv := &api.KVPair{ - Key: mq.q.RootPath + q + "/_index", - Session: sess, - } - lock := false + kv, _, err := mq.kv.Get(mq.q.RootPath+q+"/_index", nil) kv.Session = sess + lock := false for i := 0; i <= 10; i++ { locked, _, err := mq.kv.Acquire(kv, nil) if err != nil { @@ -338,6 +330,7 @@ func (mq *MQ) lock(q string) (*api.KVPair, error) { } if locked { lock = locked + break } } if !lock {