Skip to content

Commit

Permalink
Merge pull request #269 from superfly/somto/fix-failed-changes
Browse files Browse the repository at this point in the history
Handle failed changes properly
  • Loading branch information
somtochiama authored Nov 29, 2024
2 parents fca1326 + 68daa16 commit 7d4512f
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 82 deletions.
71 changes: 24 additions & 47 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! This module is _big_ and maybe should be split up further.
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::RangeInclusive;

Expand All @@ -13,9 +12,12 @@ use std::{
time::{Duration, Instant},
};

use crate::agent::util::log_at_pow_10;
use crate::{
agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL},
agent::{
bi, bootstrap, uni,
util::{log_at_pow_10, process_multiple_changes},
SyncClientError, ANNOUNCE_INTERVAL,
},
api::peer::parallel_sync,
transport::Transport,
};
Expand All @@ -36,6 +38,7 @@ use corro_types::base::Version;
use corro_types::broadcast::Timestamp;
use corro_types::change::store_empty_changeset;
use foca::Notification;
use indexmap::map::Entry;
use indexmap::IndexMap;
use metrics::{counter, gauge, histogram};
use rand::{prelude::IteratorRandom, rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -747,8 +750,8 @@ pub async fn handle_changes(
agent.config().perf.apply_queue_timeout as u64,
));

const MAX_SEEN_CACHE_LEN: usize = 10000;
const KEEP_SEEN_CACHE_SIZE: usize = 1000;
let max_seen_cache_len: usize = max_queue_len;
let keep_seen_cache_size: usize = cmp::max(10, max_seen_cache_len / 10);
let mut seen: IndexMap<_, RangeInclusiveSet<CrsqlSeq>> = IndexMap::new();

let mut drop_log_count: u64 = 0;
Expand Down Expand Up @@ -776,23 +779,7 @@ pub async fn handle_changes(
let changes = std::mem::take(&mut buf);
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(
BTreeMap::new(),
|mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>>, change| {
acc.entry(change.0.actor_id)
.or_default()
.insert(change.0.versions());
acc
},
)
} else {
BTreeMap::new()
}
});
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));

buf_cost -= tmp_cost;
}
Expand All @@ -804,13 +791,8 @@ pub async fn handle_changes(
// but we need to drain it to free up concurrency
res = join_set.join_next(), if !join_set.is_empty() => {
debug!("processed multiple changes concurrently");
if let Some(Ok(res)) = res {
for (actor_id, versions) in res {
let versions: Vec<_> = versions.into_iter().flatten().collect();
for version in versions {
seen.remove(&(actor_id, version));
}
}
if let Some(Ok(Err(e))) = res {
error!("could not process multiple changes: {e}");
}
continue;
},
Expand All @@ -833,24 +815,13 @@ pub async fn handle_changes(
let changes: Vec<_> = queue.drain(..).collect();
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(BTreeMap::new(), |mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>> , change| {
acc.entry(change.0.actor_id).or_default().insert(change.0.versions());
acc
})
} else {
BTreeMap::new()
}
});
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));
buf_cost = 0;
}

if seen.len() > MAX_SEEN_CACHE_LEN {
if seen.len() > max_seen_cache_len {
// we don't want to keep too many entries in here.
seen = seen.split_off(seen.len() - KEEP_SEEN_CACHE_SIZE);
seen = seen.split_off(seen.len() - keep_seen_cache_size);
}
continue
},
Expand Down Expand Up @@ -919,10 +890,16 @@ pub async fn handle_changes(

// drop old items when the queue is full.
if queue.len() >= max_queue_len {
let dropped = queue.pop_front();
if let Some(dropped) = dropped {
for v in dropped.0.versions() {
let _ = seen.remove(&(dropped.0.actor_id, v));
if let Some((dropped_change, _, _)) = queue.pop_front() {
for v in dropped_change.versions() {
if let Entry::Occupied(mut entry) = seen.entry((change.actor_id, v)) {
if let Some(seqs) = dropped_change.seqs().cloned() {
entry.get_mut().remove(seqs);
} else {
entry.remove_entry();
}
};
buf_cost -= dropped_change.processing_cost();
}
}

Expand Down
122 changes: 121 additions & 1 deletion crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::{
};
use tracing::{debug, info_span};
use tripwire::Tripwire;
use uuid::Uuid;

use crate::{
agent::process_multiple_changes,
Expand All @@ -31,7 +32,6 @@ use crate::{
transport::Transport,
};
use corro_tests::*;
use corro_types::agent::Agent;
use corro_types::broadcast::Timestamp;
use corro_types::change::Change;
use corro_types::{
Expand All @@ -44,6 +44,11 @@ use corro_types::{
sqlite::CrConn,
sync::generate_sync,
};
use corro_types::{
agent::Agent,
api::{ColumnName, TableName},
pubsub::pack_columns,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn insert_rows_and_gossip() -> eyre::Result<()> {
Expand Down Expand Up @@ -903,6 +908,121 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn process_failed_changes() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();

let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let uuid = Uuid::parse_str("00000000-0000-0000-a716-446655440000")?;
let actor_id = ActorId(uuid);
// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Extension(ta1.agent.clone()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
assert_eq!(status_code, StatusCode::OK);

let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let (status_code, _body) = api_v1_db_schema(
Extension(ta2.agent.clone()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
assert_eq!(status_code, StatusCode::OK);

for i in 1..=5_i64 {
let (status_code, _) = api_v1_transactions(
Extension(ta2.agent.clone()),
axum::Json(vec![Statement::WithParams(
"INSERT OR REPLACE INTO tests (id,text) VALUES (?,?)".into(),
vec![i.into(), "service-text".into()],
)]),
)
.await;
assert_eq!(status_code, StatusCode::OK);
}
let mut good_changes = get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;

let change6 = Change {
table: TableName("tests".into()),
pk: pack_columns(&vec![6i64.into()])?,
cid: ColumnName("text".into()),
val: "six".into(),
col_version: 1,
db_version: CrsqlDbVersion(6),
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
};

let bad_change = Change {
table: TableName("tests".into()),
pk: pack_columns(&vec![6i64.into()])?,
cid: ColumnName("nonexistent".into()),
val: "six".into(),
col_version: 1,
db_version: CrsqlDbVersion(6),
seq: CrsqlSeq(1),
site_id: actor_id.to_bytes(),
cl: 1,
};

let mut rows = vec![
(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
},
ChangeSource::Sync,
Instant::now(),
)
];

rows.append(&mut good_changes);

let res = process_multiple_changes(ta1.agent.clone(), ta1.bookie.clone(), rows).await;

assert!(res.is_ok());

// verify that correct versions were inserted
let conn = ta1.agent.pool().read().await?;

for i in 1..=5_i64 {
let pk = pack_columns(&[i.into()])?;
let crsql_dbv = conn.prepare_cached(r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#)?
.query_row([pk], |row| row.get::<_, CrsqlDbVersion>(0))?;

let booked_dbv = conn.prepare_cached("SELECT db_version from __corro_bookkeeping where start_version = ? and actor_id = ?")?
.query_row((i, ta2.agent.actor_id()), |row| row.get::<_, CrsqlDbVersion>(0))?;

assert_eq!(crsql_dbv, booked_dbv);

let conn = ta1.agent.pool().read().await?;
conn.prepare_cached("SELECT text from tests where id = ?")?
.query_row([i], |row| row.get::<_, String>(0))?;
}

let res = conn
.prepare_cached("SELECT text from tests where id = 6")?
.query_row([], |row| row.get::<_, String>(0));
assert!(res.is_err());
assert_eq!(res, Err(rusqlite::Error::QueryReturnedNoRows));

tripwire_tx.send(()).await.ok();
tripwire_worker.await;
wait_for_all_pending_handles().await;

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_process_multiple_changes() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
Expand Down
Loading

0 comments on commit 7d4512f

Please sign in to comment.