From 6bd37eb782c71fa1aa2fc09ffa1a706e6659846f Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Sat, 21 Oct 2023 01:01:19 -0400 Subject: [PATCH 1/5] fix localnet test --- e2e/localnet/network/fixture_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/e2e/localnet/network/fixture_test.go b/e2e/localnet/network/fixture_test.go index e5976e679..8421c18b8 100644 --- a/e2e/localnet/network/fixture_test.go +++ b/e2e/localnet/network/fixture_test.go @@ -190,19 +190,29 @@ var _ = Describe("JSON RPC tests", func() { Expect(tf.WaitForNextBlock()).To(Succeed()) // send a transaction and make sure pending nonce is incremented - _, err = contract.ConsumeGas(tf.GenerateTransactOpts("alice"), big.NewInt(10000)) + var tx *coretypes.Transaction + tx, err = contract.ConsumeGas(tf.GenerateTransactOpts("alice"), big.NewInt(10000)) Expect(err).NotTo(HaveOccurred()) + + // Pending nonce should be 1 more than the current nonce alicePendingNonce, err := client.PendingNonceAt(context.Background(), tf.Address("alice")) Expect(err).NotTo(HaveOccurred()) Expect(alicePendingNonce).To(Equal(aliceCurrNonce + 1)) + + // The nonce on disk should still be equal to the nonce prior to the consume gas txn + // being included in a block. acn, err := client.NonceAt(context.Background(), tf.Address("alice"), nil) Expect(err).NotTo(HaveOccurred()) Expect(acn).To(Equal(aliceCurrNonce)) - Expect(tf.WaitForNextBlock()).To(Succeed()) + // Wait for block inclusion. + ExpectSuccessReceipt(client, tx) + // NonceAt and PendingNonce should be equal after inclusion aliceCurrNonce, err = client.NonceAt(context.Background(), tf.Address("alice"), nil) Expect(err).NotTo(HaveOccurred()) + alicePendingNonce, err = client.PendingNonceAt(context.Background(), tf.Address("alice")) + Expect(err).NotTo(HaveOccurred()) Expect(aliceCurrNonce).To(Equal(alicePendingNonce)) }) From 9ad14ff6b0e544b21429136fc0f7674d663eba26 Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Sat, 21 Oct 2023 02:42:08 -0400 Subject: [PATCH 2/5] retry handler --- cosmos/txpool/handler.go | 82 ++++++++++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/cosmos/txpool/handler.go b/cosmos/txpool/handler.go index 8c1c25dd2..e53cae2f1 100644 --- a/cosmos/txpool/handler.go +++ b/cosmos/txpool/handler.go @@ -23,6 +23,7 @@ package txpool import ( "errors" "sync/atomic" + "time" "cosmossdk.io/log" @@ -36,7 +37,11 @@ import ( // txChanSize is the size of channel listening to NewTxsEvent. The number is referenced from the // size of tx pool. -const txChanSize = 4096 +const ( + txChanSize = 4096 + maxRetries = 5 + retryDelay = 50 * time.Millisecond +) // SdkTx is used to generate mocks. type SdkTx interface { @@ -64,6 +69,12 @@ type Subscription interface { event.Subscription } +// failedTx represents a transaction that failed to broadcast. +type failedTx struct { + tx *coretypes.Transaction + retries int +} + // handler listens for new insertions into the geth txpool and broadcasts them to the CometBFT // layer for p2p and ABCI. type handler struct { @@ -78,6 +89,9 @@ type handler struct { stopCh chan struct{} txsSub Subscription running atomic.Bool + + // Queue for failed transactions + failedTxs chan *failedTx } // newHandler creates a new handler. @@ -91,6 +105,7 @@ func newHandler( txPool: txPool, txsCh: make(chan core.NewTxsEvent, txChanSize), stopCh: make(chan struct{}), + failedTxs: make(chan *failedTx, txChanSize), } return h } @@ -100,7 +115,8 @@ func (h *handler) Start() error { if h.running.Load() { return errors.New("handler already started") } - go h.eventLoop() + go h.mainLoop() + go h.failedLoop() // Start the retry policy return nil } @@ -109,17 +125,19 @@ func (h *handler) Stop() error { if !h.Running() { return errors.New("handler already stopped") } + + // Push two stop signals to the stop channel to ensure that both loops stop. + h.stopCh <- struct{}{} h.stopCh <- struct{}{} return nil } // start handles the subscription to the txpool and broadcasts transactions. -func (h *handler) eventLoop() { +func (h *handler) mainLoop() { // Connect to the subscription. h.txsSub = h.txPool.SubscribeNewTxsEvent(h.txsCh) h.logger.With("module", "txpool-handler").Info("starting txpool handler") h.running.Store(true) - // Handle events. var err error for { @@ -135,6 +153,25 @@ func (h *handler) eventLoop() { } } +// failedLoop will periodically attempt to re-broadcast failed transactions. +func (h *handler) failedLoop() { + for { + select { + case <-h.stopCh: + return + case failed := <-h.failedTxs: + if failed.retries >= maxRetries { + h.logger.Error("failed to broadcast transaction after max retries", "tx", failed.tx) + continue + } + h.broadcastTransaction(failed.tx, failed.retries) + } + + // We slightly space out the retries in order to prioritize new transactions. + time.Sleep(retryDelay) + } +} + // Running returns true if the handler is running. func (h *handler) Running() bool { return h.running.Load() @@ -156,28 +193,35 @@ func (h *handler) stop(err error) { // Close channels. close(h.txsCh) close(h.stopCh) + close(h.failedTxs) } // broadcastTransactions will propagate a batch of transactions to the CometBFT mempool. func (h *handler) broadcastTransactions(txs coretypes.Transactions) { h.logger.Debug("broadcasting transactions", "num_txs", len(txs)) for _, signedEthTx := range txs { - // Serialize the transaction to Bytes - txBytes, err := h.serializer.ToSdkTxBytes(signedEthTx, signedEthTx.Gas()) - if err != nil { - h.logger.Error("failed to serialize transaction", "err", err) - continue - } + h.broadcastTransaction(signedEthTx, 0) + } +} - // Send the transaction to the CometBFT mempool, which will gossip it to peers via - // CometBFT's p2p layer. - rsp, err := h.clientCtx.BroadcastTxSync(txBytes) +// broadcastTransaction will propagate a transaction to the CometBFT mempool. +func (h *handler) broadcastTransaction(tx *coretypes.Transaction, retries int) { + txBytes, err := h.serializer.ToSdkTxBytes(tx, tx.Gas()) + if err != nil { + h.logger.Error("failed to serialize transaction", "err", err) + return + } - // If we see an ABCI response error. - if rsp != nil && rsp.Code != 0 { - h.logger.Error("failed to broadcast transaction", "rsp", rsp, "err", err) - } else if err != nil { - h.logger.Error("error on transactions broadcast", "err", err) - } + // Send the transaction to the CometBFT mempool, which will gossip it to peers via + // CometBFT's p2p layer. + rsp, err := h.clientCtx.BroadcastTxSync(txBytes) + + // If we see an ABCI response error. + if rsp != nil && rsp.Code != 0 { + h.logger.Error("failed to broadcast transaction", "rsp", rsp, "err", err) + h.failedTxs <- &failedTx{tx: tx, retries: retries} + } else if err != nil { + h.logger.Error("error on transactions broadcast", "err", err) + h.failedTxs <- &failedTx{tx: tx, retries: retries} } } From d4d78042c5436dc6bb2177f85fc4f6b1742db28f Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Sat, 21 Oct 2023 02:44:49 -0400 Subject: [PATCH 3/5] countdown --- cosmos/txpool/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cosmos/txpool/handler.go b/cosmos/txpool/handler.go index e53cae2f1..916667f0c 100644 --- a/cosmos/txpool/handler.go +++ b/cosmos/txpool/handler.go @@ -160,11 +160,11 @@ func (h *handler) failedLoop() { case <-h.stopCh: return case failed := <-h.failedTxs: - if failed.retries >= maxRetries { - h.logger.Error("failed to broadcast transaction after max retries", "tx", failed.tx) + if failed.retries == 0 { + h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries) continue } - h.broadcastTransaction(failed.tx, failed.retries) + h.broadcastTransaction(failed.tx, failed.retries-1) } // We slightly space out the retries in order to prioritize new transactions. @@ -200,7 +200,7 @@ func (h *handler) stop(err error) { func (h *handler) broadcastTransactions(txs coretypes.Transactions) { h.logger.Debug("broadcasting transactions", "num_txs", len(txs)) for _, signedEthTx := range txs { - h.broadcastTransaction(signedEthTx, 0) + h.broadcastTransaction(signedEthTx, maxRetries) } } From a71398b8d5c9f19aefecbc99971594aace8538a7 Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Sun, 22 Oct 2023 22:57:40 -0400 Subject: [PATCH 4/5] bing --- cosmos/go.mod | 6 ++-- cosmos/go.sum | 12 +++---- cosmos/lib/ante/eject.go | 4 +-- cosmos/miner/miner.go | 60 ++++++++++++++++++++++++++-------- cosmos/runtime/runtime.go | 6 ++-- cosmos/txpool/handler.go | 16 ++++++--- e2e/testapp/polard/cmd/root.go | 2 ++ 7 files changed, 74 insertions(+), 32 deletions(-) diff --git a/cosmos/go.mod b/cosmos/go.mod index 8da0f0e3a..0af2bf5e1 100644 --- a/cosmos/go.mod +++ b/cosmos/go.mod @@ -44,9 +44,9 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e // indirect google.golang.org/grpc v1.58.1 google.golang.org/protobuf v1.31.0 - pkg.berachain.dev/polaris/contracts v0.0.0-20231010191645-a5ed99983be4 - pkg.berachain.dev/polaris/eth v0.0.0-20231010191645-a5ed99983be4 - pkg.berachain.dev/polaris/lib v0.0.0-20231010191645-a5ed99983be4 + pkg.berachain.dev/polaris/contracts v0.0.0-20231011003341-63d98bc34da2 + pkg.berachain.dev/polaris/eth v0.0.0-20231022210635-216cf48c4787 + pkg.berachain.dev/polaris/lib v0.0.0-20231011003341-63d98bc34da2 ) require ( diff --git a/cosmos/go.sum b/cosmos/go.sum index 299b78f4b..3527cad71 100644 --- a/cosmos/go.sum +++ b/cosmos/go.sum @@ -1459,12 +1459,12 @@ nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0 nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= -pkg.berachain.dev/polaris/contracts v0.0.0-20231010191645-a5ed99983be4 h1:q9gSJk2kQQbz/3cPcXGCW+JdQvxAm+7ukcDqzZiUcmw= -pkg.berachain.dev/polaris/contracts v0.0.0-20231010191645-a5ed99983be4/go.mod h1:LwuVx7lVFkIa/EXGNz2536i+YTK/mEU4A1vQJFZ8y4Q= -pkg.berachain.dev/polaris/eth v0.0.0-20231010191645-a5ed99983be4 h1:gOqKP7ZCzIU1NZrwdISnxswAatut/6Xi7JjdSpHbZk8= -pkg.berachain.dev/polaris/eth v0.0.0-20231010191645-a5ed99983be4/go.mod h1:HuT/i4PZCz3ItXAvDWHA31z4OZ10we6RH6xz+Fq0Nlw= -pkg.berachain.dev/polaris/lib v0.0.0-20231010191645-a5ed99983be4 h1:3zGPs5JJpBtysLMNWa5wKEhwC+j3p5AAK0gZ1fWuEc8= -pkg.berachain.dev/polaris/lib v0.0.0-20231010191645-a5ed99983be4/go.mod h1:OcMl9wH8872Q4++ikKXIm1FC4FbuGUWHxrjb6LCzvIY= +pkg.berachain.dev/polaris/contracts v0.0.0-20231011003341-63d98bc34da2 h1:Vd82baPNQh+6wUaj+qARezJ9dOHHdupJQncgj1UO1FQ= +pkg.berachain.dev/polaris/contracts v0.0.0-20231011003341-63d98bc34da2/go.mod h1:LwuVx7lVFkIa/EXGNz2536i+YTK/mEU4A1vQJFZ8y4Q= +pkg.berachain.dev/polaris/eth v0.0.0-20231022210635-216cf48c4787 h1:HH1KTrf4WpN0XieP26GLzKMy5YLJxj8CGVWX7olBWaY= +pkg.berachain.dev/polaris/eth v0.0.0-20231022210635-216cf48c4787/go.mod h1:Wo7D3qNsCaBcibLwCmSY2O7Rzc94sslbk+qBMA8KSl8= +pkg.berachain.dev/polaris/lib v0.0.0-20231011003341-63d98bc34da2 h1:XV+f50FxTAbJcQCIXPV6CzDiHsRj5YX+qlPjk+Hg1IA= +pkg.berachain.dev/polaris/lib v0.0.0-20231011003341-63d98bc34da2/go.mod h1:OcMl9wH8872Q4++ikKXIm1FC4FbuGUWHxrjb6LCzvIY= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/cosmos/lib/ante/eject.go b/cosmos/lib/ante/eject.go index 708c37516..a9230e47e 100644 --- a/cosmos/lib/ante/eject.go +++ b/cosmos/lib/ante/eject.go @@ -59,9 +59,9 @@ func (e *EjectOnRecheckTxDecorator) AnteHandle( if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { hash := wet.Unwrap().Hash() e.seen[hash]++ - if e.seen[hash] > 25 { //nolint:gomnd // temp fix. + if e.seen[hash] > 15 { //nolint:gomnd // temp fix. delete(e.seen, hash) // prevent leak - return ctx, errors.New("recheck tx") + return ctx, errors.New("erroring to eject transaction from the comet bft mempool") } } diff --git a/cosmos/miner/miner.go b/cosmos/miner/miner.go index 39a6ab583..9eb4f762c 100644 --- a/cosmos/miner/miner.go +++ b/cosmos/miner/miner.go @@ -23,6 +23,8 @@ package miner import ( "context" + "errors" + "time" abci "github.com/cometbft/cometbft/abci/types" @@ -50,12 +52,14 @@ type Miner struct { eth.Miner serializer EnvelopeSerializer currentPayload *miner.Payload + payloadTimeout time.Duration } // New produces a cosmos miner from a geth miner. -func New(gm eth.Miner) *Miner { +func New(gm eth.Miner, payloadTimeout time.Duration) *Miner { return &Miner{ - Miner: gm, + Miner: gm, + payloadTimeout: payloadTimeout, } } @@ -70,7 +74,9 @@ func (m *Miner) PrepareProposal( ) (*abci.ResponsePrepareProposal, error) { var payloadEnvelopeBz []byte var err error - if payloadEnvelopeBz, err = m.buildBlock(ctx); err != nil { + if payloadEnvelopeBz, err = m.buildBlock(ctx); errors.Is(err, context.DeadlineExceeded) { + return nil, err + } else if err != nil { return nil, err } return &abci.ResponsePrepareProposal{Txs: [][]byte{payloadEnvelopeBz}}, err @@ -78,12 +84,12 @@ func (m *Miner) PrepareProposal( // buildBlock builds and submits a payload, it also waits for the txs // to resolve from the underying worker. -func (m *Miner) buildBlock(ctx sdk.Context) ([]byte, error) { +func (m *Miner) buildBlock(ctx context.Context) ([]byte, error) { defer m.clearPayload() if err := m.submitPayloadForBuilding(ctx); err != nil { return nil, err } - return m.resolveEnvelope(), nil + return m.resolveEnvelope(ctx, m.payloadTimeout) } // submitPayloadForBuilding submits a payload for building. @@ -100,7 +106,6 @@ func (m *Miner) submitPayloadForBuilding(ctx context.Context) error { return err } m.currentPayload = payload - sCtx.Logger().Info("submitted payload for building") return nil } @@ -116,16 +121,45 @@ func (m *Miner) constructPayloadArgs(ctx sdk.Context) *miner.BuildPayloadArgs { } // resolveEnvelope resolves the payload. -func (m *Miner) resolveEnvelope() []byte { +func (m *Miner) resolveEnvelope(ctx context.Context, timeout time.Duration) ([]byte, error) { + sCtx := sdk.UnwrapSDKContext(ctx).Logger() + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + resultChan := make(chan []byte, 1) + errChan := make(chan error, 1) + + go m.resolvePayload(resultChan, errChan) + + select { + case <-ctx.Done(): + // If we timed out, return an empty payload. + // TODO: penalize validators for not being able to deliver the payload? + sCtx.Error("failed to resolve envelope, proposing empty payload", "err", ctx.Err()) + return m.resolveEmptyPayload() + case result := <-resultChan: + sdk.UnwrapSDKContext(ctx).Logger().Info("successfully resolved envelope") + return result, <-errChan + } +} + +// resolvePayload is a helper function to resolve the payload in a separate goroutine. +func (m *Miner) resolvePayload(resultChan chan []byte, errChan chan error) { if m.currentPayload == nil { - return nil + resultChan <- nil + errChan <- nil + return } envelope := m.currentPayload.ResolveFull() - bz, err := m.serializer.ToSdkTxBytes(envelope, envelope.ExecutionPayload.GasLimit) - if err != nil { - panic(err) - } - return bz + result, err := m.serializer.ToSdkTxBytes(envelope, envelope.ExecutionPayload.GasLimit) + resultChan <- result + errChan <- err +} + +// resolveEmptyPayload is a helper function to resolve the empty payload in a separate goroutine. +func (m *Miner) resolveEmptyPayload() ([]byte, error) { + envelope := m.currentPayload.ResolveEmpty() + return m.serializer.ToSdkTxBytes(envelope, envelope.ExecutionPayload.GasLimit) } // clearPayload clears the payload. diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 9731c97f2..a77450d15 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -90,6 +90,9 @@ func New( panic(err) } + p.WrappedMiner = miner.New(p.Miner(), cfg.Polar.Miner.NewPayloadTimeout) + p.WrappedTxPool = txpool.New(p.TxPool()) + return p } @@ -97,10 +100,7 @@ func New( // It takes a BaseApp and an EVMKeeper as arguments. // It returns an error if the setup fails. func (p *Polaris) Build(app CosmosApp, ek EVMKeeper) error { - p.WrappedTxPool = txpool.New(p.TxPool()) app.SetMempool(p.WrappedTxPool) - - p.WrappedMiner = miner.New(p.Miner()) app.SetPrepareProposal(p.WrappedMiner.PrepareProposal) if err := ek.Setup(p.Blockchain()); err != nil { diff --git a/cosmos/txpool/handler.go b/cosmos/txpool/handler.go index 916667f0c..188a8b479 100644 --- a/cosmos/txpool/handler.go +++ b/cosmos/txpool/handler.go @@ -28,6 +28,7 @@ import ( "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/ethereum/go-ethereum/event" @@ -216,12 +217,17 @@ func (h *handler) broadcastTransaction(tx *coretypes.Transaction, retries int) { // CometBFT's p2p layer. rsp, err := h.clientCtx.BroadcastTxSync(txBytes) - // If we see an ABCI response error. - if rsp != nil && rsp.Code != 0 { - h.logger.Error("failed to broadcast transaction", "rsp", rsp, "err", err) - h.failedTxs <- &failedTx{tx: tx, retries: retries} - } else if err != nil { + if err != nil { h.logger.Error("error on transactions broadcast", "err", err) h.failedTxs <- &failedTx{tx: tx, retries: retries} + } else if rsp != nil && rsp.Code != 0 { + if sdkerrors.ErrMempoolIsFull.Codespace() == rsp.Codespace && + rsp.Code == sdkerrors.ErrMempoolIsFull.ABCICode() { + h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash()) + } else { + h.logger.Error("failed to broadcast transaction", + "codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash()) + } + h.failedTxs <- &failedTx{tx: tx, retries: retries} } } diff --git a/e2e/testapp/polard/cmd/root.go b/e2e/testapp/polard/cmd/root.go index d508e78d2..25f0e2850 100644 --- a/e2e/testapp/polard/cmd/root.go +++ b/e2e/testapp/polard/cmd/root.go @@ -175,6 +175,8 @@ func initCometBFTConfig() *cmtcfg.Config { consensus.TimeoutPrecommit = time.Second * 2 consensus.TimeoutCommit = time.Second * 2 + // Increase Default Mempool size to 50000 (aggressive lol). + cfg.Mempool.Size = 50000 return cfg } From 54824b8e52fcb14fe26e537c741680bbfc0bca71 Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Sun, 22 Oct 2023 23:04:49 -0400 Subject: [PATCH 5/5] bing --- cosmos/miner/miner.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cosmos/miner/miner.go b/cosmos/miner/miner.go index 9eb4f762c..8585ba201 100644 --- a/cosmos/miner/miner.go +++ b/cosmos/miner/miner.go @@ -23,7 +23,6 @@ package miner import ( "context" - "errors" "time" abci "github.com/cometbft/cometbft/abci/types" @@ -74,9 +73,7 @@ func (m *Miner) PrepareProposal( ) (*abci.ResponsePrepareProposal, error) { var payloadEnvelopeBz []byte var err error - if payloadEnvelopeBz, err = m.buildBlock(ctx); errors.Is(err, context.DeadlineExceeded) { - return nil, err - } else if err != nil { + if payloadEnvelopeBz, err = m.buildBlock(ctx); err != nil { return nil, err } return &abci.ResponsePrepareProposal{Txs: [][]byte{payloadEnvelopeBz}}, err