diff --git a/tzkt/events/signalr/hub.go b/tzkt/events/signalr/hub.go index 9e4377e..4307840 100644 --- a/tzkt/events/signalr/hub.go +++ b/tzkt/events/signalr/hub.go @@ -1,6 +1,7 @@ package signalr import ( + "context" "net" "net/url" "sync" @@ -8,6 +9,7 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -22,7 +24,7 @@ type Hub struct { encoder Encoding msgs chan interface{} - stop chan struct{} + log zerolog.Logger mx sync.Mutex wg sync.WaitGroup @@ -48,22 +50,22 @@ func NewHub(address, connectionToken string) (*Hub, error) { url: u, encoder: NewJSONEncoding(), msgs: make(chan interface{}, 1024), - stop: make(chan struct{}, 1), + log: log.Logger, }, nil } // Connect - -func (hub *Hub) Connect() error { +func (hub *Hub) Connect(ctx context.Context) error { if err := hub.handshake(); err != nil { return err } - hub.listen() + hub.listen(ctx) return nil } func (hub *Hub) handshake() error { - log.Trace().Msgf("connecting to %s...", hub.url.String()) + hub.log.Debug().Msgf("connecting to %s...", hub.url.String()) c, response, err := websocket.DefaultDialer.Dial(hub.url.String(), nil) if err != nil { @@ -85,14 +87,13 @@ func (hub *Hub) handshake() error { if resp.Error != "" { return errors.Wrap(ErrHandshake, resp.Error) } - log.Trace().Msg("connected") + hub.log.Debug().Msg("connected") return nil } // Close - func (hub *Hub) Close() error { - hub.stop <- struct{}{} hub.wg.Wait() if err := hub.Send(newCloseMessage()); err != nil { @@ -103,22 +104,21 @@ func (hub *Hub) Close() error { return err } - close(hub.stop) close(hub.msgs) return nil } func (hub *Hub) reconnect() error { - log.Warn().Msg("reconnecting...") + hub.log.Warn().Msg("reconnecting...") if err := hub.Send(newCloseMessage()); err != nil { - log.Err(err).Msg("send") + hub.log.Err(err).Msg("send") } if err := hub.conn.Close(); err != nil { - log.Err(err).Msg("close") + hub.log.Err(err).Msg("close") } - log.Trace().Msg("connection closed") + hub.log.Debug().Msg("connection closed") if err := hub.handshake(); err != nil { return err } @@ -128,7 +128,7 @@ func (hub *Hub) reconnect() error { return nil } -func (hub *Hub) listen() { +func (hub *Hub) listen(ctx context.Context) { hub.wg.Add(1) go func() { @@ -136,20 +136,21 @@ func (hub *Hub) listen() { for { select { - case <-hub.stop: + case <-ctx.Done(): + hub.log.Debug().Msg("stop hub listenning...") return default: if err := hub.readAllMessages(); err != nil { switch { case errors.Is(err, ErrTimeout) || websocket.IsCloseError(err, websocket.CloseAbnormalClosure): if err := hub.reconnect(); err != nil { - log.Err(err).Msg("reconnect") - log.Warn().Msg("retry after 5 seconds") + hub.log.Err(err).Msg("reconnect") + hub.log.Warn().Msg("retry after 5 seconds") time.Sleep(time.Second * 5) } case errors.Is(err, ErrEmptyResponse): default: - log.Err(err).Msg("readAllMessages") + hub.log.Err(err).Msg("readAllMessages") } } } @@ -163,6 +164,9 @@ func (hub *Hub) Send(msg interface{}) error { if err != nil { return err } + + hub.log.Trace().Str("data", string(data)).Msg("==> TzKT server") + hub.mx.Lock() defer hub.mx.Unlock() return hub.conn.WriteMessage(websocket.TextMessage, data) @@ -186,6 +190,8 @@ func (hub *Hub) readOneMessage(msg interface{}) error { if len(data) == 0 { return ErrEmptyResponse } + hub.log.Trace().Str("data", string(data)).Msg("<== TzKT server") + if err := json.Unmarshal(data, msg); err != nil { return err } @@ -203,7 +209,7 @@ func (hub *Hub) readAllMessages() error { return err } if scanner == nil { - log.Warn().Msg("no messages during read timeout") + hub.log.Warn().Msg("no messages during read timeout") return ErrEmptyResponse } if err := scanner.Scan(); err != nil { @@ -219,11 +225,14 @@ func (hub *Hub) readAllMessages() error { break } + hub.log.Trace().Str("data", string(data)).Msg("<== TzKT server") + msg, err := hub.encoder.Decode(data) if err != nil { return err } hub.msgs <- msg + if closeMsg, ok := msg.(CloseMessage); ok { return hub.closeMessageHandler(closeMsg) } @@ -238,7 +247,7 @@ func (hub *Hub) readAllMessages() error { func (hub *Hub) closeMessageHandler(msg CloseMessage) error { if msg.Error != "" { - log.Error().Msg(msg.Error) + hub.log.Error().Msg(msg.Error) } if !msg.AllowReconnect { return ErrConnectionClose diff --git a/tzkt/events/signalr/json.go b/tzkt/events/signalr/json.go index 03b8a43..e9a607a 100644 --- a/tzkt/events/signalr/json.go +++ b/tzkt/events/signalr/json.go @@ -7,7 +7,6 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" - "github.com/rs/zerolog/log" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -28,8 +27,6 @@ func NewJSONEncoding() *JSONEncoding { // Decode - func (e *JSONEncoding) Decode(data []byte) (interface{}, error) { - log.Trace().Msg(string(data)) - var typ Type if err := json.Unmarshal(data, &typ); err != nil { return nil, err diff --git a/tzkt/events/signalr/signalr.go b/tzkt/events/signalr/signalr.go index 98086a8..226400b 100644 --- a/tzkt/events/signalr/signalr.go +++ b/tzkt/events/signalr/signalr.go @@ -1,10 +1,16 @@ package signalr +import ( + "context" + + "github.com/rs/zerolog" +) + // SignalR - type SignalR struct { hub *Hub t *Transport - + log zerolog.Logger url string } @@ -16,8 +22,14 @@ func NewSignalR(url string) *SignalR { } } +// SetLogger - +func (s *SignalR) SetLogger(log zerolog.Logger) { + s.t.log = log + s.log = log +} + // Connect - connect to server -func (s *SignalR) Connect(version Version) error { +func (s *SignalR) Connect(ctx context.Context, version Version) error { resp, err := s.t.Negotiate(version) if err != nil { return err @@ -35,8 +47,9 @@ func (s *SignalR) Connect(version Version) error { return err } s.hub = hub + s.hub.log = s.log - return s.hub.Connect() + return s.hub.Connect(ctx) } // Messages - listens message channel diff --git a/tzkt/events/signalr/transport.go b/tzkt/events/signalr/transport.go index aacfdd6..2ef758e 100644 --- a/tzkt/events/signalr/transport.go +++ b/tzkt/events/signalr/transport.go @@ -5,6 +5,7 @@ import ( "net/url" "time" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/pkg/errors" @@ -23,6 +24,8 @@ const ( type Transport struct { url string client *http.Client + + log zerolog.Logger } // NewTransport - @@ -38,6 +41,7 @@ func NewTransport(baseURL string) *Transport { Timeout: time.Minute, Transport: t, }, + log: log.Logger, } } @@ -52,7 +56,7 @@ func (t *Transport) Negotiate(version Version) (response NegotiateResponse, err q.Set("negotiateVersion", string(version)) u.RawQuery = q.Encode() - log.Trace().Str("url", u.String()).Msg("send negotiate request...") + t.log.Debug().Str("url", u.String()).Msg("send negotiate request...") req, err := http.NewRequest(http.MethodPost, u.String(), nil) if err != nil { diff --git a/tzkt/events/tzkt.go b/tzkt/events/tzkt.go index bc0e23b..00649d6 100644 --- a/tzkt/events/tzkt.go +++ b/tzkt/events/tzkt.go @@ -1,6 +1,7 @@ package events import ( + "context" "encoding/json" "fmt" "strings" @@ -10,6 +11,7 @@ import ( tzktData "github.com/dipdup-net/go-lib/tzkt/data" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -20,8 +22,9 @@ type TzKT struct { subscriptions []signalr.Invocation + log zerolog.Logger + msgs chan Message - stop chan struct{} wg sync.WaitGroup } @@ -33,31 +36,35 @@ func NewTzKT(url string) *TzKT { return &TzKT{ s: signalr.NewSignalR(url), msgs: make(chan Message, 1024), - stop: make(chan struct{}, 1), subscriptions: make([]signalr.Invocation, 0), + log: log.Logger, } } +// SetLogger - +func (tzkt *TzKT) SetLogger(logger zerolog.Logger) { + tzkt.log = logger + tzkt.s.SetLogger(logger) +} + // Connect - connect to events SignalR server -func (tzkt *TzKT) Connect() error { - if err := tzkt.s.Connect(signalr.Version1); err != nil { +func (tzkt *TzKT) Connect(ctx context.Context) error { + if err := tzkt.s.Connect(ctx, signalr.Version1); err != nil { return err } tzkt.s.SetOnReconnect(tzkt.onReconnect) - tzkt.listen() + tzkt.listen(ctx) return nil } // Close - closing all connections func (tzkt *TzKT) Close() error { - tzkt.stop <- struct{}{} tzkt.wg.Wait() if err := tzkt.s.Close(); err != nil { return err } close(tzkt.msgs) - close(tzkt.stop) return nil } @@ -144,7 +151,7 @@ func (tzkt *TzKT) subscribe(channel string, args ...interface{}) error { return tzkt.s.Send(msg) } -func (tzkt *TzKT) listen() { +func (tzkt *TzKT) listen(ctx context.Context) { tzkt.wg.Add(1) go func() { @@ -152,19 +159,20 @@ func (tzkt *TzKT) listen() { for { select { - case <-tzkt.stop: + case <-ctx.Done(): + tzkt.log.Debug().Msg("listenning was stopped") return case msg := <-tzkt.s.Messages(): switch typ := msg.(type) { case signalr.Invocation: if len(typ.Arguments) == 0 { - log.Warn().Msgf("empty arguments of invocation: %v", typ) + tzkt.log.Warn().Msgf("empty arguments of invocation: %v", typ) continue } var packet Packet if err := json.Unmarshal(typ.Arguments[0], &packet); err != nil { - log.Err(err).Msg("invalid invocation argument") + tzkt.log.Err(err).Msg("invalid invocation argument") continue } @@ -177,7 +185,7 @@ func (tzkt *TzKT) listen() { if packet.Data != nil { data, err := parseData(typ.Target, packet.Data) if err != nil { - log.Err(err).Msg("error during parsing data") + tzkt.log.Err(err).Msg("error during parsing data") continue } message.Body = data