From 9aff1ded334c6b460a4890d9eacee73d2a10bc52 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 12 Dec 2024 18:11:33 +0900 Subject: [PATCH] fix: migrate to coinex v2 --- .../providers/coinex/coinex.go | 35 ++++++++- .../websocketfetcher/providers/coinex/type.go | 44 ++++++----- .../providers/coinex/utils.go | 54 ++++++------- node/pkg/websocketfetcher/tests/utils_test.go | 75 +++++++++---------- 4 files changed, 120 insertions(+), 88 deletions(-) diff --git a/node/pkg/websocketfetcher/providers/coinex/coinex.go b/node/pkg/websocketfetcher/providers/coinex/coinex.go index 64ca4c892..52694dcda 100644 --- a/node/pkg/websocketfetcher/providers/coinex/coinex.go +++ b/node/pkg/websocketfetcher/providers/coinex/coinex.go @@ -2,10 +2,12 @@ package coinex import ( "context" + "encoding/json" "bisonai.com/miko/node/pkg/websocketfetcher/common" "bisonai.com/miko/node/pkg/wss" "github.com/rs/zerolog/log" + "nhooyr.io/websocket" ) type CoinexFetcher common.Fetcher @@ -28,14 +30,18 @@ func New(ctx context.Context, opts ...common.FetcherOption) (common.FetcherInter subscription := Subscription{ Method: "state.subscribe", - Params: params, - ID: 1, + Params: SubscribeParams{ + MarketList: params, + }, + ID: 1, } ws, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint(URL), wss.WithSubscriptions([]any{subscription}), - wss.WithProxyUrl(config.Proxy)) + wss.WithProxyUrl(config.Proxy), + wss.WithCustomReadFunc(fetcher.customReadFunc), + ) if err != nil { log.Error().Str("Player", "Coinex").Err(err).Msg("error in coinex.New") return nil, err @@ -69,3 +75,26 @@ func (f *CoinexFetcher) handleMessage(ctx context.Context, message map[string]an func (f *CoinexFetcher) Run(ctx context.Context) { f.Ws.Run(ctx, f.handleMessage) } + +func (f *CoinexFetcher) customReadFunc(ctx context.Context, conn *websocket.Conn) (map[string]interface{}, error) { + var result map[string]interface{} + _, data, err := conn.Read(ctx) + if err != nil { + log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to read from websocket") + return nil, err + } + + decompressed, err := common.DecompressGzip(data) + if err != nil { + log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to decompress data") + return nil, err + } + + err = json.Unmarshal(decompressed, &result) + if err != nil { + log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to unmarshal data") + return nil, err + } + + return result, nil +} diff --git a/node/pkg/websocketfetcher/providers/coinex/type.go b/node/pkg/websocketfetcher/providers/coinex/type.go index 3144bafae..49cb25344 100644 --- a/node/pkg/websocketfetcher/providers/coinex/type.go +++ b/node/pkg/websocketfetcher/providers/coinex/type.go @@ -1,27 +1,37 @@ package coinex -const URL = "wss://socket.coinex.com/" +const URL = "wss://socket.coinex.com/v2/spot" + +type SubscribeParams struct { + MarketList []string `json:"market_list"` +} type Subscription struct { - Method string `json:"method"` - Params []string `json:"params"` - ID int `json:"id"` + Method string `json:"method"` + Params SubscribeParams `json:"params"` + ID int `json:"id"` +} + +type State struct { + Market string `json:"market"` + Last string `json:"last"` + Open string `json:"open"` + Close string `json:"close"` + High string `json:"high"` + Low string `json:"low"` + Volume string `json:"volume"` + VolumeSell string `json:"volume_sell"` + VolumeBuy string `json:"volume_buy"` + Value string `json:"value"` + Period int `json:"period"` } -type Ticker struct { - Open string `json:"open"` - Last string `json:"last"` - High string `json:"high"` - Low string `json:"low"` - Deal string `json:"deal"` - Volume string `json:"volume"` - SellTotal string `json:"sell_total"` - BuyTotal string `json:"buy_total"` - Period int `json:"period"` +type Data struct { + StateList []State `json:"state_list"` } type Response struct { - Method string `json:"method"` - Params []map[string]Ticker `json:"params"` - ID *int `json:"id"` + Method string `json:"method"` + Data Data `json:"data"` + ID *int `json:"id"` } diff --git a/node/pkg/websocketfetcher/providers/coinex/utils.go b/node/pkg/websocketfetcher/providers/coinex/utils.go index 6abb084c1..92e7b8296 100644 --- a/node/pkg/websocketfetcher/providers/coinex/utils.go +++ b/node/pkg/websocketfetcher/providers/coinex/utils.go @@ -10,34 +10,34 @@ import ( func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { feedDataList := []*common.FeedData{} - for _, item := range data.Params { - for key, value := range item { - id, exists := feedMap[key] - if !exists { - log.Warn().Str("Player", "Coinex").Str("key", key).Msg("feed not found") - continue - } - price, err := common.PriceStringToFloat64(value.Last) - if err != nil { - log.Error().Str("Player", "Coinex").Err(err).Msg("error in PriceStringToFloat64") - continue - } - volume, err := common.VolumeStringToFloat64(value.Volume) - if err != nil { - log.Error().Str("Player", "Coinex").Err(err).Msg("error in VolumeStringToFloat64") - continue - } - timestamp := time.Now() - - for _, id := range id { - feedData := new(common.FeedData) - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - feedData.Volume = volume - feedDataList = append(feedDataList, feedData) - } + for _, item := range data.Data.StateList { + + id, exists := feedMap[item.Market] + if !exists { + log.Warn().Str("Player", "Coinex").Str("key", item.Market).Msg("feed not found") + continue + } + price, err := common.PriceStringToFloat64(item.Last) + if err != nil { + log.Error().Str("Player", "Coinex").Err(err).Msg("error in PriceStringToFloat64") + continue + } + volume, err := common.VolumeStringToFloat64(item.Volume) + if err != nil { + log.Error().Str("Player", "Coinex").Err(err).Msg("error in VolumeStringToFloat64") + continue } + timestamp := time.Now() + + for _, id := range id { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + feedData.Volume = volume + feedDataList = append(feedDataList, feedData) + } + } return feedDataList, nil diff --git a/node/pkg/websocketfetcher/tests/utils_test.go b/node/pkg/websocketfetcher/tests/utils_test.go index 215fa034b..3546fe923 100644 --- a/node/pkg/websocketfetcher/tests/utils_test.go +++ b/node/pkg/websocketfetcher/tests/utils_test.go @@ -489,46 +489,39 @@ func TestMessageToStruct(t *testing.T) { t.Run("TestMessageToStructCoinex", func(t *testing.T) { jsonStr := `{ - "method": "state.update", - "params": [ - { - "BTCUSDT": { - "open": "68217.41", - "last": "67649.41", - "high": "69037.30", - "low": "66678.34", - "deal": "12707367.97477132400000000000", - "volume": "187.73201806", - "sell_total": "4.47703848", - "buy_total": "2.37157686", - "period": 86400 - }, - "ETHUSDT": { - "open": "3781.50", - "last": "3786.55", - "high": "3844.89", - "low": "3725.78", - "deal": "5307095.72019325730000000000", - "volume": "1402.85217886", - "sell_total": "68.74665541", - "buy_total": "76.70077360", - "period": 86400 - }, - "MATICUSDT": { - "open": "0.6971", - "last": "0.6975", - "high": "0.709000000000", - "low": "0.6832", - "deal": "184896.63142549148200000000", - "volume": "265725.85573677", - "sell_total": "8251.71403154", - "buy_total": "8521.89898481", - "period": 86400 - } - } - ], - "id": null - }` + "method": "state.update", + "data": { + "state_list": [ + { + "market": "LATUSDT", + "last": "0.008157", + "open": "0.008286", + "close": "0.008157", + "high": "0.008390", + "low": "0.008106", + "volume": "807714.49139758", + "volume_sell": "286170.69645599", + "volume_buy": "266161.23236408", + "value": "6689.21644207", + "period": 86400 + }, + { + "market": "ELONUSDT", + "last": "0.000000152823", + "open": "0.000000158650", + "close": "0.000000152823", + "high": "0.000000159474", + "low": "0.000000147026", + "volume": "88014042237.15", + "volume_sell": "11455578769.13", + "volume_buy": "17047669612.10", + "value": "13345.65122447", + "period": 86400 + } + ] + }, + "id": null +}` var txResult map[string]any err := json.Unmarshal([]byte(jsonStr), &txResult) if err != nil { @@ -541,7 +534,7 @@ func TestMessageToStruct(t *testing.T) { } assert.Equal(t, "state.update", txData.Method) - assert.Equal(t, "67649.41", txData.Params[0]["BTCUSDT"].Last) + assert.Equal(t, "0.008157", txData.Data.StateList[0].Last) }) t.Run("TestMessageToStructBitstamp", func(t *testing.T) {