Skip to content

Commit

Permalink
change(mempool): Return verification result after attempting to inser…
Browse files Browse the repository at this point in the history
…t transactions in the mempool (#8901)

* respond with mempool verification result after a transaction has been inserted or has failed to be inserted into the mempool

* returns mempool verification errors early, and fixes handling for cancellations or timeouts.

* Adds a comment in test warning against code reuse with buffered services.
  • Loading branch information
arya2 authored Dec 2, 2024
1 parent e0861ec commit 9eb8a99
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 78 deletions.
22 changes: 11 additions & 11 deletions zebra-consensus/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,17 @@ where
)?;

if let Some(mut mempool) = mempool {
if !transaction.transaction.transaction.outputs().is_empty() {
tokio::spawn(async move {
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}
tokio::spawn(async move {
// Best-effort poll of the mempool to provide a timely response to
// `sendrawtransaction` RPC calls or `AwaitOutput` mempool calls.
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}

Response::Mempool { transaction, spent_mempool_outpoints }
Expand Down
5 changes: 1 addition & 4 deletions zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use zebra_chain::{
use crate::BoxError;

mod gossip;

mod transaction_dependencies;

pub use transaction_dependencies::TransactionDependencies;

pub use self::gossip::Gossip;
pub use self::{gossip::Gossip, transaction_dependencies::TransactionDependencies};

/// A mempool service request.
///
Expand Down
25 changes: 17 additions & 8 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

use zebra_chain::{
Expand All @@ -43,7 +42,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus};
use crate::components::sync::SyncStatus;

pub mod config;
mod crawler;
Expand Down Expand Up @@ -586,10 +585,8 @@ impl Service<Request> for Mempool {
let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) =
pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx)
{
match r {
while let Poll::Ready(Some((result, rsp_tx))) = pin!(&mut *tx_downloads).poll_next(cx) {
match result {
Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => {
// # Correctness:
//
Expand All @@ -609,27 +606,39 @@ impl Service<Request> for Mempool {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx
.send(insert_result.map(|_| ()).map_err(|err| err.into()));
}
} else {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result = tx_downloads
.download_if_needed_and_verify(tx.transaction.into(), None);
.download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
}
}
Ok(Err((tx_id, error))) => {
tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);

storage.reject_if_needed(tx_id, error);
}
Err(_elapsed) => {
Err(elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
// so there is no specific transaction ID.

tracing::info!("mempool transaction failed to verify due to timeout");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(Err(elapsed.into()));
}
}
};
}
Expand Down
127 changes: 81 additions & 46 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::{self as zs, CloneError};

use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::components::{
mempool::crawler::RATE_LIMIT_DELAY,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};

use super::MempoolError;

Expand Down Expand Up @@ -152,16 +155,20 @@ where
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<
JoinHandle<
JoinHandle<(
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
>,
tokio::time::error::Elapsed,
>,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
)>,
>,

/// A list of channels that can be used to cancel pending transaction download and
Expand All @@ -178,14 +185,20 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>;
type Item = (
Result<
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>,
tokio::time::error::Elapsed,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -198,20 +211,28 @@ where
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok((tx, spent_mempool_outpoints, tip_height)) => {
let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
let (result, rsp_tx) =
join_result.expect("transaction download and verify tasks must not panic");

let result = match result {
Ok(Ok((tx, spent_mempool_outpoints, tip_height))) => {
this.cancel_handles.remove(&tx.transaction.id);
Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height))))
Ok(Ok((tx, spent_mempool_outpoints, tip_height)))
}
Err((e, hash)) => {
Ok(Err((e, hash))) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err((hash, e))))
Ok(Err((hash, e)))
}
}
Err(elapsed) => Err(elapsed),
};

Some((result, rsp_tx))
} else {
Poll::Ready(None)
}
None
};

Poll::Ready(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down Expand Up @@ -255,7 +276,7 @@ where
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();

Expand Down Expand Up @@ -381,37 +402,51 @@ where
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
.map_err(move |e| (e, txid))
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.in_current_span();

let task = tokio::spawn(async move {
let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);

// Prefer the cancel handle if both are ready.
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
Err((TransactionDownloadVerifyError::Cancelled, txid))
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("verification cancelled".into()));
}

Ok(Err((TransactionDownloadVerifyError::Cancelled, txid)))
}
verification = fut => verification,
verification = fut => {
verification
.inspect_err(|_elapsed| {
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
}
})
.inspect(|inner_result| {
let _ = inner_result
.as_ref()
.inspect_err(|(tx_verifier_error, tx_id)| {
if let Some(rsp_tx) = rsp_tx.take() {
let error_msg = format!(
"failed to validate tx: {tx_id}, error: {tx_verifier_error}"
);
let _ = rsp_tx.send(Err(error_msg.into()));
}
});
})
},
};

// Send the result to responder channel if one was provided.
// TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
}

result
(result, rsp_tx)
});

self.pending.push(task);
Expand Down
18 changes: 9 additions & 9 deletions zebrad/src/components/mempool/tests/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,22 +978,22 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> {
let result_rx = results.remove(0).expect("should pass initial checks");
assert!(results.is_empty(), "should have 1 result for 1 queued tx");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

// Wait for next steps in mempool's Downloads to finish
// TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after
// waiting to respond with a transaction's verification result until after it's been inserted into the mempool.
// Wait for post-verification steps in mempool's Downloads
tokio::time::sleep(Duration::from_secs(1)).await;

// Note: Buffered services shouldn't be polled without being called.
// See `mempool::Request::CheckForVerifiedTransactions` for more details.
mempool
.ready()
.await
.expect("polling mempool should succeed");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

assert_eq!(
mempool.storage().transaction_count(),
1,
Expand Down

0 comments on commit 9eb8a99

Please sign in to comment.