Skip to content

Commit

Permalink
Merge branch 'main' of ../private
Browse files Browse the repository at this point in the history
  • Loading branch information
lunar-devops committed Jan 6, 2025
2 parents 1dc28b7 + 0bd4c78 commit e5f690a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
15 changes: 11 additions & 4 deletions proxy/src/libs/toolkit-core/network/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
waitUntilRetry = 2 * time.Second
connectionPingInterval = 1 * time.Second
connectionReady = "ready"
pingTimeout = 15 * time.Second
maxBackoff = 2 * time.Minute
maxMissedPongs = 5
)
Expand Down Expand Up @@ -64,7 +65,13 @@ func (client *WSClient) ConnectAndStart() error {

client.start()

return nil
// Wait for connection to be ready (authorized)
select {
case <-client.connReadySignal:
return nil
case <-time.After(pingTimeout):
return errors.New("connected but didn't receive `ready` message")
}
}

func (client *WSClient) OnMessage(callback OnMessageFunc) {
Expand Down Expand Up @@ -112,7 +119,7 @@ func (client *WSClient) Close() error {
}

func (client *WSClient) Send(msg MessageI) error {
if !client.IsConnectionReady() {
if !client.IsConnectionReadyAndAuthorized() {
client.logger.Debug().Msg("connection not ready, discarding message")
return errors.New("connection not ready")
}
Expand Down Expand Up @@ -165,7 +172,7 @@ func (client *WSClient) readLoop() {
func (client *WSClient) writeLoop() {
for msg := range client.sendChan {
// Wait until the connection is ready
if !client.IsConnectionReady() {
if !client.IsConnectionReadyAndAuthorized() {
<-client.connReadySignal
client.logger.Debug().Str("func", "writeLoop").Msg("Connection is ready")
}
Expand Down Expand Up @@ -256,7 +263,7 @@ func (client *WSClient) setConnectionReady() {
client.connReadySignal = nil
}

func (client *WSClient) IsConnectionReady() bool {
func (client *WSClient) IsConnectionReadyAndAuthorized() bool {
client.connReadyMutex.RLock()
defer client.connReadyMutex.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion proxy/src/libs/toolkit-core/urltree/url_tree_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (urlTree *URLTree[T]) insertWithConvergenceIndication(
}
}
currentNode = currentNode.ParametricChild.Child
log.Debug().Msgf("created path parameter %v", paramName)
log.Debug().Str("original-url", url).Msgf("created path parameter %v", paramName)
continue
}
log.Trace().
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/services/lunar-engine/communication/lunar_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (hub *HubCommunication) SendDataToHub(message network.MessageI) bool {
if err := hub.client.Send(message); err != nil {
log.Debug().
Err(err).
Bool("wsClientIsConnectionReady", hub.client.IsConnectionReady()).
Bool("wsClientIsConnectionReady", hub.client.IsConnectionReadyAndAuthorized()).
Msg("HubCommunication::SendDataToHub Error sending data to Lunar Hub")
return false
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (hub *HubCommunication) attemptToConnectInLoop() {
hub.updateCommunicationStatus()
hub.setIsConnected(true)
hub.fanOutConnectionEstablished()
log.Debug().Msg("Connected to Lunar Hub")
log.Debug().Int("retries", retries).Msg("Connected to Lunar Hub")
break
}
}
Expand Down
1 change: 1 addition & 0 deletions proxy/src/services/lunar-engine/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (s *Stream) GetSupportedFilters() map[publictypes.ComparableFilter][]public

func (s *Stream) notifyHub() {
if s.lunarHub == nil {
log.Debug().Msg("No Hub communication, skipping notification")
return
}
log.Debug().Msg("Notifying Hub about loaded config")
Expand Down

0 comments on commit e5f690a

Please sign in to comment.