Skip to content

Commit

Permalink
context (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
chowyu08 authored May 10, 2018
1 parent 684584b commit 84e7fe2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 37 deletions.
68 changes: 36 additions & 32 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package broker

import (
"context"
"errors"
"net"
"reflect"
Expand Down Expand Up @@ -31,17 +32,18 @@ const (
)

type client struct {
typ int
mu sync.Mutex
broker *Broker
conn net.Conn
info info
route route
status int
closed chan int
smu sync.RWMutex
subs map[string]*subscription
rsubs map[string]*subInfo
typ int
mu sync.Mutex
broker *Broker
conn net.Conn
info info
route route
status int
smu sync.RWMutex
subs map[string]*subscription
rsubs map[string]*subInfo
ctx context.Context
cancelFunc context.CancelFunc
}

type subInfo struct {
Expand Down Expand Up @@ -79,12 +81,11 @@ func (c *client) init() {
c.smu.Lock()
defer c.smu.Unlock()
c.status = Connected
c.closed = make(chan int, 1)

c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
}

func (c *client) keepAlive(ch chan int) {
Expand All @@ -111,10 +112,8 @@ func (c *client) keepAlive(ch chan int) {

timer.Stop()
return
case _, ok := <-c.closed:
if !ok {
return
}
case <-c.ctx.Done():
return
}
}
}
Expand All @@ -130,23 +129,28 @@ func (c *client) readLoop() {
go c.keepAlive(ch)

for {
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
break
}
// keepalive channel
ch <- 1
select {
case <-c.ctx.Done():
return
default:
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
return
}
// keepalive channel
ch <- 1

msg := &Message{
client: c,
packet: packet,
msg := &Message{
client: c,
packet: packet,
}
b.SubmitWork(msg)
}
b.SubmitWork(msg)
}

msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
}

func ProcessMessage(msg *Message) {
Expand Down Expand Up @@ -500,6 +504,8 @@ func (c *client) Close() {
return
}

c.cancelFunc()

c.status = Disconnected
//wait for message complete
time.Sleep(1 * time.Second)
Expand All @@ -512,8 +518,6 @@ func (c *client) Close() {

c.smu.Unlock()

close(c.closed)

b := c.broker
subs := c.subs
if b != nil {
Expand Down
9 changes: 4 additions & 5 deletions broker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package broker

import (
"fmt"
"time"

simplejson "github.com/bitly/go-simplejson"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
"time"
)

func (c *client) SendInfo() {
Expand Down Expand Up @@ -35,10 +36,8 @@ func (c *client) StartPing() {
log.Error("ping error: ", zap.Error(err))
c.Close()
}
case _, ok := <-c.closed:
if !ok {
return
}
case <-c.ctx.Done():
return
}
}
}
Expand Down

0 comments on commit 84e7fe2

Please sign in to comment.