Skip to content

Commit

Permalink
Merge pull request #29 from lukaheric/server-default-broadcast
Browse files Browse the repository at this point in the history
broadcast messages via event bus
  • Loading branch information
davidmohar authored Aug 19, 2021
2 parents 3eee0c4 + 36e2683 commit 5c6f5f2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
1 change: 1 addition & 0 deletions server/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func Unsubscribe(topic string, handler interface{}) error {
func SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.SubscribeAsync(topic, fn, transactional)
}

func WaitAsync() {
bus.WaitAsync()
}
5 changes: 3 additions & 2 deletions server/internal/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.uber.org/zap"

"github.com/ptr-geeks/ptrun/server/internal/events"
"github.com/ptr-geeks/ptrun/server/internal/messages"
)

Expand Down Expand Up @@ -94,13 +95,13 @@ func (c *clientImpl) ReadPump() {
c.logger.Debugw("exiting client read pump", "id", c.id, "remoteAddr", c.addr)
break
}

//tu je ta message bus
// Everything seems fine, just unmarshal & forward
c.logger.Debugw("recieved message", "id", c.id, "remoteAdr", c.addr)
message := &messages.Message{}
proto.Unmarshal(msg, message)

c.server.Broadcast(c.id, message)
events.Publish("server.broadcast", c.id, message)
}
}

Expand Down
11 changes: 10 additions & 1 deletion server/internal/ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.uber.org/zap"

"github.com/ptr-geeks/ptrun/server/internal/consts"
"github.com/ptr-geeks/ptrun/server/internal/events"
"github.com/ptr-geeks/ptrun/server/internal/messages"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ func NewServer(logger *zap.Logger) Server {
//Run is used for connecting and disconnetcing clients from the server
func (s *serverImpl) Run() {
s.logger.Debug("server started and listening for events")

s.handleBroadcast()
for {
select {
case connect := <-s.connect:
Expand All @@ -79,6 +80,7 @@ func (s *serverImpl) Run() {
}

case broadcast := <-s.broadcast:
s.logger.Debugw("Broadcasting", "id", broadcast.excludeClient)
for clientID, client := range s.clients {
if broadcast.excludeClient == clientID {
continue
Expand Down Expand Up @@ -116,3 +118,10 @@ func (s *serverImpl) Disconnect(client Client) {
func (s *serverImpl) Broadcast(excludeClient int32, msg *messages.Message) {
s.broadcast <- broadcastMessage{msg: msg, excludeClient: excludeClient}
}

func (s *serverImpl) handleBroadcast() {
err := events.Subscribe("server.broadcast", s.Broadcast)
if err != nil {
s.logger.Warnw("cannot read from the client")
}
}

0 comments on commit 5c6f5f2

Please sign in to comment.