Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use block_in_place for long running blocking tasks #281

Merged
merged 8 commits into from
Jan 22, 2025
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,12 @@ async fn handle_conn(
.clone();

let mut conn = agent.pool().write_low().await.unwrap();
debug_log(&mut stream, format!("got write conn for actor id: {actor_id}")).await;

let mut bv = booked
.write::<&str, _>("admin sync reconcile gaps booked versions", None)
.await;
debug_log(&mut stream, format!("got bookie for actor id: {actor_id}")).await;

if let Err(e) = collapse_gaps(&mut stream, &mut conn, &mut bv).await {
_ = send_error(&mut stream, e).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,10 @@ pub async fn handle_changes(
// complicated loop to process changes efficiently w/ a max concurrency
// and a minimum chunk size for bigger and faster SQLite transactions
loop {
while buf_cost >= max_changes_chunk && join_set.len() < MAX_CONCURRENT {
// we're already bigger than the minimum size of changes batch
// so we want to accumulate at least that much and process them
// concurrently bvased on MAX_CONCURRENCY
while (buf_cost >= max_changes_chunk || (!queue.is_empty() && join_set.is_empty()))
&& join_set.len() < MAX_CONCURRENT
{
// Process if we hit the chunk size OR if we have any items and available capacity
let mut tmp_cost = 0;
while let Some((change, src, queued_at)) = queue.pop_front() {
tmp_cost += change.processing_cost();
Expand Down
36 changes: 33 additions & 3 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use corro_types::{
actor::{Actor, ActorId},
agent::{
find_overwritten_versions, Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion,
PartialVersion,
PartialVersion, PoolError,
},
api::TableName,
base::{CrsqlDbVersion, CrsqlSeq, Version},
Expand Down Expand Up @@ -55,7 +55,7 @@ use metrics::{counter, histogram};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet};
use rusqlite::{named_params, params, Connection, OptionalExtension, Savepoint, Transaction};
use spawn::spawn_counted;
use tokio::{net::TcpListener, task::block_in_place};
use tokio::{net::TcpListener, task::block_in_place, time::timeout};
use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer};
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -758,6 +758,7 @@ pub async fn process_multiple_changes(
counter!("corro.agent.changes.processing.started").increment(changes.len() as u64);
debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::<usize>());

const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5);
let mut seen = HashSet::new();
let mut unknown_changes: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (change, src, queued_at) in changes {
Expand Down Expand Up @@ -794,8 +795,14 @@ pub async fn process_multiple_changes(
.or_default()
.push((change, src));
}
let elapsed = start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes: removing duplicates took too long - {elapsed:?}");
}

let mut conn = agent.pool().write_normal().await?;
let mut conn = timeout(Duration::from_secs(5 * 60), agent.pool().write_normal())
.await
.map_err(PoolError::from)??;

let changesets = block_in_place(|| {
let start = Instant::now();
Expand All @@ -814,6 +821,7 @@ pub async fn process_multiple_changes(

// let mut writers: BTreeMap<ActorId, _> = Default::default();

let sub_start = Instant::now();
for (actor_id, changes) in unknown_changes {
let booked = {
bookie
Expand Down Expand Up @@ -899,9 +907,15 @@ pub async fn process_multiple_changes(
// }
}

let elapsed = sub_start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes:: process_single_version took too long - {elapsed:?}");
}

let mut count = 0;
let mut snapshots = BTreeMap::new();

let sub_start = Instant::now();
for (actor_id, knowns) in knowns.iter_mut() {
debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len());

Expand Down Expand Up @@ -1016,11 +1030,21 @@ pub async fn process_multiple_changes(
std::mem::forget(snap);
}

let elapsed = sub_start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes: processing bookkeeping took too long - {elapsed:?}");
}

let sub_start = Instant::now();
tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
let elapsed = sub_start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes: commiting transaction took too long - {elapsed:?}");
}

if let Some(ts) = last_cleared {
let mut booked_writer = agent
Expand All @@ -1038,6 +1062,7 @@ pub async fn process_multiple_changes(

debug!("committed {count} changes in {:?}", start.elapsed());

let sub_start = Instant::now();
for (actor_id, knowns) in knowns {
let booked = {
bookie
Expand Down Expand Up @@ -1080,6 +1105,11 @@ pub async fn process_multiple_changes(
}
}

let elapsed = sub_start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes: commiting snapshots took too long - {elapsed:?}");
}

Ok::<_, ChangeError>(changesets)
})?;

Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ async fn execute_schema(agent: &Agent, statements: Vec<String>) -> eyre::Result<

let partial_schema = parse_sql(&new_sql)?;

info!("getting write connection to update schema");
let mut conn = agent.pool().write_priority().await?;
info!("got write connection to update schema");

// hold onto this lock so nothing else makes changes
let mut schema_write = agent.schema().write();
Expand Down
1 change: 1 addition & 0 deletions crates/corro-pg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing = { workspace = true }
tripwire = { path = "../tripwire" }
sqlparser = { version = "0.39.0" }
chrono = { version = "0.4.31" }
socket2 = { version = "0.5" }

[dev-dependencies]
corro-tests = { path = "../corro-tests" }
Expand Down
Loading
Loading