Skip to content

Commit

Permalink
Batch insertions (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn authored Oct 5, 2023
1 parent 4b723ea commit 33043b1
Show file tree
Hide file tree
Showing 15 changed files with 1,476 additions and 561 deletions.
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ crc32fast = "1.3.2"
enquote = "1.1.0"
eyre = "0.6.8"
fallible-iterator = "0.2.0"
foca = { version = "0.15.0", features = ["std", "tracing", "bincode-codec", "serde"] }
foca = { version = "0.16.0", features = ["std", "tracing", "bincode-codec", "serde"] }
futures = "0.3.28"
futures-util = "0.3.28"
hex = "0.4.3"
Expand Down Expand Up @@ -69,7 +69,7 @@ tracing = "0.1.37"
tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] }
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
trust-dns-resolver = "0.22.0"
uhlc = { version = "0.5.2", features = ["defmt"] }
uhlc = { version = "0.6.3", features = ["defmt"] }
uuid = { version = "1.3.1", features = ["v4", "serde"] }
webpki = { version = "0.22.0", features = ["std"] }
http = { version = "0.2.9" }
Expand Down
53 changes: 52 additions & 1 deletion crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::{fmt::Display, time::Duration};

use camino::Utf8PathBuf;
use corro_types::{agent::Agent, sqlite::SqlitePoolError, sync::generate_sync};
use corro_types::{
agent::{Agent, LockKind, LockMeta, LockState},
sqlite::SqlitePoolError,
sync::generate_sync,
};
use futures::{SinkExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use spawn::spawn_counted;
Expand Down Expand Up @@ -75,6 +79,7 @@ pub fn start_server(
pub enum Command {
Ping,
Sync(SyncCommand),
Locks { top: usize },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -125,6 +130,25 @@ type FramedStream = Framed<
Json<Command, Response>,
>;

#[derive(Serialize, Deserialize)]
pub struct LockMetaElapsed {
pub label: String,
pub kind: LockKind,
pub state: LockState,
pub duration: Duration,
}

impl From<LockMeta> for LockMetaElapsed {
fn from(value: LockMeta) -> Self {
LockMetaElapsed {
label: value.label.into(),
kind: value.kind,
state: value.state,
duration: value.started_at.elapsed(),
}
}
}

async fn handle_conn(
agent: Agent,
_config: AdminConfig,
Expand All @@ -148,6 +172,33 @@ async fn handle_conn(
Err(e) => send_error(&mut stream, e).await,
}
}
Command::Locks { top } => {
info_log(&mut stream, "gathering top locks").await;
let registry = {
agent
.bookie()
.read("admin:registry")
.await
.registry()
.clone()
};

let topn: Vec<LockMetaElapsed> = {
registry
.map
.read()
.values()
.take(top)
.cloned()
.map(LockMetaElapsed::from)
.collect()
};

match serde_json::to_value(&topn) {
Ok(json) => send(&mut stream, Response::Json(json)).await,
Err(e) => send_error(&mut stream, e).await,
}
}
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
Loading

0 comments on commit 33043b1

Please sign in to comment.