Skip to content

Commit

Permalink
Merge branch 'mempool-feed-stage' into merge/v1.13.15
Browse files Browse the repository at this point in the history
  • Loading branch information
AusIV authored May 6, 2024
2 parents ed1f7ae + 755dd73 commit 75b5452
Show file tree
Hide file tree
Showing 32 changed files with 405 additions and 152 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- name: Build geth
run: |
cd cmd/geth && /opt/go/1.19.5/bin/go build
cd cmd/geth && /opt/go/1.21.0/bin/go build
tar -czvf geth-$(basename ${GITHUB_REF})-linux-arm64.tar.gz geth
echo $(md5sum geth | awk '{print $1}') > geth-$(basename ${GITHUB_REF})-linux-arm64.tar.gz.md5
- name: Upload geth to release page
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.21.x
go-version: 1.22.x
- name: Test tracers
run: go test -v ./eth/tracers/...
33 changes: 33 additions & 0 deletions bn/prometheus/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package prometheus

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type metrics struct {
registry *prometheus.Registry
gatherers prometheus.Gatherers
}

var Metrics metrics

func init() {
reg := prometheus.NewRegistry()
Metrics = metrics{registry: reg, gatherers: prometheus.Gatherers{reg}}
}

func (m *metrics) Register(collector prometheus.Collector) error {
return m.registry.Register(collector)
}

func (m *metrics) Handler() http.Handler {
return promhttp.HandlerFor(
m.gatherers[0],
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
)
}
2 changes: 1 addition & 1 deletion cmd/utils/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestHistoryImportAndExport(t *testing.T) {
db2.Close()
})

genesis.MustCommit(db2, triedb.NewDatabase(db, triedb.HashDefaults))
genesis.MustCommit(db2, triedb.NewDatabase(db, triedb.HashDefaults))
imported, err := core.NewBlockChain(db2, nil, genesis, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("unable to initialize chain: %v", err)
Expand Down
11 changes: 10 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,16 @@ func ApplyUnsignedTransactionWithResult(config *params.ChainConfig, bc ChainCont

// Create a new context to be used in the EVM environment
blockContext := NewEVMBlockContext(header, bc, author)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, vm.Config{Tracer: tracer, NoBaseFee: true})
txContext := vm.TxContext{
Origin: msg.From,
GasPrice: msg.GasPrice,
BlobHashes: msg.BlobHashes,
BlobFeeCap: msg.BlobGasFeeCap,
}
if txContext.GasPrice == nil {
txContext.GasPrice = common.Big0
}
vmenv := vm.NewEVM(blockContext, txContext, statedb, config, vm.Config{Tracer: tracer, NoBaseFee: true})
return applyTransactionWithResult(msg, config, bc, author, gp, statedb, header, msg, usedGas, vmenv, tracer)
}

Expand Down
10 changes: 7 additions & 3 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
// ExecutionResult includes all output after executing given evm
// message no matter the execution itself is successful or not.
type ExecutionResult struct {
Fee *uint256.Int
UsedGas uint64 // Total used gas, not including the refunded gas
RefundedGas uint64 // Total gas refunded after execution
Err error // Any error encountered during the execution(listed in core/vm/errors.go)
ReturnData []byte // Returned data from evm(function result or data supplied with revert opcode)
Hash common.Hash
}

// Unwrap returns the internal evm error which allows us for further
Expand Down Expand Up @@ -234,7 +236,7 @@ func (st *StateTransition) to() common.Address {

func (st *StateTransition) buyGas() error {
mgval := new(big.Int).SetUint64(st.msg.GasLimit)
mgval = mgval.Mul(mgval, st.msg.GasPrice)
mgval.Mul(mgval, st.msg.GasPrice)
balanceCheck := new(big.Int).Set(mgval)
if st.msg.GasFeeCap != nil {
balanceCheck.SetUint64(st.msg.GasLimit)
Expand Down Expand Up @@ -460,17 +462,19 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
}
effectiveTipU256, _ := uint256.FromBig(effectiveTip)

fee := new(uint256.Int)
if st.evm.Config.NoBaseFee && msg.GasFeeCap.Sign() == 0 && msg.GasTipCap.Sign() == 0 {
// Skip fee payment when NoBaseFee is set and the fee fields
// are 0. This avoids a negative effectiveTip being applied to
// the coinbase when simulating calls.
} else {
fee := new(uint256.Int).SetUint64(st.gasUsed())
fee.SetUint64(st.gasUsed())
fee.Mul(fee, effectiveTipU256)
st.state.AddBalance(st.evm.Context.Coinbase, fee)
}

return &ExecutionResult{
Fee: fee,
UsedGas: st.gasUsed(),
RefundedGas: gasRefund,
Err: vmerr,
Expand All @@ -488,7 +492,7 @@ func (st *StateTransition) refundGas(refundQuotient uint64) uint64 {

// Return ETH for remaining gas, exchanged at the original rate.
remaining := uint256.NewInt(st.gasRemaining)
remaining = remaining.Mul(remaining, uint256.MustFromBig(st.msg.GasPrice))
remaining.Mul(remaining, uint256.MustFromBig(st.msg.GasPrice))
st.state.AddBalance(st.msg.From, remaining)

// Also return remaining gas to the block gas counter so it is
Expand Down
50 changes: 47 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type blobTxMeta struct {
evictionExecTip *uint256.Int // Worst gas tip across all previous nonces
evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces

// -------- BLOCKNATIVE MODIFICATION START -------------
blobHashes []common.Hash
// -------- BLOCKNATIVE MODIFICATION STOP -------------
}

// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
Expand All @@ -122,6 +126,10 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),

// -------- BLOCKNATIVE MODIFICATION START -------------
blobHashes: tx.BlobHashes(),
// -------- BLOCKNATIVE MODIFICATION STOP -------------
}
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
Expand Down Expand Up @@ -1209,17 +1217,28 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
var (
adds = make([]*types.Transaction, 0, len(txs))
// -------- BLOCKNATIVE MODIFICATION START -------------
fullAdds = make([]*types.Transaction, 0, len(txs))
// -------- BLOCKNATIVE MODIFICATION STOP -------------
errs = make([]error, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
// -------- BLOCKNATIVE MODIFICATION START -------------
fullAdds = append(adds, tx)
// -------- BLOCKNATIVE MODIFICATION STOP -------------
}
}
// -------- BLOCKNATIVE MODIFICATION START -------------
if len(fullAdds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: fullAdds})
}
// -------- BLOCKNATIVE MODIFICATION STOP -------------

if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}
Expand Down Expand Up @@ -1624,7 +1643,33 @@ func (p *BlobPool) Stats() (int, int) {
// For the blob pool, this method will return nothing for now.
// TODO(karalabe): Abstract out the returned metadata.
func (p *BlobPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction)

// -------- BLOCKNATIVE MODIFICATION START -------------

pending := make(map[common.Address][]*types.Transaction)
p.lock.RLock()
defer p.lock.RUnlock()

for addr, txs := range p.index {
var lazies []*types.Transaction
for _, tx := range txs {
lazies = append(lazies, types.NewTx(&types.BlobTx{
Gas: tx.execGas,
BlobFeeCap: tx.blobFeeCap,
Nonce: tx.nonce,
GasFeeCap: tx.execFeeCap,
GasTipCap: tx.execTipCap,
BlobHashes: tx.blobHashes,
}))
}
if len(lazies) > 0 {
pending[addr] = lazies
}

}

return pending, make(map[common.Address][]*types.Transaction)
// -------- BLOCKNATIVE MODIFICATION STOP --------------
}

// ContentFrom retrieves the data content of the transaction pool, returning the
Expand Down Expand Up @@ -1652,7 +1697,6 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
return txpool.TxStatusUnknown
}


// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
// starts sending event to the given channel.
func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {
Expand Down
4 changes: 2 additions & 2 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ func TestOpenDrops(t *testing.T) {

id, _ := store.Put(blob)
filled[id] = struct{}{}
}
// Insert a sequence of transactions with partially passed nonces to verify
}
// Insert a sequence of transactions with partially passed nonces to verify
// that the included part of the set will get dropped (case 4).
var (
overlapper, _ = crypto.GenerateKey()
Expand Down
1 change: 1 addition & 0 deletions core/vm/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type StateDB interface {

// BlockNative additions
Logs() []*types.Log
GetLogs(hash common.Hash, blockNumber uint64, blockHash common.Hash) []*types.Log
IntermediateRoot(bool) common.Hash
}

Expand Down
2 changes: 0 additions & 2 deletions eth/downloader/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error
notifier.Notify(rpcSub.ID, status)
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()
Expand Down
6 changes: 0 additions & 6 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
}
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()
Expand Down Expand Up @@ -245,8 +243,6 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()
Expand Down Expand Up @@ -282,8 +278,6 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
}
case <-rpcSub.Err(): // client send an unsubscribe request
return
case <-notifier.Closed(): // connection dropped
return
}
}
}()
Expand Down
67 changes: 37 additions & 30 deletions eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/tracers/blocknative"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand All @@ -34,29 +35,31 @@ type rejectNotification struct {
// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction

type RPCTransaction struct {
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`
Trace *blocknative.Trace `json:"trace,omitempty"`
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`

BlobSidecar *types.BlobTxSidecar `json:"sidecar,omitempty"`
Trace *blocknative.Trace `json:"trace,omitempty"`
}

// newRPCTransaction returns a transaction that will serialize to the RPC
Expand Down Expand Up @@ -119,6 +122,8 @@ func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction {
result.GasPrice = (*hexutil.Big)(tx.GasFeeCap())
result.MaxFeePerBlobGas = (*hexutil.Big)(tx.BlobGasFeeCap())
result.BlobVersionedHashes = tx.BlobHashes()

result.BlobSidecar = tx.BlobTxSidecar()
}
return result
}
Expand Down Expand Up @@ -149,9 +154,13 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
dropped := make(chan core.DropTxsEvent)
droppedSub := api.sys.backend.SubscribeDropTxsEvent(dropped)

metricsDroppedTxsNew.Inc(1)
defer metricsDroppedTxsEnd.Inc(1)

for {
select {
case d := <-dropped:
metricsDroppedTxsReceived.Inc(int64(len(d.Txs)))
for _, tx := range d.Txs {
notification := &dropNotification{
Tx: newRPCPendingTransaction(tx),
Expand All @@ -163,14 +172,15 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
peerid, _ := txPeerMap.Get(tx.Hash())
notification.Peer, _ = peerIDMap.Load(peerid)
}
notifier.Notify(rpcSub.ID, notification)
metricsDroppedTxsSent.Inc(1)
if err := notifier.Notify(rpcSub.ID, notification); err != nil {
log.Error("dropped_txs_stream: failed to notify", "err", err)
return
}
}
case <-rpcSub.Err():
droppedSub.Unsubscribe()
return
case <-notifier.Closed():
droppedSub.Unsubscribe()
return
}
}
}()
Expand Down Expand Up @@ -232,9 +242,6 @@ func (api *FilterAPI) RejectedTransactions(ctx context.Context) (*rpc.Subscripti
case <-rpcSub.Err():
rejectedSub.Unsubscribe()
return
case <-notifier.Closed():
rejectedSub.Unsubscribe()
return
}
}
}()
Expand Down
Loading

0 comments on commit 75b5452

Please sign in to comment.