From 7c9d934ba53bd87789a21b1fd0c657c9a0a294a5 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 31 Aug 2022 23:53:40 -0400 Subject: [PATCH] Two phase init was not passing smartContractGW to receipt store - meaning no WS receipts (#228) Signed-off-by: Peter Broadhurst Signed-off-by: Peter Broadhurst --- cmd/ethconnect.go | 2 +- internal/rest/restgateway.go | 90 ++++++++++++++++++------------------ internal/ws/wsserver.go | 1 + 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/cmd/ethconnect.go b/cmd/ethconnect.go index 94ca4a5..fad3eba 100644 --- a/cmd/ethconnect.go +++ b/cmd/ethconnect.go @@ -188,7 +188,7 @@ func startServer() (err error) { // In this scenario, we can pass the receipt store to the Kafka bridge for it to do // additional idempotency checks that prevent res-submission of transactions. if idempotencyCheckReceiptStore == nil { - idempotencyCheckReceiptStore, err = restGateway.InitReceiptStore() + idempotencyCheckReceiptStore, err = restGateway.Init() if err != nil { return err } diff --git a/internal/rest/restgateway.go b/internal/rest/restgateway.go index 65d87a0..cf82502 100644 --- a/internal/rest/restgateway.go +++ b/internal/rest/restgateway.go @@ -216,51 +216,11 @@ func (g *RESTGateway) newAccessTokenContextHandler(parent http.Handler) http.Han } // ReceiptStorePersistence allows other components to access the receipt store persistence for idempotency checks, when co-located in the same address space -func (g *RESTGateway) InitReceiptStore() (receipts.ReceiptStorePersistence, error) { - var receiptStoreConf *receipts.ReceiptStoreConf - var receiptStorePersistence receipts.ReceiptStorePersistence - if g.conf.MongoDB.URL != "" { - receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf - mongoStore := receipts.NewMongoReceipts(&g.conf.MongoDB) - receiptStorePersistence = mongoStore - if err := mongoStore.Connect(); err != nil { - return nil, err - } - } else if g.conf.LevelDB.Path != "" { - receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf - leveldbStore, err := receipts.NewLevelDBReceipts(&g.conf.LevelDB) - if err != nil { - return nil, err - } - receiptStorePersistence = leveldbStore - } else { - receiptStoreConf = &g.conf.MemStore - memStore := receipts.NewMemoryReceipts(&g.conf.MemStore) - receiptStorePersistence = memStore - } - g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW) - return g.receipts.persistence, nil -} - -// Start kicks off the HTTP listener and router -func (g *RESTGateway) Start() (err error) { - - // Ensure the receipt store is initialized - if g.receipts == nil { - if _, err := g.InitReceiptStore(); err != nil { - return err - } - } - - if *g.printYAML { - b, err := utils.MarshalToYAML(&g.conf) - print("# YAML Configuration snippet for REST Gateway\n" + string(b)) - return err - } +func (g *RESTGateway) Init() (receipts.ReceiptStorePersistence, error) { tlsConfig, err := utils.CreateTLSConfiguration(&g.conf.HTTP.TLS) if err != nil { - return + return nil, err } router := httprouter.New() @@ -270,7 +230,7 @@ func (g *RESTGateway) Start() (err error) { if g.conf.RPC.URL != "" || g.conf.OpenAPI.StoragePath != "" { rpcClient, err = eth.RPCConnect(&g.conf.RPC) if err != nil { - return err + return nil, err } processor = tx.NewTxnProcessor(&g.conf.TxnProcessorConf, &g.conf.RPCConf) processor.Init(rpcClient) @@ -281,12 +241,35 @@ func (g *RESTGateway) Start() (err error) { if g.conf.OpenAPI.StoragePath != "" { g.smartContractGW, err = contractgateway.NewSmartContractGateway(&g.conf.OpenAPI, &g.conf.TxnProcessorConf, rpcClient, processor, g, g.ws) if err != nil { - return err + return nil, err } g.smartContractGW.AddRoutes(router) } + var receiptStoreConf *receipts.ReceiptStoreConf + var receiptStorePersistence receipts.ReceiptStorePersistence + if g.conf.MongoDB.URL != "" { + receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf + mongoStore := receipts.NewMongoReceipts(&g.conf.MongoDB) + receiptStorePersistence = mongoStore + if err := mongoStore.Connect(); err != nil { + return nil, err + } + } else if g.conf.LevelDB.Path != "" { + receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf + leveldbStore, err := receipts.NewLevelDBReceipts(&g.conf.LevelDB) + if err != nil { + return nil, err + } + receiptStorePersistence = leveldbStore + } else { + receiptStoreConf = &g.conf.MemStore + memStore := receipts.NewMemoryReceipts(&g.conf.MemStore) + receiptStorePersistence = memStore + } + router.GET("/status", g.statusHandler) + g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW) g.receipts.addRoutes(router) if len(g.conf.Kafka.Brokers) > 0 { wk := newWebhooksKafka(&g.conf.Kafka, g.receipts) @@ -304,6 +287,25 @@ func (g *RESTGateway) Start() (err error) { MaxHeaderBytes: MaxHeaderSize, } + return g.receipts.persistence, nil +} + +// Start kicks off the HTTP listener and router +func (g *RESTGateway) Start() (err error) { + + if *g.printYAML { + b, err := utils.MarshalToYAML(&g.conf) + print("# YAML Configuration snippet for REST Gateway\n" + string(b)) + return err + } + + // Check we're initialized (caller can choose to call init explicitly) + if g.receipts == nil { + if _, err = g.Init(); err != nil { + return err + } + } + readyToListen := make(chan bool) gwDone := make(chan error) svrDone := make(chan error) diff --git a/internal/ws/wsserver.go b/internal/ws/wsserver.go index 9edb2b7..28b8820 100644 --- a/internal/ws/wsserver.go +++ b/internal/ws/wsserver.go @@ -217,6 +217,7 @@ func (s *webSocketServer) processReplies() { s.mux.Lock() wsconns := getConnListFromMap(s.replyMap) s.mux.Unlock() + log.Debugf("Sending reply to %d WS connections", len(wsconns)) s.broadcastToConnections(wsconns, message) } }