Skip to content

Commit

Permalink
patch from etcd-io#102
Browse files Browse the repository at this point in the history
  • Loading branch information
kshvakov committed Mar 15, 2019
1 parent ac03d75 commit 9c485d8
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 70 deletions.
1 change: 1 addition & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
)

const (
opError = -1
opCreate = 1
opDelete = 2
opExists = 3
Expand Down
4 changes: 4 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ func (r *MultiResponse) Encode(buf []byte) (int, error) {
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.String))
case opSetData:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.Stat))
case opError:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(&op.Header.Err))
}
total += n
if err != nil {
Expand Down Expand Up @@ -371,6 +373,8 @@ func (r *MultiResponse) Decode(buf []byte) (int, error) {
res.Stat = new(Stat)
w = reflect.ValueOf(res.Stat)
case opCheck, opDelete:
case opError:
w = reflect.ValueOf(&res.Header.Err)
}
if w.IsValid() {
n, err := decodePacketValue(buf[total:], w)
Expand Down
2 changes: 1 addition & 1 deletion integration/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewContainer(containerName, dockerFile string, ports []string) (*Container,
}

func newContainerFiles(cfg ContainerConfig) (c *Container, err error) {
dc, err := docker.NewClient("unix://var/run/docker.sock")
dc, err := docker.NewClient("unix:///var/run/docker.sock")
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions integration/drill/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
FROM java:openjdk-8-jdk
# needs jdk to submit sql queries!?

ENV DRILL_VERSION 1.15.0

RUN mkdir -p /opt/drill && \
wget -q -O - http://www-us.apache.org/dist/drill/drill-1.10.0/apache-drill-1.10.0.tar.gz | tar -zxvf - -C /opt/drill
wget -q -O - http://www-us.apache.org/dist/drill/drill-${DRILL_VERSION}/apache-drill-${DRILL_VERSION}.tar.gz | tar -zxvf - -C /opt/drill

EXPOSE 8047

ENV DRILL_HOME /opt/drill/apache-drill-1.10.0
ENV DRILL_HOME /opt/drill/apache-drill-${DRILL_VERSION}
ENV DRILL_LOG_DIR ${DRILL_HOME}/log/
ENV DRILL_LOG_PREFIX ${DRILL_LOG_PATH}/drill

Expand All @@ -23,8 +25,8 @@ ENTRYPOINT ["java",\
"-Ddrill.exec.enable-epoll=false",\
"-XX:+CMSClassUnloadingEnabled",\
"-XX:+UseG1GC",\
"-Dlog.path=/opt/drill/apache-drill-1.10.0/log/drillbit.log",\
"-Dlog.query.path=/opt/drill/apache-drill-1.10.0/log/drillbit_queries.json",\
"-Dlog.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit.log",\
"-Dlog.query.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit_queries.json",\
"-cp",\
"/opt/drill/apache-drill-1.10.0/conf:/opt/drill/apache-drill-1.10.0/jars/*:/opt/drill/apache-drill-1.10.0/jars/ext/*:/opt/drill/apache-drill-1.10.0/jars/3rdparty/*:/opt/drill/apache-drill-1.10.0/jars/classb/*",\
"/opt/drill/apache-drill-${DRILL_VERSION}/conf:/opt/drill/apache-drill-${DRILL_VERSION}/jars/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/ext/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/3rdparty/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/classb/*",\
"org.apache.drill.exec.server.Drillbit"]
50 changes: 23 additions & 27 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package integration

import (
"net"
"testing"
"time"

Expand Down Expand Up @@ -557,26 +556,6 @@ func TestCreateInvalidPath(t *testing.T) {
})
}

func TestRUOK(t *testing.T) {
zkclus := newZKCluster(t)
defer zkclus.Close(t)

conn, err := net.Dial("tcp", zkclus.Addr())
if err != nil {
t.Fatal(err)
}
if _, err := conn.Write([]byte("ruok")); err != nil {
t.Fatal(err)
}
buf := make([]byte, 4)
if _, err := conn.Read(buf); err != nil {
t.Fatal(err)
}
if string(buf) != "imok" {
t.Fatalf(`expected "imok", got %q`, string(buf))
}
}

func TestMultiOp(t *testing.T) { runTest(t, testMultiOp) }

func testMultiOp(t *testing.T, c *zk.Conn) {
Expand Down Expand Up @@ -626,8 +605,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
// test create+delete on same key == no key
ops = []interface{}{
Expand All @@ -653,8 +632,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
&zk.CheckVersionRequest{Path: "/foo", Version: 2},
}
_, err = c.Multi(ops...)
if err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if err == nil || err.Error() != zetcd.ErrBadVersion.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrBadVersion, err)
}
if _, s1, err = c.Get("/test1"); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got (%v,%v)", zetcd.ErrNoNode, s1, err)
Expand All @@ -681,8 +660,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/missing-key", Version: 0},
}
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNoNode, err)
}
// test empty operation list
if resp, err = c.Multi(); err != nil || len(resp) != 0 {
Expand All @@ -705,6 +684,23 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
if s1.Mzxid != s2.Mzxid {
t.Fatalf("expected zxids in %+v to match %+v", *s1, *s2)
}
// test partial success
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/test2", Version: 0},
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if resp, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
if len(resp) != 2 {
t.Fatalf("expected %d results, got %d", 2, len(resp))
}
if resp[0].Error != nil {
t.Fatalf("expected checkop error to be nil, got %v", resp[0].Error)
}
if resp[1].Error == nil || resp[1].Error.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected createop error to be %v, got %v", zetcd.ErrNodeExists.Error(), resp[1].Error)
}
}

func runTest(t *testing.T, f func(*testing.T, *zk.Conn)) {
Expand Down
28 changes: 28 additions & 0 deletions integration/integration_zetcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !zkdocker,!xchk

package integration

import (
"net"
"testing"
)

func TestRUOK(t *testing.T) {
zkclus := newZKCluster(t)
defer zkclus.Close(t)

conn, err := net.Dial("tcp", zkclus.Addr())
if err != nil {
t.Fatal(err)
}
if _, err := conn.Write([]byte("ruok")); err != nil {
t.Fatal(err)
}
buf := make([]byte, 4)
if _, err := conn.Read(buf); err != nil {
t.Fatal(err)
}
if string(buf) != "imok" {
t.Fatalf(`expected "imok", got %q`, string(buf))
}
}
3 changes: 2 additions & 1 deletion integration/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ RUN apt-get install -y wget supervisor dnsutils
RUN rm -rf /var/lib/apt/lists/*; apt-get clean

ENV SCALA_VERSION 2.11
ENV KAFKA_VERSION 0.11.0.0
ENV KAFKA_VERSION 0.11.0.3
RUN wget -q http://www-us.apache.org/dist/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz
RUN tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && mv /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION" /kafka
# 9092 is kafka port
EXPOSE 9092

COPY kafka/ /kafka/config/
ADD kafka/run.sh /kafka/run.sh
RUN chmod uga+x /kafka/run.sh
ENTRYPOINT [ "/bin/bash", "/kafka/run.sh" ]
3 changes: 1 addition & 2 deletions integration/kafka/kafka.chroot.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# NOTE: assumes port 30001 for cross-checking configuration
zookeeper.connect=172.17.0.1:30001/kafka-chroot
zookeeper.connect=172.17.0.1:2181/kafka-chroot
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
3 changes: 1 addition & 2 deletions integration/kafka/kafka.server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# NOTE: assumes port 30001 for cross-checking configuration
zookeeper.connect=172.17.0.1:30001
zookeeper.connect=172.17.0.1:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func serveRequest(s Session, zke ZK, zkreq ZKRequest) error {
}
zkresp := DispatchZK(zke, zkreq.xid, zkreq.req)
if zkresp.Err != nil {
glog.V(9).Infof("dispatch error", zkresp.Err)
glog.V(9).Infof("dispatch error %v", zkresp.Err)
return zkresp.Err
}
if zkresp.Hdr.Err == 0 {
Expand Down
8 changes: 7 additions & 1 deletion xchk/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,13 @@ func (xchk *zkXchk) GetChildren2(xid zetcd.Xid, op *zetcd.GetChildren2Request) z
return or
}

func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { panic("wut") }
func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse {
cf := func() zetcd.ZKResponse { return xchk.cZK.Multi(xid, op) }
of := func() zetcd.ZKResponse { return xchk.oZK.Multi(xid, op) }
cr, or, err := xchk.xchkResp(cf, of)
defer func() { xchk.reportErr(cr, or, err) }()
return or
}

func (xchk *zkXchk) Close(xid zetcd.Xid, op *zetcd.CloseRequest) zetcd.ZKResponse {
cf := func() zetcd.ZKResponse { return xchk.cZK.Close(xid, op) }
Expand Down
57 changes: 27 additions & 30 deletions zketcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,31 @@ func (z *zkEtcd) Sync(xid Xid, op *SyncRequest) ZKResponse {

func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
bs := make([]opBundle, len(mreq.Ops))
mresp := &MultiResponse{
Ops: make([]MultiResponseOp, len(mreq.Ops)),
DoneHeader: MultiHeader{Type: opMulti},
}
if len(mreq.Ops) == 0 {
zxid, zerr := z.incrementAndGetZxid()
if zerr != nil {
return mkErr(zerr)
}
return mkZKResp(xid, zxid, mresp)
}
for i, op := range mreq.Ops {
switch req := op.Op.(type) {
case *CreateRequest:
bs[i] = z.mkCreateTxnOp(req)
mresp.Ops[i].Header.Type = opCreate
case *DeleteRequest:
bs[i] = z.mkDeleteTxnOp(req)
mresp.Ops[i].Header.Type = opDelete
case *SetDataRequest:
bs[i] = z.mkSetDataTxnOp(req)
mresp.Ops[i].Header.Type = opSetData
case *CheckVersionRequest:
bs[i] = z.mkCheckVersionPathTxnOp(req)
mresp.Ops[i].Header.Type = opCheck
default:
panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op))
}
Expand All @@ -491,61 +506,43 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
}

apply := func(s v3sync.STM) error {
for _, b := range bs {
for i, b := range bs {
if err := b.apply(s); err != nil {
var ok bool
mresp.Ops[i].Header.Type = opError
if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok {
mresp.Ops[i].Header.Err = errAPIError
}
return err
}
}
return nil
}

reply := func(xid Xid, zxid ZXid) ZKResponse {
ops := make([]MultiResponseOp, len(bs))
for i, b := range bs {
resp := b.reply(xid, zxid)
ops[i].Header = MultiHeader{Err: 0}
switch r := resp.Resp.(type) {
case *CreateResponse:
ops[i].Header.Type = opCreate
ops[i].String = r.Path
mresp.Ops[i].String = r.Path
case *SetDataResponse:
ops[i].Header.Type = opSetData
ops[i].Stat = &r.Stat
case *DeleteResponse:
ops[i].Header.Type = opDelete
case *struct{}:
ops[i].Header.Type = opCheck
default:
panic(fmt.Sprintf("unknown multi %+v %T", resp, resp))
mresp.Ops[i].Stat = &r.Stat
}
}
mresp := &MultiResponse{
Ops: ops,
DoneHeader: MultiHeader{Type: opMulti},
}
return mkZKResp(xid, zxid, mresp)
}

resp, err := z.doSTM(apply, prefetch...)
resp, _ := z.doSTM(apply, prefetch...)
if resp == nil {
// txn aborted, possibly due to any API error
if _, ok := errorToErrCode[err]; !ok {
// aborted due to non-API error
return mkErr(err)
}
zxid, zerr := z.incrementAndGetZxid()
if zerr != nil {
return mkErr(zerr)
}
// zkdocker seems to always return API error...
zkresp := apiErrToZKErr(xid, zxid, err)
zkresp.Hdr.Err = errAPIError
return zkresp
return reply(xid, zxid)
}

mresp := reply(xid, ZXid(resp.Header.Revision))
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v", *mreq, resp.Header.Revision, *resp)
return mresp
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp)
return reply(xid, ZXid(resp.Header.Revision))
}

func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle {
Expand Down

0 comments on commit 9c485d8

Please sign in to comment.