diff --git a/broker/client.go b/broker/client.go index 447de875..30d012e8 100644 --- a/broker/client.go +++ b/broker/client.go @@ -3,6 +3,7 @@ package broker import ( + "context" "errors" "net" "reflect" @@ -31,17 +32,18 @@ const ( ) type client struct { - typ int - mu sync.Mutex - broker *Broker - conn net.Conn - info info - route route - status int - closed chan int - smu sync.RWMutex - subs map[string]*subscription - rsubs map[string]*subInfo + typ int + mu sync.Mutex + broker *Broker + conn net.Conn + info info + route route + status int + smu sync.RWMutex + subs map[string]*subscription + rsubs map[string]*subInfo + ctx context.Context + cancelFunc context.CancelFunc } type subInfo struct { @@ -79,12 +81,11 @@ func (c *client) init() { c.smu.Lock() defer c.smu.Unlock() c.status = Connected - c.closed = make(chan int, 1) - c.rsubs = make(map[string]*subInfo) c.subs = make(map[string]*subscription, 10) c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] + c.ctx, c.cancelFunc = context.WithCancel(context.Background()) } func (c *client) keepAlive(ch chan int) { @@ -111,10 +112,8 @@ func (c *client) keepAlive(ch chan int) { timer.Stop() return - case _, ok := <-c.closed: - if !ok { - return - } + case <-c.ctx.Done(): + return } } } @@ -130,23 +129,28 @@ func (c *client) readLoop() { go c.keepAlive(ch) for { - packet, err := packets.ReadPacket(nc) - if err != nil { - log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) - break - } - // keepalive channel - ch <- 1 + select { + case <-c.ctx.Done(): + return + default: + packet, err := packets.ReadPacket(nc) + if err != nil { + log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + msg := &Message{client: c, packet: DisconnectdPacket} + b.SubmitWork(msg) + return + } + // keepalive channel + ch <- 1 - msg := &Message{ - client: c, - packet: packet, + msg := &Message{ + client: c, + packet: packet, + } + b.SubmitWork(msg) } - b.SubmitWork(msg) } - msg := &Message{client: c, packet: DisconnectdPacket} - b.SubmitWork(msg) } func ProcessMessage(msg *Message) { @@ -500,6 +504,8 @@ func (c *client) Close() { return } + c.cancelFunc() + c.status = Disconnected //wait for message complete time.Sleep(1 * time.Second) @@ -512,8 +518,6 @@ func (c *client) Close() { c.smu.Unlock() - close(c.closed) - b := c.broker subs := c.subs if b != nil { diff --git a/broker/info.go b/broker/info.go index 1e6e715a..c114eef3 100644 --- a/broker/info.go +++ b/broker/info.go @@ -4,10 +4,11 @@ package broker import ( "fmt" + "time" + simplejson "github.com/bitly/go-simplejson" "github.com/eclipse/paho.mqtt.golang/packets" "go.uber.org/zap" - "time" ) func (c *client) SendInfo() { @@ -35,10 +36,8 @@ func (c *client) StartPing() { log.Error("ping error: ", zap.Error(err)) c.Close() } - case _, ok := <-c.closed: - if !ok { - return - } + case <-c.ctx.Done(): + return } } }