Skip to content

Commit

Permalink
feat(sequencer): rewrite memool to have per-account transaction stora…
Browse files Browse the repository at this point in the history
…ge and maintenance (#1323)

## Summary
This rewrite makes our mempool similar to Geth's/Reth's with per-account
transaction storage and maintenance. This rewrite tracks 'pending'
transactions that are ready to execute, and 'parked' transactions that
could be ready for execution in the future on a per-account basis.

This rewrite adds minor new functionality from the previous mempool and
changes the mempool interface. Was co-written with @Fraser999.

## Background
Prior to this rewrite, the mempool was not in a state to be improved
upon. All transactions, ready to execute or not, were being stored
together in a giant queue. This structure didn't allow for mempool
optimizations such as performing mempool upkeep on a per account basis--
instead the only option was to loop through all transactions on the
per-block maintenance upkeep.

### `mempool/mod.rs` Changes
This file contains the rewrite of the mempool interface. Instead of
returning a modifiable queue of all transactions to the builder in
`prepare_proposal()`, now we return a copy of the transactions. Instead
of removing transactions as we process them, we only remove transactions
during the `finalize_block()` logic, updating accounts to reflect their
new nonces.

The main app-exposed mempool calls are:
- `insert()`, which will only insert a transaction if it meets the
mempool's transaction policies.
- `remove_tx_invalid()`, to be used by `prepare_proposal()` to remove
transactions that fail execution from the mempool.
- `builder_queue()`, returns a copy of the ready-to-execute transactions
sorted by priority order.
- `run_maintenance()`, updates the stored transactions post new block
height increases.

The mempool implements the following transaction policies: 
1. Nonce replacement is not allowed.
2. Accounts cannot have more than `PARKED_SIZE_LIMIT` transactions in
their parked queues (currently set to 15).
3. There is no account limit on pending transactions.
4. Transactions will expire and can be removed after `TX_TTL` time
(currently set to 4 minutes).
5. If an account has a transaction removed for being invalid or expired,
all transactions for that account with a higher nonce can be removed as
well. This is due to the fact that we do not execute failing
transactions, so a transaction 'failing' will mean that further account
nonces will not be able to execute either.

### `mempool/transactions_container.rs`
This is a new file containing the data structures for the per-account
transaction logic. Several new code level constructs are added:
- `TimemarkedTransaction`: a `SignedTransaction` wrapper that also
stores the time that the transaction was first seen by the mempool. This
is used for implementing the transaction expiry policy.
- The trait `TransactionsForAccount` with types
`PendingTransactionsForAccount` and `ParkedTransactionForAccount`. This
is used to house the per-account logic of storing pending and parked
transactions, respectively.
- `TransactionsContainer<T>`: a struct with a generic over
`TransactionsForAccount`. Types `PendingTransactions` and
`ParkedTransactions` are defined over this as the mempool level
containers for pending and parked transactions.

### Other notable changes
- All transaction eviction reasons are surfaced to the `CheckTx()`
CometBFT mempool logic. TBD on how to surface this to the end user.
- Fixed hidden bug in the app's `finalize_block()` logic which was
updating the mempool with the previous block's state.
- Returned transactions for block building are ordered by nonce
difference and then time first seen. The previous implementation only
did not sort by time first seen.

### Mempool Overview 
The overall mempool structure as of this PR for Astria is shown in the
below diagram:

![image](https://github.com/user-attachments/assets/daf26a2b-4083-49ec-adb5-0f4ac5959c00)

## Testing
- Unit tests were written for all desired behaviors.
- Ran locally with a single node. 
- The mempool's benchmarks were modified to work with the new interface,
showing a 2x speed increase on mempools sized 100 transactions and 10x
speed increase on 10,000 transactions.

## TODO still
- Rewrite and add new metrics.
- Implement pending/parked logic that takes into consideration and
account's balance.
- Rewrite benchmarks to better represent expected user behavior. 


## Related Issues
closes #1154, #1150, #1334
  • Loading branch information
Lilyjjo authored Aug 20, 2024
1 parent ee31f2d commit 2ce5fd9
Show file tree
Hide file tree
Showing 12 changed files with 2,198 additions and 726 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ impl AbciErrorCode {
pub const VALUE_NOT_FOUND: Self = Self(unsafe { NonZeroU32::new_unchecked(8) });
pub const TRANSACTION_EXPIRED: Self = Self(unsafe { NonZeroU32::new_unchecked(9) });
pub const TRANSACTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(10) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) });
}

impl AbciErrorCode {
Expand All @@ -42,6 +44,10 @@ impl AbciErrorCode {
Self::TRANSACTION_FAILED => {
"the transaction failed to execute in prepare_proposal()".into()
}
Self::TRANSACTION_INSERTION_FAILED => {
"the transaction failed insertion into the mempool".into()
}
Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(),
Self::BAD_REQUEST => "the request payload was malformed".into(),
Self(other) => {
format!("invalid error code {other}: should be unreachable (this is a bug)")
Expand Down
1 change: 0 additions & 1 deletion crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7
] }
ibc-proto = { version = "0.41.0", features = ["server"] }
matchit = "0.7.2"
priority-queue = "2.0.2"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
Expand Down
68 changes: 37 additions & 31 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ use tracing::{
};

use crate::{
accounts,
accounts::{
self,
component::AccountsComponent,
StateReadExt,
StateWriteExt as _,
},
address::StateWriteExt as _,
Expand Down Expand Up @@ -484,11 +485,21 @@ impl App {
let mut included_signed_txs = Vec::new();
let mut failed_tx_count: usize = 0;
let mut execution_results = Vec::new();
let mut txs_to_readd_to_mempool = Vec::new();
let mut excluded_txs: usize = 0;

// get copy of transactions to execute from mempool
let current_account_nonce_getter =
|address: [u8; 20]| self.state.get_account_nonce(address);
let pending_txs = self
.mempool
.builder_queue(current_account_nonce_getter)
.await
.expect("failed to fetch pending transactions");

while let Some((enqueued_tx, priority)) = self.mempool.pop().await {
let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string();
let tx = enqueued_tx.signed_tx();
let mut unused_count = pending_txs.len();
for (tx_hash, tx) in pending_txs {
unused_count = unused_count.saturating_sub(1);
let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string();
let bytes = tx.to_raw().encode_to_vec();
let tx_len = bytes.len();
info!(transaction_hash = %tx_hash_base64, "executing transaction");
Expand All @@ -503,7 +514,7 @@ impl App {
tx_data_bytes = tx_len,
"excluding remaining transactions: max cometBFT data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// break from loop, as the block is full
break;
Expand All @@ -526,7 +537,7 @@ impl App {
tx_data_bytes = tx_sequence_data_bytes,
"excluding transaction: max block sequenced data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// continue as there might be non-sequence txs that can fit
continue;
Expand Down Expand Up @@ -558,18 +569,21 @@ impl App {
);

if e.downcast_ref::<InvalidNonce>().is_some() {
// we re-insert the tx into the mempool if it failed to execute
// we don't remove the tx from mempool if it failed to execute
// due to an invalid nonce, as it may be valid in the future.
// if it's invalid due to the nonce being too low, it'll be
// removed from the mempool in `update_mempool_after_finalization`.
txs_to_readd_to_mempool.push((enqueued_tx, priority));
} else {
failed_tx_count = failed_tx_count.saturating_add(1);

// the transaction should be removed from the cometbft mempool
// remove the failing transaction from the mempool
//
// this will remove any transactions from the same sender
// as well, as the dependent nonces will not be able
// to execute
self.mempool
.track_removal_comet_bft(
enqueued_tx.tx_hash(),
.remove_tx_invalid(
tx,
RemovalReason::FailedPrepareProposal(e.to_string()),
)
.await;
Expand All @@ -586,15 +600,12 @@ impl App {
);
}
self.metrics.set_prepare_proposal_excluded_transactions(
txs_to_readd_to_mempool
.len()
.saturating_add(failed_tx_count),
excluded_txs.saturating_add(failed_tx_count),
);

self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");
self.metrics.set_transactions_in_mempool_total(mempool_len);
debug!("{unused_count} leftover pending transactions");
self.metrics
.set_transactions_in_mempool_total(self.mempool.len().await);

self.execution_results = Some(execution_results);
Ok((validated_txs, included_signed_txs))
Expand Down Expand Up @@ -805,10 +816,6 @@ impl App {

// skip the first two transactions, as they are the rollup data commitments
for tx in finalize_block.txs.iter().skip(2) {
// remove any included txs from the mempool
let tx_hash = Sha256::digest(tx).into();
self.mempool.remove(tx_hash).await;

let signed_tx = signed_transaction_from_bytes(tx)
.context("protocol error; only valid txs should be finalized")?;

Expand Down Expand Up @@ -880,6 +887,10 @@ impl App {
state_tx
.put_sequencer_block(sequencer_block)
.context("failed to write sequencer block to state")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, &state_tx).await;

// events that occur after end_block are ignored here;
// there should be none anyways.
let _ = self.apply(state_tx);
Expand All @@ -890,11 +901,6 @@ impl App {
.await
.context("failed to prepare commit")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, self.state.clone())
.await
.context("failed to update mempool after finalization")?;

Ok(abci::response::FinalizeBlock {
events: end_block.events,
validator_updates: end_block.validator_updates,
Expand Down Expand Up @@ -1121,10 +1127,10 @@ impl App {
// the mempool is large.
async fn update_mempool_after_finalization<S: accounts::StateReadExt>(
mempool: &mut Mempool,
state: S,
) -> anyhow::Result<()> {
state: &S,
) {
let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address);
mempool.run_maintenance(current_account_nonce_getter).await
mempool.run_maintenance(current_account_nonce_getter).await;
}

/// relevant data of a block being executed.
Expand Down
12 changes: 9 additions & 3 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use astria_core::{
crypto::SigningKey,
primitive::v1::RollupId,
Expand Down Expand Up @@ -138,21 +140,25 @@ pub(crate) async fn initialize_app(
app
}

pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
pub(crate) fn mock_tx(
nonce: u32,
signer: &SigningKey,
rollup_name: &str,
) -> Arc<SignedTransaction> {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes([0; 32]),
rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()),
data: Bytes::from_static(&[0x99]),
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(&get_alice_signing_key())
Arc::new(tx.into_signed(signer))
}
28 changes: 23 additions & 5 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool.insert(signed_tx, 0).await.unwrap();
app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand All @@ -473,6 +473,12 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
assert_eq!(prepare_proposal_result.txs, finalize_block.txs);
assert_eq!(app.executed_proposal_hash, Hash::default());
assert_eq!(app.validator_address.unwrap(), proposer_address);
// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

assert_eq!(app.mempool.len().await, 0);

// call process_proposal - should not re-execute anything.
Expand Down Expand Up @@ -561,8 +567,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -581,6 +587,12 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down Expand Up @@ -634,8 +646,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -654,6 +666,12 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down
19 changes: 13 additions & 6 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,20 @@ mod test {

let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
let nonce = 99;
let tx = crate::app::test_utils::get_mock_tx(nonce);
// insert a transaction with a nonce gap
let gapped_nonce = 99;
let tx = crate::app::test_utils::mock_tx(gapped_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a tx with lower nonce also, but we should get the highest nonce
let lower_nonce = 98;
let tx = crate::app::test_utils::get_mock_tx(lower_nonce);
// insert a transaction at the current nonce
let account_nonce = 0;
let tx = crate::app::test_utils::mock_tx(account_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a transactions one above account nonce (not gapped)
let sequential_nonce = 1;
let tx: Arc<astria_core::protocol::transaction::v1alpha1::SignedTransaction> =
crate::app::test_utils::mock_tx(sequential_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

let server = Arc::new(SequencerServer::new(storage.clone(), mempool));
Expand All @@ -279,7 +286,7 @@ mod test {
};
let request = Request::new(request);
let response = server.get_pending_nonce(request).await.unwrap();
assert_eq!(response.into_inner().inner, nonce);
assert_eq!(response.into_inner().inner, sequential_nonce);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 2ce5fd9

Please sign in to comment.