Skip to content

Commit

Permalink
fix locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Pete Fraedrich committed Mar 27, 2020
1 parent 76c14c3 commit 4a803a6
Showing 1 changed file with 57 additions and 35 deletions.
92 changes: 57 additions & 35 deletions consulmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,37 +216,13 @@ func (mq *MQ) loadIndex(q string) ([]string, *api.KVPair, error) {
return idx, obj, nil
}

func (mq *MQ) writeIndex(q string, idx []string, kv *api.KVPair) error {
sess, _, err := mq.session.CreateNoChecks(nil, nil)
if err != nil {
return err
}
func (mq *MQ) writeIndex(idx []string, kv *api.KVPair) error {
b, err := json.MarshalIndent(idx, "", " ")
if err != nil {
return err
}
k := &api.KVPair{
Key: mq.q.RootPath + q + "/_index",
Value: b,
Session: sess,
}
lock := false
kv.Session = sess
for i := 0; i <= 10; i++ {
locked, _, err := mq.kv.Acquire(kv, nil)
if err != nil {
return err
}
if locked {
lock = locked
}
}
defer mq.unlock(k)
if !lock {
return fmt.Errorf("unable to acquire index lock for queue " + q)
}

_, err = mq.kv.Put(k, nil)
kv.Value = b
_, err = mq.kv.Put(kv, nil)
if err != nil {
return err
}
Expand All @@ -264,40 +240,55 @@ func (mq *MQ) unlock(kv *api.KVPair) {
}

func (mq *MQ) indexPush(queue string, id string) error {
idx, kv, err := mq.loadIndex(queue)
kv, err := mq.lock(queue)
if err != nil {
return err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
if err != nil {
return err
}
idx = append(idx, id)
err = mq.writeIndex(queue, idx, kv)
err = mq.writeIndex(idx, kv)
if err != nil {
return err
}
return nil
}

func (mq *MQ) indexPushFirst(queue string, id string) error {
idx, kv, err := mq.loadIndex(queue)
kv, err := mq.lock(queue)
if err != nil {
return err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
if err != nil {
return err
}
idx = append([]string{id}, idx...)
err = mq.writeIndex(queue, idx, kv)
err = mq.writeIndex(idx, kv)
if err != nil {
return err
}
return nil
}

func (mq *MQ) indexPop(queue string) (string, int, error) {
idx, kv, err := mq.loadIndex(queue)
kv, err := mq.lock(queue)
if err != nil {
return "", -1, err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
if err != nil {
return "", len(idx), err
}
var id string
if len(idx) > 0 {
id, idx = idx[0], idx[1:]
err = mq.writeIndex(queue, idx, kv)
err = mq.writeIndex(idx, kv)
if err != nil {
return "", len(idx), err
}
Expand All @@ -306,7 +297,12 @@ func (mq *MQ) indexPop(queue string) (string, int, error) {
}

func (mq *MQ) indexPopLast(queue string) (string, int, error) {
idx, kv, err := mq.loadIndex(queue)
kv, err := mq.lock(queue)
if err != nil {
return "", -1, err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(queue)
if err != nil {
return "", len(idx), err
}
Expand All @@ -315,13 +311,39 @@ func (mq *MQ) indexPopLast(queue string) (string, int, error) {
id = idx[len(idx)-1]
idx[len(idx)-1] = ""
idx = idx[:len(idx)-1]
err = mq.writeIndex(queue, idx, kv)
err = mq.writeIndex(idx, kv)
if err != nil {
return "", len(idx), err
}
}
return id, len(idx), nil
}

func (mq *MQ) lock(q string) (*api.KVPair, error) {
sess, _, err := mq.session.CreateNoChecks(nil, nil)
if err != nil {
return nil, err
}

kv := &api.KVPair{
Key: mq.q.RootPath + q + "/_index",
Session: sess,
}
lock := false
kv.Session = sess
for i := 0; i <= 10; i++ {
locked, _, err := mq.kv.Acquire(kv, nil)
if err != nil {
return kv, err
}
if locked {
lock = locked
}
}
if !lock {
return kv, fmt.Errorf("unable to acquire index lock for queue " + q)
}
return kv, nil
}

// Push an object to the rear of the queue. Push returns a QueueObject with the object ID,
Expand Down

0 comments on commit 4a803a6

Please sign in to comment.