Skip to content

Commit

Permalink
Better TzKT events logging
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Aug 2, 2022
1 parent 84a0fd4 commit 50b8f68
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 38 deletions.
47 changes: 28 additions & 19 deletions tzkt/events/signalr/hub.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package signalr

import (
"context"
"net"
"net/url"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand All @@ -22,7 +24,7 @@ type Hub struct {

encoder Encoding
msgs chan interface{}
stop chan struct{}
log zerolog.Logger
mx sync.Mutex
wg sync.WaitGroup

Expand All @@ -48,22 +50,22 @@ func NewHub(address, connectionToken string) (*Hub, error) {
url: u,
encoder: NewJSONEncoding(),
msgs: make(chan interface{}, 1024),
stop: make(chan struct{}, 1),
log: log.Logger,
}, nil
}

// Connect -
func (hub *Hub) Connect() error {
func (hub *Hub) Connect(ctx context.Context) error {
if err := hub.handshake(); err != nil {
return err
}

hub.listen()
hub.listen(ctx)
return nil
}

func (hub *Hub) handshake() error {
log.Trace().Msgf("connecting to %s...", hub.url.String())
hub.log.Debug().Msgf("connecting to %s...", hub.url.String())

c, response, err := websocket.DefaultDialer.Dial(hub.url.String(), nil)
if err != nil {
Expand All @@ -85,14 +87,13 @@ func (hub *Hub) handshake() error {
if resp.Error != "" {
return errors.Wrap(ErrHandshake, resp.Error)
}
log.Trace().Msg("connected")
hub.log.Debug().Msg("connected")

return nil
}

// Close -
func (hub *Hub) Close() error {
hub.stop <- struct{}{}
hub.wg.Wait()

if err := hub.Send(newCloseMessage()); err != nil {
Expand All @@ -103,22 +104,21 @@ func (hub *Hub) Close() error {
return err
}

close(hub.stop)
close(hub.msgs)
return nil
}

func (hub *Hub) reconnect() error {
log.Warn().Msg("reconnecting...")
hub.log.Warn().Msg("reconnecting...")

if err := hub.Send(newCloseMessage()); err != nil {
log.Err(err).Msg("send")
hub.log.Err(err).Msg("send")
}

if err := hub.conn.Close(); err != nil {
log.Err(err).Msg("close")
hub.log.Err(err).Msg("close")
}
log.Trace().Msg("connection closed")
hub.log.Debug().Msg("connection closed")
if err := hub.handshake(); err != nil {
return err
}
Expand All @@ -128,28 +128,29 @@ func (hub *Hub) reconnect() error {
return nil
}

func (hub *Hub) listen() {
func (hub *Hub) listen(ctx context.Context) {
hub.wg.Add(1)

go func() {
defer hub.wg.Done()

for {
select {
case <-hub.stop:
case <-ctx.Done():
hub.log.Debug().Msg("stop hub listenning...")
return
default:
if err := hub.readAllMessages(); err != nil {
switch {
case errors.Is(err, ErrTimeout) || websocket.IsCloseError(err, websocket.CloseAbnormalClosure):
if err := hub.reconnect(); err != nil {
log.Err(err).Msg("reconnect")
log.Warn().Msg("retry after 5 seconds")
hub.log.Err(err).Msg("reconnect")
hub.log.Warn().Msg("retry after 5 seconds")
time.Sleep(time.Second * 5)
}
case errors.Is(err, ErrEmptyResponse):
default:
log.Err(err).Msg("readAllMessages")
hub.log.Err(err).Msg("readAllMessages")
}
}
}
Expand All @@ -163,6 +164,9 @@ func (hub *Hub) Send(msg interface{}) error {
if err != nil {
return err
}

hub.log.Trace().Str("data", string(data)).Msg("==> TzKT server")

hub.mx.Lock()
defer hub.mx.Unlock()
return hub.conn.WriteMessage(websocket.TextMessage, data)
Expand All @@ -186,6 +190,8 @@ func (hub *Hub) readOneMessage(msg interface{}) error {
if len(data) == 0 {
return ErrEmptyResponse
}
hub.log.Trace().Str("data", string(data)).Msg("<== TzKT server")

if err := json.Unmarshal(data, msg); err != nil {
return err
}
Expand All @@ -203,7 +209,7 @@ func (hub *Hub) readAllMessages() error {
return err
}
if scanner == nil {
log.Warn().Msg("no messages during read timeout")
hub.log.Warn().Msg("no messages during read timeout")
return ErrEmptyResponse
}
if err := scanner.Scan(); err != nil {
Expand All @@ -219,11 +225,14 @@ func (hub *Hub) readAllMessages() error {
break
}

hub.log.Trace().Str("data", string(data)).Msg("<== TzKT server")

msg, err := hub.encoder.Decode(data)
if err != nil {
return err
}
hub.msgs <- msg

if closeMsg, ok := msg.(CloseMessage); ok {
return hub.closeMessageHandler(closeMsg)
}
Expand All @@ -238,7 +247,7 @@ func (hub *Hub) readAllMessages() error {

func (hub *Hub) closeMessageHandler(msg CloseMessage) error {
if msg.Error != "" {
log.Error().Msg(msg.Error)
hub.log.Error().Msg(msg.Error)
}
if !msg.AllowReconnect {
return ErrConnectionClose
Expand Down
3 changes: 0 additions & 3 deletions tzkt/events/signalr/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand All @@ -28,8 +27,6 @@ func NewJSONEncoding() *JSONEncoding {

// Decode -
func (e *JSONEncoding) Decode(data []byte) (interface{}, error) {
log.Trace().Msg(string(data))

var typ Type
if err := json.Unmarshal(data, &typ); err != nil {
return nil, err
Expand Down
19 changes: 16 additions & 3 deletions tzkt/events/signalr/signalr.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package signalr

import (
"context"

"github.com/rs/zerolog"
)

// SignalR -
type SignalR struct {
hub *Hub
t *Transport

log zerolog.Logger
url string
}

Expand All @@ -16,8 +22,14 @@ func NewSignalR(url string) *SignalR {
}
}

// SetLogger -
func (s *SignalR) SetLogger(log zerolog.Logger) {
s.t.log = log
s.log = log
}

// Connect - connect to server
func (s *SignalR) Connect(version Version) error {
func (s *SignalR) Connect(ctx context.Context, version Version) error {
resp, err := s.t.Negotiate(version)
if err != nil {
return err
Expand All @@ -35,8 +47,9 @@ func (s *SignalR) Connect(version Version) error {
return err
}
s.hub = hub
s.hub.log = s.log

return s.hub.Connect()
return s.hub.Connect(ctx)
}

// Messages - listens message channel
Expand Down
6 changes: 5 additions & 1 deletion tzkt/events/signalr/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/pkg/errors"
Expand All @@ -23,6 +24,8 @@ const (
type Transport struct {
url string
client *http.Client

log zerolog.Logger
}

// NewTransport -
Expand All @@ -38,6 +41,7 @@ func NewTransport(baseURL string) *Transport {
Timeout: time.Minute,
Transport: t,
},
log: log.Logger,
}
}

Expand All @@ -52,7 +56,7 @@ func (t *Transport) Negotiate(version Version) (response NegotiateResponse, err
q.Set("negotiateVersion", string(version))
u.RawQuery = q.Encode()

log.Trace().Str("url", u.String()).Msg("send negotiate request...")
t.log.Debug().Str("url", u.String()).Msg("send negotiate request...")

req, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
Expand Down
32 changes: 20 additions & 12 deletions tzkt/events/tzkt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand All @@ -10,6 +11,7 @@ import (

tzktData "github.com/dipdup-net/go-lib/tzkt/data"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand All @@ -20,8 +22,9 @@ type TzKT struct {

subscriptions []signalr.Invocation

log zerolog.Logger

msgs chan Message
stop chan struct{}
wg sync.WaitGroup
}

Expand All @@ -33,31 +36,35 @@ func NewTzKT(url string) *TzKT {
return &TzKT{
s: signalr.NewSignalR(url),
msgs: make(chan Message, 1024),
stop: make(chan struct{}, 1),
subscriptions: make([]signalr.Invocation, 0),
log: log.Logger,
}
}

// SetLogger -
func (tzkt *TzKT) SetLogger(logger zerolog.Logger) {
tzkt.log = logger
tzkt.s.SetLogger(logger)
}

// Connect - connect to events SignalR server
func (tzkt *TzKT) Connect() error {
if err := tzkt.s.Connect(signalr.Version1); err != nil {
func (tzkt *TzKT) Connect(ctx context.Context) error {
if err := tzkt.s.Connect(ctx, signalr.Version1); err != nil {
return err
}
tzkt.s.SetOnReconnect(tzkt.onReconnect)
tzkt.listen()
tzkt.listen(ctx)
return nil
}

// Close - closing all connections
func (tzkt *TzKT) Close() error {
tzkt.stop <- struct{}{}
tzkt.wg.Wait()

if err := tzkt.s.Close(); err != nil {
return err
}
close(tzkt.msgs)
close(tzkt.stop)
return nil
}

Expand Down Expand Up @@ -144,27 +151,28 @@ func (tzkt *TzKT) subscribe(channel string, args ...interface{}) error {
return tzkt.s.Send(msg)
}

func (tzkt *TzKT) listen() {
func (tzkt *TzKT) listen(ctx context.Context) {
tzkt.wg.Add(1)

go func() {
defer tzkt.wg.Done()

for {
select {
case <-tzkt.stop:
case <-ctx.Done():
tzkt.log.Debug().Msg("listenning was stopped")
return
case msg := <-tzkt.s.Messages():
switch typ := msg.(type) {
case signalr.Invocation:
if len(typ.Arguments) == 0 {
log.Warn().Msgf("empty arguments of invocation: %v", typ)
tzkt.log.Warn().Msgf("empty arguments of invocation: %v", typ)
continue
}

var packet Packet
if err := json.Unmarshal(typ.Arguments[0], &packet); err != nil {
log.Err(err).Msg("invalid invocation argument")
tzkt.log.Err(err).Msg("invalid invocation argument")
continue
}

Expand All @@ -177,7 +185,7 @@ func (tzkt *TzKT) listen() {
if packet.Data != nil {
data, err := parseData(typ.Target, packet.Data)
if err != nil {
log.Err(err).Msg("error during parsing data")
tzkt.log.Err(err).Msg("error during parsing data")
continue
}
message.Body = data
Expand Down

0 comments on commit 50b8f68

Please sign in to comment.