Skip to content

Commit

Permalink
Clustering tweaks (#77)
Browse files Browse the repository at this point in the history
* measure how long it takes to connect to a server

* log connection errors

* track datagram send errors

* let's do a 60 secs timeout on QUIC connections

* forgot to process subs when applying buffered changes
  • Loading branch information
jeromegn authored Oct 6, 2023
1 parent 33043b1 commit 90f3b9b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 23 deletions.
35 changes: 35 additions & 0 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,7 @@ async fn process_fully_buffered_changes(
.await
.for_actor(actor_id)
};

let mut bookedw = booked
.write(format!(
"process_fully_buffered(booked writer):{}",
Expand Down Expand Up @@ -1763,7 +1764,21 @@ async fn process_fully_buffered_changes(
tx.commit()?;

let inserted = if let Some(known_version) = known_version {
let db_version = if let KnownDbVersion::Current { db_version, .. } = &known_version {
Some(*db_version)
} else {
None
};

bookedw.insert(version, known_version);

drop(bookedw);

if let Some(db_version) = db_version {
// TODO: write changes into a queueing table
process_subs_by_db_version(agent, &conn, db_version);
}

true
} else {
false
Expand Down Expand Up @@ -2264,6 +2279,26 @@ pub fn process_subs(agent: &Agent, changeset: &[Change]) {
}
}

pub fn process_subs_by_db_version(agent: &Agent, conn: &Connection, db_version: i64) {
trace!("process subs by db version...");

let mut matchers_to_delete = vec![];

{
let matchers = agent.matchers().read();
for (id, matcher) in matchers.iter() {
if let Err(e) = matcher.process_changes_from_db_version(conn, db_version) {
error!("could not process change w/ matcher {id}, it is probably defunct! {e}");
matchers_to_delete.push(*id);
}
}
}

for id in matchers_to_delete {
agent.matchers().write().remove(&id);
}
}

#[derive(Debug, thiserror::Error)]
pub enum SyncClientError {
#[error("bad status code: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn build_quinn_transport_config(config: &GossipConfig) -> quinn::TransportConfig

// max idle timeout
transport_config.max_idle_timeout(Some(
Duration::from_secs(std::cmp::min(config.idle_timeout_secs as u64, 10))
Duration::from_secs(config.idle_timeout_secs as u64)
.try_into()
.unwrap(),
));
Expand Down
27 changes: 21 additions & 6 deletions crates/corro-agent/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use bytes::Bytes;
use metrics::{histogram, increment_counter};
use quinn::{
ApplicationClose, Connection, ConnectionError, Endpoint, RecvStream, SendDatagramError,
SendStream,
Expand Down Expand Up @@ -45,13 +51,11 @@ impl Transport {
debug!("sent datagram to {addr}");
return Ok(send);
}
Err(e @ SendDatagramError::ConnectionLost(ConnectionError::VersionMismatch)) => {
return Err(e.into());
}
Err(SendDatagramError::ConnectionLost(e)) => {
debug!("retryable error attempting to open unidirectional stream: {e}");
debug!("retryable error attempting to send datagram: {e}");
}
Err(e) => {
increment_counter!("corro.transport.send_datagram.errors", "addr" => addr.to_string(), "error" => e.to_string());
return Err(e.into());
}
}
Expand Down Expand Up @@ -120,7 +124,18 @@ impl Transport {
}
}

let conn = self.0.endpoint.connect(addr, server_name.as_str())?.await?;
let start = Instant::now();
let conn = match self.0.endpoint.connect(addr, server_name.as_str())?.await {
Ok(conn) => {
histogram!("corro.transport.connect.time.seconds", start.elapsed().as_secs_f64(), "addr" => server_name);
conn
}
Err(e) => {
increment_counter!("corro.transport.connect.errors", "addr" => server_name, "error" => e.to_string());
return Err(e.into());
}
};

if let Err(e) = self.0.rtt_tx.try_send((addr, conn.rtt())) {
debug!("could not send RTT for connection through sender: {e}");
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};

pub const DEFAULT_GOSSIP_PORT: u16 = 4001;
const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 30;
const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 60;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down
52 changes: 37 additions & 15 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use corro_api_types::{Change, ChangeId, ColumnType, RowId, SqliteValue, SqliteVa
use enquote::unquote;
use fallible_iterator::FallibleIterator;
use indexmap::IndexMap;
use itertools::Itertools;
use rusqlite::{params, params_from_iter, Connection, OptionalExtension, ToSql, Transaction};
use sqlite3_parser::{
ast::{
Expand Down Expand Up @@ -238,28 +237,27 @@ struct InnerMatcherHandle {
}

impl MatcherHandle {
pub fn process_change(&self, changes: &[Change]) -> Result<(), MatcherError> {
fn process_changes_from_iter<I, T, P>(&self, iter: I) -> Result<(), MatcherError>
where
I: Iterator<Item = rusqlite::Result<(T, P)>>,
T: AsRef<str>,
P: AsRef<[u8]>,
{
let mut candidates: IndexMap<CompactString, Vec<Vec<SqliteValue>>> = IndexMap::new();

let grouped = changes
.iter()
.filter(|change| {
self.0
.parsed
.table_columns
.contains_key(change.table.as_str())
})
.group_by(|change| (change.table.as_str(), change.pk.as_slice()));
let filtered = iter
.flatten()
.filter(|(table, _)| self.0.parsed.table_columns.contains_key(table.as_ref()));

for ((table, pk), _) in grouped.into_iter() {
let pks = unpack_columns(pk)?
for (table, pk) in filtered {
let pks = unpack_columns(pk.as_ref())?
.into_iter()
.map(|v| v.to_owned())
.collect();
if let Some(v) = candidates.get_mut(table) {
if let Some(v) = candidates.get_mut(table.as_ref()) {
v.push(pks);
} else {
candidates.insert(table.to_compact_string(), vec![pks]);
candidates.insert(table.as_ref().to_compact_string(), vec![pks]);
}
}

Expand All @@ -271,6 +269,30 @@ impl MatcherHandle {
Ok(())
}

pub fn process_changes_from_db_version(
&self,
conn: &Connection,
db_version: i64,
) -> Result<(), MatcherError> {
let mut prepped = conn.prepare_cached(
"SELECT \"table\", DISTINCT(pk) FROM crsql_changes WHERE db_version = ? ORDER BY seq GROUP BY \"table\"",
)?;

let rows = prepped.query_map([db_version], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
})?;

self.process_changes_from_iter(rows)
}

pub fn process_change(&self, changes: &[Change]) -> Result<(), MatcherError> {
self.process_changes_from_iter(
changes
.iter()
.map(|change| Ok((change.table.as_str(), change.pk.as_slice()))),
)
}

pub fn id(&self) -> Uuid {
self.0.id
}
Expand Down

0 comments on commit 90f3b9b

Please sign in to comment.