Skip to content

Commit

Permalink
mempool rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
Lilyjjo committed Jul 31, 2024
1 parent b8f26d2 commit 7bf4753
Show file tree
Hide file tree
Showing 7 changed files with 3,065 additions and 1,003 deletions.
12 changes: 9 additions & 3 deletions crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ impl AbciErrorCode {
pub const VALUE_NOT_FOUND: Self = Self(8);
pub const TRANSACTION_EXPIRED: Self = Self(9);
pub const TRANSACTION_FAILED: Self = Self(10);
pub const BAD_REQUEST: Self = Self(11);
pub const TRANSACTION_INSERTION_FAILED: Self = Self(11);
pub const LOWER_NONCE_INVALIDATED: Self = Self(12);
pub const BAD_REQUEST: Self = Self(13);
}

impl AbciErrorCode {
Expand All @@ -38,7 +40,9 @@ impl AbciErrorCode {
8 => "the requested value was not found".into(),
9 => "the transaction expired in the app's mempool".into(),
10 => "the transaction failed to execute in prepare_proposal()".into(),
11 => "the request payload was malformed".into(),
11 => "the transaction failed insertion into the mempool".into(),
12 => "lower nonce was invalidated in mempool".into(),
13 => "the request payload was malformed".into(),
other => format!("unknown non-zero abci error code: {other}").into(),
}
}
Expand Down Expand Up @@ -69,7 +73,9 @@ impl From<NonZeroU32> for AbciErrorCode {
8 => Self::VALUE_NOT_FOUND,
9 => Self::TRANSACTION_EXPIRED,
10 => Self::TRANSACTION_FAILED,
11 => Self::BAD_REQUEST,
11 => Self::TRANSACTION_INSERTION_FAILED,
12 => Self::LOWER_NONCE_INVALIDATED,
13 => Self::BAD_REQUEST,
other => Self(other),
}
}
Expand Down
51 changes: 28 additions & 23 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,19 @@ impl App {
let mut included_signed_txs = Vec::new();
let mut failed_tx_count: usize = 0;
let mut execution_results = Vec::new();
let mut txs_to_readd_to_mempool = Vec::new();
let mut excluded_txs: usize = 0;

while let Some((enqueued_tx, priority)) = self.mempool.pop().await {
let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string();
let tx = enqueued_tx.signed_tx();
// get copy of transactions to execute from mempool
let current_account_nonce_getter = |address: Address| self.state.get_account_nonce(address);
let mut pending_txs = self
.mempool
.builder_queue(current_account_nonce_getter)
.await
.expect("failed to fetch pending transactions");

while let Some((timemarked_tx, _)) = pending_txs.pop() {
let tx_hash_base64 = telemetry::display::base64(&timemarked_tx.tx_hash()).to_string();
let tx = timemarked_tx.signed_tx();
let bytes = tx.to_raw().encode_to_vec();
let tx_len = bytes.len();
info!(transaction_hash = %tx_hash_base64, "executing transaction");
Expand All @@ -514,7 +522,7 @@ impl App {
tx_data_bytes = tx_len,
"excluding remaining transactions: max cometBFT data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// break from loop, as the block is full
break;
Expand All @@ -537,14 +545,14 @@ impl App {
tx_data_bytes = tx_sequence_data_bytes,
"excluding transaction: max block sequenced data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// continue as there might be non-sequence txs that can fit
continue;
}

// execute tx and store in `execution_results` list on success
match self.execute_transaction(tx.clone()).await {
match self.execute_transaction(Arc::new(tx.clone())).await {
Ok(events) => {
execution_results.push(ExecTxResult {
events,
Expand All @@ -569,18 +577,22 @@ impl App {
);

if e.downcast_ref::<InvalidNonce>().is_some() {
// we re-insert the tx into the mempool if it failed to execute
// we don't remove the tx from mempool if it failed to execute
// due to an invalid nonce, as it may be valid in the future.
//
// if it's invalid due to the nonce being too low, it'll be
// removed from the mempool in `update_mempool_after_finalization`.
txs_to_readd_to_mempool.push((enqueued_tx, priority));
} else {
failed_tx_count = failed_tx_count.saturating_add(1);

// the transaction should be removed from the cometbft mempool
// remove the failing transaction from the mempool
//
// this will remove any transactions from the same sender
// as well, as the dependent nonces will not be able
// to execute
self.mempool
.track_removal_comet_bft(
enqueued_tx.tx_hash(),
.remove_tx_invalid(
tx,
RemovalReason::FailedPrepareProposal(e.to_string()),
)
.await;
Expand All @@ -597,15 +609,12 @@ impl App {
);
}
self.metrics.set_prepare_proposal_excluded_transactions(
txs_to_readd_to_mempool
.len()
.saturating_add(failed_tx_count),
excluded_txs.saturating_add(failed_tx_count),
);

self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");
self.metrics.set_transactions_in_mempool_total(mempool_len);
debug!("{} {}", pending_txs.len(), "leftover pending transactions");
self.metrics
.set_transactions_in_mempool_total(self.mempool.len().await);

self.execution_results = Some(execution_results);
Ok((validated_txs, included_signed_txs))
Expand Down Expand Up @@ -816,10 +825,6 @@ impl App {

// skip the first two transactions, as they are the rollup data commitments
for tx in finalize_block.txs.iter().skip(2) {
// remove any included txs from the mempool
let tx_hash = Sha256::digest(tx).into();
self.mempool.remove(tx_hash).await;

let signed_tx = signed_transaction_from_bytes(tx)
.context("protocol error; only valid txs should be finalized")?;

Expand Down
23 changes: 23 additions & 0 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,26 @@ pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {

tx.into_signed(&alice_signing_key)
}

pub(crate) fn get_mock_tx_parameterized(
nonce: u32,
signer: &SigningKey,
data_bytes: [u8; 32],
) -> SignedTransaction {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes(data_bytes),
data: vec![0x99],
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(signer)
}
Loading

0 comments on commit 7bf4753

Please sign in to comment.