Skip to content

Commit

Permalink
fix JSON parsing issue with index
Browse files Browse the repository at this point in the history
  • Loading branch information
Pete Fraedrich committed Mar 27, 2020
1 parent 4a803a6 commit fc763f5
Showing 1 changed file with 11 additions and 18 deletions.
29 changes: 11 additions & 18 deletions consulmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,13 @@ func (mq *MQ) createPaths() error {
return nil
}

func (mq *MQ) loadIndex(q string) ([]string, *api.KVPair, error) {
obj, _, err := mq.kv.Get(mq.q.RootPath+q+"/_index", nil)
if err != nil || obj == nil {
return nil, obj, err
}
func (mq *MQ) loadIndex(kv *api.KVPair) ([]string, *api.KVPair, error) {
var idx []string
err = json.Unmarshal(obj.Value, &idx)
err := json.Unmarshal(kv.Value, &idx)
if err != nil {
return nil, obj, err
return nil, kv, err
}
return idx, obj, nil
return idx, kv, nil
}

func (mq *MQ) writeIndex(idx []string, kv *api.KVPair) error {
Expand Down Expand Up @@ -245,7 +241,7 @@ func (mq *MQ) indexPush(queue string, id string) error {
return err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
idx, _, err := mq.loadIndex(kv)
if err != nil {
return err
}
Expand All @@ -263,7 +259,7 @@ func (mq *MQ) indexPushFirst(queue string, id string) error {
return err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
idx, _, err := mq.loadIndex(kv)
if err != nil {
return err
}
Expand All @@ -281,7 +277,7 @@ func (mq *MQ) indexPop(queue string) (string, int, error) {
return "", -1, err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
idx, _, err := mq.loadIndex(kv)
if err != nil {
return "", len(idx), err
}
Expand All @@ -302,7 +298,7 @@ func (mq *MQ) indexPopLast(queue string) (string, int, error) {
return "", -1, err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
idx, _, err := mq.loadIndex(kv)
if err != nil {
return "", len(idx), err
}
Expand All @@ -324,20 +320,17 @@ func (mq *MQ) lock(q string) (*api.KVPair, error) {
if err != nil {
return nil, err
}

kv := &api.KVPair{
Key: mq.q.RootPath + q + "/_index",
Session: sess,
}
lock := false
kv, _, err := mq.kv.Get(mq.q.RootPath+q+"/_index", nil)
kv.Session = sess
lock := false
for i := 0; i <= 10; i++ {
locked, _, err := mq.kv.Acquire(kv, nil)
if err != nil {
return kv, err
}
if locked {
lock = locked
break
}
}
if !lock {
Expand Down

0 comments on commit fc763f5

Please sign in to comment.