From 7db7edaa175da42c748c25961a742c17a7643a10 Mon Sep 17 00:00:00 2001 From: zhouyuyan Date: Wed, 17 Jan 2018 09:39:07 +0800 Subject: [PATCH] cluster fix --- broker/broker.go | 10 ++-------- broker/client.go | 2 +- broker/info.go | 11 ++++++----- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index aadd98f8..2d773828 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -288,17 +288,11 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { conn: conn, info: info, } + c.init() cid := c.info.clientID - if typ == ROUTER { - c.route = route{ - remoteID: "", - remoteUrl: conn.RemoteAddr().String(), - } - } - var msgPool *MessagePool var exist bool var old interface{} @@ -358,7 +352,7 @@ func (b *Broker) connectRouter(url, remoteID string) { } break } - route := &route{ + route := route{ remoteID: remoteID, remoteUrl: conn.RemoteAddr().String(), } diff --git a/broker/client.go b/broker/client.go index 7666b8a7..2f3f38d5 100644 --- a/broker/client.go +++ b/broker/client.go @@ -32,7 +32,7 @@ type client struct { broker *Broker conn net.Conn info info - route *route + route route status int closed chan int smu sync.RWMutex diff --git a/broker/info.go b/broker/info.go index c157f70a..b3ac6045 100644 --- a/broker/info.go +++ b/broker/info.go @@ -110,13 +110,14 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { } b.mu.Unlock() - // log.Info("isforword: ", isForward) if !isForward { - route := &route{ - remoteUrl: rurl, - remoteID: rid, + if c.typ == ROUTER { + route := route{ + remoteUrl: rurl, + remoteID: rid, + } + c.route = route } - c.route = route go b.SendLocalSubsToRouter(c) // log.Info("BroadcastInfoMessage starting... ")