diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index 1552b849..0d35817e 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -319,14 +319,16 @@ async fn handle_conn( } Command::Sync(SyncCommand::ReconcileGaps) => { let actor_ids: Vec<_> = { - let r = bookie.read("admin sync reconcile gaps").await; + let r = bookie + .read::<&str, _>("admin sync reconcile gaps", None) + .await; r.keys().copied().collect() }; for actor_id in actor_ids { { let booked = bookie - .read(format!("admin sync reconcile gaps get actor {actor_id}")) + .read("admin sync reconcile gaps get actor", actor_id.as_simple()) .await .get(&actor_id) .unwrap() @@ -334,7 +336,7 @@ async fn handle_conn( let mut conn = agent.pool().write_low().await.unwrap(); let mut bv = booked - .write("admin sync reconcile gaps booked versions") + .write::<&str, _>("admin sync reconcile gaps booked versions", None) .await; if let Err(e) = collapse_gaps(&mut stream, &mut conn, &mut bv).await { @@ -480,7 +482,7 @@ async fn handle_conn( } Command::Actor(ActorCommand::Version { actor_id, version }) => { let json: Result = { - let bookie = bookie.read("admin actor version").await; + let bookie = bookie.read::<&str, _>("admin actor version", None).await; let booked = match bookie.get(&actor_id) { Some(booked) => booked, None => { @@ -489,7 +491,9 @@ async fn handle_conn( continue; } }; - let booked_read = booked.read("admin actor version booked").await; + let booked_read = booked + .read::<&str, _>("admin actor version booked", None) + .await; if booked_read.contains_version(&version) { match booked_read.get_partial(&version) { Some(partial) => { diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index ca3eff7f..7e5adfa9 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -576,14 +576,12 @@ pub async fn handle_emptyset( while !changes.is_empty() { let change = changes.pop_front().unwrap(); if let Some(booked) = bookie - .read(format!("process_emptyset(check ts):{actor}")) + .read("process_emptyset(check ts)",actor.as_simple()) .await .get(actor) { let booked_read = booked - .read(format!( - "process_emptyset(booked writer, ts timestamp):{actor}" - )) + .read("process_emptyset(booked writer, ts timestamp)", actor.as_simple()) .await; if let Some(seen_ts) = booked_read.last_cleared_ts() { @@ -629,17 +627,19 @@ pub async fn process_emptyset( debug!("processing emptyset from {:?}", actor_id); let booked = { bookie - .write(format!( - "process_emptyset(booked writer, updates timestamp):{actor_id}", - )) + .write( + "process_emptyset(booked writer, updates timestamp)", + actor_id.as_simple(), + ) .await .ensure(actor_id) }; let mut booked_write = booked - .write(format!( - "process_emptyset(booked writer, updates timestamp):{actor_id}" - )) + .write( + "process_emptyset(booked writer, updates timestamp)", + actor_id.as_simple(), + ) .await; let mut snap = booked_write.snapshot(); @@ -682,17 +682,19 @@ pub async fn process_emptyset( let mut conn = agent.pool().write_low().await?; let booked = { bookie - .write(format!( - "process_emptyset(booked writer, updates timestamp):{actor_id}", - )) + .write( + "process_emptyset(booked writer, updates timestamp)", + actor_id.as_simple(), + ) .await .ensure(actor_id) }; let mut booked_write = booked - .write(format!( - "process_emptyset(booked writer, updates timestamp):{actor_id}" - )) + .write( + "process_emptyset(booked writer, updates timestamp)", + actor_id.as_simple(), + ) .await; let tx = conn @@ -894,10 +896,7 @@ pub async fn handle_changes( let booked = { bookie - .read(format!( - "handle_change(get):{}", - change.actor_id.as_simple() - )) + .read("handle_change(get)", change.actor_id.as_simple()) .await .get(&change.actor_id) .cloned() @@ -905,10 +904,7 @@ pub async fn handle_changes( if let Some(booked) = booked { if booked - .read(format!( - "handle_change(contains?):{}", - change.actor_id.as_simple() - )) + .read("handle_change(contains?)", change.actor_id.as_simple()) .await .contains_all(change.versions(), change.seqs()) { @@ -1183,8 +1179,12 @@ mod tests { sleep(Duration::from_secs(2)).await; - let bookie = bookie.read("read booked").await; - let booked = bookie.get(&other_actor).unwrap().read("test").await; + let bookie = bookie.read::<&str, _>("read booked", None).await; + let booked = bookie + .get(&other_actor) + .unwrap() + .read::<&str, _>("test", None) + .await; assert!(booked.contains_all(Version(6)..=Version(10), None)); assert!(booked.contains_all(Version(1)..=Version(3), None)); assert!(!booked.contains_version(&Version(5))); diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index c6b9730e..6805f286 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -123,7 +123,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul let bookie = Bookie::new_with_registry(Default::default(), lock_registry); { - let mut w = bookie.write("init").await; + let mut w = bookie.write::<&str, _>("init", None).await; w.insert(agent.actor_id(), agent.booked().clone()); } @@ -180,7 +180,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul } bookie - .write("replace_actor") + .write::<&str, _>("replace_actor", None) .await .replace_actor(actor_id, bv); } diff --git a/crates/corro-agent/src/agent/setup.rs b/crates/corro-agent/src/agent/setup.rs index 4fa10a7c..4195a0af 100644 --- a/crates/corro-agent/src/agent/setup.rs +++ b/crates/corro-agent/src/agent/setup.rs @@ -4,6 +4,7 @@ use arc_swap::ArcSwap; use camino::Utf8PathBuf; use indexmap::IndexMap; +use metrics::counter; use parking_lot::RwLock; use rusqlite::{Connection, OptionalExtension}; use std::{ @@ -164,7 +165,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age tokio::spawn({ let pool = pool.clone(); // acquiring the lock here means everything will have to wait for it to be ready - let mut booked = booked.write_owned("init").await; + let mut booked = booked.write_owned::<&str, _>("init", None).await; async move { let conn = pool.read().await?; *booked.deref_mut().deref_mut() = @@ -196,7 +197,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age if top .values() - .any(|meta| meta.started_at.elapsed() > WARNING_THRESHOLD) + .any(|meta| meta.started_at.elapsed() >= WARNING_THRESHOLD) { warn!( "lock registry shows locks held for a long time! top {} locks:", @@ -209,6 +210,11 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age "{} (id: {id}, type: {:?}, state: {:?}) locked for: {duration:?}", lock.label, lock.kind, lock.state ); + + if duration >= WARNING_THRESHOLD { + counter!("corro.agent.lock.slow.count", "name" => lock.label) + .increment(1); + } } } } diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index 4625bda7..1f226483 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -487,10 +487,10 @@ pub async fn configurable_stress_test( debug!( "last version: {:?}", ta.bookie - .write("test") + .write::<&str, _>("test", None) .await .ensure(ta.agent.actor_id()) - .read("test") + .read::<&str, _>("test", None) .await .last() ); @@ -726,11 +726,11 @@ async fn large_tx_sync() -> eyre::Result<()> { println!( "{name}: bookie: {:?}", ta.bookie - .read("test") + .read::<&str, _>("test", None) .await .get(&ta1.agent.actor_id()) .unwrap() - .read("test") + .read::<&str, _>("test", None) .await .deref() ); @@ -874,15 +874,15 @@ async fn test_clear_empty_versions() -> eyre::Result<()> { let ta1_cleared = ta1 .agent .booked() - .read("test_clear_empty") + .read::<&str, _>("test_clear_empty", None) .await .last_cleared_ts(); let ta2_ta1_cleared = ta2 .bookie - .write("test") + .write::<&str, _>("test", None) .await .ensure(ta1.agent.actor_id()) - .read("test_clear_empty") + .read::<&str, _>("test_clear_empty", None) .await .last_cleared_ts(); @@ -1068,8 +1068,12 @@ async fn check_bookie_versions( cleared: Vec>, ) -> eyre::Result<()> { let conn = ta.agent.pool().read().await?; - let booked = ta.bookie.write("test").await.ensure(actor_id); - let bookedv = booked.read("test").await; + let booked = ta + .bookie + .write::<&str, _>("test", None) + .await + .ensure(actor_id); + let bookedv = booked.read::<&str, _>("test", None).await; for versions in complete { for version in versions.clone() { diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index fd06cebe..609b56d9 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -513,19 +513,16 @@ pub async fn process_fully_buffered_changes( let booked = { bookie - .write(format!( - "process_fully_buffered(ensure):{}", - actor_id.as_simple() - )) + .write("process_fully_buffered(ensure)", actor_id.as_simple()) .await .ensure(actor_id) }; let mut bookedw = booked - .write(format!( - "process_fully_buffered(booked writer):{}", - actor_id.as_simple() - )) + .write( + "process_fully_buffered(booked writer)", + actor_id.as_simple(), + ) .await; debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes"); @@ -675,7 +672,7 @@ pub async fn process_fully_buffered_changes( let mut agent_booked = { agent .booked() - .blocking_write("process_fully_buffered_changes(get snapshot)") + .blocking_write::<&str, _>("process_fully_buffered_changes(get snapshot)", None) }; let mut agent_snap = agent_booked.snapshot(); @@ -740,18 +737,18 @@ pub async fn process_multiple_changes( let booked_writer = { bookie - .write(format!( - "process_multiple_changes(ensure):{}", - change.actor_id.as_simple() - )) + .write( + "process_multiple_changes(ensure)", + change.actor_id.as_simple(), + ) .await .ensure(change.actor_id) }; if booked_writer - .read(format!( - "process_multiple_changes(contains?):{}", - change.actor_id.as_simple() - )) + .read( + "process_multiple_changes(contains_all?)", + change.actor_id.as_simple(), + ) .await .contains_all(change.versions(), change.seqs()) { @@ -786,16 +783,16 @@ pub async fn process_multiple_changes( for (actor_id, changes) in unknown_changes { let booked = { bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) + .blocking_write( + "process_multiple_changes(for_actor_blocking)", + actor_id.as_simple(), + ) .ensure(actor_id) }; - let booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, unknown changes):{}", - actor_id.as_simple() - )); + let booked_write = booked.blocking_write( + "process_multiple_changes(booked writer, unknown changes)", + actor_id.as_simple(), + ); let mut seen = RangeInclusiveMap::new(); @@ -911,9 +908,10 @@ pub async fn process_multiple_changes( let booked = { bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{actor_id}", - )) + .blocking_write( + "process_multiple_changes(for_actor_blocking)", + actor_id.as_simple(), + ) .ensure(*actor_id) }; @@ -921,9 +919,10 @@ pub async fn process_multiple_changes( let mut snap = match snapshots.remove(actor_id) { Some(snap) => snap, None => { - let booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, during knowns):{actor_id}", - )); + let booked_write = booked.blocking_write( + "process_multiple_changes(booked writer, during knowns)", + actor_id.as_simple(), + ); booked_write.snapshot() } }; @@ -965,7 +964,10 @@ pub async fn process_multiple_changes( let mut snap = { agent .booked() - .blocking_write("process_multiple_changes(update_cleared_ts snapshot)") + .blocking_write::<&str, _>( + "process_multiple_changes(update_cleared_ts snapshot)", + None, + ) .snapshot() }; @@ -988,7 +990,7 @@ pub async fn process_multiple_changes( if let Some(ts) = last_cleared { let mut booked_writer = agent .booked() - .blocking_write("process_multiple_changes(update_cleared_ts)"); + .blocking_write::<&str, _>("process_multiple_changes(update_cleared_ts)", None); booked_writer.update_cleared_ts(ts); } @@ -1004,14 +1006,16 @@ pub async fn process_multiple_changes( for (actor_id, knowns) in knowns { let booked = { bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{actor_id}", - )) + .blocking_write( + "process_multiple_changes(for_actor_blocking)", + actor_id.as_simple(), + ) .ensure(actor_id) }; - let mut booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, before apply needed):{actor_id}", - )); + let mut booked_write = booked.blocking_write( + "process_multiple_changes(booked writer, before apply needed)", + actor_id.as_simple(), + ); if let Some(snap) = snapshots.remove(&actor_id) { booked_write.commit_snapshot(snap); diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index f8292bc6..067edbab 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -846,7 +846,7 @@ async fn process_sync( let last_ts = agent .clone() .booked() - .read("process_sync(read_cleared_ts))") + .read::<&str, _>("process_sync(read_cleared_ts))", None) .await .last_cleared_ts(); loop { @@ -882,7 +882,7 @@ async fn process_sync( for (actor_id, needs) in agg { let booked = bookie - .read("process_sync get actor") + .read::<&str, _>("process_sync get actor", None) .await .get(&actor_id) .cloned(); @@ -890,7 +890,9 @@ async fn process_sync( Some(b) => b, None => continue, }; - let booked_read = booked.read("process_sync check needs").await; + let booked_read = booked + .read::<&str, _>("process_sync check needs", None) + .await; for need in needs { match &need { @@ -1888,10 +1890,15 @@ mod tests { ) .await?; - let booked = bookie.read("test").await.get(&actor_id).cloned().unwrap(); + let booked = bookie + .read::<&str, _>("test", None) + .await + .get(&actor_id) + .cloned() + .unwrap(); { - let read = booked.read("test").await; + let read = booked.read::<&str, _>("test", None).await; assert!(read.contains_version(&Version(1))); assert!(read.contains_version(&Version(2))); diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index af1b72fa..4363f084 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -49,7 +49,7 @@ where // so it probably doesn't matter too much, except for reads of internal state let mut book_writer = agent .booked() - .write("make_broadcastable_changes(booked writer)") + .write::<&str, _>("make_broadcastable_changes(booked writer)", None) .await; let start = Instant::now(); @@ -681,7 +681,10 @@ mod tests { })) )); - assert_eq!(agent.booked().read("test").await.last(), Some(Version(1))); + assert_eq!( + agent.booked().read::<&str, _>("test", None).await.last(), + Some(Version(1)) + ); println!("second req..."); diff --git a/crates/corro-pg/src/lib.rs b/crates/corro-pg/src/lib.rs index 68c225be..8d38ab65 100644 --- a/crates/corro-pg/src/lib.rs +++ b/crates/corro-pg/src/lib.rs @@ -2089,7 +2089,7 @@ impl Session { let mut book_writer = self .agent .booked() - .blocking_write("handle_write_tx(book_writer)"); + .blocking_write::<&str, _>("handle_write_tx(book_writer)", None); let actor_id = self.agent.actor_id(); diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index e199e53a..4aeb1175 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -1,7 +1,7 @@ use std::{ cmp, collections::{btree_map, BTreeMap, HashMap, HashSet}, - io, + fmt, io, net::SocketAddr, ops::{Deref, DerefMut, RangeInclusive}, path::{Path, PathBuf}, @@ -14,7 +14,7 @@ use std::{ use arc_swap::ArcSwap; use camino::Utf8PathBuf; -use compact_str::CompactString; +use compact_str::{CompactString, ToCompactString}; use indexmap::IndexMap; use metrics::{gauge, histogram}; use parking_lot::RwLock; @@ -774,55 +774,63 @@ impl CountedTokioRwLock { } } - #[tracing::instrument(skip(self, label), level = "debug")] - pub async fn write>( + #[tracing::instrument(skip_all, level = "debug")] + pub async fn write>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard<'_, T> { - self.registry.acquire_write(label, &self.lock).await + self.registry.acquire_write(label, extra, &self.lock).await } - #[tracing::instrument(skip(self, label), level = "debug")] - pub async fn write_owned>( + #[tracing::instrument(skip_all, level = "debug")] + pub async fn write_owned>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedOwnedTokioRwLockWriteGuard { self.registry - .acquire_write_owned(label, self.lock.clone()) + .acquire_write_owned(label, extra, self.lock.clone()) .await } - #[tracing::instrument(skip(self, label), level = "debug")] - pub fn blocking_write>( + #[tracing::instrument(skip_all, level = "debug")] + pub fn blocking_write>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard<'_, T> { - self.registry.acquire_blocking_write(label, &self.lock) + self.registry + .acquire_blocking_write(label, extra, &self.lock) } - #[tracing::instrument(skip(self, label), level = "debug")] - pub fn blocking_write_owned>( + #[tracing::instrument(skip_all, level = "debug")] + pub fn blocking_write_owned>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedOwnedTokioRwLockWriteGuard { self.registry - .acquire_blocking_write_owned(label, self.lock.clone()) + .acquire_blocking_write_owned(label, extra, self.lock.clone()) } - #[tracing::instrument(skip(self, label), level = "debug")] - pub fn blocking_read>( + #[tracing::instrument(skip_all, level = "debug")] + pub fn blocking_read>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedTokioRwLockReadGuard<'_, T> { - self.registry.acquire_blocking_read(label, &self.lock) + self.registry + .acquire_blocking_read(label, extra, &self.lock) } - #[tracing::instrument(skip(self, label), level = "debug")] - pub async fn read>( + #[tracing::instrument(skip_all, level = "debug")] + pub async fn read>>( &self, - label: C, + label: &'static str, + extra: E, ) -> CountedTokioRwLockReadGuard<'_, T> { - self.registry.acquire_read(label, &self.lock).await + self.registry.acquire_read(label, extra, &self.lock).await } pub fn registry(&self) -> &LockRegistry { @@ -891,7 +899,8 @@ type LockId = usize; #[derive(Debug, Clone)] pub struct LockMeta { - pub label: CompactString, + pub label: &'static str, + pub extra: Option, pub kind: LockKind, pub state: LockState, pub started_at: Instant, @@ -908,16 +917,18 @@ impl LockRegistry { self.map.write().remove(id); } - async fn acquire_write<'a, T, C: Into>( + async fn acquire_write<'a, T, C: fmt::Display, E: Into>>( &self, - label: C, + label: &'static str, + extra: E, lock: &'a TokioRwLock, ) -> CountedTokioRwLockWriteGuard<'a, T> { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra.into().map(|d| d.to_compact_string()), kind: LockKind::Write, state: LockState::Acquiring, started_at: Instant::now(), @@ -932,16 +943,18 @@ impl LockRegistry { CountedTokioRwLockWriteGuard { lock: w, _tracker } } - async fn acquire_write_owned>( + async fn acquire_write_owned>>( &self, - label: C, + label: &'static str, + extra: E, lock: Arc>, ) -> CountedOwnedTokioRwLockWriteGuard { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra.into().map(|d| d.to_compact_string()), kind: LockKind::Write, state: LockState::Acquiring, started_at: Instant::now(), @@ -956,16 +969,18 @@ impl LockRegistry { CountedOwnedTokioRwLockWriteGuard { lock: w, _tracker } } - fn acquire_blocking_write<'a, T, C: Into>( + fn acquire_blocking_write<'a, T, C: fmt::Display, E: Into>>( &self, - label: C, + label: &'static str, + extra: E, lock: &'a TokioRwLock, ) -> CountedTokioRwLockWriteGuard<'a, T> { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra.into().map(|d| d.to_compact_string()), kind: LockKind::Write, state: LockState::Acquiring, started_at: Instant::now(), @@ -980,16 +995,18 @@ impl LockRegistry { CountedTokioRwLockWriteGuard { lock: w, _tracker } } - fn acquire_blocking_write_owned>( + fn acquire_blocking_write_owned>>( &self, - label: C, + label: &'static str, + extra: E, lock: Arc>, ) -> CountedOwnedTokioRwLockWriteGuard { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra.into().map(|d| d.to_compact_string()), kind: LockKind::Write, state: LockState::Acquiring, started_at: Instant::now(), @@ -1010,16 +1027,21 @@ impl LockRegistry { CountedOwnedTokioRwLockWriteGuard { lock: w, _tracker } } - async fn acquire_read<'a, T, C: Into>( + async fn acquire_read<'a, T, C: fmt::Display, E: Into>>( &self, - label: C, + label: &'static str, + extra: E, lock: &'a TokioRwLock, ) -> CountedTokioRwLockReadGuard<'a, T> { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra + .into() + .map(|d| d.to_compact_string()) + .map(|d| d.to_compact_string()), kind: LockKind::Read, state: LockState::Acquiring, started_at: Instant::now(), @@ -1034,16 +1056,18 @@ impl LockRegistry { CountedTokioRwLockReadGuard { lock: w, _tracker } } - fn acquire_blocking_read<'a, T, C: Into>( + fn acquire_blocking_read<'a, T, C: fmt::Display, E: Into>>( &self, - label: C, + label: &'static str, + extra: E, lock: &'a TokioRwLock, ) -> CountedTokioRwLockReadGuard<'a, T> { let id = self.gen_id(); self.insert_lock( id, LockMeta { - label: label.into(), + label, + extra: extra.into().map(|d| d.to_compact_string()), kind: LockKind::Read, state: LockState::Acquiring, started_at: Instant::now(), @@ -1523,46 +1547,52 @@ impl Booked { Self(Arc::new(CountedTokioRwLock::new(registry, versions))) } - pub async fn read>( + pub async fn read>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockReadGuard<'_, BookedVersions> { - self.0.read(label).await + self.0.read(label, extra).await } - pub async fn write>( + pub async fn write>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard<'_, BookedVersions> { - self.0.write(label).await + self.0.write(label, extra).await } - pub async fn write_owned>( + pub async fn write_owned>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedOwnedTokioRwLockWriteGuard { - self.0.write_owned(label).await + self.0.write_owned(label, extra).await } - pub fn blocking_write>( + pub fn blocking_write>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard<'_, BookedVersions> { - self.0.blocking_write(label) + self.0.blocking_write(label, extra) } - pub fn blocking_read>( + pub fn blocking_read>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockReadGuard<'_, BookedVersions> { - self.0.blocking_read(label) + self.0.blocking_read(label, extra) } - pub fn blocking_write_owned>( + pub fn blocking_write_owned>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedOwnedTokioRwLockWriteGuard { - self.0.blocking_write_owned(label) + self.0.blocking_write_owned(label, extra) } } @@ -1632,25 +1662,28 @@ impl Bookie { ))) } - pub async fn read>( + pub async fn read>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockReadGuard { - self.0.read(label).await + self.0.read(label, extra).await } - pub async fn write>( + pub async fn write>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard { - self.0.write(label).await + self.0.write(label, extra).await } - pub fn blocking_write>( + pub fn blocking_write>>( &self, - label: L, + label: &'static str, + extra: E, ) -> CountedTokioRwLockWriteGuard { - self.0.blocking_write(label) + self.0.blocking_write(label, extra) } pub fn registry(&self) -> &LockRegistry { diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 5aa546f3..d96f899a 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -289,7 +289,7 @@ pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncState let actors: Vec<(ActorId, Booked)> = { bookie - .read("generate_sync") + .read::<&str, _>("generate_sync", None) .await .iter() .map(|(k, v)| (*k, v.clone())) @@ -299,9 +299,7 @@ pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncState let mut last_ts = None; for (actor_id, booked) in actors { - let bookedr = booked - .read(format!("generate_sync:{}", actor_id.as_simple())) - .await; + let bookedr = booked.read("generate_sync", actor_id.as_simple()).await; let last_version = match { bookedr.last() } { None => continue,