Skip to content

Commit

Permalink
Merge branch 'main' of ../private
Browse files Browse the repository at this point in the history
  • Loading branch information
lunar-devops committed Aug 25, 2024
2 parents fece259 + 805420e commit b403727
Show file tree
Hide file tree
Showing 26 changed files with 355 additions and 93 deletions.
4 changes: 3 additions & 1 deletion proxy/integration-tests/hub-mock/hub.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from typing import Dict

from tornado.log import gen_log as logger
from tornado.web import RequestHandler, Application

from web_socket.handler import WebSocketHubHandler, DISCOVERY_EVENT
Expand Down Expand Up @@ -33,6 +33,8 @@ def initialize(self, cache: Dict[str, Dict[str, str]], lock: asyncio.Lock):

async def get(self):
async with self.lock:
logger.info("Discovery get event received")
logger.info(f"Cache: {self.cache}")
data = self.cache.get(DISCOVERY_EVENT)

if data is None:
Expand Down
21 changes: 17 additions & 4 deletions proxy/integration-tests/hub-mock/web_socket/handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import asyncio
from tornado.log import gen_log as logger
from json import loads
from typing import Dict
from tornado.websocket import WebSocketHandler

DISCOVERY_EVENT = "discovery-event"
CONFIGURATION_LOAD_EVENT = "configuration-load-event"
EVENT_KEY = "event"
DATA_KEY = "data"

Expand All @@ -15,13 +16,15 @@ def initialize(self, cache: Dict[str, Dict[str, str]], lock: asyncio.Lock):
self.lock = lock

async def on_message(self, message: str | bytes):
logger.info(f"Received message: {message}")
await self.handle_message(message)

async def handle_message(self, message: str | bytes):
try:
json_data = loads(message)

logger.info(f"Parsed message: {json_data}")
event_name = json_data.get(EVENT_KEY)
logger.info(f"Event: {event_name}")
handler = self.get_event_handler(event_name)
if handler is None:
print(f"Error handling event message of type: {event_name}")
Expand All @@ -30,11 +33,21 @@ async def handle_message(self, message: str | bytes):
await handler(json_data)

except Exception as e:
logging.error(f"Error handling message: {e}")
print(f"Error handling message: {e}")

def get_event_handler(self, event: str):
return {DISCOVERY_EVENT: self._discovery_event}.get(event)
logger.info(f"Event: {event}")
return {
DISCOVERY_EVENT: self._discovery_event,
CONFIGURATION_LOAD_EVENT: self._configuration_load_event,
}.get(event)

async def _discovery_event(self, event_data: Dict[str, str]):
async with self.lock:
logger.info(f"Discovery Event: {event_data}")
self.cache[DISCOVERY_EVENT] = event_data

async def _configuration_load_event(self, event_data: Dict[str, str]):
async with self.lock:
logger.info(f"Configuration Load Event: {event_data}")
self.cache[CONFIGURATION_LOAD_EVENT] = event_data
6 changes: 3 additions & 3 deletions proxy/src/libs/toolkit-core/client/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestHealthcheckReturnsNoErrorWhenPredicatesPass(t *testing.T) {

healthcheckConfig := client.HealthcheckConfig{
URL: "mock/healthcheck",
BodyPredicate: func(bytes []byte) bool { return true },
BodyPredicate: func(_ []byte) bool { return true },
StatusPredicate: func(code int) bool { return code == 200 },
HTTPClient: &httpClient,
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestHealthcheckReturnsErrorWhenPredicatesDoNotPass(t *testing.T) {

healthcheckConfig := client.HealthcheckConfig{
URL: "mock/healthcheck",
BodyPredicate: func(bytes []byte) bool { return true },
BodyPredicate: func(_ []byte) bool { return true },
StatusPredicate: func(code int) bool { return code == 200 },
HTTPClient: &httpClient,
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestHealthcheckReturnsErrorWhenHTTPClientReturnsError(t *testing.T) {

healthcheckConfig := client.HealthcheckConfig{
URL: "mock/healthcheck",
BodyPredicate: func(bytes []byte) bool { return true },
BodyPredicate: func(_ []byte) bool { return true },
StatusPredicate: func(code int) bool { return code == 200 || code == 400 },
HTTPClient: &httpClient,
}
Expand Down
28 changes: 20 additions & 8 deletions proxy/src/libs/toolkit-core/configuration/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package configuration

import (
"os"
"reflect"

"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
)

func DecodeYAML[T any](path string) (*T, error) {
type YAMLResult[T any] struct {
UnmarshaledData T
Content []byte
}

func DecodeYAML[T any](path string) (*YAMLResult[*T], error) {
data, readErr := os.ReadFile(path)
if readErr != nil {
// If the file does not exist, return an empty object
Expand All @@ -17,16 +23,22 @@ func DecodeYAML[T any](path string) (*T, error) {
return UnmarshalPolicyRawData[T](data)
}

func UnmarshalPolicyRawData[T any](data []byte) (*T, error) {
func UnmarshalPolicyRawData[T any](data []byte) (*YAMLResult[*T], error) {
if log.Trace().Enabled() {
log.Trace().Msgf("Read raw YAML: %s", string(data))
}

var target T

if unmarshalErr := yaml.Unmarshal(data, &target); unmarshalErr != nil {
return nil, unmarshalErr
result := YAMLResult[*T]{
Content: data,
UnmarshaledData: nil,
}

return &target, nil
if unmarshalErr := yaml.Unmarshal(data, &result.UnmarshaledData); unmarshalErr != nil {
log.Warn().Err(unmarshalErr).Msg("failed to unmarshal yaml")
return &result, unmarshalErr
}
if result.UnmarshaledData == nil {
result.UnmarshaledData = reflect.New(reflect.TypeOf(result.UnmarshaledData).Elem()).
Interface().(*T)
}
return &result, nil
}
39 changes: 35 additions & 4 deletions proxy/src/libs/toolkit-core/network/message.model.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
package network

import "lunar/shared-model/discovery"
import (
"lunar/shared-model/discovery"
)

type Message struct {
Event string `json:"event"`
Data discovery.Output `json:"data"`
type MessageI interface {
GetEvent() WebSocketMessageEvent
}

type DiscoveryMessage struct {
Event WebSocketMessageEvent `json:"event"`
Data discovery.Output `json:"data"`
}

type ConfigurationMessage struct {
Event WebSocketMessageEvent `json:"event"`
Data ConfigurationData `json:"data"`
}

type ConfigurationData struct {
Data []ConfigurationPayload `json:"data"`
}

type ConfigurationPayload struct {
Type string `json:"type"`
FileName string `json:"file_name"`
Content []byte `json:"content"`
}

type (
WebSocketConnectionEvent string
WebSocketMessageEvent string
)

const (
WebSocketEventDiscovery WebSocketMessageEvent = "discovery-event"
WebSocketEventConfigurationLoad WebSocketMessageEvent = "configuration-load-event"
)
9 changes: 9 additions & 0 deletions proxy/src/libs/toolkit-core/network/message.utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package network

func (ym *ConfigurationMessage) GetEvent() WebSocketMessageEvent {
return ym.Event
}

func (dm *DiscoveryMessage) GetEvent() WebSocketMessageEvent {
return dm.Event
}
138 changes: 106 additions & 32 deletions proxy/src/libs/toolkit-core/network/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,34 @@ package network
import (
"encoding/json"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)

const waitUntilRetry = 2 * time.Second
const (
waitUntilRetry = 2 * time.Second
connectionPingInterval = 1 * time.Second
connectionEstablished = "ready"
)

type WSClient struct {
url string
handshakeHeaders http.Header
conn *websocket.Conn
sendChan chan []byte
}
type (
OnMessageFunc func([]byte)
OnDisconnectFunc func()

WSClient struct {
url string
handshakeHeaders http.Header
conn *websocket.Conn
sendChan chan []byte
onMessageCallback OnMessageFunc
onDisconnectCallback OnDisconnectFunc
connReadySignal chan struct{}
connReadyMutex sync.Mutex
}
)

func NewWSClient(url string, handshakeHeaders http.Header) *WSClient {
return &WSClient{ //nolint:exhaustruct
Expand All @@ -37,7 +51,16 @@ func (client *WSClient) ConnectAndStart() error {
return nil
}

func (client *WSClient) OnMessage(callback OnMessageFunc) {
client.onMessageCallback = callback
}

func (client *WSClient) OnDisconnect(callback OnDisconnectFunc) {
client.onDisconnectCallback = callback
}

func (client *WSClient) Connect() error {
client.setConnectionNotReady()
dialer := websocket.Dialer{ //nolint:exhaustruct
Subprotocols: []string{"token"},
}
Expand All @@ -47,8 +70,14 @@ func (client *WSClient) Connect() error {
return err
}

client.conn = conn
conn.SetPongHandler(func(string) error {
log.Debug().Msg("WSClient::SetPongHandlerReceived pong from server")
client.setConnectionReady()
return nil
})

go client.startPing()
client.conn = conn
return nil
}

Expand All @@ -62,45 +91,90 @@ func (client *WSClient) Close() error {
return client.conn.Close()
}

func (client *WSClient) Send(data *Message) error {
msg, err := json.Marshal(data)
func (client *WSClient) Send(msg MessageI) error {
msgBytes, err := json.Marshal(msg)
if err != nil {
return err
}
client.sendChan <- msg

client.sendChan <- msgBytes
return nil
}

func (client *WSClient) readLoop() {
for {
_, _, err := client.conn.ReadMessage()
_, msg, err := client.conn.ReadMessage()
if err != nil {
log.Debug().Err(err).Msg("WebSocket: read error")
log.Trace().Msg("WebSocket: Attempting to reconnect...")
time.Sleep(waitUntilRetry)
err := client.Connect()
if err != nil {
log.Error().Err(err).Msg("Reconnect failed")
} else {
log.Debug().Msg("Reconnect successful")
}
log.Debug().Err(err).Msg("WSClient: read error")
client.onConnectionError()

} else if client.onMessageCallback != nil {
client.onMessageCallback(msg)
}
}
}

func (client *WSClient) writeLoop() {
for msg := range client.sendChan {
err := client.conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Debug().Err(err).Msg("WebSocket: write error")
log.Trace().Msg("WebSocket: Attempting to reconnect...")
time.Sleep(waitUntilRetry)
err := client.Connect()
if err != nil {
log.Error().Err(err).Msg("WebSocket: write error")
} else {
log.Debug().Msg("Reconnect successful")
}
// Wait until the connection is ready
if !client.IsConnectionReady() {
<-client.connReadySignal
}
log.Debug().Msg("WSClient::writeLoop Connection is ready")
log.Trace().Msgf("Sending message: %s", string(msg))
if err := client.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
log.Debug().Err(err).Msg("WSClient: write error")
client.onConnectionError()
}
}
}

func (client *WSClient) onConnectionError() {
log.Trace().Msg("WSClient: Attempting to reconnect...")
time.Sleep(waitUntilRetry)
connErr := client.Connect()
if connErr != nil {
log.Error().Err(connErr).Msg("WSClient: write error")
} else {
log.Debug().Msg("Reconnect successful")
}
}

func (client *WSClient) setConnectionNotReady() {
log.Debug().Msg("WSClient::setConnectionNotReady")
client.connReadyMutex.Lock()
defer client.connReadyMutex.Unlock()
if client.connReadySignal != nil {
return
}
client.connReadySignal = make(chan struct{})
}

func (client *WSClient) setConnectionReady() {
log.Debug().Msg("WSClient::setConnectionReady")
client.connReadyMutex.Lock()
defer client.connReadyMutex.Unlock()

if client.connReadySignal == nil {
return
}

close(client.connReadySignal)
client.connReadySignal = nil
}

func (client *WSClient) IsConnectionReady() bool {
client.connReadyMutex.Lock()
defer client.connReadyMutex.Unlock()

return client.connReadySignal == nil
}

func (client *WSClient) startPing() {
// Note: This function will ping the server every second to keep the connection alive.
// Execute this function in a separate goroutine to avoid blocking the main thread.
for !client.IsConnectionReady() {
_ = client.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second))
time.Sleep(connectionPingInterval) // Ping every second.
}
}
Loading

0 comments on commit b403727

Please sign in to comment.