From b4dac80bbd7f99bb1c667a5050f258a496e2404a Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 29 Sep 2023 08:25:08 -0400 Subject: [PATCH] Remove possible blockages (#74) This aims at reducing the possibilities of stalling w/ the following: - Don't interrupt long-running SQLite executions. They're pretty much always necessary. The only way that would be problematic is if we have a deadlock in Corrosion within a transaction. - Rewrite the compaction logic to do a single-ish query to get all cleared versions - Keep synchronizing in a loop until a synchronization completes successfully. --- crates/corro-agent/src/agent.rs | 560 +++++++++++++++++++------------- crates/corro-types/src/agent.rs | 40 ++- 2 files changed, 355 insertions(+), 245 deletions(-) diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 98705847..007a212e 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, convert::Infallible, - hash::{Hash, Hasher}, net::SocketAddr, ops::RangeInclusive, sync::{atomic::AtomicI64, Arc}, @@ -33,7 +32,7 @@ use corro_types::{ BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeV1, Changeset, ChangesetParts, FocaInput, Timestamp, UniPayload, UniPayloadV1, }, - change::{Change, SqliteValue}, + change::Change, config::{AuthzConfig, Config, DEFAULT_GOSSIP_PORT}, members::{MemberEvent, Members, Rtt}, pubsub::{migrate_subs, Matcher}, @@ -51,7 +50,7 @@ use axum::{ }; use bytes::{Bytes, BytesMut}; use foca::{Member, Notification}; -use futures::{FutureExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use hyper::{server::conn::AddrIncoming, StatusCode}; use itertools::Itertools; use metrics::{counter, gauge, histogram, increment_counter}; @@ -68,7 +67,7 @@ use tokio::{ task::block_in_place, time::{sleep, timeout}, }; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt as TokioStreamExt}; use tokio_util::codec::{Decoder, FramedRead, LengthDelimitedCodec}; use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer}; use tower_http::trace::TraceLayer; @@ -393,7 +392,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { // we can handle a lot of them I think... let chunker = stream.chunks_timeout(1024, Duration::from_secs(1)); tokio::pin!(chunker); - while let Some(chunks) = chunker.next().await { + while let Some(chunks) = StreamExt::next(&mut chunker).await { let mut members = agent.members().write(); for (addr, rtt) in chunks { members.add_rtt(addr, rtt); @@ -618,7 +617,12 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { FramedRead::new(rx, LengthDelimitedCodec::new()); loop { - match timeout(Duration::from_secs(5), framed.next()).await { + match timeout( + Duration::from_secs(5), + StreamExt::next(&mut framed), + ) + .await + { Err(_e) => { warn!("timed out receiving bidirectional frame"); return; @@ -723,117 +727,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { } }); - tokio::spawn({ - let agent = agent.clone(); - async move { - let pool = agent.pool(); - let bookie = agent.bookie(); - loop { - sleep(COMPACT_BOOKED_INTERVAL).await; - - let to_check: Vec = { bookie.read().await.keys().copied().collect() }; - - let mut inserted = 0; - let mut deleted = 0; - - for actor_id in to_check { - let booked = bookie.for_actor(actor_id).await; - - { - if booked.read().await.current_versions().is_empty() { - continue; - } - } - - let mut conn = match pool.write_low().await { - Ok(conn) => conn, - Err(e) => { - error!("could not acquire low priority write connection for compaction: {e}"); - continue; - } - }; - - let mut bookedw = booked.write().await; - let versions = bookedw.current_versions(); - - if versions.is_empty() { - continue; - } - - let res = block_in_place(|| { - let tx = conn.transaction()?; - - let db_versions = versions.keys().copied().collect(); - - let to_clear = { - match find_cleared_db_versions_for_actor(&tx, &db_versions) { - Ok(to_clear) => { - if to_clear.is_empty() { - return Ok(()); - } - to_clear - } - Err(e) => { - error!("could not compute difference between known live and still alive versions for actor {actor_id}: {e}"); - return Err(e); - } - } - }; - - deleted += tx - .prepare_cached("DELETE FROM __corro_bookkeeping WHERE actor_id = ?")? - .execute([actor_id])?; - - let mut new_copy = bookedw.clone(); - - let cleared_len = to_clear.len(); - - for db_version in to_clear.iter() { - if let Some(version) = versions.get(db_version) { - new_copy.insert(*version..=*version, KnownDbVersion::Cleared); - } - } - - for (range, known) in new_copy.iter() { - match known { - KnownDbVersion::Current { - db_version, - last_seq, - ts, - } => { - inserted += tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?,?,?,?,?)")?.execute(params![actor_id, range.start(), db_version, last_seq, ts])?; - } - KnownDbVersion::Cleared => { - inserted += tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) VALUES (?,?,?)")?.execute(params![actor_id, range.start(), range.end()])?; - } - KnownDbVersion::Partial { .. } => { - // do nothing, not stored in that table! - } - } - } - - tx.commit()?; - - debug!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}"); - - **bookedw.inner_mut() = new_copy; - debug!("compacted in-memory cache by clearing {cleared_len} db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); - - Ok::<_, eyre::Report>(()) - }); - - if let Err(e) = res { - error!("could not compact versions for actor {actor_id}: {e}"); - } - } - - info!( - "compaction done, cleared {} db bookkeeping table rows", - deleted - inserted - ); - } - } - }); + tokio::spawn(clear_overwritten_versions(agent.clone())); let states = match agent.pool().read().await { Ok(conn) => block_in_place(|| { @@ -876,7 +770,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { }; if !states.is_empty() { - let mut foca_states = Vec::with_capacity(states.len()); + // let mut foca_states = Vec::with_capacity(states.len()); { // block to drop the members write lock @@ -891,11 +785,11 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { if matches!(foca_state.state(), foca::State::Suspect) { continue; } - foca_states.push(foca_state); + // foca_states.push(foca_state); } } - foca_tx.send(FocaInput::ApplyMany(foca_states)).await.ok(); + // foca_tx.send(FocaInput::ApplyMany(foca_states)).await.ok(); } let api = Router::new() @@ -1021,7 +915,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { loop { tokio::select! { biased; - msg = gossip_chunker.next() => match msg { + msg = StreamExt::next(&mut gossip_chunker) => match msg { Some(msg) => { spawn_counted( handle_gossip(agent.clone(), msg, false) @@ -1072,12 +966,161 @@ async fn require_authz( Ok(next.run(request).await) } -const CHECKSUM_SEEDS: [u64; 4] = [ - 0x16f11fe89b0d677c, - 0xb480a793d8e6c86c, - 0x6fe2e5aaf078ebc9, - 0x14f994a4c5259381, -]; +async fn clear_overwritten_versions(agent: Agent) { + let pool = agent.pool(); + let bookie = agent.bookie(); + + let mut interval = Duration::new(0, 0); + + loop { + sleep(interval).await; + + if interval != COMPACT_BOOKED_INTERVAL { + interval = COMPACT_BOOKED_INTERVAL; + } + + info!("starting compaction..."); + let start = Instant::now(); + + let mut to_check: BTreeMap = BTreeMap::new(); + + { + let booked = bookie.read().await; + for (actor_id, booked) in booked.iter() { + let versions = booked.read().await.current_versions(); + if versions.is_empty() { + continue; + } + for (db_v, v) in versions { + to_check.insert(db_v, (*actor_id, v)); + } + } + } + + info!("got actors and their versions"); + + let cleared_versions: BTreeSet = { + match pool.read().await { + Ok(mut conn) => { + let res = block_in_place(|| { + let tx = conn.transaction()?; + find_cleared_db_versions(&tx) + }); + match res { + Ok(cleared) => cleared, + Err(e) => { + error!("could not get cleared versions: {e}"); + continue; + } + } + } + Err(e) => { + error!("could not get read connection: {e}"); + continue; + } + } + }; + + let mut to_clear_by_actor: BTreeMap> = BTreeMap::new(); + + for db_v in cleared_versions { + if let Some((actor_id, v)) = to_check.remove(&db_v) { + to_clear_by_actor + .entry(actor_id) + .or_default() + .push((db_v, v)); + } + } + + let mut deleted = 0; + let mut inserted = 0; + + for (actor_id, to_clear) in to_clear_by_actor { + info!(%actor_id, "clearing actor {} versions", to_clear.len()); + let booked = bookie.for_actor(actor_id).await; + + let mut conn = match pool.write_low().await { + Ok(conn) => conn, + Err(e) => { + error!("could not acquire low priority write connection: {e}"); + continue; + } + }; + + let mut bookedw = booked.write().await; + + let res = block_in_place(|| { + let tx = conn.transaction()?; + + let mut new_copy = bookedw.clone(); + + for (db_v, v) in to_clear + .iter() + .filter(|(_db_v, v)| bookedw.contains_current(v)) + { + deleted += tx + .prepare_cached("DELETE FROM __corro_bookkeeping WHERE db_version = ?")? + .execute([db_v])?; + new_copy.insert(*v..=*v, KnownDbVersion::Cleared); + } + + for (range, known) in to_clear + .iter() + .filter_map(|(_, v)| new_copy.get_key_value(v)) + .dedup() + { + match known { + KnownDbVersion::Cleared => { + inserted += tx + .prepare_cached( + " + INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) + VALUES (?,?,?) + ON CONFLICT (actor_id, start_version) DO UPDATE SET + end_version = excluded.end_version, + db_version = NULL, + last_seq = NULL, + ts = NULL + WHERE end_version != excluded.end_version + ", + )? + .execute(params![actor_id, range.start(), range.end()])?; + } + known => { + warn!(%actor_id, "unexpected known db version when attempting to clear: {known:?}"); + } + } + } + + tx.commit()?; + + debug!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}"); + + **bookedw.inner_mut() = new_copy; + debug!("compacted in-memory cache by clearing db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); + + Ok::<_, rusqlite::Error>(()) + }); + + if let Err(e) = res { + error!(%actor_id, "could not compact bookkeeping: {e}"); + } + } + + info!( + "compaction done, cleared {} db bookkeeping table rows in {:?}", + deleted - inserted, + start.elapsed() + ); + } +} + +// const CHECKSUM_SEEDS: [u64; 4] = [ +// 0x16f11fe89b0d677c, +// 0xb480a793d8e6c86c, +// 0x6fe2e5aaf078ebc9, +// 0x14f994a4c5259381, +// ]; async fn metrics_loop(agent: Agent) { let mut metrics_interval = tokio::time::interval(Duration::from_secs(10)); @@ -1089,7 +1132,7 @@ async fn metrics_loop(agent: Agent) { } } -const MAX_COUNT_TO_HASH: i64 = 500_000; +// const MAX_COUNT_TO_HASH: i64 = 500_000; fn collect_metrics(agent: &Agent) { agent.pool().emit_metrics(); @@ -1104,7 +1147,7 @@ fn collect_metrics(agent: &Agent) { } }; - let mut low_count_tables = vec![]; + // let mut low_count_tables = vec![]; for table in schema.tables.keys() { match conn @@ -1113,9 +1156,9 @@ fn collect_metrics(agent: &Agent) { { Ok(count) => { gauge!("corro.db.table.rows.total", count as f64, "table" => table.clone()); - if count <= MAX_COUNT_TO_HASH { - low_count_tables.push(table); - } + // if count <= MAX_COUNT_TO_HASH { + // low_count_tables.push(table); + // } } Err(e) => { error!("could not query count for table {table}: {e}"); @@ -1143,38 +1186,38 @@ fn collect_metrics(agent: &Agent) { } } - for name in low_count_tables { - if let Some(table) = schema.tables.get(name) { - let pks = table.pk.iter().cloned().collect::>().join(","); - match conn - .prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}")) - .and_then(|mut prepped| { - let col_count = prepped.column_count(); - prepped.query(()).and_then(|mut rows| { - let mut hasher = seahash::SeaHasher::with_seeds( - CHECKSUM_SEEDS[0], - CHECKSUM_SEEDS[1], - CHECKSUM_SEEDS[2], - CHECKSUM_SEEDS[3], - ); - while let Ok(Some(row)) = rows.next() { - for idx in 0..col_count { - let v: SqliteValue = row.get(idx)?; - v.hash(&mut hasher); - } - } - Ok(hasher.finish()) - }) - }) { - Ok(hash) => { - gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone()); - } - Err(e) => { - error!("could not query clock table values for hashing {table}: {e}"); - } - } - } - } + // for name in low_count_tables { + // if let Some(table) = schema.tables.get(name) { + // let pks = table.pk.iter().cloned().collect::>().join(","); + // match conn + // .prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}")) + // .and_then(|mut prepped| { + // let col_count = prepped.column_count(); + // prepped.query(()).and_then(|mut rows| { + // let mut hasher = seahash::SeaHasher::with_seeds( + // CHECKSUM_SEEDS[0], + // CHECKSUM_SEEDS[1], + // CHECKSUM_SEEDS[2], + // CHECKSUM_SEEDS[3], + // ); + // while let Ok(Some(row)) = rows.next() { + // for idx in 0..col_count { + // let v: SqliteValue = row.get(idx)?; + // v.hash(&mut hasher); + // } + // } + // Ok(hasher.finish()) + // }) + // }) { + // Ok(hash) => { + // gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone()); + // } + // Err(e) => { + // error!("could not query clock table values for hashing {table}: {e}"); + // } + // } + // } + // } } pub async fn handle_change( @@ -1207,33 +1250,43 @@ pub async fn handle_change( } } -fn find_cleared_db_versions_for_actor( - tx: &Transaction, - versions: &BTreeSet, -) -> eyre::Result> { - let (first, last) = match (versions.first().copied(), versions.last().copied()) { - (Some(first), Some(last)) => (first, last), - _ => return Ok(BTreeSet::new()), - }; - +// fn chunk_range( +// range: RangeInclusive, +// chunk_size: i64, +// ) -> impl Iterator> { +// range +// .clone() +// .step_by(chunk_size as usize) +// .map(move |block_start| { +// let block_end = (block_start + chunk_size).min(*range.end()); +// block_start..=block_end +// }) +// } + +fn find_cleared_db_versions(tx: &Transaction) -> rusqlite::Result> { let tables = tx .prepare_cached( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE '%__crsql_clock'", )? .query_map([], |row| row.get(0))? - .collect::, _>>()?; - - let still_live: BTreeSet = tx - .prepare(&format!( - "SELECT db_version FROM ({});", - tables.iter().map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table} WHERE db_version >= {first} AND db_version <= {last}")).collect::>().join(" UNION ") - ))? - .query_map([], - |row| row.get(0), - )? + .collect::, _>>()?; + + let to_clear_query = format!( + "SELECT DISTINCT(db_version) FROM __corro_bookkeeping WHERE db_version IS NOT NULL + EXCEPT SELECT db_version FROM ({});", + tables + .iter() + .map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}")) + .collect::>() + .join(" UNION ") + ); + + let cleared_db_version = tx + .prepare_cached(&to_clear_query)? + .query_map([], |row| row.get(0))? .collect::>()?; - Ok(versions.difference(&still_live).copied().collect()) + Ok(cleared_db_version) } async fn handle_gossip_to_send(transport: Transport, mut to_send_rx: Receiver<(Actor, Bytes)>) { @@ -2280,7 +2333,12 @@ async fn sync_loop( warn!("aborted sync by tripwire"); break; } - tripwire::Outcome::Completed(_res) => {} + tripwire::Outcome::Completed(res) => { + if res.is_err() { + // keep syncing until we successfully sync + continue; + } + } } next_sync_at .as_mut() @@ -2305,9 +2363,10 @@ async fn sync_loop( } pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> { - let migrations: Vec> = vec![Box::new( - init_migration as fn(&Transaction) -> rusqlite::Result<()>, - )]; + let migrations: Vec> = vec![ + Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>), + Box::new(v0_2_0_migration as fn(&Transaction) -> rusqlite::Result<()>), + ]; corro_types::sqlite::migrate(conn, migrations) } @@ -2396,6 +2455,12 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> { Ok(()) } +fn v0_2_0_migration(tx: &Transaction) -> rusqlite::Result<()> { + tx.execute_batch( + "CREATE INDEX __corro_bookkeeping_db_version ON __corro_bookkeeping (db_version)", + ) +} + #[cfg(test)] pub mod tests { use std::{ @@ -2847,6 +2912,8 @@ pub mod tests { fn test_in_memory_versions_compaction() -> eyre::Result<()> { let mut conn = CrConn::init(rusqlite::Connection::open_in_memory()?)?; + migrate(&mut conn)?; + conn.execute_batch( " CREATE TABLE foo (a INTEGER PRIMARY KEY, b INTEGER); @@ -2859,41 +2926,80 @@ pub mod tests { // db version 1 conn.execute("INSERT INTO foo (a) VALUES (1)", ())?; + + // invalid, but whatever + conn.execute("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version) SELECT crsql_site_id(), 1, crsql_db_version()", [])?; + // db version 2 conn.execute("DELETE FROM foo;", ())?; + // invalid, but whatever + conn.execute("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version) SELECT crsql_site_id(), 2, crsql_db_version()", [])?; + let db_version: i64 = conn.query_row("SELECT crsql_db_version();", (), |row| row.get(0))?; assert_eq!(db_version, 2); + { + let mut prepped = conn.prepare("SELECT * FROM __corro_bookkeeping")?; + let mut rows = prepped.query([])?; + + while let Ok(Some(row)) = rows.next() { + println!("row: {row:?}"); + } + } + + { + let mut prepped = conn.prepare("SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo2__crsql_clock UNION SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo__crsql_clock;")?; + let mut rows = prepped.query([])?; + + while let Ok(Some(row)) = rows.next() { + println!("row: {row:?}"); + } + } + let tx = conn.transaction()?; - let to_clear = find_cleared_db_versions_for_actor(&tx, &[1].into())?; + let to_clear = find_cleared_db_versions(&tx)?; + + println!("to_clear: {to_clear:?}"); assert!(to_clear.contains(&1)); assert!(!to_clear.contains(&2)); - let to_clear = find_cleared_db_versions_for_actor(&tx, &[2].into())?; + tx.execute("DELETE FROM __corro_bookkeeping WHERE db_version = 1", [])?; + tx.execute("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) SELECT crsql_site_id(), 1, 1", [])?; + + let to_clear = find_cleared_db_versions(&tx)?; assert!(to_clear.is_empty()); tx.execute("INSERT INTO foo2 (a) VALUES (2)", ())?; + + // invalid, but whatever + tx.execute("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version) SELECT crsql_site_id(), 3, crsql_db_version()", [])?; + tx.commit()?; let tx = conn.transaction()?; - let to_clear = find_cleared_db_versions_for_actor(&tx, &[2, 3].into())?; + let to_clear = find_cleared_db_versions(&tx)?; assert!(to_clear.is_empty()); tx.execute("INSERT INTO foo (a) VALUES (1)", ())?; tx.commit()?; let tx = conn.transaction()?; - let to_clear = find_cleared_db_versions_for_actor(&tx, &[2, 3, 4].into())?; + let to_clear = find_cleared_db_versions(&tx)?; assert!(to_clear.contains(&2)); assert!(!to_clear.contains(&3)); assert!(!to_clear.contains(&4)); - let to_clear = find_cleared_db_versions_for_actor(&tx, &[3, 4].into())?; + tx.execute("DELETE FROM __corro_bookkeeping WHERE db_version = 2", [])?; + tx.execute( + "UPDATE __corro_bookkeeping SET end_version = 2 WHERE start_version = 1;", + [], + )?; + let to_clear = find_cleared_db_versions(&tx)?; assert!(to_clear.is_empty()); @@ -3038,44 +3144,42 @@ pub mod tests { _ = tracing_subscriber::fmt::try_init(); let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); - let agents = futures::stream::iter(0..10) - .chunks(50) - .fold(vec![], { + let agents = futures::StreamExt::fold(futures::stream::iter(0..10).chunks(50), vec![], { + let tripwire = tripwire.clone(); + move |mut agents: Vec, to_launch| { let tripwire = tripwire.clone(); - move |mut agents: Vec, to_launch| { - let tripwire = tripwire.clone(); - async move { - for n in to_launch { - println!("LAUNCHING AGENT #{n}"); - let mut rng = StdRng::from_entropy(); - let bootstrap = agents - .iter() - .map(|ta| ta.agent.gossip_addr()) - .choose_multiple(&mut rng, 10); - agents.push( - launch_test_agent( - |conf| { - conf.gossip_addr("127.0.0.1:0".parse().unwrap()) - .bootstrap( - bootstrap - .iter() - .map(SocketAddr::to_string) - .collect::>(), - ) - .build() - }, - tripwire.clone(), - ) - .await - .unwrap(), - ); - } - tokio::time::sleep(Duration::from_secs(1)).await; - agents + async move { + for n in to_launch { + println!("LAUNCHING AGENT #{n}"); + let mut rng = StdRng::from_entropy(); + let bootstrap = agents + .iter() + .map(|ta| ta.agent.gossip_addr()) + .choose_multiple(&mut rng, 10); + agents.push( + launch_test_agent( + |conf| { + conf.gossip_addr("127.0.0.1:0".parse().unwrap()) + .bootstrap( + bootstrap + .iter() + .map(SocketAddr::to_string) + .collect::>(), + ) + .build() + }, + tripwire.clone(), + ) + .await + .unwrap(), + ); } + tokio::time::sleep(Duration::from_secs(1)).await; + agents } - }) - .await; + } + }) + .await; let mut start_id = 0; diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index e133fbfd..1be9cd45 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -10,7 +10,7 @@ use std::{ use arc_swap::ArcSwap; use camino::Utf8PathBuf; -use metrics::{gauge, histogram, increment_counter}; +use metrics::{gauge, histogram}; use parking_lot::RwLock; use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{Connection, InterruptHandle}; @@ -29,7 +29,7 @@ use tokio::{ task::block_in_place, }; use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info}; use tripwire::Tripwire; use crate::{ @@ -370,24 +370,26 @@ impl SplitPool { async fn timeout_wait( token: CancellationToken, - handle: InterruptHandle, - timeout: Duration, + _handle: InterruptHandle, + _timeout: Duration, queue: &'static str, ) { let start = Instant::now(); - tokio::select! { - biased; - _ = token.cancelled() => { - trace!("conn dropped before timeout"); - histogram!("corro.sqlite.pool.execution.seconds", start.elapsed().as_secs_f64(), "queue" => queue); - return; - }, - _ = tokio::time::sleep(timeout) => { - warn!("conn execution timed out, interrupting!"); - } - } - handle.interrupt(); - increment_counter!("corro.sqlite.pool.execution.timeout"); + token.cancelled().await; + histogram!("corro.sqlite.pool.execution.seconds", start.elapsed().as_secs_f64(), "queue" => queue); + // tokio::select! { + // biased; + // _ = token.cancelled() => { + // trace!("conn dropped before timeout"); + // histogram!("corro.sqlite.pool.execution.seconds", start.elapsed().as_secs_f64(), "queue" => queue); + // return; + // }, + // _ = tokio::time::sleep(timeout) => { + // warn!("conn execution timed out, interrupting!"); + // } + // } + // handle.interrupt(); + // increment_counter!("corro.sqlite.pool.execution.timeout"); // FIXME: do we need to cancel the token? } @@ -562,6 +564,10 @@ impl<'a> BookWriter<'a> { } } + pub fn contains_current(&self, version: &i64) -> bool { + matches!(self.0.get(version), Some(KnownDbVersion::Current { .. })) + } + pub fn contains_all( &self, mut versions: RangeInclusive,