Skip to content

Commit

Permalink
refactor lock metadata tracking to allocate less and be more metrics-…
Browse files Browse the repository at this point in the history
…friendly, but it means a lot of small changes
  • Loading branch information
jeromegn committed Nov 28, 2024
1 parent 3edb515 commit 4309f7f
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 167 deletions.
14 changes: 9 additions & 5 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,24 @@ async fn handle_conn(
}
Command::Sync(SyncCommand::ReconcileGaps) => {
let actor_ids: Vec<_> = {
let r = bookie.read("admin sync reconcile gaps").await;
let r = bookie
.read::<&str, _>("admin sync reconcile gaps", None)
.await;
r.keys().copied().collect()
};

for actor_id in actor_ids {
{
let booked = bookie
.read(format!("admin sync reconcile gaps get actor {actor_id}"))
.read("admin sync reconcile gaps get actor", actor_id.as_simple())
.await
.get(&actor_id)
.unwrap()
.clone();

let mut conn = agent.pool().write_low().await.unwrap();
let mut bv = booked
.write("admin sync reconcile gaps booked versions")
.write::<&str, _>("admin sync reconcile gaps booked versions", None)
.await;

if let Err(e) = collapse_gaps(&mut stream, &mut conn, &mut bv).await {
Expand Down Expand Up @@ -480,7 +482,7 @@ async fn handle_conn(
}
Command::Actor(ActorCommand::Version { actor_id, version }) => {
let json: Result<serde_json::Value, rusqlite::Error> = {
let bookie = bookie.read("admin actor version").await;
let bookie = bookie.read::<&str, _>("admin actor version", None).await;
let booked = match bookie.get(&actor_id) {
Some(booked) => booked,
None => {
Expand All @@ -489,7 +491,9 @@ async fn handle_conn(
continue;
}
};
let booked_read = booked.read("admin actor version booked").await;
let booked_read = booked
.read::<&str, _>("admin actor version booked", None)
.await;
if booked_read.contains_version(&version) {
match booked_read.get_partial(&version) {
Some(partial) => {
Expand Down
52 changes: 26 additions & 26 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,14 +576,12 @@ pub async fn handle_emptyset(
while !changes.is_empty() {
let change = changes.pop_front().unwrap();
if let Some(booked) = bookie
.read(format!("process_emptyset(check ts):{actor}"))
.read("process_emptyset(check ts)",actor.as_simple())
.await
.get(actor)
{
let booked_read = booked
.read(format!(
"process_emptyset(booked writer, ts timestamp):{actor}"
))
.read("process_emptyset(booked writer, ts timestamp)", actor.as_simple())
.await;

if let Some(seen_ts) = booked_read.last_cleared_ts() {
Expand Down Expand Up @@ -629,17 +627,19 @@ pub async fn process_emptyset(
debug!("processing emptyset from {:?}", actor_id);
let booked = {
bookie
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}",
))
.write(
"process_emptyset(booked writer, updates timestamp)",
actor_id.as_simple(),
)
.await
.ensure(actor_id)
};

let mut booked_write = booked
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}"
))
.write(
"process_emptyset(booked writer, updates timestamp)",
actor_id.as_simple(),
)
.await;

let mut snap = booked_write.snapshot();
Expand Down Expand Up @@ -682,17 +682,19 @@ pub async fn process_emptyset(
let mut conn = agent.pool().write_low().await?;
let booked = {
bookie
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}",
))
.write(
"process_emptyset(booked writer, updates timestamp)",
actor_id.as_simple(),
)
.await
.ensure(actor_id)
};

let mut booked_write = booked
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}"
))
.write(
"process_emptyset(booked writer, updates timestamp)",
actor_id.as_simple(),
)
.await;

let tx = conn
Expand Down Expand Up @@ -894,21 +896,15 @@ pub async fn handle_changes(

let booked = {
bookie
.read(format!(
"handle_change(get):{}",
change.actor_id.as_simple()
))
.read("handle_change(get)", change.actor_id.as_simple())
.await
.get(&change.actor_id)
.cloned()
};

if let Some(booked) = booked {
if booked
.read(format!(
"handle_change(contains?):{}",
change.actor_id.as_simple()
))
.read("handle_change(contains?)", change.actor_id.as_simple())
.await
.contains_all(change.versions(), change.seqs())
{
Expand Down Expand Up @@ -1183,8 +1179,12 @@ mod tests {

sleep(Duration::from_secs(2)).await;

let bookie = bookie.read("read booked").await;
let booked = bookie.get(&other_actor).unwrap().read("test").await;
let bookie = bookie.read::<&str, _>("read booked", None).await;
let booked = bookie
.get(&other_actor)
.unwrap()
.read::<&str, _>("test", None)
.await;
assert!(booked.contains_all(Version(6)..=Version(10), None));
assert!(booked.contains_all(Version(1)..=Version(3), None));
assert!(!booked.contains_version(&Version(5)));
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul

let bookie = Bookie::new_with_registry(Default::default(), lock_registry);
{
let mut w = bookie.write("init").await;
let mut w = bookie.write::<&str, _>("init", None).await;
w.insert(agent.actor_id(), agent.booked().clone());
}

Expand Down Expand Up @@ -180,7 +180,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
}

bookie
.write("replace_actor")
.write::<&str, _>("replace_actor", None)
.await
.replace_actor(actor_id, bv);
}
Expand Down
10 changes: 8 additions & 2 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use arc_swap::ArcSwap;
use camino::Utf8PathBuf;
use indexmap::IndexMap;
use metrics::counter;
use parking_lot::RwLock;
use rusqlite::{Connection, OptionalExtension};
use std::{
Expand Down Expand Up @@ -164,7 +165,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
tokio::spawn({
let pool = pool.clone();
// acquiring the lock here means everything will have to wait for it to be ready
let mut booked = booked.write_owned("init").await;
let mut booked = booked.write_owned::<&str, _>("init", None).await;
async move {
let conn = pool.read().await?;
*booked.deref_mut().deref_mut() =
Expand Down Expand Up @@ -196,7 +197,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

if top
.values()
.any(|meta| meta.started_at.elapsed() > WARNING_THRESHOLD)
.any(|meta| meta.started_at.elapsed() >= WARNING_THRESHOLD)
{
warn!(
"lock registry shows locks held for a long time! top {} locks:",
Expand All @@ -209,6 +210,11 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
"{} (id: {id}, type: {:?}, state: {:?}) locked for: {duration:?}",
lock.label, lock.kind, lock.state
);

if duration >= WARNING_THRESHOLD {
counter!("corro.agent.lock.slow.count", "name" => lock.label)
.increment(1);
}
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ pub async fn configurable_stress_test(
debug!(
"last version: {:?}",
ta.bookie
.write("test")
.write::<&str, _>("test", None)
.await
.ensure(ta.agent.actor_id())
.read("test")
.read::<&str, _>("test", None)
.await
.last()
);
Expand Down Expand Up @@ -726,11 +726,11 @@ async fn large_tx_sync() -> eyre::Result<()> {
println!(
"{name}: bookie: {:?}",
ta.bookie
.read("test")
.read::<&str, _>("test", None)
.await
.get(&ta1.agent.actor_id())
.unwrap()
.read("test")
.read::<&str, _>("test", None)
.await
.deref()
);
Expand Down Expand Up @@ -874,15 +874,15 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
let ta1_cleared = ta1
.agent
.booked()
.read("test_clear_empty")
.read::<&str, _>("test_clear_empty", None)
.await
.last_cleared_ts();
let ta2_ta1_cleared = ta2
.bookie
.write("test")
.write::<&str, _>("test", None)
.await
.ensure(ta1.agent.actor_id())
.read("test_clear_empty")
.read::<&str, _>("test_clear_empty", None)
.await
.last_cleared_ts();

Expand Down Expand Up @@ -1068,8 +1068,12 @@ async fn check_bookie_versions(
cleared: Vec<RangeInclusive<Version>>,
) -> eyre::Result<()> {
let conn = ta.agent.pool().read().await?;
let booked = ta.bookie.write("test").await.ensure(actor_id);
let bookedv = booked.read("test").await;
let booked = ta
.bookie
.write::<&str, _>("test", None)
.await
.ensure(actor_id);
let bookedv = booked.read::<&str, _>("test", None).await;

for versions in complete {
for version in versions.clone() {
Expand Down
Loading

0 comments on commit 4309f7f

Please sign in to comment.