Skip to content

Commit

Permalink
avoid passing state into builder_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraser999 committed Jan 17, 2025
1 parent 7b1e2a5 commit 552a1c9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 102 deletions.
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ impl App {
};

// get copy of transactions to execute from mempool
let pending_txs = self.mempool.builder_queue(&self.state).await;
let pending_txs = self.mempool.builder_queue().await;

let mut unused_count = pending_txs.len();
for (tx_hash, tx) in pending_txs {
Expand Down
11 changes: 1 addition & 10 deletions crates/astria-sequencer/src/mempool/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,11 @@ fn builder_queue<T: MempoolSize>(bencher: divan::Bencher) {
.build()
.unwrap();

let mut mock_state = runtime.block_on(mock_state_getter());

// iterate over all signers and put their balances and nonces into the mock state
for i in 0..SIGNER_COUNT {
let signing_key = SigningKey::from([i; 32]);
let signing_address = signing_key.address_bytes();
mock_state_put_account_nonce(&mut mock_state, &signing_address, 0);
}

bencher
.with_inputs(|| init_mempool::<T>())
.bench_values(move |mempool| {
runtime.block_on(async {
mempool.builder_queue(&mock_state).await;
mempool.builder_queue().await;
});
});
}
Expand Down
46 changes: 17 additions & 29 deletions crates/astria-sequencer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,8 @@ impl Mempool {
/// Returns a copy of all transactions and their hashes ready for execution, sorted first by the
/// difference between a transaction and the account's current nonce and then by the time that
/// the transaction was first seen by the appside mempool.
#[instrument(skip_all, err(level = Level::DEBUG))]
pub(crate) async fn builder_queue<S: accounts::StateReadExt>(
&self,
state: &S,
) -> Vec<([u8; 32], Arc<Transaction>)> {
self.pending.read().await.builder_queue(state).await
pub(crate) async fn builder_queue(&self) -> Vec<([u8; 32], Arc<Transaction>)> {
self.pending.read().await.builder_queue()
}

/// Removes the target transaction and all transactions for associated account with higher
Expand Down Expand Up @@ -589,10 +585,9 @@ mod tests {
#[tokio::test]
async fn single_account_flow_extensive() {
// This test tries to hit the more complex edges of the mempool with a single account.
// The test adds the nonces [1,2,0,4], creates a builder queue with the account
// nonce at 1, and then cleans the pool to nonce 4. This tests some of the
// odder edge cases that can be hit if a node goes offline or fails to see
// some transactions that other nodes include into their proposed blocks.
// The test adds the nonces [1,2,0,4], creates a builder queue, and then cleans the pool to
// nonce 4. This tests some of the odder edge cases that can be hit if a node goes offline
// or fails to see some transactions that other nodes include into their proposed blocks.
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
Expand Down Expand Up @@ -642,21 +637,13 @@ mod tests {
// assert size
assert_eq!(mempool.len().await, 4);

// mock state with nonce at 1
let mut mock_state = mock_state_getter().await;
mock_state_put_account_nonce(
&mut mock_state,
astria_address_from_hex_string(ALICE_ADDRESS).as_bytes(),
1,
);

// grab building queue, should return transactions [1,2] since [0] was below and [4] is
// gapped
let builder_queue = mempool.builder_queue(&mock_state).await;
// grab building queue, should return transactions [0,1,2] since [4] is gapped
let builder_queue = mempool.builder_queue().await;

// see contains first two transactions that should be pending
assert_eq!(builder_queue[0].1.nonce(), 1, "nonce should be one");
assert_eq!(builder_queue[1].1.nonce(), 2, "nonce should be two");
assert_eq!(builder_queue[0].1.nonce(), 0, "nonce should be zero");
assert_eq!(builder_queue[1].1.nonce(), 1, "nonce should be one");
assert_eq!(builder_queue[2].1.nonce(), 2, "nonce should be two");

// see mempool's transactions just cloned, not consumed
assert_eq!(mempool.len().await, 4);
Expand All @@ -665,6 +652,7 @@ mod tests {
// to pending

// setup state
let mut mock_state = mock_state_getter().await;
mock_state_put_account_nonce(
&mut mock_state,
astria_address_from_hex_string(ALICE_ADDRESS).as_bytes(),
Expand All @@ -682,7 +670,7 @@ mod tests {
assert_eq!(mempool.len().await, 1);

// see transaction [4] properly promoted
let mut builder_queue = mempool.builder_queue(&mock_state).await;
let mut builder_queue = mempool.builder_queue().await;
let (_, returned_tx) = builder_queue.pop().expect("should return last transaction");
assert_eq!(returned_tx.nonce(), 4, "nonce should be four");
}
Expand Down Expand Up @@ -727,7 +715,7 @@ mod tests {
1,
);

let builder_queue = mempool.builder_queue(&mock_state).await;
let builder_queue = mempool.builder_queue().await;
assert_eq!(
builder_queue.len(),
1,
Expand All @@ -746,7 +734,7 @@ mod tests {
mempool.run_maintenance(&mock_state, false).await;

// see builder queue now contains them
let builder_queue = mempool.builder_queue(&mock_state).await;
let builder_queue = mempool.builder_queue().await;
assert_eq!(
builder_queue.len(),
3,
Expand Down Expand Up @@ -795,7 +783,7 @@ mod tests {
1,
);

let builder_queue = mempool.builder_queue(&mock_state).await;
let builder_queue = mempool.builder_queue().await;
assert_eq!(
builder_queue.len(),
4,
Expand All @@ -812,7 +800,7 @@ mod tests {
mempool.run_maintenance(&mock_state, false).await;

// see builder queue now contains single transactions
let builder_queue = mempool.builder_queue(&mock_state).await;
let builder_queue = mempool.builder_queue().await;
assert_eq!(
builder_queue.len(),
1,
Expand All @@ -832,7 +820,7 @@ mod tests {

mempool.run_maintenance(&mock_state, false).await;

let builder_queue = mempool.builder_queue(&mock_state).await;
let builder_queue = mempool.builder_queue().await;
assert_eq!(
builder_queue.len(),
3,
Expand Down
100 changes: 38 additions & 62 deletions crates/astria-sequencer/src/mempool/transactions_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use tokio::time::{
use tracing::{
error,
instrument,
Level,
};

use super::RemovalReason;
Expand Down Expand Up @@ -213,6 +212,10 @@ impl PendingTransactionsForAccount {
self.txs.last_key_value().map(|(nonce, _)| *nonce)
}

fn current_account_nonce(&self) -> Option<u32> {
self.txs.first_key_value().map(|(nonce, _)| *nonce)
}

/// Removes and returns transactions that exceed the balances in `available_balances`.
fn find_demotables(
&mut self,
Expand Down Expand Up @@ -774,11 +777,7 @@ impl PendingTransactions {

/// Returns a copy of transactions and their hashes sorted by nonce difference and then time
/// first seen.
#[instrument(skip_all, err(level = Level::DEBUG))]
pub(super) async fn builder_queue<S: accounts::StateReadExt>(
&self,
state: &S,
) -> Vec<([u8; 32], Arc<Transaction>)> {
pub(super) fn builder_queue(&self) -> Vec<([u8; 32], Arc<Transaction>)> {
// Used to hold the values in Vec for sorting.
struct QueueEntry {
tx: Arc<Transaction>,
Expand All @@ -789,25 +788,12 @@ impl PendingTransactions {
let mut queue = Vec::with_capacity(self.len());
// Add all transactions to the queue.
for (address, account_txs) in &self.txs {
let current_account_nonce = match state.get_account_nonce(address).await {
Ok(nonce) => nonce,
Err(error) => {
error!(address = %telemetry::display::base64(address), "failed to fetch account nonce for builder queue: {error:#}");
// use the lowest nonce in pending as the current nonce, this should be the
// same value as the nonce returned by the state. the pending queue shouldn't
// be empty, if it is we can continue to the next account
if let Some(nonce) = account_txs
.txs()
.values()
.map(TimemarkedTransaction::nonce)
.min()
{
nonce
} else {
error!(address = %telemetry::display::base64(address), "pending queue is empty during builder queue step");
continue;
}
}
let Some(current_account_nonce) = account_txs.current_account_nonce() else {
error!(
address = %telemetry::display::base64(address),
"pending queue is empty during builder queue step"
);
continue;
};

for ttx in account_txs.txs.values() {
Expand Down Expand Up @@ -888,7 +874,6 @@ mod tests {
denom_3,
mock_balances,
mock_state_getter,
mock_state_put_account_nonce,
mock_tx_cost,
ALICE_ADDRESS,
BOB_ADDRESS,
Expand Down Expand Up @@ -1765,9 +1750,9 @@ mod tests {
);
}

#[tokio::test]
#[test]
#[expect(clippy::too_many_lines, reason = "it's a test")]
async fn transactions_container_clean_account_stale_expired() {
fn transactions_container_clean_account_stale_expired() {
let mut pending_txs = PendingTransactions::new(TX_TTL);

// transactions to add to accounts
Expand Down Expand Up @@ -1983,8 +1968,8 @@ mod tests {
);
}

#[tokio::test]
async fn pending_transactions_builder_queue() {
#[test]
fn pending_transactions_builder_queue() {
let mut pending_txs = PendingTransactions::new(TX_TTL);

// transactions to add to accounts
Expand Down Expand Up @@ -2017,25 +2002,12 @@ mod tests {
.add(ttx_s1_3.clone(), 1, &account_balances)
.unwrap();

// should return all transactions from Alice and last two from Bob
let mut mock_state = mock_state_getter().await;
mock_state_put_account_nonce(
&mut mock_state,
astria_address_from_hex_string(ALICE_ADDRESS).as_bytes(),
1,
);
mock_state_put_account_nonce(
&mut mock_state,
astria_address_from_hex_string(BOB_ADDRESS).as_bytes(),
2,
);

// get builder queue
let builder_queue = pending_txs.builder_queue(&mock_state).await;
// get builder queue - should return all transactions from Alice and Bob
let builder_queue = pending_txs.builder_queue();
assert_eq!(
builder_queue.len(),
3,
"three transactions should've been popped"
4,
"four transactions should've been popped"
);

// check that the transactions are in the expected order
Expand All @@ -2046,13 +2018,18 @@ mod tests {
);
let (second_tx_hash, _) = builder_queue[1];
assert_eq!(
second_tx_hash, ttx_s1_2.tx_hash,
second_tx_hash, ttx_s1_1.tx_hash,
"expected other low nonce diff (0) to be second"
);
let (third_tx_hash, _) = builder_queue[2];
assert_eq!(
third_tx_hash, ttx_s1_3.tx_hash,
"expected highest nonce diff to be last"
third_tx_hash, ttx_s1_2.tx_hash,
"expected middle nonce diff (1) to be third"
);
let (fourth_tx_hash, _) = builder_queue[3];
assert_eq!(
fourth_tx_hash, ttx_s1_3.tx_hash,
"expected highest nonce diff (2) to be last"
);

// ensure transactions not removed
Expand All @@ -2063,8 +2040,8 @@ mod tests {
);
}

#[tokio::test]
async fn parked_transactions_find_promotables() {
#[test]
fn parked_transactions_find_promotables() {
let mut parked_txs = ParkedTransactions::<MAX_PARKED_TXS_PER_ACCOUNT>::new(TX_TTL, 100);

// transactions to add to accounts
Expand Down Expand Up @@ -2130,8 +2107,8 @@ mod tests {
);
}

#[tokio::test]
async fn pending_transactions_find_demotables() {
#[test]
fn pending_transactions_find_demotables() {
let mut pending_txs = PendingTransactions::new(TX_TTL);

// transactions to add to account
Expand Down Expand Up @@ -2209,8 +2186,8 @@ mod tests {
);
}

#[tokio::test]
async fn pending_transactions_remaining_account_balances() {
#[test]
fn pending_transactions_remaining_account_balances() {
let mut pending_txs = PendingTransactions::new(TX_TTL);

// transactions to add to account
Expand Down Expand Up @@ -2265,10 +2242,9 @@ mod tests {
);
}

#[tokio::test]
async fn builder_queue_should_be_sorted_by_action_group_type() {
#[test]
fn builder_queue_should_be_sorted_by_action_group_type() {
let mut pending_txs = PendingTransactions::new(TX_TTL);
let mock_state = mock_state_getter().await;

// create transactions in reverse order
let ttx_unbundleable_sudo = MockTTXBuilder::new()
Expand Down Expand Up @@ -2308,7 +2284,7 @@ mod tests {

// get the builder queue
// note: the account nonces are set to zero when not initialized in the mock state
let builder_queue = pending_txs.builder_queue(&mock_state).await;
let builder_queue = pending_txs.builder_queue();

// check that the transactions are in the expected order
let (first_tx_hash, _) = builder_queue[0];
Expand Down Expand Up @@ -2336,8 +2312,8 @@ mod tests {
);
}

#[tokio::test]
async fn parked_transactions_size_limit_works() {
#[test]
fn parked_transactions_size_limit_works() {
let mut parked_txs = ParkedTransactions::<MAX_PARKED_TXS_PER_ACCOUNT>::new(TX_TTL, 1);

// transactions to add to account
Expand Down

0 comments on commit 552a1c9

Please sign in to comment.