Skip to content

Commit

Permalink
rebase on top of most recent main
Browse files Browse the repository at this point in the history
  • Loading branch information
Lilyjjo committed Aug 20, 2024
1 parent ee31f2d commit be99377
Show file tree
Hide file tree
Showing 12 changed files with 2,198 additions and 726 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ impl AbciErrorCode {
pub const VALUE_NOT_FOUND: Self = Self(unsafe { NonZeroU32::new_unchecked(8) });
pub const TRANSACTION_EXPIRED: Self = Self(unsafe { NonZeroU32::new_unchecked(9) });
pub const TRANSACTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(10) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) });
}

impl AbciErrorCode {
Expand All @@ -42,6 +44,10 @@ impl AbciErrorCode {
Self::TRANSACTION_FAILED => {
"the transaction failed to execute in prepare_proposal()".into()
}
Self::TRANSACTION_INSERTION_FAILED => {
"the transaction failed insertion into the mempool".into()
}
Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(),
Self::BAD_REQUEST => "the request payload was malformed".into(),
Self(other) => {
format!("invalid error code {other}: should be unreachable (this is a bug)")
Expand Down
1 change: 0 additions & 1 deletion crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7
] }
ibc-proto = { version = "0.41.0", features = ["server"] }
matchit = "0.7.2"
priority-queue = "2.0.2"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
Expand Down
68 changes: 37 additions & 31 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ use tracing::{
};

use crate::{
accounts,
accounts::{
self,
component::AccountsComponent,
StateReadExt,
StateWriteExt as _,
},
address::StateWriteExt as _,
Expand Down Expand Up @@ -484,11 +485,21 @@ impl App {
let mut included_signed_txs = Vec::new();
let mut failed_tx_count: usize = 0;
let mut execution_results = Vec::new();
let mut txs_to_readd_to_mempool = Vec::new();
let mut excluded_txs: usize = 0;

// get copy of transactions to execute from mempool
let current_account_nonce_getter =
|address: [u8; 20]| self.state.get_account_nonce(address);
let pending_txs = self
.mempool
.builder_queue(current_account_nonce_getter)
.await
.expect("failed to fetch pending transactions");

while let Some((enqueued_tx, priority)) = self.mempool.pop().await {
let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string();
let tx = enqueued_tx.signed_tx();
let mut unused_count = pending_txs.len();
for (tx_hash, tx) in pending_txs {
unused_count = unused_count.saturating_sub(1);
let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string();
let bytes = tx.to_raw().encode_to_vec();
let tx_len = bytes.len();
info!(transaction_hash = %tx_hash_base64, "executing transaction");
Expand All @@ -503,7 +514,7 @@ impl App {
tx_data_bytes = tx_len,
"excluding remaining transactions: max cometBFT data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// break from loop, as the block is full
break;
Expand All @@ -526,7 +537,7 @@ impl App {
tx_data_bytes = tx_sequence_data_bytes,
"excluding transaction: max block sequenced data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// continue as there might be non-sequence txs that can fit
continue;
Expand Down Expand Up @@ -558,18 +569,21 @@ impl App {
);

if e.downcast_ref::<InvalidNonce>().is_some() {
// we re-insert the tx into the mempool if it failed to execute
// we don't remove the tx from mempool if it failed to execute
// due to an invalid nonce, as it may be valid in the future.
// if it's invalid due to the nonce being too low, it'll be
// removed from the mempool in `update_mempool_after_finalization`.
txs_to_readd_to_mempool.push((enqueued_tx, priority));
} else {
failed_tx_count = failed_tx_count.saturating_add(1);

// the transaction should be removed from the cometbft mempool
// remove the failing transaction from the mempool
//
// this will remove any transactions from the same sender
// as well, as the dependent nonces will not be able
// to execute
self.mempool
.track_removal_comet_bft(
enqueued_tx.tx_hash(),
.remove_tx_invalid(
tx,
RemovalReason::FailedPrepareProposal(e.to_string()),
)
.await;
Expand All @@ -586,15 +600,12 @@ impl App {
);
}
self.metrics.set_prepare_proposal_excluded_transactions(
txs_to_readd_to_mempool
.len()
.saturating_add(failed_tx_count),
excluded_txs.saturating_add(failed_tx_count),
);

self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");
self.metrics.set_transactions_in_mempool_total(mempool_len);
debug!("{unused_count} leftover pending transactions");
self.metrics
.set_transactions_in_mempool_total(self.mempool.len().await);

self.execution_results = Some(execution_results);
Ok((validated_txs, included_signed_txs))
Expand Down Expand Up @@ -805,10 +816,6 @@ impl App {

// skip the first two transactions, as they are the rollup data commitments
for tx in finalize_block.txs.iter().skip(2) {
// remove any included txs from the mempool
let tx_hash = Sha256::digest(tx).into();
self.mempool.remove(tx_hash).await;

let signed_tx = signed_transaction_from_bytes(tx)
.context("protocol error; only valid txs should be finalized")?;

Expand Down Expand Up @@ -880,6 +887,10 @@ impl App {
state_tx
.put_sequencer_block(sequencer_block)
.context("failed to write sequencer block to state")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, &state_tx).await;

// events that occur after end_block are ignored here;
// there should be none anyways.
let _ = self.apply(state_tx);
Expand All @@ -890,11 +901,6 @@ impl App {
.await
.context("failed to prepare commit")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, self.state.clone())
.await
.context("failed to update mempool after finalization")?;

Ok(abci::response::FinalizeBlock {
events: end_block.events,
validator_updates: end_block.validator_updates,
Expand Down Expand Up @@ -1121,10 +1127,10 @@ impl App {
// the mempool is large.
async fn update_mempool_after_finalization<S: accounts::StateReadExt>(
mempool: &mut Mempool,
state: S,
) -> anyhow::Result<()> {
state: &S,
) {
let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address);
mempool.run_maintenance(current_account_nonce_getter).await
mempool.run_maintenance(current_account_nonce_getter).await;
}

/// relevant data of a block being executed.
Expand Down
12 changes: 9 additions & 3 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use astria_core::{
crypto::SigningKey,
primitive::v1::RollupId,
Expand Down Expand Up @@ -138,21 +140,25 @@ pub(crate) async fn initialize_app(
app
}

pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
pub(crate) fn mock_tx(
nonce: u32,
signer: &SigningKey,
rollup_name: &str,
) -> Arc<SignedTransaction> {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes([0; 32]),
rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()),
data: Bytes::from_static(&[0x99]),
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(&get_alice_signing_key())
Arc::new(tx.into_signed(signer))
}
28 changes: 23 additions & 5 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool.insert(signed_tx, 0).await.unwrap();
app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand All @@ -473,6 +473,12 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
assert_eq!(prepare_proposal_result.txs, finalize_block.txs);
assert_eq!(app.executed_proposal_hash, Hash::default());
assert_eq!(app.validator_address.unwrap(), proposer_address);
// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

assert_eq!(app.mempool.len().await, 0);

// call process_proposal - should not re-execute anything.
Expand Down Expand Up @@ -561,8 +567,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -581,6 +587,12 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down Expand Up @@ -634,8 +646,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -654,6 +666,12 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down
19 changes: 13 additions & 6 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,20 @@ mod test {

let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
let nonce = 99;
let tx = crate::app::test_utils::get_mock_tx(nonce);
// insert a transaction with a nonce gap
let gapped_nonce = 99;
let tx = crate::app::test_utils::mock_tx(gapped_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a tx with lower nonce also, but we should get the highest nonce
let lower_nonce = 98;
let tx = crate::app::test_utils::get_mock_tx(lower_nonce);
// insert a transaction at the current nonce
let account_nonce = 0;
let tx = crate::app::test_utils::mock_tx(account_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a transactions one above account nonce (not gapped)
let sequential_nonce = 1;
let tx: Arc<astria_core::protocol::transaction::v1alpha1::SignedTransaction> =
crate::app::test_utils::mock_tx(sequential_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

let server = Arc::new(SequencerServer::new(storage.clone(), mempool));
Expand All @@ -279,7 +286,7 @@ mod test {
};
let request = Request::new(request);
let response = server.get_pending_nonce(request).await.unwrap();
assert_eq!(response.into_inner().inner, nonce);
assert_eq!(response.into_inner().inner, sequential_nonce);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit be99377

Please sign in to comment.