diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 00000000..d8e7f86a --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,28 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.19 + + - name: Build + run: go build -v ./... + + - name: Test + run: go test -v ./... diff --git a/.gitignore b/.gitignore index e21a8cb8..e44afb4f 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ log/* .vscode/settings.json .pre-commit-config.yaml hmq.exe +*.sw* +*.swo +*.swp +*.swn diff --git a/Dockerfile b/Dockerfile index 758c0bad..6f791847 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ COPY . . RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' . -FROM alpine +FROM alpine:3.17.3 WORKDIR / COPY --from=builder /go/src/github.com/fhmq/hmq/hmq . EXPOSE 1883 diff --git a/README.md b/README.md index c8f5a8c7..79120031 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,3 @@ -![build](https://img.shields.io/github/workflow/status/fhmq/hmq/Ubuntu%20build?label=Ubuntu&style=for-the-badge) -![build](https://img.shields.io/github/workflow/status/fhmq/hmq/MacOS%20build?label=MacOS&style=for-the-badge) -![build](https://img.shields.io/github/workflow/status/fhmq/hmq/Windows%20build?label=Windows&style=for-the-badge) Free and High Performance MQTT Broker ============ diff --git a/broker/broker.go b/broker/broker.go index 53246af0..cf772c93 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,11 +2,13 @@ package broker import ( "crypto/tls" + "errors" "fmt" "net" "net/http" "sync" "time" + encJson "encoding/json" "github.com/fhmq/hmq/broker/lib/sessions" "github.com/fhmq/hmq/broker/lib/topics" @@ -203,7 +205,10 @@ func (b *Broker) StartWebsocketListening() { func (b *Broker) wsHandler(ws *websocket.Conn) { // io.Copy(ws, ws) ws.PayloadType = websocket.BinaryFrame - b.handleConnection(CLIENT, ws) + err:=b.handleConnection(CLIENT, ws) + if err!=nil{ + ws.Close() + } } func (b *Broker) StartClientListening(Tls bool) { @@ -254,7 +259,12 @@ func (b *Broker) StartClientListening(Tls bool) { } tmpDelay = ACCEPT_MIN_SLEEP - go b.handleConnection(CLIENT, conn) + go func(){ + err :=b.handleConnection(CLIENT, conn) + if err!=nil{ + conn.Close() + } + }() } } @@ -291,7 +301,12 @@ func (b *Broker) StartClusterListening() { } tmpDelay = ACCEPT_MIN_SLEEP - go b.handleConnection(ROUTER, conn) + go func(){ + err :=b.handleConnection(ROUTER, conn) + if err!=nil{ + conn.Close() + } + }() } } @@ -307,22 +322,18 @@ func (b *Broker) DisConnClientByClientId(clientId string) { conn.Close() } -func (b *Broker) handleConnection(typ int, conn net.Conn) { +func (b *Broker) handleConnection(typ int, conn net.Conn) error{ //process connect packet packet, err := packets.ReadPacket(conn) if err != nil { - log.Error("read connect packet error", zap.Error(err)) - conn.Close() - return + return errors.New(fmt.Sprintln("read connect packet error:%v",err)) } if packet == nil { - log.Error("received nil packet") - return + return errors.New("received nil packet") } msg, ok := packet.(*packets.ConnectPacket) if !ok { - log.Error("received msg that was not Connect") - return + return errors.New("received msg that was not Connect") } log.Info("read connect from ", getAdditionalLogFields(msg.ClientIdentifier, conn)...) @@ -332,29 +343,22 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { connack.ReturnCode = msg.Validate() if connack.ReturnCode != packets.Accepted { - func() { - defer conn.Close() - if err := connack.Write(conn); err != nil { - log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...) - } - }() - return + if err := connack.Write(conn); err != nil { + return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + } + return errors.New(fmt.Sprintln("connect packet validate failed with connack.ReturnCode%v",connack.ReturnCode)) } if typ == CLIENT && !b.CheckConnectAuth(msg.ClientIdentifier, msg.Username, string(msg.Password)) { connack.ReturnCode = packets.ErrRefusedNotAuthorised - func() { - defer conn.Close() - if err := connack.Write(conn); err != nil { - log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...) - } - }() - return + if err := connack.Write(conn); err != nil { + return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + } + return errors.New(fmt.Sprintln("connect packet CheckConnectAuth failed with connack.ReturnCode%v",connack.ReturnCode)) } if err := connack.Write(conn); err != nil { - log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...) - return + return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) } willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) @@ -385,8 +389,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { c.init() if err := b.getSession(c, msg, connack); err != nil { - log.Error("get session error", getAdditionalLogFields(c.info.clientID, conn, zap.Error(err))...) - return + return errors.New(fmt.Sprintln("get session error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) } cid := c.info.clientID @@ -405,7 +408,21 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { } b.clients.Store(cid, c) - b.OnlineOfflineNotification(cid, true) + var pubPack = PubPacket{} + if willmsg != nil { + pubPack.TopicName = info.willMsg.TopicName + pubPack.Payload = info.willMsg.Payload + } + + pubInfo := Info{ + ClientID: info.clientID, + Username: info.username, + Password: info.password, + Keepalive: info.keepalive, + WillMsg: pubPack, + } + + b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime) { b.Publish(&bridge.Elements{ ClientID: msg.ClientIdentifier, @@ -426,6 +443,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { } c.readLoop() + return nil } func (b *Broker) ConnectToDiscovery() { @@ -695,11 +713,33 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) { b.BroadcastSubOrUnsubMessage(unsub) } -func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { +type OnlineOfflineMsg struct { + ClientID string `json:"clientID"` + Online bool `json:"online"` + Timestamp string `json:"timestamp"` + ClientInfo Info `json:"info"` + LastMsgTime int64 `json:"lastMsg"` +} + +func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) { packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) - packet.TopicName = "$SYS/broker/connection/clients/" + clientID + packet.TopicName = "$SYS/broker/connection/clients/" + info.ClientID packet.Qos = 0 - packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339))) + + msg := OnlineOfflineMsg{ + ClientID: info.ClientID, + Online: online, + Timestamp: time.Now().UTC().Format(time.RFC3339), + ClientInfo: info, + LastMsgTime: lastMsg, + } + + if b, err := encJson.Marshal(msg); err != nil { + //This is a TERRIBLE situation, falling back to legacy format to not break API Contract + packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, info.ClientID, online, time.Now().UTC().Format(time.RFC3339))) + } else { + packet.Payload = b + } b.PublishMessage(packet) } diff --git a/broker/client.go b/broker/client.go index 83edce01..223efc92 100644 --- a/broker/client.go +++ b/broker/client.go @@ -79,6 +79,7 @@ type client struct { mqueue *queue.Queue retryTimer *time.Timer retryTimerLock sync.Mutex + lastMsgTime int64 } type InflightStatus uint8 @@ -111,6 +112,19 @@ type info struct { remoteIP string } +type PubPacket struct { + TopicName string `json:"topicName"` + Payload []byte `json:"payload"` +} + +type Info struct { + ClientID string `json:"clientId"` + Username string `json:"username"` + Password []byte `json:"password"` + Keepalive uint16 `json:"keepalive"` + WillMsg PubPacket `json:"willMsg"` +} + type route struct { remoteID string remoteUrl string @@ -122,6 +136,7 @@ var ( ) func (c *client) init() { + c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged c.status = Connected c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String()) remoteAddr := c.conn.RemoteAddr() @@ -185,6 +200,8 @@ func (c *client) readLoop() { if _, isDisconnect := packet.(*packets.DisconnectPacket); isDisconnect { c.info.willMsg = nil c.cancelFunc() + } else { + c.lastMsgTime = time.Now().Unix() } msg := &Message{ @@ -842,8 +859,22 @@ func (c *client) Close() { if c.typ == CLIENT { b.BroadcastUnSubscribe(unSubTopics) + + var pubPack = PubPacket{} + if c.info.willMsg != nil { + pubPack.TopicName = c.info.willMsg.TopicName + pubPack.Payload = c.info.willMsg.Payload + } + + pubInfo := Info{ + ClientID: c.info.clientID, + Username: c.info.username, + Password: c.info.password, + Keepalive: c.info.keepalive, + WillMsg: pubPack, + } //offline notification - b.OnlineOfflineNotification(c.info.clientID, false) + b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime) } if c.info.willMsg != nil { @@ -864,7 +895,7 @@ func (c *client) Close() { func (c *client) WriterPacket(packet packets.ControlPacket) error { defer func() { if err := recover(); err != nil { - log.Error("recover error, ", zap.Any("recover", r)) + log.Error("recover error, ", zap.Any("recover", err)) } }() if c.status == Disconnected { diff --git a/broker/http.go b/broker/http.go index 4dda0551..c99f4ff5 100644 --- a/broker/http.go +++ b/broker/http.go @@ -4,10 +4,24 @@ import ( "github.com/gin-gonic/gin" ) +const ( + CONNECTIONS = "api/v1/connections" +) + +type ConnClient struct { + Info `json:"info"` + LastMsgTime int64 `json:"lastMsg"` +} + +type resp struct { + Code int `json:"code,omitempty"` + Clients []ConnClient `json:"clients,omitempty"` +} + func InitHTTPMoniter(b *Broker) { gin.SetMode(gin.ReleaseMode) router := gin.Default() - router.DELETE("api/v1/connections/:clientid", func(c *gin.Context) { + router.DELETE(CONNECTIONS + "/:clientid", func(c *gin.Context) { clientid := c.Param("clientid") cli, ok := b.clients.Load(clientid) if ok { @@ -16,10 +30,35 @@ func InitHTTPMoniter(b *Broker) { conn.Close() } } - resp := map[string]int{ - "code": 0, - } - c.JSON(200, &resp) + r := resp{Code: 0} + c.JSON(200, &r) + }) + router.GET(CONNECTIONS, func(c *gin.Context) { + conns := make([]ConnClient, 0) + b.clients.Range(func (k, v interface{}) bool { + cl, _ := v.(*client) + var pubPack = PubPacket{} + if cl.info.willMsg != nil { + pubPack.TopicName = cl.info.willMsg.TopicName + pubPack.Payload = cl.info.willMsg.Payload + } + + msg := ConnClient{ + Info: Info{ + ClientID: cl.info.clientID, + Username: cl.info.username, + Password: cl.info.password, + Keepalive: cl.info.keepalive, + WillMsg: pubPack, + }, + LastMsgTime: cl.lastMsgTime, + } + + conns = append(conns, msg) + return true + }) + r := resp{Clients: conns} + c.JSON(200, &r) }) router.Run(":" + b.config.HTTPPort) diff --git a/go.mod b/go.mod index fd6ac7f7..4c3ff0fe 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/go-playground/validator/v10 v10.14.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect @@ -39,7 +39,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.15.15 // indirect + github.com/klauspost/compress v1.15.14 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect @@ -53,8 +53,8 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.9.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index 8e357183..e09bbc66 100644 --- a/go.sum +++ b/go.sum @@ -57,9 +57,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -79,8 +78,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -127,12 +126,12 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= diff --git a/pool/fixpool.go b/pool/fixpool.go index 6896de2d..9fdf3982 100644 --- a/pool/fixpool.go +++ b/pool/fixpool.go @@ -43,16 +43,14 @@ func (p *WorkerPool) dispatch() { } func startWorker(taskChan chan func()) { - go func() { - var task func() - var ok bool - for { - task, ok = <-taskChan - if !ok { - break - } - // Execute the task. - task() + var task func() + var ok bool + for { + task, ok = <-taskChan + if !ok { + break } - }() + // Execute the task. + task() + } }