Skip to content

Commit

Permalink
Two phase init was not passing smartContractGW to receipt store - mea…
Browse files Browse the repository at this point in the history
…ning no WS receipts (#228)

Signed-off-by: Peter Broadhurst <[email protected]>

Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst authored Sep 1, 2022
1 parent 0e6f5b0 commit 7c9d934
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/ethconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func startServer() (err error) {
// In this scenario, we can pass the receipt store to the Kafka bridge for it to do
// additional idempotency checks that prevent res-submission of transactions.
if idempotencyCheckReceiptStore == nil {
idempotencyCheckReceiptStore, err = restGateway.InitReceiptStore()
idempotencyCheckReceiptStore, err = restGateway.Init()
if err != nil {
return err
}
Expand Down
90 changes: 46 additions & 44 deletions internal/rest/restgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,51 +216,11 @@ func (g *RESTGateway) newAccessTokenContextHandler(parent http.Handler) http.Han
}

// ReceiptStorePersistence allows other components to access the receipt store persistence for idempotency checks, when co-located in the same address space
func (g *RESTGateway) InitReceiptStore() (receipts.ReceiptStorePersistence, error) {
var receiptStoreConf *receipts.ReceiptStoreConf
var receiptStorePersistence receipts.ReceiptStorePersistence
if g.conf.MongoDB.URL != "" {
receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf
mongoStore := receipts.NewMongoReceipts(&g.conf.MongoDB)
receiptStorePersistence = mongoStore
if err := mongoStore.Connect(); err != nil {
return nil, err
}
} else if g.conf.LevelDB.Path != "" {
receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf
leveldbStore, err := receipts.NewLevelDBReceipts(&g.conf.LevelDB)
if err != nil {
return nil, err
}
receiptStorePersistence = leveldbStore
} else {
receiptStoreConf = &g.conf.MemStore
memStore := receipts.NewMemoryReceipts(&g.conf.MemStore)
receiptStorePersistence = memStore
}
g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW)
return g.receipts.persistence, nil
}

// Start kicks off the HTTP listener and router
func (g *RESTGateway) Start() (err error) {

// Ensure the receipt store is initialized
if g.receipts == nil {
if _, err := g.InitReceiptStore(); err != nil {
return err
}
}

if *g.printYAML {
b, err := utils.MarshalToYAML(&g.conf)
print("# YAML Configuration snippet for REST Gateway\n" + string(b))
return err
}
func (g *RESTGateway) Init() (receipts.ReceiptStorePersistence, error) {

tlsConfig, err := utils.CreateTLSConfiguration(&g.conf.HTTP.TLS)
if err != nil {
return
return nil, err
}

router := httprouter.New()
Expand All @@ -270,7 +230,7 @@ func (g *RESTGateway) Start() (err error) {
if g.conf.RPC.URL != "" || g.conf.OpenAPI.StoragePath != "" {
rpcClient, err = eth.RPCConnect(&g.conf.RPC)
if err != nil {
return err
return nil, err
}
processor = tx.NewTxnProcessor(&g.conf.TxnProcessorConf, &g.conf.RPCConf)
processor.Init(rpcClient)
Expand All @@ -281,12 +241,35 @@ func (g *RESTGateway) Start() (err error) {
if g.conf.OpenAPI.StoragePath != "" {
g.smartContractGW, err = contractgateway.NewSmartContractGateway(&g.conf.OpenAPI, &g.conf.TxnProcessorConf, rpcClient, processor, g, g.ws)
if err != nil {
return err
return nil, err
}
g.smartContractGW.AddRoutes(router)
}

var receiptStoreConf *receipts.ReceiptStoreConf
var receiptStorePersistence receipts.ReceiptStorePersistence
if g.conf.MongoDB.URL != "" {
receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf
mongoStore := receipts.NewMongoReceipts(&g.conf.MongoDB)
receiptStorePersistence = mongoStore
if err := mongoStore.Connect(); err != nil {
return nil, err
}
} else if g.conf.LevelDB.Path != "" {
receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf
leveldbStore, err := receipts.NewLevelDBReceipts(&g.conf.LevelDB)
if err != nil {
return nil, err
}
receiptStorePersistence = leveldbStore
} else {
receiptStoreConf = &g.conf.MemStore
memStore := receipts.NewMemoryReceipts(&g.conf.MemStore)
receiptStorePersistence = memStore
}

router.GET("/status", g.statusHandler)
g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW)
g.receipts.addRoutes(router)
if len(g.conf.Kafka.Brokers) > 0 {
wk := newWebhooksKafka(&g.conf.Kafka, g.receipts)
Expand All @@ -304,6 +287,25 @@ func (g *RESTGateway) Start() (err error) {
MaxHeaderBytes: MaxHeaderSize,
}

return g.receipts.persistence, nil
}

// Start kicks off the HTTP listener and router
func (g *RESTGateway) Start() (err error) {

if *g.printYAML {
b, err := utils.MarshalToYAML(&g.conf)
print("# YAML Configuration snippet for REST Gateway\n" + string(b))
return err
}

// Check we're initialized (caller can choose to call init explicitly)
if g.receipts == nil {
if _, err = g.Init(); err != nil {
return err
}
}

readyToListen := make(chan bool)
gwDone := make(chan error)
svrDone := make(chan error)
Expand Down
1 change: 1 addition & 0 deletions internal/ws/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *webSocketServer) processReplies() {
s.mux.Lock()
wsconns := getConnListFromMap(s.replyMap)
s.mux.Unlock()
log.Debugf("Sending reply to %d WS connections", len(wsconns))
s.broadcastToConnections(wsconns, message)
}
}
Expand Down

0 comments on commit 7c9d934

Please sign in to comment.