Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool blob feed + trace related fixes #164

Merged
merged 17 commits into from
Mar 7, 2024
13 changes: 12 additions & 1 deletion core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,17 +1217,28 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
var (
adds = make([]*types.Transaction, 0, len(txs))
// -------- BLOCKNATIVE MODIFICATION START -------------
fullAdds = make([]*types.Transaction, 0, len(txs))
// -------- BLOCKNATIVE MODIFICATION STOP -------------
errs = make([]error, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
// -------- BLOCKNATIVE MODIFICATION START -------------
fullAdds = append(adds, tx)
// -------- BLOCKNATIVE MODIFICATION STOP -------------
}
}
// -------- BLOCKNATIVE MODIFICATION START -------------
if len(fullAdds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: fullAdds})
}
// -------- BLOCKNATIVE MODIFICATION STOP -------------

if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}
Expand Down
50 changes: 27 additions & 23 deletions eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,31 @@ type rejectNotification struct {
// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction

type RPCTransaction struct {
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`
Trace *blocknative.Trace `json:"trace,omitempty"`
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`

BlobSidecar *types.BlobTxSidecar `json:"sidecar,omitempty"`
Trace *blocknative.Trace `json:"trace,omitempty"`
}

// newRPCTransaction returns a transaction that will serialize to the RPC
Expand Down Expand Up @@ -119,6 +121,8 @@ func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction {
result.GasPrice = (*hexutil.Big)(tx.GasFeeCap())
result.MaxFeePerBlobGas = (*hexutil.Big)(tx.BlobGasFeeCap())
result.BlobVersionedHashes = tx.BlobHashes()

result.BlobSidecar = tx.BlobTxSidecar()
}
return result
}
Expand Down
22 changes: 21 additions & 1 deletion eth/filters/peers_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func RPCMarshalHeader(head *types.Header) map[string]interface{} {
result["baseFeePerGas"] = (*hexutil.Big)(head.BaseFee)
}

if head.BlobGasUsed != nil {
result["blobGasUsed"] = hexutil.Uint64(*head.BlobGasUsed)
}

if head.ExcessBlobGas != nil {
result["excessBlobGas"] = hexutil.Uint64(*head.ExcessBlobGas)
}

return result
}

Expand Down Expand Up @@ -480,7 +488,19 @@ func (api *FilterAPI) NewPendingTransactionsWithPeers(ctx context.Context) (*rpc
peerid, _ := txPeerMap.Get(h)
p2pts, _ := tsMap.Get(h)
peer, _ := peerIDMap.Load(peerid)
notifier.Notify(rpcSub.ID, withPeer{Value: newRPCPendingTransaction(api.sys.backend.GetPoolTransaction(h)), Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts})

bTx := api.sys.backend.GetPoolTransaction(h)
val := newRPCPendingTransaction(bTx)
if val == nil {
val = newRPCPendingTransaction(tx)
}

if tx != nil && val != nil {
if tx.Type() == 3 && val.BlobSidecar == nil {
val.BlobSidecar = tx.BlobTxSidecar()
}
}
notifier.Notify(rpcSub.ID, withPeer{Value: val, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts})
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
Expand Down
28 changes: 13 additions & 15 deletions eth/filters/trace_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -108,7 +106,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
traceCtx.TxHash = tx.Hash()
trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts)
if err != nil {
log.Error("failed to trace tx", "err", err, "tx", tx.Hash())
log.Info("failed to trace tx", "err", err, "tx", tx.Hash())
continue
}

Expand Down Expand Up @@ -175,11 +173,14 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
}
case h := <-headers:
hashes = []common.Hash{h.Hash()}
case <-headersSub.Err():
case err := <-headersSub.Err():
log.Error("HeaderSub error", "error", err)
return
case <-reorgSub.Err():
case err := <-reorgSub.Err():
log.Error("ReorgSub error", "error", err)
return
case <-notifier.Closed():
log.Error("Nofitier closed Error")
return
}

Expand All @@ -192,19 +193,20 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON

marshalBlock, err := RPCMarshalBlock(block, true, true, api.sys.backend.ChainConfig())
if err != nil {
log.Error("failed to marshal block", "err", err, "block", block.Number())
continue
}

trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts)
if err != nil {
log.Error("failed to trace block", "err", err, "block", block.Number())
log.Info("failed to trace block", "err", err, "block", block.Number())
continue
}
marshalBlock["trace"] = trace

marshalReceipts := make(map[common.Hash]map[string]interface{})
receipts, err := api.sys.backend.GetReceipts(ctx, hash)
if err != nil {
log.Error("failed to get receipts for block", "err", err, "hash ", hash, "block", block.Number())
continue
}
for index, receipt := range receipts {
Expand Down Expand Up @@ -239,20 +241,14 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return rpcSub, nil
}

var (
txTraceLocksMu sync.RWMutex
txTraceLocks = make(map[common.Hash]chan struct{})
txTraceLocksTimeout = 1 * time.Second
)

// traceTx traces a transaction with the given contexts.
func traceTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContext, chainConfig *params.ChainConfig, statedb *state.StateDB, tracerOpts blocknative.TracerOpts) (*blocknative.Trace, error) {
tracer, err := blocknative.NewTracerWithOpts(tracerOpts)
if err != nil {
return nil, err
}
txContext := core.NewEVMTxContext(message)
vmenv := vm.NewEVM(vmctx, txContext, statedb, chainConfig, vm.Config{Tracer: tracer, NoBaseFee: true})
vmenv := vm.NewEVM(vmctx, txContext, statedb, chainConfig, vm.Config{Tracer: tracer})
statedb.SetTxContext(txCtx.TxHash, txCtx.TxIndex)

if _, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.GasLimit)); err != nil {
Expand Down Expand Up @@ -288,8 +284,9 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
)

for i, tx := range txs {
msg, err := core.TransactionToMessage(tx, signer, block.BaseFee())
msg, err := core.TransactionToMessage(tx, signer, blockCtx.BaseFee)
if err != nil {
log.Error("failed to trace block in transaction to message", "err", err, "tx", tx.Hash())
return nil, err
}
txCtx := &tracers.Context{
Expand All @@ -300,6 +297,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
}
results[i], err = traceTx(msg, txCtx, blockCtx, chainConfig, statedb, tracerOpts)
if err != nil {
log.Error("failed to trace block in transaction", "err", err, "tx", tx.Hash())
return nil, err
}
statedb.Finalise(is158)
Expand Down
Loading