Skip to content
This repository has been archived by the owner on Oct 7, 2023. It is now read-only.

Issue98fix #102

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
)

const (
opError = -1
opCreate = 1
opDelete = 2
opExists = 3
Expand Down
2 changes: 2 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ func (r *MultiResponse) Encode(buf []byte) (int, error) {
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.String))
case opSetData:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.Stat))
case opError:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(&op.Header.Err))
}
total += n
if err != nil {
Expand Down
29 changes: 23 additions & 6 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
// test create+delete on same key == no key
ops = []interface{}{
Expand All @@ -653,8 +653,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
&zk.CheckVersionRequest{Path: "/foo", Version: 2},
}
_, err = c.Multi(ops...)
if err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if err == nil || err.Error() != zetcd.ErrBadVersion.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrBadVersion, err)
}
if _, s1, err = c.Get("/test1"); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got (%v,%v)", zetcd.ErrNoNode, s1, err)
Expand All @@ -681,8 +681,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/missing-key", Version: 0},
}
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNoNode, err)
}
// test empty operation list
if resp, err = c.Multi(); err != nil || len(resp) != 0 {
Expand All @@ -705,6 +705,23 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
if s1.Mzxid != s2.Mzxid {
t.Fatalf("expected zxids in %+v to match %+v", *s1, *s2)
}
// test partial success
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/test2", Version: 0},
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if resp, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
if len(resp) != 2 {
t.Fatalf("expected %d results, got %d", 2, len(resp))
}
if resp[0].Error != nil {
t.Fatalf("expected checkop error to be nil, got %v", resp[0].Error)
}
if resp[1].Error == nil || resp[1].Error.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected createop error to be %v, got %v", zetcd.ErrNodeExists.Error(), resp[1].Error)
}
}

func runTest(t *testing.T, f func(*testing.T, *zk.Conn)) {
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func serveRequest(s Session, zke ZK, zkreq ZKRequest) error {
}
zkresp := DispatchZK(zke, zkreq.xid, zkreq.req)
if zkresp.Err != nil {
glog.V(9).Infof("dispatch error", zkresp.Err)
glog.V(9).Infof("dispatch error %v", zkresp.Err)
return zkresp.Err
}
if zkresp.Hdr.Err == 0 {
Expand Down
50 changes: 20 additions & 30 deletions zketcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,23 @@ func (z *zkEtcd) Sync(xid Xid, op *SyncRequest) ZKResponse {

func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
bs := make([]opBundle, len(mreq.Ops))
mresp := &MultiResponse{
Ops: make([]MultiResponseOp, len(mreq.Ops)),
DoneHeader: MultiHeader{Type: opMulti},
}
for i, op := range mreq.Ops {
switch req := op.Op.(type) {
case *CreateRequest:
bs[i] = z.mkCreateTxnOp(req)
mresp.Ops[i].Header.Type = opCreate
case *DeleteRequest:
bs[i] = z.mkDeleteTxnOp(req)
mresp.Ops[i].Header.Type = opDelete
case *SetDataRequest:
bs[i] = z.mkSetDataTxnOp(req)
mresp.Ops[i].Header.Type = opSetData
case *CheckVersionRequest:
mresp.Ops[i].Header.Type = opCheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this after mkCheckVersionPathTxnOp to be consistent with the other cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change has been made.

I have not been able to get the xchk,docker tests to run yet, they just seem to hang. I can't get them to run for the master branch either.

Do you have any instructions on setting that up?

bs[i] = z.mkCheckVersionPathTxnOp(req)
default:
panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op))
Expand All @@ -491,61 +499,43 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
}

apply := func(s v3sync.STM) error {
for _, b := range bs {
for i, b := range bs {
if err := b.apply(s); err != nil {
var ok bool
mresp.Ops[i].Header.Type = opError
if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok {
mresp.Ops[i].Header.Err = errAPIError
}
return err
}
}
return nil
}

reply := func(xid Xid, zxid ZXid) ZKResponse {
ops := make([]MultiResponseOp, len(bs))
for i, b := range bs {
resp := b.reply(xid, zxid)
ops[i].Header = MultiHeader{Err: 0}
switch r := resp.Resp.(type) {
case *CreateResponse:
ops[i].Header.Type = opCreate
ops[i].String = r.Path
mresp.Ops[i].String = r.Path
case *SetDataResponse:
ops[i].Header.Type = opSetData
ops[i].Stat = &r.Stat
case *DeleteResponse:
ops[i].Header.Type = opDelete
case *struct{}:
ops[i].Header.Type = opCheck
default:
panic(fmt.Sprintf("unknown multi %+v %T", resp, resp))
mresp.Ops[i].Stat = &r.Stat
}
}
mresp := &MultiResponse{
Ops: ops,
DoneHeader: MultiHeader{Type: opMulti},
}
return mkZKResp(xid, zxid, mresp)
}

resp, err := z.doSTM(apply, prefetch...)
resp, _ := z.doSTM(apply, prefetch...)
if resp == nil {
// txn aborted, possibly due to any API error
if _, ok := errorToErrCode[err]; !ok {
// aborted due to non-API error
return mkErr(err)
}
zxid, zerr := z.incrementAndGetZxid()
if zerr != nil {
return mkErr(zerr)
}
// zkdocker seems to always return API error...
zkresp := apiErrToZKErr(xid, zxid, err)
zkresp.Hdr.Err = errAPIError
return zkresp
return reply(xid, zxid)
}

mresp := reply(xid, ZXid(resp.Header.Revision))
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v", *mreq, resp.Header.Revision, *resp)
return mresp
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp)
return reply(xid, ZXid(resp.Header.Revision))
}

func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle {
Expand Down