Skip to content

Commit

Permalink
add cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuyan committed Jan 16, 2018
1 parent 123bb72 commit 1d6f6a4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 69 deletions.
86 changes: 49 additions & 37 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type Broker struct {
id string
cid uint64
mu sync.Mutex
config *Config
tlsConfig *tls.Config
AclConfig *acl.ACLConfig
Expand Down Expand Up @@ -235,12 +236,6 @@ func (b *Broker) StartClusterListening() {
tmpDelay = ACCEPT_MIN_SLEEP

go b.handleConnection(ROUTER, conn, idx)
if idx == 1 {
idx = 0
} else {
idx = idx + 1
}

}
}

Expand Down Expand Up @@ -297,6 +292,13 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {

cid := c.info.clientID

if typ == ROUTER {
c.route = route{
remoteID: "",
remoteUrl: conn.RemoteAddr().String(),
}
}

var msgPool *MessagePool
var exist bool
var old interface{}
Expand Down Expand Up @@ -330,13 +332,16 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c)
}

c.readLoop()
go c.readLoop()
if typ == ROUTER {
c.SendInfo()
c.StartPing()
}
}

func (b *Broker) ConnectToRouters() {
for i := 0; i < len(b.config.Cluster.Routes); i++ {
url := b.config.Cluster.Routes[i]
go b.connectRouter(url, "")
for _, v := range b.config.Cluster.Routes {
go b.connectRouter(v, "")
}
}

Expand All @@ -347,32 +352,41 @@ func (b *Broker) connectRouter(url, remoteID string) {
conn, err = net.Dial("tcp", url)
if err != nil {
log.Error("Error trying to connect to route: ", err)
select {
case <-time.After(DEFAULT_ROUTE_CONNECT):
log.Debug("Connect to route timeout ,retry...")
continue
}
log.Debug("Connect to route timeout ,retry...")
time.Sleep(5 * time.Second)
continue
}
break
}
route := &route{
remoteID: remoteID,
remoteUrl: url,
remoteUrl: conn.RemoteAddr().String(),
}
cid := GenUniqueId()

info := info{
clientID: cid,
clientID: cid,
keepalive: 60,
}

c := &client{
typ: REMOTE,
conn: conn,
route: route,
info: info,
broker: b,
typ: REMOTE,
conn: conn,
route: route,
info: info,
}
c.init()
b.remotes.Store(cid, c)

c.mp = MSGPool[(MessagePoolNum + 1)].GetPool()

c.SendConnect()
c.SendInfo()
c.StartPing()

go c.readLoop()
go c.StartPing()

}

func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
Expand All @@ -381,9 +395,7 @@ func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
v, ok := value.(*client)
if ok {
if v.route.remoteUrl == url {
// if v.route.remoteID == "" || v.route.remoteID != remoteID {
v.route.remoteID = remoteID
// }
exist = true
return false
}
Expand All @@ -406,14 +418,16 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
}
return true
})
err := c.WriterPacket(subInfo)
if err != nil {
log.Error("Send localsubs To Router error :", err)
if len(subInfo.Topics) > 0 {
err := c.WriterPacket(subInfo)
if err != nil {
log.Error("Send localsubs To Router error :", err)
}
}
}

func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket) {
b.remotes.Range(func(key, value interface{}) bool {
b.routes.Range(func(key, value interface{}) bool {
r, ok := value.(*client)
if ok {
if r.route.remoteID == remoteID {
Expand All @@ -428,7 +442,8 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke
}

func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
b.remotes.Range(func(key, value interface{}) bool {

b.routes.Range(func(key, value interface{}) bool {
r, ok := value.(*client)
if ok {
r.WriterPacket(packet)
Expand All @@ -455,7 +470,6 @@ func (b *Broker) removeClient(c *client) {
func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName
r := b.sl.Match(topic)
// log.Info("psubs num: ", len(r.psubs))
if len(r.psubs) == 0 {
return
}
Expand All @@ -472,14 +486,12 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {

func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) {

ubsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
for topic, _ := range subs {
// topic := sub.topic
// if sub.queue {
// topic = "$queue/" + sub.topic
// }
ubsub.Topics = append(ubsub.Topics, topic)
unsub.Topics = append(unsub.Topics, topic)
}
b.BroadcastSubOrUnsubMessage(ubsub)

if len(unsub.Topics) > 0 {
b.BroadcastSubOrUnsubMessage(unsub)
}
}
69 changes: 42 additions & 27 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type client struct {
info info
route *route
status int
closed chan int
smu sync.RWMutex
mp *MessagePool
subs map[string]*subscription
Expand Down Expand Up @@ -75,12 +76,10 @@ func (c *client) init() {
c.smu.Lock()
defer c.smu.Unlock()
c.status = Connected
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string]*subInfo)
} else if typ == CLIENT {
c.subs = make(map[string]*subscription, 10)
}
c.closed = make(chan int, 1)

c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
Expand All @@ -101,6 +100,10 @@ func (c *client) keepAlive(ch chan int) {
msgPool.queue <- msg
timer.Stop()
return
case _, ok := <-c.closed:
if !ok {
return
}
}
}
}
Expand All @@ -121,8 +124,9 @@ func (c *client) readLoop() {
log.Error("read packet error: ", err, " clientID = ", c.info.clientID)
break
}
// log.Info("recv buf: ", packet)

ch <- 1

msg := &Message{
client: c,
packet: packet,
Expand Down Expand Up @@ -176,6 +180,11 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}

topic := packet.TopicName
if topic == BrokerInfoTopic && c.typ != CLIENT {
c.ProcessInfo(packet)
return
}

if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID)
return
Expand Down Expand Up @@ -228,8 +237,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}

for _, sub := range r.psubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
if sub.client.typ == REMOTE {
if typ == REMOTE {
continue
}
}
Expand All @@ -248,8 +257,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if exist {
// log.Info("queue index : ", cnt)
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
if sub.client.typ == REMOTE {
if c.typ == REMOTE {
continue
}
}
Expand Down Expand Up @@ -350,7 +359,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
retcodes = append(retcodes, qoss[i])
continue
}
case ROUTER:
case REMOTE:
if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo
Expand Down Expand Up @@ -405,29 +414,25 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
topics := packet.Topics

for _, t := range topics {
var sub *subscription
ok := false

switch typ {
case CLIENT:
sub, ok = c.subs[t]
case ROUTER:
sub, ok := c.subs[t]
if ok {
c.unsubscribe(sub)
}
case REMOTE:
subinfo, ok := c.rsubs[t]
if ok {
subinfo.num = subinfo.num - 1
if subinfo.num < 1 {
sub = subinfo.sub
delete(c.rsubs, t)
c.unsubscribe(subinfo.sub)
} else {
c.rsubs[t] = subinfo
sub = nil
}
} else {
return
}
}
if ok {
go c.unsubscribe(sub)
}

}

Expand Down Expand Up @@ -471,21 +476,24 @@ func (c *client) ProcessPing() {
}

func (c *client) Close() {
c.smu.Lock()
if c.status == Disconnected {
c.smu.Unlock()
return
}
//wait for message complete
time.Sleep(1 * time.Second)

c.smu.Lock()
c.status = Disconnected
c.smu.Unlock()

if c.conn != nil {
c.conn.Close()
c.conn = nil
}

c.smu.Unlock()

close(c.closed)

b := c.broker
subs := c.subs
if b != nil {
Expand All @@ -505,12 +513,19 @@ func (c *client) Close() {

//do reconnect
if c.typ == REMOTE {
b.connectRouter(c.route.remoteUrl, "")
localUrl := c.info.localIP + ":" + c.broker.config.Cluster.Port
if c.route.remoteUrl != localUrl {
b.connectRouter(c.route.remoteUrl, "")
}
}
}
}

func (c *client) WriterPacket(packet packets.ControlPacket) error {
if packet == nil {
return nil
}

c.mu.Lock()
err := packet.Write(c.conn)
c.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion broker/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewDispatcher() *Dispatcher {
}

func (d *Dispatcher) dispatch() {
for i := 0; i < MessagePoolNum; i++ {
for i := 0; i < (MessagePoolNum + 2); i++ {
go func(idx int) {
for {
select {
Expand Down
Loading

0 comments on commit 1d6f6a4

Please sign in to comment.