Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering tweaks #77

Merged
merged 5 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,7 @@ async fn process_fully_buffered_changes(
.await
.for_actor(actor_id)
};

let mut bookedw = booked
.write(format!(
"process_fully_buffered(booked writer):{}",
Expand Down Expand Up @@ -1763,7 +1764,21 @@ async fn process_fully_buffered_changes(
tx.commit()?;

let inserted = if let Some(known_version) = known_version {
let db_version = if let KnownDbVersion::Current { db_version, .. } = &known_version {
Some(*db_version)
} else {
None
};

bookedw.insert(version, known_version);

drop(bookedw);

if let Some(db_version) = db_version {
// TODO: write changes into a queueing table
process_subs_by_db_version(agent, &conn, db_version);
}

true
} else {
false
Expand Down Expand Up @@ -2264,6 +2279,26 @@ pub fn process_subs(agent: &Agent, changeset: &[Change]) {
}
}

pub fn process_subs_by_db_version(agent: &Agent, conn: &Connection, db_version: i64) {
trace!("process subs by db version...");

let mut matchers_to_delete = vec![];

{
let matchers = agent.matchers().read();
for (id, matcher) in matchers.iter() {
if let Err(e) = matcher.process_changes_from_db_version(conn, db_version) {
error!("could not process change w/ matcher {id}, it is probably defunct! {e}");
matchers_to_delete.push(*id);
}
}
}

for id in matchers_to_delete {
agent.matchers().write().remove(&id);
}
}

#[derive(Debug, thiserror::Error)]
pub enum SyncClientError {
#[error("bad status code: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn build_quinn_transport_config(config: &GossipConfig) -> quinn::TransportConfig

// max idle timeout
transport_config.max_idle_timeout(Some(
Duration::from_secs(std::cmp::min(config.idle_timeout_secs as u64, 10))
Duration::from_secs(config.idle_timeout_secs as u64)
.try_into()
.unwrap(),
));
Expand Down
27 changes: 21 additions & 6 deletions crates/corro-agent/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use bytes::Bytes;
use metrics::{histogram, increment_counter};
use quinn::{
ApplicationClose, Connection, ConnectionError, Endpoint, RecvStream, SendDatagramError,
SendStream,
Expand Down Expand Up @@ -45,13 +51,11 @@ impl Transport {
debug!("sent datagram to {addr}");
return Ok(send);
}
Err(e @ SendDatagramError::ConnectionLost(ConnectionError::VersionMismatch)) => {
return Err(e.into());
}
Err(SendDatagramError::ConnectionLost(e)) => {
debug!("retryable error attempting to open unidirectional stream: {e}");
debug!("retryable error attempting to send datagram: {e}");
}
Err(e) => {
increment_counter!("corro.transport.send_datagram.errors", "addr" => addr.to_string(), "error" => e.to_string());
return Err(e.into());
}
}
Expand Down Expand Up @@ -120,7 +124,18 @@ impl Transport {
}
}

let conn = self.0.endpoint.connect(addr, server_name.as_str())?.await?;
let start = Instant::now();
let conn = match self.0.endpoint.connect(addr, server_name.as_str())?.await {
Ok(conn) => {
histogram!("corro.transport.connect.time.seconds", start.elapsed().as_secs_f64(), "addr" => server_name);
conn
}
Err(e) => {
increment_counter!("corro.transport.connect.errors", "addr" => server_name, "error" => e.to_string());
return Err(e.into());
}
};

if let Err(e) = self.0.rtt_tx.try_send((addr, conn.rtt())) {
debug!("could not send RTT for connection through sender: {e}");
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};

pub const DEFAULT_GOSSIP_PORT: u16 = 4001;
const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 30;
const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 60;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down
52 changes: 37 additions & 15 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use corro_api_types::{Change, ChangeId, ColumnType, RowId, SqliteValue, SqliteVa
use enquote::unquote;
use fallible_iterator::FallibleIterator;
use indexmap::IndexMap;
use itertools::Itertools;
use rusqlite::{params, params_from_iter, Connection, OptionalExtension, ToSql, Transaction};
use sqlite3_parser::{
ast::{
Expand Down Expand Up @@ -238,28 +237,27 @@ struct InnerMatcherHandle {
}

impl MatcherHandle {
pub fn process_change(&self, changes: &[Change]) -> Result<(), MatcherError> {
fn process_changes_from_iter<I, T, P>(&self, iter: I) -> Result<(), MatcherError>
where
I: Iterator<Item = rusqlite::Result<(T, P)>>,
T: AsRef<str>,
P: AsRef<[u8]>,
{
let mut candidates: IndexMap<CompactString, Vec<Vec<SqliteValue>>> = IndexMap::new();

let grouped = changes
.iter()
.filter(|change| {
self.0
.parsed
.table_columns
.contains_key(change.table.as_str())
})
.group_by(|change| (change.table.as_str(), change.pk.as_slice()));
let filtered = iter
.flatten()
.filter(|(table, _)| self.0.parsed.table_columns.contains_key(table.as_ref()));

for ((table, pk), _) in grouped.into_iter() {
let pks = unpack_columns(pk)?
for (table, pk) in filtered {
let pks = unpack_columns(pk.as_ref())?
.into_iter()
.map(|v| v.to_owned())
.collect();
if let Some(v) = candidates.get_mut(table) {
if let Some(v) = candidates.get_mut(table.as_ref()) {
v.push(pks);
} else {
candidates.insert(table.to_compact_string(), vec![pks]);
candidates.insert(table.as_ref().to_compact_string(), vec![pks]);
}
}

Expand All @@ -271,6 +269,30 @@ impl MatcherHandle {
Ok(())
}

pub fn process_changes_from_db_version(
&self,
conn: &Connection,
db_version: i64,
) -> Result<(), MatcherError> {
let mut prepped = conn.prepare_cached(
"SELECT \"table\", DISTINCT(pk) FROM crsql_changes WHERE db_version = ? ORDER BY seq GROUP BY \"table\"",
)?;

let rows = prepped.query_map([db_version], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
})?;

self.process_changes_from_iter(rows)
}

pub fn process_change(&self, changes: &[Change]) -> Result<(), MatcherError> {
self.process_changes_from_iter(
changes
.iter()
.map(|change| Ok((change.table.as_str(), change.pk.as_slice()))),
)
}

pub fn id(&self) -> Uuid {
self.0.id
}
Expand Down
Loading