From 0c3eb014a7d429e4a90fbfcc7c659499f4a4fa63 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 10 Oct 2024 16:59:06 +0200 Subject: [PATCH 1/2] Add timeout --- src/tasks/index.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/tasks/index.rs b/src/tasks/index.rs index c60d3b5..b914665 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -7,6 +7,7 @@ use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; +use tokio::time::timeout; use crate::app::App; use crate::broadcast_utils::gas_estimation::{ @@ -22,6 +23,8 @@ const GAS_PRICE_FOR_METRICS_FACTOR: f64 = 1e-9; const MAX_RECENT_BLOCKS_TO_CHECK: u64 = 60; +const NEXT_BLOCK_TIMEOUT: Duration = Duration::from_secs(60); + pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { loop { index_inner(app.clone(), chain_id).await?; @@ -34,18 +37,32 @@ async fn index_inner(app: Arc, chain_id: u64) -> eyre::Result<()> { let rpc = app.http_provider(chain_id).await?; tracing::info!("Subscribing to new blocks"); - // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; - // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head tracing::info!("Backfilling blocks"); if let Some(latest_block) = blocks_stream.next().await { backfill_to_block(app.clone(), chain_id, &rpc, latest_block).await?; } - // Index incoming blocks from the stream - while let Some(block) = blocks_stream.next().await { - index_block(app.clone(), chain_id, &rpc, block).await?; + loop { + let next_block = + timeout(NEXT_BLOCK_TIMEOUT, blocks_stream.next()).await; + + match next_block { + Ok(Some(block)) => { + index_block(app.clone(), chain_id, &rpc, block).await?; + } + Ok(None) => { + // Stream ended, break out of the loop + tracing::info!("Block stream ended"); + break; + } + Err(_) => { + // Timeout occurred + tracing::warn!("Timed out waiting for a block"); + break; + } + } } Ok(()) From 4ed00f2576f33a45a0c21e6aef9d16564d96dcd9 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 10 Oct 2024 17:11:40 +0200 Subject: [PATCH 2/2] Mae it configurable --- src/config.rs | 15 +++++++++++++++ src/tasks/index.rs | 9 +++++---- tests/common/service_builder.rs | 1 + 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1978ace..6974943 100644 --- a/src/config.rs +++ b/src/config.rs @@ -63,6 +63,13 @@ pub struct TxSitterConfig { )] pub hard_reorg_interval: Duration, + /// Max amount of time to wait for a new block from the RPC block stream + #[serde( + with = "humantime_serde", + default = "default::block_stream_timeout" + )] + pub block_stream_timeout: Duration, + #[serde(default, skip_serializing_if = "Option::is_none")] pub predefined: Option, @@ -219,6 +226,10 @@ mod default { Duration::from_secs(60 * 60) } + pub fn block_stream_timeout() -> Duration { + Duration::from_secs(60) + } + pub mod metrics { pub fn host() -> String { "127.0.0.1".to_string() @@ -249,6 +260,7 @@ mod tests { escalation_interval = "1h" soft_reorg_interval = "1m" hard_reorg_interval = "1h" + block_stream_timeout = "1m" [server] host = "127.0.0.1:3000" @@ -266,6 +278,7 @@ mod tests { escalation_interval = "1h" soft_reorg_interval = "1m" hard_reorg_interval = "1h" + block_stream_timeout = "1m" [server] host = "127.0.0.1:3000" @@ -289,6 +302,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), soft_reorg_interval: default::soft_reorg_interval(), hard_reorg_interval: default::hard_reorg_interval(), + block_stream_timeout: default::block_stream_timeout(), predefined: None, telemetry: None, }, @@ -317,6 +331,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), soft_reorg_interval: default::soft_reorg_interval(), hard_reorg_interval: default::hard_reorg_interval(), + block_stream_timeout: default::block_stream_timeout(), predefined: None, telemetry: None, }, diff --git a/src/tasks/index.rs b/src/tasks/index.rs index b914665..f601a92 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -23,8 +23,6 @@ const GAS_PRICE_FOR_METRICS_FACTOR: f64 = 1e-9; const MAX_RECENT_BLOCKS_TO_CHECK: u64 = 60; -const NEXT_BLOCK_TIMEOUT: Duration = Duration::from_secs(60); - pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { loop { index_inner(app.clone(), chain_id).await?; @@ -45,8 +43,11 @@ async fn index_inner(app: Arc, chain_id: u64) -> eyre::Result<()> { } loop { - let next_block = - timeout(NEXT_BLOCK_TIMEOUT, blocks_stream.next()).await; + let next_block = timeout( + app.config.service.block_stream_timeout, + blocks_stream.next(), + ) + .await; match next_block { Ok(Some(block)) => { diff --git a/tests/common/service_builder.rs b/tests/common/service_builder.rs index 05c428e..48074f8 100644 --- a/tests/common/service_builder.rs +++ b/tests/common/service_builder.rs @@ -58,6 +58,7 @@ impl ServiceBuilder { escalation_interval: self.escalation_interval, soft_reorg_interval: self.soft_reorg_interval, hard_reorg_interval: self.hard_reorg_interval, + block_stream_timeout: Duration::from_secs(60), telemetry: None, predefined: Some(Predefined { network: PredefinedNetwork {