diff --git a/Cargo.lock b/Cargo.lock index 9304f028..8fb509e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1360,9 +1360,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "foca" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73879bd4f1b8e2a1a39886488ac24641b20ade95238726325d516ae98d1ed00a" +checksum = "1ae5801563a919f58548b626bbf37ab589cfbf8c18007ddae589207338abd05e" dependencies = [ "anyhow", "bincode", @@ -3877,18 +3877,17 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "uhlc" -version = "0.5.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d291a7454d390b753ef68df8145da18367e32883ec2fa863959f0aefb915cdb" +checksum = "d1eadef1fa26cbbae1276c46781e8f4d888bdda434779c18ae6c2a0e69991885" dependencies = [ "defmt", - "hex", "humantime", "lazy_static", "log", + "rand", "serde", "spin 0.9.8", - "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 625e7e32..6e44fc59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ crc32fast = "1.3.2" enquote = "1.1.0" eyre = "0.6.8" fallible-iterator = "0.2.0" -foca = { version = "0.15.0", features = ["std", "tracing", "bincode-codec", "serde"] } +foca = { version = "0.16.0", features = ["std", "tracing", "bincode-codec", "serde"] } futures = "0.3.28" futures-util = "0.3.28" hex = "0.4.3" @@ -69,7 +69,7 @@ tracing = "0.1.37" tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] } tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] } trust-dns-resolver = "0.22.0" -uhlc = { version = "0.5.2", features = ["defmt"] } +uhlc = { version = "0.6.3", features = ["defmt"] } uuid = { version = "1.3.1", features = ["v4", "serde"] } webpki = { version = "0.22.0", features = ["std"] } http = { version = "0.2.9" } diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index 2f816570..d05ddd7d 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -1,7 +1,11 @@ use std::{fmt::Display, time::Duration}; use camino::Utf8PathBuf; -use corro_types::{agent::Agent, sqlite::SqlitePoolError, sync::generate_sync}; +use corro_types::{ + agent::{Agent, LockKind, LockMeta, LockState}, + sqlite::SqlitePoolError, + sync::generate_sync, +}; use futures::{SinkExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use spawn::spawn_counted; @@ -75,6 +79,7 @@ pub fn start_server( pub enum Command { Ping, Sync(SyncCommand), + Locks { top: usize }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -125,6 +130,25 @@ type FramedStream = Framed< Json, >; +#[derive(Serialize, Deserialize)] +pub struct LockMetaElapsed { + pub label: String, + pub kind: LockKind, + pub state: LockState, + pub duration: Duration, +} + +impl From for LockMetaElapsed { + fn from(value: LockMeta) -> Self { + LockMetaElapsed { + label: value.label.into(), + kind: value.kind, + state: value.state, + duration: value.started_at.elapsed(), + } + } +} + async fn handle_conn( agent: Agent, _config: AdminConfig, @@ -148,6 +172,33 @@ async fn handle_conn( Err(e) => send_error(&mut stream, e).await, } } + Command::Locks { top } => { + info_log(&mut stream, "gathering top locks").await; + let registry = { + agent + .bookie() + .read("admin:registry") + .await + .registry() + .clone() + }; + + let topn: Vec = { + registry + .map + .read() + .values() + .take(top) + .cloned() + .map(LockMetaElapsed::from) + .collect() + }; + + match serde_json::to_value(&topn) { + Ok(json) => send(&mut stream, Response::Json(json)).await, + Err(e) => send_error(&mut stream, e).await, + } + } }, Ok(None) => { debug!("done with admin conn"); diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 42cbf8cf..28912d2e 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -25,12 +25,10 @@ use crate::{ use arc_swap::ArcSwap; use corro_types::{ actor::{Actor, ActorId}, - agent::{ - Agent, AgentConfig, Booked, BookedVersions, Bookie, ChangeError, KnownDbVersion, SplitPool, - }, + agent::{Agent, AgentConfig, BookedVersions, Bookie, ChangeError, KnownDbVersion, SplitPool}, broadcast::{ - BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeV1, Changeset, ChangesetParts, - FocaInput, Timestamp, UniPayload, UniPayloadV1, + BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeSource, ChangeV1, Changeset, + ChangesetParts, FocaInput, Timestamp, UniPayload, UniPayloadV1, }, change::Change, config::{AuthzConfig, Config, DEFAULT_GOSSIP_PORT}, @@ -50,7 +48,7 @@ use axum::{ }; use bytes::{Bytes, BytesMut}; use foca::{Member, Notification}; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt}; use hyper::{server::conn::AddrIncoming, StatusCode}; use itertools::Itertools; use metrics::{counter, gauge, histogram, increment_counter}; @@ -72,7 +70,7 @@ use tokio_util::codec::{Decoder, FramedRead, LengthDelimitedCodec}; use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer}; use tower_http::trace::TraceLayer; use tracing::{debug, error, info, trace, warn}; -use tripwire::{PreemptibleFutureExt, Tripwire}; +use tripwire::{Outcome, PreemptibleFutureExt, TimeoutFutureExt, Tripwire}; use trust_dns_resolver::{ error::ResolveErrorKind, proto::rr::{RData, RecordType}, @@ -91,6 +89,7 @@ pub struct AgentOptions { pub rx_bcast: Receiver, pub rx_apply: Receiver<(ActorId, i64)>, pub rx_empty: Receiver<(ActorId, RangeInclusive)>, + pub rx_changes: Receiver<(ChangeV1, ChangeSource)>, pub rtt_rx: Receiver<(SocketAddr, Duration)>, pub tripwire: Tripwire, } @@ -151,7 +150,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let mut prepped = conn.prepare_cached( "SELECT actor_id, start_version, end_version, db_version, last_seq, ts - FROM __corro_bookkeeping AS bk", + FROM __corro_bookkeeping", )?; let mut rows = prepped.query([])?; @@ -163,7 +162,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let ranges = bk.entry(row.get(0)?).or_default(); let start_v = row.get(1)?; let end_v: Option = row.get(2)?; - ranges.insert( + ranges.insert_many( start_v..=end_v.unwrap_or(start_v), match row.get(3)? { Some(db_version) => KnownDbVersion::Current { @@ -221,10 +220,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age } let gaps_count = seqs.gaps(&(0..=last_seq)).count(); - ranges.insert( - version..=version, - KnownDbVersion::Partial { seqs, last_seq, ts }, - ); + ranges.insert(version, KnownDbVersion::Partial { seqs, last_seq, ts }); if gaps_count == 0 { info!(%actor_id, version, "found fully buffered, unapplied, changes! scheduling apply"); @@ -235,11 +231,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age debug!("done building bookkeeping"); - let bookie = Bookie::new(Arc::new(tokio::sync::RwLock::new( - bk.into_iter() - .map(|(k, v)| (k, Booked::new(Arc::new(tokio::sync::RwLock::new(v))))) - .collect(), - ))); + let bookie = Bookie::new(bk); let gossip_server_endpoint = gossip_server_endpoint(&conf.gossip).await?; let gossip_addr = gossip_server_endpoint.local_addr()?; @@ -255,14 +247,14 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let clock = Arc::new( uhlc::HLCBuilder::default() - .with_id(actor_id.into()) + .with_id(actor_id.try_into().unwrap()) .with_max_delta(Duration::from_millis(300)) .build(), ); let (tx_bcast, rx_bcast) = channel(10240); - let (tx_empty, rx_empty) = channel(10240); + let (tx_changes, rx_changes) = channel(10240); let opts = AgentOptions { actor_id, @@ -272,6 +264,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age rx_bcast, rx_apply, rx_empty, + rx_changes, rtt_rx, tripwire: tripwire.clone(), }; @@ -288,6 +281,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age tx_bcast, tx_apply, tx_empty, + tx_changes, schema: RwLock::new(schema), tripwire, }); @@ -313,6 +307,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { rx_bcast, rx_apply, rx_empty, + rx_changes, rtt_rx, } = opts; @@ -380,7 +375,11 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let (member_events_tx, member_events_rx) = tokio::sync::broadcast::channel::(512); runtime_loop( - Actor::new(actor_id, agent.gossip_addr()), + Actor::new( + actor_id, + agent.gossip_addr(), + agent.clock().new_timestamp().into(), + ), agent.clone(), transport.clone(), foca_rx, @@ -426,12 +425,22 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { } }); - tokio::spawn({ + spawn_counted({ let agent = agent.clone(); - let tripwire = tripwire.clone(); + let mut tripwire = tripwire.clone(); let foca_tx = foca_tx.clone(); async move { - while let Some(connecting) = gossip_server_endpoint.accept().await { + loop { + let connecting = match gossip_server_endpoint + .accept() + .preemptible(&mut tripwire) + .await + { + Outcome::Completed(Some(connecting)) => connecting, + Outcome::Completed(None) => return, + Outcome::Preempted(_) => break, + }; + let process_uni_tx = process_uni_tx.clone(); let agent = agent.clone(); let tripwire = tripwire.clone(); @@ -516,9 +525,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let mut codec = LengthDelimitedCodec::new(); let mut buf = BytesMut::new(); - let mut stream_ended = false; - - loop { + let stream_ended = loop { loop { match codec.decode(&mut buf) { Ok(Some(b)) => { @@ -554,31 +561,21 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { } } } - match timeout( - Duration::from_secs(5), - rx.read_buf(&mut buf), - ) - .await - { - Ok(Ok(0)) => { - stream_ended = true; - break; + + match rx.read_buf(&mut buf).await { + Ok(0) => { + break true; } - Ok(Ok(n)) => { + Ok(n) => { counter!("corro.peer.stream.bytes.recv.total", n as u64, "type" => "uni"); trace!("read {n} bytes"); } - Ok(Err(e)) => { + Err(e) => { error!("error reading bytes into buffer: {e}"); - stream_ended = true; - break; - } - Err(_e) => { - warn!("timed out reading from unidirectional stream"); - break; + break true; } } - } + }; if !stream_ended { if let Err(e) = rx.stop(0u32.into()) { @@ -686,6 +683,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { }); }); } + + // graceful shutdown + gossip_server_endpoint.reject_new_connections(); + _ = gossip_server_endpoint + .wait_idle() + .with_timeout(Duration::from_secs(5)) + .await; + gossip_server_endpoint.close(0u32.into(), b"shutting down"); } }); @@ -776,7 +781,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { }; if !states.is_empty() { - // let mut foca_states = Vec::with_capacity(states.len()); + let mut foca_states = Vec::with_capacity(states.len()); { // block to drop the members write lock @@ -791,11 +796,11 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { if matches!(foca_state.state(), foca::State::Suspect) { continue; } - // foca_states.push(foca_state); + foca_states.push(foca_state); } } - // foca_tx.send(FocaInput::ApplyMany(foca_states)).await.ok(); + foca_tx.send(FocaInput::ApplyMany(foca_states)).await.ok(); } let api = Router::new() @@ -898,6 +903,8 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { .inspect(|_| info!("corrosion api is done")), ); + spawn_counted(handle_changes(agent.clone(), rx_changes, tripwire.clone())); + spawn_counted( sync_loop( agent.clone(), @@ -920,28 +927,13 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { )); tokio::spawn(metrics_loop(agent.clone())); - let gossip_chunker = - ReceiverStream::new(bcast_rx).chunks_timeout(10, Duration::from_millis(500)); - tokio::pin!(gossip_chunker); + tokio::spawn(handle_broadcasts(agent.clone(), bcast_rx)); + + // tokio::spawn loop { tokio::select! { biased; - msg = StreamExt::next(&mut gossip_chunker) => match msg { - Some(msg) => { - spawn_counted( - handle_gossip(agent.clone(), msg, false) - .inspect_err(|e| error!("error handling gossip: {e}")).preemptible(tripwire.clone()).complete_or_else(|_| { - warn!("preempted a gossip"); - eyre::eyre!("preempted a gossip") - }) - ); - }, - None => { - error!("NO MORE PARSED MESSAGES"); - break; - } - }, _ = db_cleanup_interval.tick() => { tokio::spawn(handle_db_cleanup(agent.pool().clone()).preemptible(tripwire.clone())); }, @@ -997,10 +989,18 @@ async fn clear_overwritten_versions(agent: Agent) { let mut to_check: BTreeMap = BTreeMap::new(); { - let booked = bookie.read().await; + let booked = bookie.read("clear_overwritten_versions").await; for (actor_id, booked) in booked.iter() { let versions = { - match timeout(Duration::from_secs(1), booked.read()).await { + match timeout( + Duration::from_secs(1), + booked.read(format!( + "clear_overwritten_versions:{}", + actor_id.as_simple() + )), + ) + .await + { Ok(booked) => booked.current_versions(), Err(_) => { info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); @@ -1057,7 +1057,12 @@ async fn clear_overwritten_versions(agent: Agent) { for (actor_id, to_clear) in to_clear_by_actor { info!(%actor_id, "clearing actor {} versions", to_clear.len()); - let booked = bookie.for_actor(actor_id).await; + let booked = { + bookie + .write(format!("to_clear_get_booked:{}", actor_id.as_simple())) + .await + .for_actor(actor_id) + }; let mut conn = match pool.write_low().await { Ok(conn) => conn, @@ -1067,7 +1072,9 @@ async fn clear_overwritten_versions(agent: Agent) { } }; - let mut bookedw = booked.write().await; + let mut bookedw = booked + .write(format!("clearing:{}", actor_id.as_simple())) + .await; let res = block_in_place(|| { let tx = conn.transaction()?; @@ -1091,20 +1098,11 @@ async fn clear_overwritten_versions(agent: Agent) { { match known { KnownDbVersion::Cleared => { - inserted += tx - .prepare_cached( - " - INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) - VALUES (?,?,?) - ON CONFLICT (actor_id, start_version) DO UPDATE SET - end_version = excluded.end_version, - db_version = NULL, - last_seq = NULL, - ts = NULL - WHERE end_version < excluded.end_version - ", - )? - .execute(params![actor_id, range.start(), range.end()])?; + inserted += store_empty_changeset( + &tx, + actor_id, + *range.start()..=*range.end(), + )?; } known => { warn!(%actor_id, "unexpected known db version when attempting to clear: {known:?}"); @@ -1116,8 +1114,8 @@ async fn clear_overwritten_versions(agent: Agent) { debug!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}"); - **bookedw.inner_mut() = new_copy; - debug!("compacted in-memory cache by clearing db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); + **bookedw = BookedVersions(new_copy); + debug!("compacted in-memory cache by clearing db versions for actor {actor_id}, new total: {}", bookedw.len()); Ok::<_, rusqlite::Error>(()) }); @@ -1253,8 +1251,18 @@ pub async fn handle_change( trace!("handling {} changes", change.len()); if bookie - .contains(&change.actor_id, change.versions(), change.seqs()) + .write(format!( + "handle_change(for_actor):{}", + change.actor_id.as_simple() + )) .await + .for_actor(change.actor_id) + .read(format!( + "handle_change(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) { trace!("already seen, stop disseminating"); return; @@ -1324,16 +1332,9 @@ async fn handle_gossip_to_send(transport: Transport, mut to_send_rx: Receiver<(A spawn_counted(async move { let len = data.len(); - match timeout(Duration::from_secs(5), transport.send_datagram(addr, data)).await { - Err(_e) => { - warn!("timed out writing gossip as datagram {addr}"); - return; - } - Ok(Err(e)) => { - error!("could not write datagram {addr}: {e}"); - return; - } - _ => {} + if let Err(e) = transport.send_datagram(addr, data).await { + error!("could not write datagram {addr}: {e}"); + return; } increment_counter!("corro.peer.datagram.sent.total", "actor_id" => actor.id().to_string()); counter!("corro.peer.datagram.bytes.sent.total", len as u64); @@ -1341,27 +1342,22 @@ async fn handle_gossip_to_send(transport: Transport, mut to_send_rx: Receiver<(A } } -async fn handle_gossip( - agent: Agent, - messages: Vec, - high_priority: bool, -) -> eyre::Result<()> { - let priority_label = if high_priority { "high" } else { "normal" }; - counter!("corro.broadcast.recv.count", messages.len() as u64, "priority" => priority_label); - - let rebroadcast = process_messages(&agent, messages).await?; - - for msg in rebroadcast { - if let Err(e) = agent - .tx_bcast() - .send(BroadcastInput::Rebroadcast(msg)) - .await - { - error!("could not send message rebroadcast: {e}"); +async fn handle_broadcasts(agent: Agent, mut bcast_rx: Receiver) { + while let Some(bcast) = bcast_rx.recv().await { + increment_counter!("corro.broadcast.recv.count"); + match bcast { + BroadcastV1::Change(change) => { + if let Err(_e) = agent + .tx_changes() + .send((change, ChangeSource::Broadcast)) + .await + { + error!("changes channel is closed"); + break; + } + } } } - - Ok(()) } async fn handle_notifications( @@ -1603,12 +1599,12 @@ fn store_empty_changeset( tx: &Transaction, actor_id: ActorId, versions: RangeInclusive, -) -> Result<(), rusqlite::Error> { - // TODO: make sure this makes sense - tx.prepare_cached( - " - INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, ts) - VALUES (?, ?, ?, ?, ?) +) -> Result { + let inserted = tx + .prepare_cached( + " + INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, last_seq, ts) + VALUES (?, ?, ?, NULL, NULL, NULL) ON CONFLICT (actor_id, start_version) DO UPDATE SET end_version = excluded.end_version, db_version = NULL, @@ -1616,20 +1612,14 @@ fn store_empty_changeset( ts = NULL WHERE end_version < excluded.end_version; ", - )? - .execute(params![ - actor_id, - versions.start(), - versions.end(), - rusqlite::types::Null, - rusqlite::types::Null - ])?; + )? + .execute(params![actor_id, versions.start(), versions.end()])?; for version in versions { clear_buffered_meta(tx, actor_id, version)?; } - Ok(()) + Ok(inserted) } fn clear_buffered_meta(tx: &Transaction, actor_id: ActorId, version: i64) -> rusqlite::Result<()> { @@ -1656,13 +1646,27 @@ async fn process_fully_buffered_changes( let mut conn = agent.pool().write_normal().await?; debug!(%actor_id, version, "acquired write (normal) connection to process fully buffered changes"); - let booked = agent.bookie().for_actor(actor_id).await; - let mut bookedw = booked.write().await; + let booked = { + agent + .bookie() + .write(format!( + "process_fully_buffered(for_actor):{}", + actor_id.as_simple() + )) + .await + .for_actor(actor_id) + }; + let mut bookedw = booked + .write(format!( + "process_fully_buffered(booked writer):{}", + actor_id.as_simple() + )) + .await; debug!(%actor_id, version, "acquired Booked write lock to process fully buffered changes"); let inserted = block_in_place(|| { let (last_seq, ts) = { - match bookedw.inner().get(&version) { + match bookedw.get(&version) { Some(KnownDbVersion::Partial { seqs, last_seq, ts }) => { if seqs.gaps(&(0..=*last_seq)).count() != 0 { error!(%actor_id, version, "found sequence gaps: {:?}, aborting!", seqs.gaps(&(0..=*last_seq)).collect::>()); @@ -1686,31 +1690,6 @@ async fn process_fully_buffered_changes( info!(%actor_id, version, "processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})"); - // { - // // TODO: remove, this is for debugging purposes - // let mut prepped = tx.prepare_cached( - // "SELECT site_id, db_version, seq FROM __corro_buffered_changes - // WHERE site_id = ? - // AND version = ? - // ORDER BY db_version ASC, seq ASC", - // )?; - // let mut mapped = prepped.query_map(params![actor_id.as_bytes(), version], |row| { - // Ok(( - // (row.get::<_, ActorId>(0)?, row.get::<_, i64>(1)?), - // row.get::<_, i64>(2)?, - // )) - // })?; - // let mut map: BTreeMap<_, RangeInclusiveSet> = BTreeMap::new(); - - // while let Some(Ok(((actor_id, db_version), seq))) = mapped.next() { - // map.entry((actor_id, db_version)) - // .or_default() - // .insert(seq..=seq); - // } - - // info!(%actor_id, version, "buffered changes contents: {map:?}"); - // } - let max_db_version: Option = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?")?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional()?; let start = Instant::now(); @@ -1747,7 +1726,25 @@ async fn process_fully_buffered_changes( tx.query_row("SELECT crsql_next_db_version()", [], |row| row.get(0))?; debug!("db version: {db_version}"); - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; + tx.prepare_cached( + " + INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) + VALUES ( + :actor_id, + :version, + :db_version, + :last_seq, + :ts + );", + )? + .execute(named_params! { + ":actor_id": actor_id, + ":version": version, + ":db_version": db_version, + // ":start_version": 0, + ":last_seq": last_seq, + ":ts": ts + })?; info!(%actor_id, version, "inserted bookkeeping row after buffered insert"); @@ -1757,15 +1754,7 @@ async fn process_fully_buffered_changes( ts, }) } else { - let _inserted = tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, last_seq, ts) VALUES (?, ?, ?, ?);")?.execute(params![actor_id, version, last_seq, ts])?; - - // if inserted > 0 { - // info!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert"); - // Some(KnownDbVersion::Cleared) - // } else { - // warn!(%actor_id, version, "bookkeeping row already existed, it shouldn't matter but it would be nice to fix this issue"); - // None - // } + store_empty_changeset(&tx, actor_id, version..=version)?; info!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert"); Some(KnownDbVersion::Cleared) @@ -1788,51 +1777,75 @@ async fn process_fully_buffered_changes( pub async fn process_multiple_changes( agent: &Agent, - changes: Vec, -) -> Result, ChangeError> { + changes: Vec<(ChangeV1, ChangeSource)>, +) -> Result<(), ChangeError> { + info!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.len()); + let bookie = agent.bookie(); let mut seen = HashSet::new(); let mut unknown_changes = Vec::with_capacity(changes.len()); - for change in changes { + for (change, src) in changes { let versions = change.versions(); let seqs = change.seqs(); if !seen.insert((change.actor_id, versions, seqs.cloned())) { continue; } if bookie - .contains(&change.actor_id, change.versions(), change.seqs()) + .write(format!( + "process_multiple_changes(for_actor):{}", + change.actor_id.as_simple() + )) .await + .for_actor(change.actor_id) + .read(format!( + "process_multiple_changes(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) { continue; } - unknown_changes.push(change); + unknown_changes.push((change, src)); } - // NOTE: should we use `Vec::with_capacity(unknown_changes.len())?` - let mut res = vec![]; - - unknown_changes.sort_by_key(|change| change.actor_id); + unknown_changes.sort_by_key(|(change, _src)| change.actor_id); let mut conn = agent.pool().write_normal().await?; - for (actor_id, changes) in unknown_changes - .into_iter() - .group_by(|change| change.actor_id) - .into_iter() - { - block_in_place(|| { - let mut knowns = vec![]; - let mut changesets = vec![]; + block_in_place(|| { + let tx = conn.transaction()?; + + let mut knowns: BTreeMap> = BTreeMap::new(); + let mut changesets = vec![]; + + let mut last_db_version = None; + for (actor_id, changes) in unknown_changes + .into_iter() + .group_by(|(change, _src)| change.actor_id) + .into_iter() + { + // get a lock on the actor id's booked writer if we didn't already { - let booked = bookie.for_actor_blocking(actor_id); - let mut booked_write = booked.blocking_write(); + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .for_actor(actor_id) + }; + let booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer):{}", + actor_id.as_simple() + )); let mut seen = RangeInclusiveMap::new(); - for change in changes { + for (change, src) in changes { let seqs = change.seqs(); if booked_write.contains_all(change.versions(), change.seqs()) { trace!( @@ -1867,8 +1880,6 @@ pub async fn process_multiple_changes( { error!("could not send empty changed versions into channel: {e}"); } - // insert into in-memory bookkeeping right away - booked_write.insert_many(change.versions(), KnownDbVersion::Cleared); ( KnownDbVersion::Cleared, Changeset::Empty { @@ -1876,58 +1887,131 @@ pub async fn process_multiple_changes( }, ) } else { - let tx = conn.transaction()?; - - let (known, changeset) = match process_single_version(&tx, change) { - Ok(res) => res, + let (known, changeset) = match process_single_version( + &tx, + last_db_version, + change, + ) { + Ok((known, changeset)) => { + if let KnownDbVersion::Current { db_version, .. } = &known { + last_db_version = Some(*db_version); + } + (known, changeset) + } Err(e) => { error!(%actor_id, ?versions, "could not process single change: {e}"); continue; } }; - - tx.commit()?; + debug!(%actor_id, self_actor_id = %agent.actor_id(), versions = ?changeset.versions(), "got known to insert: {known:?}"); (known, changeset) }; seen.insert(versions.clone(), known.clone()); - changesets.push((actor_id, changeset)); - knowns.push((versions, known)); + changesets.push((actor_id, changeset, src)); + knowns.entry(actor_id).or_default().push((versions, known)); } + } + } - for (versions, known) in knowns { - if let KnownDbVersion::Partial { seqs, last_seq, .. } = &known { - let full_seqs_range = 0..=*last_seq; - let gaps_count = seqs.gaps(&full_seqs_range).count(); - let version = *versions.start(); - if gaps_count == 0 { - // if we have no gaps, then we can schedule applying all these changes. - info!(%actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); - let tx_apply = agent.tx_apply().clone(); - tokio::spawn(async move { - if let Err(e) = tx_apply.send((actor_id, version)).await { - error!("could not send trigger for applying fully buffered changes later: {e}"); - } - }); - } else { - debug!(%actor_id, version, "still have {gaps_count} gaps in partially buffered seqs"); - } + let mut count = 0; + + for (actor_id, knowns) in knowns.iter_mut() { + debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); + for (versions, known) in knowns.iter_mut() { + match known { + KnownDbVersion::Partial { .. } => { + continue; + } + KnownDbVersion::Current { + db_version, + last_seq, + ts, + } => { + count += 1; + let version = versions.start(); + debug!(%actor_id, self_actor_id = %agent.actor_id(), version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); + tx.prepare_cached(" + INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) + VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);")? + .execute(named_params!{ + ":actor_id": actor_id, + ":start_version": *version, + ":db_version": *db_version, + ":last_seq": *last_seq, + ":ts": *ts + })?; + } + KnownDbVersion::Cleared => { + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); + store_empty_changeset(&tx, *actor_id, versions.clone())?; } - booked_write.insert_many(versions, known); } + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); } + } + + debug!("inserted {count} new changesets"); - for (actor_id, changeset) in changesets { - process_subs(agent, changeset.changes()); - res.push((actor_id, changeset)); + tx.commit()?; + + for (actor_id, knowns) in knowns { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .for_actor(actor_id) + }; + let mut booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer):{}", + actor_id.as_simple() + )); + + for (versions, known) in knowns { + if let KnownDbVersion::Partial { seqs, last_seq, .. } = &known { + let full_seqs_range = 0..=*last_seq; + let gaps_count = seqs.gaps(&full_seqs_range).count(); + let version = *versions.start(); + if gaps_count == 0 { + // if we have no gaps, then we can schedule applying all these changes. + info!(%actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); + let tx_apply = agent.tx_apply().clone(); + tokio::spawn(async move { + if let Err(e) = tx_apply.send((actor_id, version)).await { + error!("could not send trigger for applying fully buffered changes later: {e}"); + } + }); + } else { + debug!(%actor_id, version, "still have {gaps_count} gaps in partially buffered seqs"); + } + } + booked_write.insert_many(versions, known); } + } - Ok::<_, ChangeError>(()) - })?; - } + for (actor_id, changeset, src) in changesets { + process_subs(agent, changeset.changes()); + if matches!(src, ChangeSource::Broadcast) && !changeset.is_empty() { + if let Err(_e) = + agent + .tx_bcast() + .try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(ChangeV1 { + actor_id, + changeset, + }))) + { + debug!("broadcasts are full or done!"); + } + } + } - Ok(res) + Ok::<_, ChangeError>(()) + })?; + + Ok(()) } fn process_incomplete_version( @@ -2023,6 +2107,7 @@ fn process_incomplete_version( fn process_complete_version( tx: &Transaction, actor_id: ActorId, + last_db_version: Option, versions: RangeInclusive, parts: ChangesetParts, ) -> rusqlite::Result<(KnownDbVersion, Changeset)> { @@ -2034,18 +2119,28 @@ fn process_complete_version( ts, } = parts; - info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}"); + let len = changes.len(); + + let max_db_version = changes.iter().map(|c| c.db_version).max().unwrap_or(0); + + info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}, changes len: {len}, max db version: {max_db_version}"); + + debug_assert!(len <= (seqs.end() - seqs.start() + 1) as usize); let mut impactful_changeset = vec![]; let mut last_rows_impacted = 0; - let changes_len = changes.len(); - let mut changes_per_table = BTreeMap::new(); + // we need to manually increment the next db version for each changeset + tx + .prepare_cached("SELECT CASE WHEN COALESCE(?, crsql_db_version()) >= ? THEN crsql_next_db_version(crsql_next_db_version() + 1) END")? + .query_row(params![last_db_version, max_db_version], |_row| Ok(()))?; + for change in changes { trace!("inserting change! {change:?}"); + tx.prepare_cached( r#" INSERT INTO crsql_changes @@ -2063,6 +2158,7 @@ fn process_complete_version( change.db_version, &change.site_id, change.cl, + // increment the seq by the start_seq or else we'll have multiple change rows with the same seq change.seq, ])?; let rows_impacted: i64 = tx @@ -2083,22 +2179,13 @@ fn process_complete_version( last_rows_impacted = rows_impacted; } - let db_version: i64 = tx - .prepare_cached("SELECT crsql_next_db_version()")? - .query_row((), |row| row.get(0))?; - let (known_version, new_changeset) = if impactful_changeset.is_empty() { - debug!(%actor_id, version, - "inserting CLEARED bookkeeping row db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", - ); - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) VALUES (?, ?, ?);")? - .execute(params![actor_id, version, version])?; (KnownDbVersion::Cleared, Changeset::Empty { versions }) } else { - debug!(%actor_id, version, - "inserting bookkeeping row db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", - ); - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; + // TODO: find a way to avoid this... + let db_version: i64 = tx + .prepare_cached("SELECT crsql_next_db_version()")? + .query_row([], |row| row.get(0))?; ( KnownDbVersion::Current { db_version, @@ -2115,8 +2202,6 @@ fn process_complete_version( ) }; - debug!(%actor_id, version, "inserted bookkeeping row"); - // in case we got both buffered data and a complete set of changes clear_buffered_meta(tx, actor_id, version)?; @@ -2129,6 +2214,7 @@ fn process_complete_version( fn process_single_version( tx: &Transaction, + last_db_version: Option, change: ChangeV1, ) -> rusqlite::Result<(KnownDbVersion, Changeset)> { let ChangeV1 { @@ -2142,6 +2228,7 @@ fn process_single_version( process_complete_version( tx, actor_id, + last_db_version, versions, changeset .into_parts() @@ -2157,29 +2244,6 @@ fn process_single_version( Ok((known, changeset)) } -async fn process_messages( - agent: &Agent, - bcast: Vec, -) -> Result, ChangeError> { - let changes = bcast - .into_iter() - .map(|bcast| match bcast { - BroadcastV1::Change(change) => change, - }) - .collect(); - - Ok(process_multiple_changes(agent, changes) - .await? - .into_iter() - .map(|(actor_id, changeset)| { - BroadcastV1::Change(ChangeV1 { - actor_id, - changeset, - }) - }) - .collect()) -} - pub fn process_subs(agent: &Agent, changeset: &[Change]) { trace!("process subs..."); @@ -2247,6 +2311,8 @@ pub enum SyncRecvError { ExpectedClockMessage, #[error("timed out waiting for sync message")] TimedOut, + #[error("changes channel is closed")] + ChangesChannelClosed, } async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncClientError> { @@ -2332,6 +2398,73 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli Ok(()) } +const MIN_CHANGES_CHUNK: usize = 1000; + +async fn handle_changes( + agent: Agent, + mut rx_changes: Receiver<(ChangeV1, ChangeSource)>, + mut tripwire: Tripwire, +) { + let mut buf = vec![]; + let mut count = 0; + + let mut max_wait = tokio::time::interval(Duration::from_millis(500)); + + loop { + tokio::select! { + Some((change, src)) = rx_changes.recv() => { + count += std::cmp::max(change.len(), 1); + buf.push((change, src)); + if count < MIN_CHANGES_CHUNK { + continue; + } + }, + _ = max_wait.tick() => { + // got a wait interval tick... + if buf.is_empty() { + continue; + } + }, + _ = &mut tripwire => { + break; + } + else => { + break; + } + } + + // drain and process current changes! + if let Err(e) = process_multiple_changes(&agent, buf.drain(..).collect()).await { + error!("could not process multiple changes: {e}"); + } + + // reset count + count = 0; + } + + info!("draining changes receiver..."); + + // drain! + while let Ok((change, src)) = rx_changes.try_recv() { + count += std::cmp::max(change.len(), 1); + buf.push((change, src)); + if count >= MIN_CHANGES_CHUNK { + // drain and process current changes! + if let Err(e) = process_multiple_changes(&agent, buf.drain(..).collect()).await { + error!("could not process multiple changes: {e}"); + } + + // reset count + count = 0; + } + } + + // process the last changes we got! + if let Err(e) = process_multiple_changes(&agent, buf).await { + error!("could not process multiple changes: {e}"); + } +} + const CHECK_EMPTIES_TO_INSERT_AFTER: Duration = Duration::from_secs(120); async fn sync_loop( @@ -2472,25 +2605,44 @@ async fn process_completed_empties( agent: &Agent, empties: &mut BTreeMap>>, ) -> eyre::Result<()> { + info!( + "processing empty versions (count: {})", + empties.values().map(Vec::len).sum::() + ); let mut conn = agent.pool().write_normal().await?; block_in_place(|| { let tx = conn.transaction()?; + + let mut inserted = 0; while let Some((actor_id, empties)) = empties.pop_first() { - let booked = agent.bookie().for_actor_blocking(actor_id); - let bookedw = booked.blocking_write(); + let booked = { + agent + .bookie() + .blocking_write(format!( + "process_completed_empties(for_actor_blocking):{}", + actor_id.as_simple() + )) + .for_actor(actor_id) + }; + let bookedw = booked.blocking_write(format!( + "process_completed_empties(booked writer):{}", + actor_id.as_simple() + )); for (range, _) in empties .iter() .filter_map(|range| bookedw.get_key_value(range.start())) .dedup() { - store_empty_changeset(&tx, actor_id, range.clone())?; + inserted += store_empty_changeset(&tx, actor_id, range.clone())?; } } tx.commit()?; + info!("upserted {inserted} empty versions"); + Ok(()) }) } @@ -2590,7 +2742,9 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> { fn v0_2_0_migration(tx: &Transaction) -> rusqlite::Result<()> { tx.execute_batch( - "CREATE INDEX __corro_bookkeeping_db_version ON __corro_bookkeeping (db_version)", + " + CREATE INDEX __corro_bookkeeping_db_version ON __corro_bookkeeping (db_version); + ", ) } @@ -2725,24 +2879,30 @@ pub mod tests { println!("body: {body:?}"); - let bk: Vec<(ActorId, i64, i64)> = ta1 + #[allow(clippy::type_complexity)] + let bk: Vec<(ActorId, i64, Option, i64, Option)> = ta1 .agent .pool() .read() .await? - .prepare("SELECT actor_id, start_version, db_version FROM __corro_bookkeeping")? + .prepare("SELECT actor_id, start_version, end_version, db_version, last_seq FROM __corro_bookkeeping")? .query_map((), |row| { Ok(( row.get::<_, ActorId>(0)?, row.get::<_, i64>(1)?, - row.get::<_, i64>(2)?, + row.get::<_, Option>(2)?, + row.get::<_, i64>(3)?, + row.get::<_, Option>(4)?, )) })? .collect::>()?; assert_eq!( bk, - vec![(ta1.agent.actor_id(), 1, 1), (ta1.agent.actor_id(), 2, 2)] + vec![ + (ta1.agent.actor_id(), 1, None, 1, Some(0)), + (ta1.agent.actor_id(), 2, None, 2, Some(0)) + ] ); let svc: TestRecord = ta1.agent.pool().read().await?.query_row( @@ -2986,24 +3146,30 @@ pub mod tests { })? .collect::>()?; - info!("versions count: {counts:?}"); + debug!("versions count: {counts:?}"); let actual_count: i64 = conn.query_row("SELECT count(*) FROM crsql_changes;", (), |row| row.get(0))?; - debug!("actual count: {actual_count}"); + info!("actual count: {actual_count}"); let bookie = ta.agent.bookie(); - debug!( + info!( "last version: {:?}", - bookie.last(&ta.agent.actor_id()).await + bookie + .write("test") + .await + .for_actor(ta.agent.actor_id()) + .read("test") + .await + .last() ); let sync = generate_sync(bookie, ta.agent.actor_id()).await; let needed = sync.need_len(); debug!("generated sync: {sync:?}"); - debug!("needed: {needed}"); + info!("needed: {needed}"); v.push((counts.values().sum::(), needed)); } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index d21cccd7..d820fc90 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -6,8 +6,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use bytes::{Buf, BufMut, BytesMut}; +use compact_str::format_compact; use corro_types::agent::{Agent, KnownDbVersion, SplitPool}; -use corro_types::broadcast::{ChangeV1, Changeset, Timestamp}; +use corro_types::broadcast::{ChangeSource, ChangeV1, Changeset, Timestamp}; use corro_types::change::{row_to_change, Change}; use corro_types::config::{GossipConfig, TlsClientConfig}; use corro_types::sync::{SyncMessage, SyncMessageEncodeError, SyncMessageV1, SyncStateV1}; @@ -23,7 +24,7 @@ use tokio_stream::StreamExt; use tokio_util::codec::{Encoder, FramedRead, LengthDelimitedCodec}; use tracing::{debug, error, info, trace, warn}; -use crate::agent::{process_multiple_changes, SyncRecvError}; +use crate::agent::SyncRecvError; use crate::api::public::ChunkedChanges; use corro_types::{ @@ -336,7 +337,10 @@ async fn process_range( let overlapping: Vec<(_, KnownDbVersion)> = { booked - .read() + .read(format!( + "process_range(overlapping):{}", + actor_id.as_simple() + )) .await .overlapping(range) .map(|(k, v)| (k.clone(), v.clone())) @@ -359,7 +363,13 @@ async fn process_range( } for version in versions { - let known = { booked.read().await.get(&version).cloned() }; + let known = { + booked + .read(format!("process_range[{version}]:{}", actor_id.as_simple())) + .await + .get(&version) + .cloned() + }; if let Some(known_version) = known { process_version( pool, @@ -395,11 +405,15 @@ fn handle_known_version( ts: Timestamp, sender: &Sender, ) -> eyre::Result<()> { + debug!(%actor_id, %version, "handle known version! known: {init_known:?}, seqs_needed: {seqs_needed:?}"); let mut seqs_iter = seqs_needed.into_iter(); while let Some(range_needed) = seqs_iter.by_ref().next() { match &init_known { KnownDbVersion::Current { db_version, .. } => { - let bw = booked.blocking_write(); + let bw = booked.blocking_write(format_compact!( + "sync_handle_known[{version}]:{}", + actor_id.as_simple() + )); match bw.get(&version) { Some(known) => { // a current version cannot go back to a partial version @@ -476,8 +490,11 @@ fn handle_known_version( debug!("partial, effective range: {start_seq}..={end_seq}"); - let bw = booked.blocking_write(); - let maybe_db_version = match bw.get(&version) { + let bw = booked.blocking_write(format_compact!( + "sync_handle_known(partial)[{version}]:{}", + actor_id.as_simple() + )); + let maybe_current_version = match bw.get(&version) { Some(known) => match known { KnownDbVersion::Partial { seqs, .. } => { if seqs != &partial_seqs { @@ -491,7 +508,7 @@ fn handle_known_version( } None } - KnownDbVersion::Current { db_version, .. } => Some(*db_version), + known @ KnownDbVersion::Current { .. } => Some(known.clone()), KnownDbVersion::Cleared => { debug!(%actor_id, version, "in-memory bookkeeping has been cleared, aborting."); break; @@ -503,24 +520,25 @@ fn handle_known_version( } }; - if let Some(db_version) = maybe_db_version { + if let Some(known) = maybe_current_version { info!(%actor_id, version, "switched from partial to current version"); + + // drop write lock drop(bw); + + // restart the seqs_needed here! let mut seqs_needed: Vec> = seqs_iter.collect(); if let Some(new_start_seq) = last_sent_seq.take() { range_needed = (new_start_seq + 1)..=*range_needed.end(); } seqs_needed.insert(0, range_needed); + return handle_known_version( conn, actor_id, is_local, version, - KnownDbVersion::Current { - db_version, - last_seq, - ts, - }, + known, booked, seqs_needed, last_seq, @@ -693,7 +711,7 @@ async fn process_sync( ) -> eyre::Result<()> { let booked_actors: HashMap = { bookie - .read() + .read("process_sync") .await .iter() .map(|(k, v)| (*k, v.clone())) @@ -720,7 +738,13 @@ async fn process_sync( // 2. process partial needs if let Some(partially_needed) = sync_state.partial_need.get(&actor_id) { for (version, seqs_needed) in partially_needed.iter() { - let known = { booked.read().await.get(version).cloned() }; + let known = { + booked + .read(format!("process_sync(partials)[{version}]")) + .await + .get(version) + .cloned() + }; if let Some(known) = known { process_version( &pool, @@ -739,9 +763,16 @@ async fn process_sync( // 3. process newer-than-heads let their_last_version = sync_state.heads.get(&actor_id).copied().unwrap_or(0); - let our_last_version = booked.last().await.unwrap_or(0); + let our_last_version = booked + .read(format!( + "process_sync(our_last_version):{}", + actor_id.as_simple() + )) + .await + .last() + .unwrap_or(0); - debug!(actor_id = %local_actor_id, "their last version: {their_last_version} vs ours: {our_last_version}"); + trace!(actor_id = %local_actor_id, "their last version: {their_last_version} vs ours: {our_last_version}"); if their_last_version >= our_last_version { // nothing to teach the other node! @@ -870,14 +901,19 @@ pub async fn bidirectional_sync( write.flush().await.map_err(SyncSendError::from)?; match read_sync_msg(&mut read).await? { - Some(SyncMessage::V1(SyncMessageV1::Clock(ts))) => { - if let Err(e) = agent - .clock() - .update_with_timestamp(&uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into())) - { - warn!("could not update clock from actor {their_actor_id}: {e}"); + Some(SyncMessage::V1(SyncMessageV1::Clock(ts))) => match their_actor_id.try_into() { + Ok(id) => { + if let Err(e) = agent + .clock() + .update_with_timestamp(&uhlc::Timestamp::new(ts.to_ntp64(), id)) + { + warn!("could not update clock from actor {their_actor_id}: {e}"); + } } - } + Err(e) => { + error!("could not convert ActorId to uhlc ID: {e}"); + } + }, Some(_) => return Err(SyncRecvError::ExpectedClockMessage.into()), None => return Err(SyncRecvError::UnexpectedEndOfStream.into()), } @@ -893,6 +929,8 @@ pub async fn bidirectional_sync( .inspect_err(|e| error!("could not process sync request: {e}")), ); + let tx_changes = agent.tx_changes().clone(); + let (_sent_count, recv_count) = tokio::try_join!( async move { let mut count = 0; @@ -917,7 +955,6 @@ pub async fn bidirectional_sync( } }, _ = check_buf.tick() => { - println!("checking if buf is not empty and sending if necessary, len: {}", send_buf.len()); if !send_buf.is_empty() { write_sync_buf(&mut send_buf, &mut write).await?; } @@ -942,9 +979,6 @@ pub async fn bidirectional_sync( async move { let mut count = 0; - // changes buffer - let mut buf = Vec::with_capacity(4); - loop { match read_sync_msg(&mut read).await { Ok(None) => { @@ -956,9 +990,11 @@ pub async fn bidirectional_sync( } Ok(Some(msg)) => match msg { SyncMessage::V1(SyncMessageV1::Changeset(change)) => { - let len = change.len(); - buf.push(change); - count += len; + count += change.len(); + tx_changes + .send((change, ChangeSource::Sync)) + .await + .map_err(|_| SyncRecvError::ChangesChannelClosed)?; } SyncMessage::V1(SyncMessageV1::State(_)) => { warn!("received sync state message more than once, ignoring"); @@ -970,18 +1006,8 @@ pub async fn bidirectional_sync( } }, } - - if buf.len() == buf.capacity() { - process_multiple_changes(agent, buf.drain(..).collect()) - .await - .map_err(SyncRecvError::from)?; - } } - process_multiple_changes(agent, buf.drain(..).collect()) - .await - .map_err(SyncRecvError::from)?; - debug!(actor_id = %agent.actor_id(), "done reading sync messages"); counter!("corro.sync.changes.recv", count as u64, "actor_id" => their_actor_id.to_string()); @@ -995,15 +1021,228 @@ pub async fn bidirectional_sync( #[cfg(test)] mod tests { + use axum::{Extension, Json}; use camino::Utf8PathBuf; + use corro_tests::TEST_SCHEMA; use corro_types::{ - config::TlsConfig, + api::{ColumnName, TableName}, + config::{Config, TlsConfig}, + pubsub::pack_columns, tls::{generate_ca, generate_client_cert, generate_server_cert}, }; + use hyper::StatusCode; use tempfile::TempDir; + use tokio::sync::mpsc; + use tripwire::Tripwire; + + use crate::{ + agent::{process_multiple_changes, setup}, + api::public::api_v1_db_schema, + }; use super::*; + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_known_version() -> eyre::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + + let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple(); + + let dir = tempfile::tempdir()?; + + let (agent, _agent_options) = setup( + Config::builder() + .db_path(dir.path().join("corrosion.db").display().to_string()) + .gossip_addr("127.0.0.1:0".parse()?) + .api_addr("127.0.0.1:0".parse()?) + .build()?, + tripwire, + ) + .await?; + + let (status_code, _res) = + api_v1_db_schema(Extension(agent.clone()), Json(vec![TEST_SCHEMA.to_owned()])).await; + + assert_eq!(status_code, StatusCode::OK); + + let actor_id = ActorId(uuid::Uuid::new_v4()); + + let ts = agent.clock().new_timestamp().into(); + + let change1 = Change { + table: TableName("tests".into()), + pk: pack_columns(&vec![1i64.into()])?, + cid: ColumnName("text".into()), + val: "one".into(), + col_version: 1, + db_version: 1, + seq: 0, + site_id: actor_id.to_bytes(), + cl: 1, + }; + + let change2 = Change { + table: TableName("tests".into()), + pk: pack_columns(&vec![2i64.into()])?, + cid: ColumnName("text".into()), + val: "two".into(), + col_version: 1, + db_version: 2, + seq: 0, + site_id: actor_id.to_bytes(), + cl: 1, + }; + + process_multiple_changes( + &agent, + vec![ + ( + ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: 1, + changes: vec![change1.clone()], + seqs: 0..=0, + last_seq: 0, + ts, + }, + }, + ChangeSource::Sync, + ), + ( + ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: 2, + changes: vec![change2.clone()], + seqs: 0..=0, + last_seq: 0, + ts, + }, + }, + ChangeSource::Sync, + ), + ], + ) + .await?; + + let known1 = KnownDbVersion::Current { + db_version: 1, + last_seq: 0, + ts, + }; + + let known2 = KnownDbVersion::Current { + db_version: 2, + last_seq: 0, // original last seq + ts, + }; + + let booked = agent + .bookie() + .read("test") + .await + .get(&actor_id) + .cloned() + .unwrap(); + + { + let read = booked.read("test").await; + + assert_eq!(read.get(&1).unwrap().clone(), known1); + assert_eq!(read.get(&2).unwrap().clone(), known2); + } + + { + let (tx, mut rx) = mpsc::channel(5); + let mut conn = agent.pool().read().await?; + + { + let mut prepped = conn.prepare("SELECT * FROM crsql_changes;")?; + let mut rows = prepped.query([])?; + + loop { + let row = rows.next()?; + if row.is_none() { + break; + } + + println!("ROW: {row:?}"); + } + } + + block_in_place(|| { + handle_known_version( + &mut conn, + actor_id, + false, + 1, + known1, + &booked, + vec![0..=0], + 0, + ts, + &tx, + ) + })?; + + let msg = rx.recv().await.unwrap(); + assert_eq!( + msg, + SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: 1, + changes: vec![change1], + seqs: 0..=0, + last_seq: 0, + ts, + } + })) + ); + + block_in_place(|| { + handle_known_version( + &mut conn, + actor_id, + false, + 2, + known2, + &booked, + vec![0..=0], + 0, + ts, + &tx, + ) + })?; + + let msg = rx.recv().await.unwrap(); + assert_eq!( + msg, + SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: 2, + changes: vec![change2], + seqs: 0..=0, + last_seq: 0, + ts, + } + })) + ); + } + + // make_broadcastable_change(&agent, |tx| { + // tx.execute("INSERT INTO test (id, text) VALUES (1, \"one\")", []) + // })?; + + // make_broadcastable_change(&agent, |tx| { + // tx.execute("INSERT INTO test (id, text) VALUES (2, \"two\")", []) + // })?; + + Ok(()) + } + #[tokio::test] async fn test_mutual_tls() -> eyre::Result<()> { let ca_cert = generate_ca()?; diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index e68d6c36..81fa64b2 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -9,16 +9,16 @@ use bytes::{BufMut, BytesMut}; use compact_str::ToCompactString; use corro_types::{ agent::{Agent, ChangeError, KnownDbVersion}, - api::{ExecResponse, ExecResult, QueryEvent, Statement}, + api::{row_to_change, ExecResponse, ExecResult, QueryEvent, Statement}, broadcast::{ChangeV1, Changeset, Timestamp}, - change::{row_to_change, SqliteValue}, - schema::{make_schema_inner, parse_sql}, + change::SqliteValue, + schema::{apply_schema, parse_sql}, sqlite::SqlitePoolError, }; use hyper::StatusCode; use itertools::Itertools; use metrics::counter; -use rusqlite::{params, params_from_iter, ToSql, Transaction}; +use rusqlite::{named_params, params_from_iter, ToSql, Transaction}; use spawn::spawn_counted; use tokio::{ sync::{ @@ -160,10 +160,18 @@ where trace!("got conn"); let actor_id = agent.actor_id(); - let booked = agent.bookie().for_actor(actor_id).await; + let booked = { + agent + .bookie() + .write("make_broadcastable_changes(for_actor)") + .await + .for_actor(actor_id) + }; // maybe we should do this earlier, but there can only ever be 1 write conn at a time, // so it probably doesn't matter too much, except for reads of internal state - let mut book_writer = booked.write().await; + let mut book_writer = booked + .write("make_broadcastable_changes(booked writer)") + .await; let start = Instant::now(); block_in_place(move || { @@ -189,25 +197,33 @@ where return Ok((ret, start.elapsed())); } + let last_version = book_writer.last().unwrap_or(0); + trace!("last_version: {last_version}"); + let version = last_version + 1; + trace!("version: {version}"); + let last_seq: i64 = tx .prepare_cached( "SELECT MAX(seq) FROM crsql_changes WHERE site_id IS NULL AND db_version = ?", )? .query_row([db_version], |row| row.get(0))?; - let last_version = book_writer.last().unwrap_or(0); - trace!("last_version: {last_version}"); - let version = last_version + 1; - trace!("version: {version}"); - let elapsed = { tx.prepare_cached( r#" INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) - VALUES (?, ?, ?, ?, ?); + VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts); "#, )? - .execute(params![actor_id, version, db_version, last_seq, ts])?; + .execute(named_params! { + ":actor_id": actor_id, + ":start_version": version, + ":db_version": db_version, + ":last_seq": last_seq, + ":ts": ts + })?; + + debug!(%actor_id, %version, %db_version, "inserted local bookkeeping row!"); tx.commit()?; start.elapsed() @@ -630,7 +646,7 @@ async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result< block_in_place(|| { let tx = conn.transaction()?; - make_schema_inner(&tx, &schema_write, &mut new_schema)?; + apply_schema(&tx, &schema_write, &mut new_schema)?; for tbl_name in partial_schema.tables.keys() { tx.execute("DELETE FROM __corro_schema WHERE tbl_name = ?", [tbl_name])?; @@ -772,7 +788,17 @@ mod tests { })) )); - assert_eq!(agent.bookie().last(&agent.actor_id()).await, Some(1)); + assert_eq!( + agent + .bookie() + .write("test") + .await + .for_actor(agent.actor_id()) + .read("test") + .await + .last(), + Some(1) + ); println!("second req..."); diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index a04a9045..83dddfa5 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -206,37 +206,39 @@ pub fn runtime_loop( let states: Vec<_> = { let members = agent.members().read(); - foca.iter_members() + foca.iter_membership_state() .filter_map(|member| { - members.get(&member.id().id()).and_then(|state| { - match serde_json::to_string(member) { - Ok(foca_state) => Some(( - member.id().id(), - state.addr, - "up", - foca_state, - serde_json::Value::Array( - members - .rtts - .get(&state.addr) - .map(|rtt| { - rtt.buf - .iter() - .copied() - .map(serde_json::Value::from) - .collect::>() - }) - .unwrap_or(vec![]), - ), - )), - Err(e) => { - error!( - "could not serialize foca member state: {e}" - ); - None - } + let id = member.id().id(); + let addr = member.id().addr(); + match serde_json::to_string(member) { + Ok(foca_state) => Some(( + id, + addr, + if members.get(&id).is_some() { + "up" + } else { + "down" + }, + foca_state, + serde_json::Value::Array( + members + .rtts + .get(&addr) + .map(|rtt| { + rtt.buf + .iter() + .copied() + .map(serde_json::Value::from) + .collect::>() + }) + .unwrap_or(vec![]), + ), + )), + Err(e) => { + error!("could not serialize foca member state: {e}"); + None } - }) + } }) .collect() }; @@ -361,7 +363,7 @@ pub fn runtime_loop( let foca_state = { // need to bind this... let foca_state = foca - .iter_members() + .iter_membership_state() .find(|member| member.id().id() == actor.id()) .and_then(|member| match serde_json::to_string(member) { Ok(foca_state) => Some(foca_state), @@ -417,12 +419,12 @@ pub fn runtime_loop( ); let upserted = tx.prepare_cached("INSERT INTO __corro_members (actor_id, address, state, foca_state, rtts) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (actor_id) DO UPDATE SET - address = excluded.address, - state = excluded.state, - foca_state = excluded.foca_state, - rtts = excluded.rtts;")? + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (actor_id) DO UPDATE SET + address = excluded.address, + state = excluded.state, + foca_state = excluded.foca_state, + rtts = excluded.rtts;")? .execute(params![ id, address.to_string(), diff --git a/crates/corro-types/src/actor.rs b/crates/corro-types/src/actor.rs index 751b39f9..d19c1c26 100644 --- a/crates/corro-types/src/actor.rs +++ b/crates/corro-types/src/actor.rs @@ -1,4 +1,10 @@ -use std::{fmt, hash::Hash, net::SocketAddr, ops::Deref}; +use std::{ + fmt, + hash::Hash, + net::SocketAddr, + ops::Deref, + time::{Duration, SystemTime}, +}; use corro_api_types::SqliteValue; use foca::Identity; @@ -8,8 +14,11 @@ use rusqlite::{ }; use serde::{Deserialize, Serialize}; use speedy::{Context, Readable, Reader, Writable, Writer}; +use uhlc::NTP64; use uuid::Uuid; +use crate::broadcast::Timestamp; + #[derive( Debug, Default, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, )] @@ -30,9 +39,11 @@ impl ActorId { } } -impl From for uhlc::ID { - fn from(val: ActorId) -> Self { - val.0.into() +impl TryFrom for uhlc::ID { + type Error = uhlc::SizeError; + + fn try_from(value: ActorId) -> Result { + value.as_bytes().try_into() } } @@ -123,8 +134,7 @@ impl FromSql for ActorId { pub struct Actor { id: ActorId, addr: SocketAddr, - // An extra field to allow fast rejoin - bump: u16, + ts: Timestamp, } impl Hash for Actor { @@ -135,12 +145,8 @@ impl Hash for Actor { } impl Actor { - pub fn new(id: ActorId, addr: SocketAddr) -> Self { - Self { - id, - addr, - bump: rand::random(), - } + pub fn new(id: ActorId, addr: SocketAddr, ts: Timestamp) -> Self { + Self { id, addr, ts } } pub fn id(&self) -> ActorId { @@ -149,11 +155,14 @@ impl Actor { pub fn addr(&self) -> SocketAddr { self.addr } + pub fn ts(&self) -> Timestamp { + self.ts + } } impl From for Actor { fn from(value: SocketAddr) -> Self { - Self::new(ActorId(Uuid::nil()), value) + Self::new(ActorId(Uuid::nil()), value, Timestamp::zero()) } } @@ -179,7 +188,13 @@ impl Identity for Actor { Some(Self { id: self.id, addr: self.addr, - bump: self.bump.wrapping_add(1), + ts: NTP64::from(duration_since_epoch()).into(), }) } } + +fn duration_since_epoch() -> Duration { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("could not generate duration since unix epoch") +} diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index dad12ab4..45e987f7 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -4,16 +4,22 @@ use std::{ net::SocketAddr, ops::{Deref, DerefMut, RangeInclusive}, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }; use arc_swap::ArcSwap; use camino::Utf8PathBuf; +use compact_str::CompactString; +use indexmap::IndexMap; use metrics::{gauge, histogram}; use parking_lot::RwLock; use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{Connection, InterruptHandle}; +use serde::{Deserialize, Serialize}; use tokio::{ runtime::Handle, sync::{ @@ -23,8 +29,8 @@ use tokio::{ }; use tokio::{ sync::{ - RwLock as TokioRwLock, RwLockReadGuard as TokioRwLockReadGuard, - RwLockWriteGuard as TokioRwLockWriteGuard, + OwnedRwLockWriteGuard as OwnedTokioRwLockWriteGuard, RwLock as TokioRwLock, + RwLockReadGuard as TokioRwLockReadGuard, RwLockWriteGuard as TokioRwLockWriteGuard, }, task::block_in_place, }; @@ -34,7 +40,7 @@ use tripwire::Tripwire; use crate::{ actor::ActorId, - broadcast::{BroadcastInput, Timestamp}, + broadcast::{BroadcastInput, ChangeSource, ChangeV1, Timestamp}, config::Config, pubsub::MatcherHandle, schema::NormalizedSchema, @@ -61,6 +67,7 @@ pub struct AgentConfig { pub tx_bcast: Sender, pub tx_apply: Sender<(ActorId, i64)>, pub tx_empty: Sender<(ActorId, RangeInclusive)>, + pub tx_changes: Sender<(ChangeV1, ChangeSource)>, pub schema: RwLock, pub tripwire: Tripwire, @@ -79,6 +86,7 @@ pub struct AgentInner { tx_bcast: Sender, tx_apply: Sender<(ActorId, i64)>, tx_empty: Sender<(ActorId, RangeInclusive)>, + tx_changes: Sender<(ChangeV1, ChangeSource)>, schema: RwLock, } @@ -97,6 +105,7 @@ impl Agent { tx_bcast: config.tx_bcast, tx_apply: config.tx_apply, tx_empty: config.tx_empty, + tx_changes: config.tx_changes, schema: config.schema, })) } @@ -133,6 +142,10 @@ impl Agent { &self.0.tx_apply } + pub fn tx_changes(&self) -> &Sender<(ChangeV1, ChangeSource)> { + &self.0.tx_changes + } + pub fn tx_empty(&self) -> &Sender<(ActorId, RangeInclusive)> { &self.0.tx_empty } @@ -397,7 +410,7 @@ async fn timeout_wait( // } // } // handle.interrupt(); - // increment_counter!("corro.sqlite.pool.execution.timeout"); + // increment_tracker!("corro.sqlite.pool.execution.timeout"); // FIXME: do we need to cancel the token? } @@ -434,13 +447,19 @@ impl DerefMut for WriteConn { #[derive(Debug, Clone, Eq, PartialEq)] pub enum KnownDbVersion { Partial { + // range of sequences recorded seqs: RangeInclusiveSet, + // actual last sequence originally produced last_seq: i64, + // timestamp when the change was produced by the source ts: Timestamp, }, Current { + // cr-sqlite db version db_version: i64, + // actual last sequence originally produced last_seq: i64, + // timestamp when the change was produced by the source ts: Timestamp, }, Cleared, @@ -452,111 +471,273 @@ impl KnownDbVersion { } } -pub type BookedVersions = RangeInclusiveMap; -pub type BookedInner = Arc>; +pub struct CountedTokioRwLock { + registry: LockRegistry, + lock: Arc>, +} -#[derive(Default, Clone)] -pub struct Booked(BookedInner); +impl CountedTokioRwLock { + fn new(registry: LockRegistry, value: T) -> Self { + Self { + registry, + lock: Arc::new(TokioRwLock::new(value)), + } + } -impl Booked { - pub fn new(inner: BookedInner) -> Self { - Self(inner) + pub async fn write>( + &self, + label: C, + ) -> CountedTokioRwLockWriteGuard<'_, T> { + self.registry.acquire_write(label, &self.lock).await } - pub async fn contains(&self, version: i64, seqs: Option<&RangeInclusive>) -> bool { - match seqs { - Some(check_seqs) => { - let read = self.0.read().await; - match read.get(&version) { - Some(known) => match known { - KnownDbVersion::Partial { seqs, .. } => { - check_seqs.clone().all(|seq| seqs.contains(&seq)) - } - KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, - }, - None => false, - } - } - None => self.0.read().await.contains_key(&version), - } + pub fn blocking_write>( + &self, + label: C, + ) -> CountedTokioRwLockWriteGuard<'_, T> { + self.registry.acquire_blocking_write(label, &self.lock) } - pub async fn last(&self) -> Option { - self.0.read().await.iter().map(|(k, _v)| *k.end()).max() + pub fn blocking_write_owned>( + &self, + label: C, + ) -> CountedOwnedTokioRwLockWriteGuard { + self.registry + .acquire_blocking_write_owned(label, self.lock.clone()) } - pub async fn read(&self) -> BookReader { - BookReader(self.0.read().await) + pub async fn read>( + &self, + label: C, + ) -> CountedTokioRwLockReadGuard<'_, T> { + self.registry.acquire_read(label, &self.lock).await } +} + +pub struct CountedTokioRwLockWriteGuard<'a, T> { + lock: TokioRwLockWriteGuard<'a, T>, + _tracker: LockTracker, +} + +impl<'a, T> Deref for CountedTokioRwLockWriteGuard<'a, T> { + type Target = TokioRwLockWriteGuard<'a, T>; - pub async fn write(&self) -> BookWriter { - BookWriter(self.0.write().await) + fn deref(&self) -> &Self::Target { + &self.lock } +} - pub fn blocking_write(&self) -> BookWriter { - BookWriter(self.0.blocking_write()) +impl<'a, T> DerefMut for CountedTokioRwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.lock } } -pub struct BookReader<'a>(TokioRwLockReadGuard<'a, BookedVersions>); +pub struct CountedOwnedTokioRwLockWriteGuard { + lock: OwnedTokioRwLockWriteGuard, + _tracker: LockTracker, +} -impl<'a> BookReader<'a> { - pub fn contains(&self, version: i64, seqs: Option<&RangeInclusive>) -> bool { - match seqs { - Some(check_seqs) => match self.0.get(&version) { - Some(known) => match known { - KnownDbVersion::Partial { seqs, .. } => { - check_seqs.clone().all(|seq| seqs.contains(&seq)) - } - KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, - }, - None => false, - }, - None => self.0.contains_key(&version), - } - } +impl Deref for CountedOwnedTokioRwLockWriteGuard { + type Target = OwnedTokioRwLockWriteGuard; - pub fn contains_all( - &self, - mut versions: RangeInclusive, - seqs: Option<&RangeInclusive>, - ) -> bool { - versions.all(|version| self.contains(version, seqs)) + fn deref(&self) -> &Self::Target { + &self.lock } +} - pub fn current_versions(&self) -> BTreeMap { - self.0 - .iter() - .filter_map(|(range, known)| { - if let KnownDbVersion::Current { db_version, .. } = known { - Some((*db_version, *range.start())) - } else { - None - } - }) - .collect() +impl DerefMut for CountedOwnedTokioRwLockWriteGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.lock } } -impl<'a> Deref for BookReader<'a> { - type Target = TokioRwLockReadGuard<'a, BookedVersions>; +pub struct CountedTokioRwLockReadGuard<'a, T> { + lock: TokioRwLockReadGuard<'a, T>, + _tracker: LockTracker, +} + +impl<'a, T> Deref for CountedTokioRwLockReadGuard<'a, T> { + type Target = TokioRwLockReadGuard<'a, T>; fn deref(&self) -> &Self::Target { - &self.0 + &self.lock + } +} + +impl<'a, T> DerefMut for CountedTokioRwLockReadGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.lock } } -pub struct BookWriter<'a>(TokioRwLockWriteGuard<'a, BookedVersions>); +type LockId = usize; -impl<'a> BookWriter<'a> { - pub fn insert(&mut self, version: i64, known_version: KnownDbVersion) { - self.insert_many(version..=version, known_version); +#[derive(Debug, Clone)] +pub struct LockMeta { + pub label: CompactString, + pub kind: LockKind, + pub state: LockState, + pub started_at: Instant, +} + +#[derive(Default, Clone)] +pub struct LockRegistry { + id_gen: Arc, + pub map: Arc>>, +} + +impl LockRegistry { + fn remove(&self, id: &LockId) { + self.map.write().remove(id); } - pub fn insert_many(&mut self, versions: RangeInclusive, known_version: KnownDbVersion) { - self.0.insert(versions, known_version); + async fn acquire_write<'a, T, C: Into>( + &self, + label: C, + lock: &'a TokioRwLock, + ) -> CountedTokioRwLockWriteGuard<'a, T> { + let id = self.gen_id(); + self.insert_lock( + id, + LockMeta { + label: label.into(), + kind: LockKind::Write, + state: LockState::Acquiring, + started_at: Instant::now(), + }, + ); + let _tracker = LockTracker { + id, + registry: self.clone(), + }; + let w = lock.write().await; + self.set_lock_state(&id, LockState::Locked); + CountedTokioRwLockWriteGuard { lock: w, _tracker } + } + + fn acquire_blocking_write<'a, T, C: Into>( + &self, + label: C, + lock: &'a TokioRwLock, + ) -> CountedTokioRwLockWriteGuard<'a, T> { + let id = self.gen_id(); + self.insert_lock( + id, + LockMeta { + label: label.into(), + kind: LockKind::Write, + state: LockState::Acquiring, + started_at: Instant::now(), + }, + ); + let _tracker = LockTracker { + id, + registry: self.clone(), + }; + let w = lock.blocking_write(); + self.set_lock_state(&id, LockState::Locked); + CountedTokioRwLockWriteGuard { lock: w, _tracker } } + fn acquire_blocking_write_owned>( + &self, + label: C, + lock: Arc>, + ) -> CountedOwnedTokioRwLockWriteGuard { + let id = self.gen_id(); + self.insert_lock( + id, + LockMeta { + label: label.into(), + kind: LockKind::Write, + state: LockState::Acquiring, + started_at: Instant::now(), + }, + ); + let _tracker = LockTracker { + id, + registry: self.clone(), + }; + let w = loop { + if let Ok(w) = lock.clone().try_write_owned() { + break w; + } + // don't instantly loop + std::thread::sleep(Duration::from_millis(1)); + }; + self.set_lock_state(&id, LockState::Locked); + CountedOwnedTokioRwLockWriteGuard { lock: w, _tracker } + } + + async fn acquire_read<'a, T, C: Into>( + &self, + label: C, + lock: &'a TokioRwLock, + ) -> CountedTokioRwLockReadGuard<'a, T> { + let id = self.gen_id(); + self.insert_lock( + id, + LockMeta { + label: label.into(), + kind: LockKind::Read, + state: LockState::Acquiring, + started_at: Instant::now(), + }, + ); + let _tracker = LockTracker { + id, + registry: self.clone(), + }; + let w = lock.read().await; + self.set_lock_state(&id, LockState::Locked); + CountedTokioRwLockReadGuard { lock: w, _tracker } + } + + fn set_lock_state(&self, id: &LockId, state: LockState) { + if let Some(meta) = self.map.write().get_mut(id) { + meta.state = state + } + } + + fn insert_lock(&self, id: LockId, meta: LockMeta) { + self.map.write().insert(id, meta); + } + + fn gen_id(&self) -> LockId { + self.id_gen.fetch_add(1, Ordering::Release) + 1 + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LockState { + Acquiring, + Locked, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LockKind { + Read, + Write, +} + +struct LockTracker { + id: LockId, + registry: LockRegistry, +} + +impl Drop for LockTracker { + fn drop(&mut self) { + self.registry.remove(&self.id) + } +} + +#[derive(Default)] +pub struct BookedVersions(pub RangeInclusiveMap); + +impl BookedVersions { pub fn contains(&self, version: i64, seqs: Option<&RangeInclusive>) -> bool { match seqs { Some(check_seqs) => match self.0.get(&version) { @@ -572,10 +753,6 @@ impl<'a> BookWriter<'a> { } } - pub fn contains_current(&self, version: &i64) -> bool { - matches!(self.0.get(version), Some(KnownDbVersion::Current { .. })) - } - pub fn contains_all( &self, mut versions: RangeInclusive, @@ -584,8 +761,8 @@ impl<'a> BookWriter<'a> { versions.all(|version| self.contains(version, seqs)) } - pub fn last(&self) -> Option { - self.0.iter().map(|(k, _v)| *k.end()).max() + pub fn contains_current(&self, version: &i64) -> bool { + matches!(self.0.get(version), Some(KnownDbVersion::Current { .. })) } pub fn current_versions(&self) -> BTreeMap { @@ -601,66 +778,134 @@ impl<'a> BookWriter<'a> { .collect() } - pub fn inner(&self) -> &TokioRwLockWriteGuard<'a, BookedVersions> { - &self.0 + pub fn last(&self) -> Option { + self.0.iter().map(|(k, _v)| *k.end()).max() + } + + pub fn insert(&mut self, version: i64, known_version: KnownDbVersion) { + self.insert_many(version..=version, known_version); } - pub fn inner_mut(&mut self) -> &mut TokioRwLockWriteGuard<'a, BookedVersions> { - &mut self.0 + pub fn insert_many(&mut self, versions: RangeInclusive, known_version: KnownDbVersion) { + self.0.insert(versions, known_version); } } -impl<'a> Deref for BookWriter<'a> { - type Target = TokioRwLockWriteGuard<'a, BookedVersions>; +impl Deref for BookedVersions { + type Target = RangeInclusiveMap; fn deref(&self) -> &Self::Target { &self.0 } } -pub type BookieInner = Arc>>; +pub type BookedInner = Arc>; -#[derive(Default, Clone)] -pub struct Bookie(BookieInner); +#[derive(Clone)] +pub struct Booked(BookedInner); -impl Bookie { - pub fn new(inner: BookieInner) -> Self { - Self(inner) +impl Booked { + fn new(versions: BookedVersions, registry: LockRegistry) -> Self { + Self(Arc::new(CountedTokioRwLock::new(registry, versions))) } - pub async fn for_actor(&self, actor_id: ActorId) -> Booked { - let mut w = self.0.write().await; - w.entry(actor_id).or_default().clone() + pub async fn read>( + &self, + label: L, + ) -> CountedTokioRwLockReadGuard<'_, BookedVersions> { + self.0.read(label).await } - pub fn for_actor_blocking(&self, actor_id: ActorId) -> Booked { - let mut w = self.0.blocking_write(); - w.entry(actor_id).or_default().clone() + pub async fn write>( + &self, + label: L, + ) -> CountedTokioRwLockWriteGuard<'_, BookedVersions> { + self.0.write(label).await } - pub async fn contains( + pub fn blocking_write>( &self, - actor_id: &ActorId, - mut versions: RangeInclusive, - seqs: Option<&RangeInclusive>, - ) -> bool { - if let Some(booked) = self.0.read().await.get(actor_id) { - let read = booked.read().await; - versions.all(|v| read.contains(v, seqs)) - } else { - false - } + label: L, + ) -> CountedTokioRwLockWriteGuard<'_, BookedVersions> { + self.0.blocking_write(label) } - pub async fn last(&self, actor_id: &ActorId) -> Option { - if let Some(booked) = self.0.read().await.get(actor_id) { - booked.last().await - } else { - None - } + pub fn blocking_write_owned>( + &self, + label: L, + ) -> CountedOwnedTokioRwLockWriteGuard { + self.0.blocking_write_owned(label) + } +} + +#[derive(Default)] +pub struct BookieInner { + map: HashMap, + registry: LockRegistry, +} + +impl BookieInner { + pub fn for_actor(&mut self, actor_id: ActorId) -> Booked { + self.map + .entry(actor_id) + .or_insert_with(|| { + Booked(Arc::new(CountedTokioRwLock::new( + self.registry.clone(), + Default::default(), + ))) + }) + .clone() } - pub async fn read(&self) -> TokioRwLockReadGuard> { - self.0.read().await + pub fn registry(&self) -> &LockRegistry { + &self.registry + } +} + +impl Deref for BookieInner { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.map + } +} + +#[derive(Clone)] +pub struct Bookie(Arc>); + +impl Bookie { + pub fn new(map: HashMap) -> Self { + let registry = LockRegistry::default(); + Self(Arc::new(CountedTokioRwLock::new( + registry.clone(), + BookieInner { + map: map + .into_iter() + .map(|(k, v)| (k, Booked::new(v, registry.clone()))) + .collect(), + registry, + }, + ))) + } + + pub async fn read>( + &self, + label: L, + ) -> CountedTokioRwLockReadGuard { + self.0.read(label).await + } + + pub async fn write>( + &self, + label: L, + ) -> CountedTokioRwLockWriteGuard { + self.0.write(label).await + } + + pub fn blocking_write>( + &self, + label: L, + ) -> CountedTokioRwLockWriteGuard { + self.0.blocking_write(label) } } diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index b7ea756b..83191e07 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -14,7 +14,7 @@ use rusqlite::{ ToSql, }; use serde::{Deserialize, Serialize}; -use speedy::{Readable, Writable}; +use speedy::{Context, Readable, Reader, Writable, Writer}; use time::OffsetDateTime; use tokio::sync::mpsc::Sender; use tracing::{error, trace}; @@ -63,8 +63,14 @@ pub enum BroadcastV1 { Change(ChangeV1), } +#[derive(Debug, Clone, Copy)] +pub enum ChangeSource { + Broadcast, + Sync, +} + // TODO: shrink this by mapping primary keys to integers instead of repeating them -#[derive(Debug, Clone, Readable, Writable)] +#[derive(Debug, Clone, PartialEq, Readable, Writable)] pub struct ChangeV1 { pub actor_id: ActorId, pub changeset: Changeset, @@ -78,7 +84,7 @@ impl Deref for ChangeV1 { } } -#[derive(Debug, Clone, Readable, Writable)] +#[derive(Debug, Clone, PartialEq, Readable, Writable)] pub enum Changeset { Empty { versions: RangeInclusive, @@ -201,33 +207,48 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive( - Debug, Default, Clone, Copy, Serialize, Deserialize, Readable, Writable, PartialEq, Eq, -)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd)] #[serde(transparent)] -pub struct Timestamp(pub u64); +pub struct Timestamp(pub NTP64); impl Timestamp { pub fn to_time(&self) -> OffsetDateTime { - let t = NTP64(self.0); - OffsetDateTime::from_unix_timestamp(t.as_secs() as i64).unwrap() - + time::Duration::nanoseconds(t.subsec_nanos() as i64) + OffsetDateTime::from_unix_timestamp(self.0.as_secs() as i64).unwrap() + + time::Duration::nanoseconds(self.0.subsec_nanos() as i64) } pub fn to_ntp64(&self) -> NTP64 { - NTP64(self.0) + self.0 + } + + pub fn zero() -> Self { + Timestamp(NTP64(0)) + } +} + +impl Deref for Timestamp { + type Target = NTP64; + + fn deref(&self) -> &Self::Target { + &self.0 } } impl fmt::Display for Timestamp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - NTP64(self.0).fmt(f) + self.0.fmt(f) } } impl From for Timestamp { fn from(ts: uhlc::Timestamp) -> Self { - Self(ts.get_time().as_u64()) + Self(*ts.get_time()) + } +} + +impl From for Timestamp { + fn from(ntp64: NTP64) -> Self { + Self(ntp64) } } @@ -236,7 +257,7 @@ impl FromSql for Timestamp { match value { rusqlite::types::ValueRef::Text(b) => match std::str::from_utf8(b) { Ok(s) => match s.parse::() { - Ok(ntp) => Ok(Timestamp(ntp.as_u64())), + Ok(ntp) => Ok(Timestamp(ntp)), Err(e) => Err(FromSqlError::Other(Box::new(TimestampParseError::Parse(e)))), }, Err(e) => Err(FromSqlError::Other(Box::new(e))), @@ -249,11 +270,41 @@ impl FromSql for Timestamp { impl ToSql for Timestamp { fn to_sql(&self) -> rusqlite::Result> { Ok(rusqlite::types::ToSqlOutput::Owned( - rusqlite::types::Value::Text(NTP64(self.0).to_string()), + rusqlite::types::Value::Text(self.0.to_string()), )) } } +impl<'a, C> Readable<'a, C> for Timestamp +where + C: Context, +{ + #[inline] + fn read_from>(reader: &mut R) -> Result { + Ok(Timestamp(NTP64(u64::read_from(reader)?))) + } + + #[inline] + fn minimum_bytes_needed() -> usize { + std::mem::size_of::() + } +} + +impl Writable for Timestamp +where + C: Context, +{ + #[inline] + fn write_to>(&self, writer: &mut T) -> Result<(), C::Error> { + self.0 .0.write_to(writer) + } + + #[inline] + fn bytes_needed(&self) -> Result { + >::bytes_needed(&self.0 .0) + } +} + #[derive(Debug, thiserror::Error)] pub enum BroadcastEncodeError { #[error(transparent)] diff --git a/crates/corro-types/src/members.rs b/crates/corro-types/src/members.rs index 6c542145..908070f9 100644 --- a/crates/corro-types/src/members.rs +++ b/crates/corro-types/src/members.rs @@ -1,25 +1,27 @@ use std::{collections::BTreeMap, net::SocketAddr, ops::Range, time::Duration}; use circular_buffer::CircularBuffer; -use tracing::trace; +use tracing::{debug, trace}; -use crate::actor::{Actor, ActorId}; +use crate::{ + actor::{Actor, ActorId}, + broadcast::Timestamp, +}; #[derive(Clone, Debug)] pub struct MemberState { pub addr: SocketAddr, + pub ts: Timestamp, pub ring: Option, - - counter: u8, } impl MemberState { - pub fn new(addr: SocketAddr) -> Self { + pub fn new(addr: SocketAddr, ts: Timestamp) -> Self { Self { addr, + ts, ring: None, - counter: 0, } } @@ -52,35 +54,40 @@ impl Members { let member = self .states .entry(actor_id) - .or_insert_with(|| MemberState::new(actor.addr())); - - member.addr = actor.addr(); + .or_insert_with(|| MemberState::new(actor.addr(), actor.ts())); trace!("member: {member:?}"); - member.counter += 1; - let inserted = member.counter == 1; + if actor.ts().to_time() < member.ts.to_time() { + debug!("older timestamp, ignoring"); + return false; + } + + let newer = actor.ts() > member.ts; + + if newer { + member.addr = actor.addr(); + member.ts = actor.ts(); - if inserted { self.by_addr.insert(actor.addr(), actor.id()); self.recalculate_rings(actor.addr()); } - inserted + newer } // A result of `true` means that the effective list of // cluster member addresses has changed pub fn remove_member(&mut self, actor: &Actor) -> bool { - let effectively_down = if let Some(member) = self.states.get_mut(&actor.id()) { - member.counter -= 1; - member.counter == 0 + let effectively_down = if let Some(member) = self.states.get(&actor.id()) { + member.ts == actor.ts() } else { // Shouldn't happen false }; if effectively_down { + self.by_addr.remove(&actor.addr()); self.states.remove(&actor.id()); } diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index 8954568f..204339bd 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use bytes::Buf; +use bytes::{Buf, BufMut}; use compact_str::{CompactString, ToCompactString}; use corro_api_types::{Change, ChangeId, ColumnType, RowId, SqliteValue, SqliteValueRef}; use enquote::unquote; @@ -64,6 +64,96 @@ pub fn normalize_sql(sql: &str) -> Result { Ok(Cmd::Stmt(stmt).to_string()) } +#[derive(Debug, thiserror::Error)] +pub enum PackError { + #[error("abort")] + Abort, +} + +pub fn pack_columns(args: &[SqliteValue]) -> Result, PackError> { + let mut buf = vec![]; + /* + * Format: + * [num_columns:u8,...[(type(0-3),num_bytes?(3-7)):u8, length?:i32, ...bytes:u8[]]] + * + * The byte used for column type also encodes the number of bytes used for the integer. + * e.g.: (type(0-3),num_bytes?(3-7)):u8 + * first 3 bits are type + * last 5 encode how long the following integer, if there is a following integer, is. 1, 2, 3, ... 8 bytes. + * + * Not packing an integer into the minimal number of bytes required is rather wasteful. + * E.g., the number `0` would take 8 bytes rather than 1 byte. + */ + let len_result: Result = args.len().try_into(); + if let Ok(len) = len_result { + buf.put_u8(len); + for value in args { + match value { + SqliteValue::Null => { + buf.put_u8(ColumnType::Null as u8); + } + SqliteValue::Integer(val) => { + let num_bytes_for_int = num_bytes_needed_i64(*val); + let type_byte = num_bytes_for_int << 3 | (ColumnType::Integer as u8); + buf.put_u8(type_byte); + buf.put_int(*val, num_bytes_for_int as usize); + } + SqliteValue::Real(v) => { + buf.put_u8(ColumnType::Float as u8); + buf.put_f64(v.0); + } + SqliteValue::Text(value) => { + let len = value.len() as i32; + let num_bytes_for_len = num_bytes_needed_i32(len); + let type_byte = num_bytes_for_len << 3 | (ColumnType::Text as u8); + buf.put_u8(type_byte); + buf.put_int(len as i64, num_bytes_for_len as usize); + buf.put_slice(value.as_bytes()); + } + SqliteValue::Blob(value) => { + let len = value.len() as i32; + let num_bytes_for_len = num_bytes_needed_i32(len); + let type_byte = num_bytes_for_len << 3 | (ColumnType::Blob as u8); + buf.put_u8(type_byte); + buf.put_int(len as i64, num_bytes_for_len as usize); + buf.put_slice(value); + } + } + } + Ok(buf) + } else { + Err(PackError::Abort) + } +} + +fn num_bytes_needed_i64(val: i64) -> u8 { + if val & 0xFF00000000000000u64 as i64 != 0 { + 8 + } else if val & 0x00FF000000000000 != 0 { + 7 + } else if val & 0x0000FF0000000000 != 0 { + 6 + } else if val & 0x000000FF00000000 != 0 { + 5 + } else { + num_bytes_needed_i32(val as i32) + } +} + +fn num_bytes_needed_i32(val: i32) -> u8 { + if val & 0xFF000000u32 as i32 != 0 { + 4 + } else if val & 0x00FF0000 != 0 { + 3 + } else if val & 0x0000FF00 != 0 { + 2 + } else if val * 0x000000FF != 0 { + 1 + } else { + 0 + } +} + #[derive(Debug, thiserror::Error)] pub enum UnpackError { #[error("abort")] @@ -1371,7 +1461,7 @@ mod tests { use rusqlite::params; use crate::{ - schema::{make_schema_inner, parse_sql}, + schema::{apply_schema, parse_sql}, sqlite::{setup_conn, CrConn}, }; @@ -1409,7 +1499,7 @@ mod tests { { let tx = conn.transaction()?; - make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema)?; + apply_schema(&tx, &NormalizedSchema::default(), &mut schema)?; tx.commit()?; } @@ -1541,7 +1631,7 @@ mod tests { { let tx = conn.transaction().unwrap(); - make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); + apply_schema(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); tx.commit().unwrap(); } @@ -1583,7 +1673,7 @@ mod tests { { let tx = conn2.transaction().unwrap(); - make_schema_inner(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); + apply_schema(&tx, &NormalizedSchema::default(), &mut schema).unwrap(); tx.commit().unwrap(); } diff --git a/crates/corro-types/src/schema.rs b/crates/corro-types/src/schema.rs index c2affb69..5b4745ba 100644 --- a/crates/corro-types/src/schema.rs +++ b/crates/corro-types/src/schema.rs @@ -178,7 +178,7 @@ pub fn init_schema(conn: &Connection) -> Result { } #[allow(clippy::result_large_err)] -pub fn make_schema_inner( +pub fn apply_schema( tx: &Transaction, schema: &NormalizedSchema, new_schema: &mut NormalizedSchema, diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 3b3be99d..de1dafb1 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -11,19 +11,19 @@ use crate::{ broadcast::{ChangeV1, Timestamp}, }; -#[derive(Debug, Clone, Readable, Writable)] +#[derive(Debug, Clone, PartialEq, Readable, Writable)] pub enum SyncMessage { V1(SyncMessageV1), } -#[derive(Debug, Clone, Readable, Writable)] +#[derive(Debug, Clone, PartialEq, Readable, Writable)] pub enum SyncMessageV1 { State(SyncStateV1), Changeset(ChangeV1), Clock(Timestamp), } -#[derive(Debug, Default, Clone, Readable, Writable, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, PartialEq, Readable, Writable, Serialize, Deserialize)] pub struct SyncStateV1 { pub actor_id: ActorId, pub heads: HashMap, @@ -83,7 +83,7 @@ pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { let actors: Vec<(ActorId, Booked)> = { bookie - .read() + .read("generate_sync") .await .iter() .map(|(k, v)| (*k, v.clone())) @@ -91,19 +91,32 @@ pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { }; for (actor_id, booked) in actors { - let last_version = match booked.last().await { + let last_version = match { + booked + .read(format!("generate_sync(last):{}", actor_id.as_simple())) + .await + .last() + } { Some(v) => v, None => continue, }; - let need: Vec<_> = { booked.read().await.gaps(&(1..=last_version)).collect() }; + let need: Vec<_> = { + booked + .read(format!("generate_sync(need):{}", actor_id.as_simple())) + .await + .gaps(&(1..=last_version)) + .collect() + }; if !need.is_empty() { state.need.insert(actor_id, need); } { - let read = booked.read().await; + let read = booked + .read(format!("generate_sync(partials):{}", actor_id.as_simple())) + .await; for (range, known) in read.iter() { if let KnownDbVersion::Partial { seqs, last_seq, .. } = known { if seqs.gaps(&(0..=*last_seq)).count() == 0 { diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index b2e18522..580a2543 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -320,6 +320,11 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { )) .await?; } + Command::Locks { top } => { + let mut conn = AdminConn::connect(cli.admin_path()).await?; + conn.send_command(corro_admin::Command::Locks { top: *top }) + .await?; + } Command::Template { template, flags } => { command::tpl::run(cli.api_addr()?, template, flags).await?; } @@ -427,7 +432,9 @@ enum Command { Agent, /// Backup the Corrosion DB - Backup { path: String }, + Backup { + path: String, + }, /// Restore the Corrosion DB from a backup Restore { @@ -468,6 +475,10 @@ enum Command { #[command(subcommand)] Sync(SyncCommand), + Locks { + top: usize, + }, + Template { template: Vec, #[command(flatten)]