From 965515a2756a437abe4813f47014982287c7d77e Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 27 Dec 2024 12:53:16 -0500 Subject: [PATCH 1/2] track lost packets by addr --- crates/corro-agent/src/transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/corro-agent/src/transport.rs b/crates/corro-agent/src/transport.rs index 3aa03513..ab73b9f5 100644 --- a/crates/corro-agent/src/transport.rs +++ b/crates/corro-agent/src/transport.rs @@ -256,8 +256,9 @@ impl Transport { .set(stats.path.congestion_events as f64); gauge!("corro.transport.path.black_holes_detected", "addr" => addr.to_string()) .set(stats.path.black_holes_detected as f64); + gauge!("corro.transport.path.lost_packets", "addr" => addr.to_string()) + .set(stats.path.lost_packets as f64); - acc.path.lost_packets += stats.path.lost_packets; acc.path.lost_bytes += stats.path.lost_bytes; acc.path.sent_packets += stats.path.sent_packets; acc.path.sent_plpmtud_probes += stats.path.sent_plpmtud_probes; @@ -319,7 +320,6 @@ impl Transport { acc }); - gauge!("corro.transport.path.lost_packets").set(stats.path.lost_packets as f64); gauge!("corro.transport.path.lost_bytes").set(stats.path.lost_bytes as f64); gauge!("corro.transport.path.sent_packets").set(stats.path.sent_packets as f64); gauge!("corro.transport.path.sent_plpmtud_probes") From 4e18caac52208b3d260b79836b040b2754ea0a39 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Sat, 28 Dec 2024 10:57:52 -0500 Subject: [PATCH 2/2] upgrade rusqlite and deadpool, add timeout to some immediate transactions, don't reuse connections if they're older than 5 minutes --- Cargo.lock | 146 ++++++++++++++--------- Cargo.toml | 6 +- crates/corro-agent/src/agent/handlers.rs | 4 +- crates/corro-agent/src/agent/util.rs | 6 +- crates/corro-agent/src/api/public/mod.rs | 2 +- crates/corro-agent/src/broadcast/mod.rs | 2 +- crates/corro-pg/src/lib.rs | 8 +- crates/corro-types/Cargo.toml | 1 + crates/corro-types/src/sqlite.rs | 57 ++++++++- crates/sqlite-pool/src/lib.rs | 17 +-- 10 files changed, 175 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7deb292d..293a6f46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "abort-on-drop" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dd6d700ad9af641490c1f7d67980d2de4d1433016e5b12f819448d3c832142a" +dependencies = [ + "tokio", +] + [[package]] name = "addr2line" version = "0.19.0" @@ -30,9 +39,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.6" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "const-random", @@ -374,9 +383,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.2" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbe3c979c178231552ecba20214a8272df4e09f232a87aef4320cf06539aded" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "block-buffer" @@ -523,11 +532,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.79" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "8d6dbb628b8f8555f86d0323c2eb39e3ec81901f4b83e091db8a6a76d316a333" dependencies = [ "jobserver", + "libc", + "shlex", ] [[package]] @@ -538,9 +549,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -548,7 +559,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.0", + "windows-targets 0.52.6", ] [[package]] @@ -662,23 +673,21 @@ checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "const-random" -version = "0.1.15" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" dependencies = [ "const-random-macro", - "proc-macro-hack", ] [[package]] name = "const-random-macro" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ "getrandom", "once_cell", - "proc-macro-hack", "tiny-keccak", ] @@ -965,6 +974,7 @@ dependencies = [ name = "corro-types" version = "0.1.0-alpha.1" dependencies = [ + "abort-on-drop", "arc-swap", "async-trait", "bytes", @@ -1243,7 +1253,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown 0.14.2", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -1257,11 +1267,10 @@ checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" [[package]] name = "deadpool" -version = "0.10.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" dependencies = [ - "async-trait", "deadpool-runtime", "num_cpus", "tokio", @@ -1332,6 +1341,16 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derive-new" version = "0.5.9" @@ -1727,7 +1746,7 @@ version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b3ba52851e73b46a4c3df1d89343741112003f0f6f13beb0dfac9e457c3fdcd" dependencies = [ - "bitflags 2.3.2", + "bitflags 2.6.0", "libc", "libgit2-sys", "log", @@ -1795,22 +1814,25 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.11", ] [[package]] name = "hashbrown" -version = "0.14.2" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.11", +] [[package]] name = "hashlink" -version = "0.8.1" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ - "hashbrown 0.12.3", + "hashbrown 0.14.5", ] [[package]] @@ -2226,7 +2248,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.2", + "hashbrown 0.14.5", "serde", ] @@ -2337,9 +2359,9 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "jobserver" -version = "0.1.28" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -2399,9 +2421,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.27.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -2574,7 +2596,7 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77b9e10a211c839210fd7f99954bda26e5f8e26ec686ad68da6a32df7c80e782" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.11", "portable-atomic", ] @@ -2783,6 +2805,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -3148,6 +3176,12 @@ dependencies = [ "time", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -3216,12 +3250,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.92" @@ -3404,7 +3432,7 @@ version = "11.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" dependencies = [ - "bitflags 2.3.2", + "bitflags 2.6.0", ] [[package]] @@ -3495,7 +3523,7 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c2a11a646ef5d4e4a9d5cf80c7e4ecb20f9b1954292d5c5e6d6cbc8d33728ec" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.11", "bitflags 1.3.2", "instant", "num-traits", @@ -3546,11 +3574,11 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.30.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ - "bitflags 2.3.2", + "bitflags 2.6.0", "chrono", "fallible-iterator 0.3.0", "fallible-streaming-iterator", @@ -3866,6 +3894,12 @@ dependencies = [ "regex", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -4023,7 +4057,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9add252f9b70a7d493b03127524ed06cdf7480b3dc8c1b2ccfda89384d90a8b7" dependencies = [ - "bitflags 2.3.2", + "bitflags 2.6.0", "cc", "fallible-iterator 0.3.0", "indexmap 2.1.0", @@ -4293,11 +4327,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.20" +version = "0.3.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ + "deranged", "itoa", + "num-conv", + "powerfmt", "serde", "time-core", "time-macros", @@ -4305,16 +4342,17 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.8" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" dependencies = [ + "num-conv", "time-core", ] @@ -5360,18 +5398,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.26" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.26" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 62e365f3..0c7f23ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,7 @@ arc-swap = { version = "1.6.0" } assert2 = "0.3.10" async-trait = "0.1.68" axum = { version = "0.6.15", features = ["http2", "ws", "tracing", "headers"] } -deadpool = "0.10.0" -deadpool-sqlite = "0.6.0" +deadpool = "0.12.0" bincode = "1.3.3" build-info = "0.0.35" build-info-build = { version = "0.0.35" } @@ -52,7 +51,7 @@ rand = { version = "0.8.5", features = ["small_rng"] } rangemap = { version = "1.5.1", features = ["serde1"] } rcgen = { version = "0.11.1", features = ["x509-parser"] } rhai = { version = "1.15.1", features = ["sync"] } -rusqlite = { version = "0.30.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono", "series"] } +rusqlite = { version = "0.32.1", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono", "series", "hooks", "modern_sqlite"] } rustls = { version = "0.21.0", features = ["dangerous_configuration", "quic"] } rustls-pemfile = "1.0.2" seahash = "4.1.0" @@ -83,6 +82,7 @@ uuid = { version = "1.3.1", features = ["v4", "serde"] } webpki = { version = "0.22.0", features = ["std"] } http = { version = "0.2.9" } governor = { version = "0.7.0" } +abort-on-drop = { version = "0.2.2" } [patch.crates-io] quinn-proto = { git = "https://github.com/jeromegn/quinn", rev = "108f25a6" } diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 872d7bee..da8791be 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -650,7 +650,7 @@ pub async fn process_emptyset( debug!(self_actor_id = %agent.actor_id(), "processing emptyset changes, len: {}", versions.len()); block_in_place(|| { let tx = conn - .immediate_transaction() + .immediate_transaction_timeout(Duration::from_secs(30)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: None, @@ -701,7 +701,7 @@ pub async fn process_emptyset( .await; let tx = conn - .immediate_transaction() + .immediate_transaction_timeout(Duration::from_secs(30)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: None, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 8dae64ec..98fed54f 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -432,7 +432,7 @@ pub async fn clear_buffered_meta_loop( let mut conn = pool.write_low().await?; block_in_place(|| { - let tx = conn.immediate_transaction()?; + let tx = conn.immediate_transaction_timeout(Duration::from_secs(30))?; // TODO: delete buffered changes from deleted sequences only (maybe, it's kind of hard and may not be necessary) @@ -574,7 +574,7 @@ pub async fn process_fully_buffered_changes( }; let tx = conn - .immediate_transaction() + .immediate_transaction_timeout(Duration::from_secs(30)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), @@ -800,7 +800,7 @@ pub async fn process_multiple_changes( let changesets = block_in_place(|| { let start = Instant::now(); let mut tx = conn - .immediate_transaction() + .immediate_transaction_timeout(Duration::from_secs(30)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: None, diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 41d001bd..f3df2af3 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -57,7 +57,7 @@ where let start = Instant::now(); block_in_place(move || { let tx = conn - .immediate_transaction() + .immediate_transaction_timeout(Duration::from_secs(30)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index bf720cfe..dcf28b0a 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -860,7 +860,7 @@ fn diff_member_states( let mut deleted = 0; let res = block_in_place(|| { - let tx = conn.immediate_transaction()?; + let tx = conn.immediate_transaction_timeout(Duration::from_secs(30))?; for (member, rtt_min) in to_update { let foca_state = serde_json::to_string(&member).unwrap(); diff --git a/crates/corro-pg/src/lib.rs b/crates/corro-pg/src/lib.rs index 8d38ab65..65ddb035 100644 --- a/crates/corro-pg/src/lib.rs +++ b/crates/corro-pg/src/lib.rs @@ -3316,7 +3316,9 @@ mod tests { println!("t2text: {:?}", row.try_get::<_, String>(2)); let now: DateTime = Utc::now(); - let now = NaiveDateTime::from_timestamp_micros(now.timestamp_micros()).unwrap(); + let now = DateTime::from_timestamp_micros(now.timestamp_micros()) + .unwrap() + .naive_utc(); println!("NOW: {now:?}"); let row = client @@ -3333,7 +3335,9 @@ mod tests { assert_eq!(now, updated_at); let future: DateTime = Utc::now() + Duration::from_secs(1); - let future = NaiveDateTime::from_timestamp_micros(future.timestamp_micros()).unwrap(); + let future = DateTime::from_timestamp_micros(future.timestamp_micros()) + .unwrap() + .naive_utc(); println!("NOW: {future:?}"); let row = client diff --git a/crates/corro-types/Cargo.toml b/crates/corro-types/Cargo.toml index bb783a3a..4e8891a3 100644 --- a/crates/corro-types/Cargo.toml +++ b/crates/corro-types/Cargo.toml @@ -51,6 +51,7 @@ tripwire = { version = "0.1.0-alpha.0", path = "../tripwire" } uhlc = { workspace = true } uuid = { workspace = true } strum = { workspace = true } +abort-on-drop = { workspace = true } [dev-dependencies] tracing-subscriber = { workspace = true } diff --git a/crates/corro-types/src/sqlite.rs b/crates/corro-types/src/sqlite.rs index dcfc140c..67d745a0 100644 --- a/crates/corro-types/src/sqlite.rs +++ b/crates/corro-types/src/sqlite.rs @@ -1,8 +1,9 @@ use std::{ ops::{Deref, DerefMut}, - time::Instant, + time::{Duration, Instant}, }; +use abort_on_drop::ChildTask; use once_cell::sync::Lazy; use rusqlite::{params, Connection, Transaction}; use sqlite_pool::SqliteConn; @@ -58,6 +59,17 @@ impl CrConn { self.0 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate) } + + pub fn immediate_transaction_timeout( + &mut self, + timeout: Duration, + ) -> rusqlite::Result { + Ok(TimeoutTransaction::new( + self.0 + .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?, + Instant::now() + timeout, + )) + } } impl SqliteConn for CrConn { @@ -88,6 +100,49 @@ impl Drop for CrConn { } } +pub struct TimeoutTransaction<'conn> { + tx: Transaction<'conn>, + _timer_task: ChildTask<()>, +} + +impl<'conn> TimeoutTransaction<'conn> { + fn new(tx: Transaction<'conn>, deadline: Instant) -> TimeoutTransaction<'conn> { + let interrupt_handle = tx.get_interrupt_handle(); + Self { + tx, + _timer_task: tokio::spawn(async move { + tokio::time::sleep_until(deadline.into()).await; + interrupt_handle.interrupt(); + }) + .into(), + } + } + + #[inline] + pub fn commit(self) -> rusqlite::Result<()> { + self.tx.commit() + } + + #[inline] + pub fn rollback(self) -> rusqlite::Result<()> { + self.tx.rollback() + } +} + +impl<'conn> Deref for TimeoutTransaction<'conn> { + type Target = Transaction<'conn>; + + fn deref(&self) -> &Self::Target { + &self.tx + } +} + +impl<'conn> DerefMut for TimeoutTransaction<'conn> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.tx + } +} + fn init_cr_conn(conn: &mut Connection) -> Result<(), rusqlite::Error> { let ext_dir = &CRSQL_EXT_DIR; trace!( diff --git a/crates/sqlite-pool/src/lib.rs b/crates/sqlite-pool/src/lib.rs index ba36c74d..843b2471 100644 --- a/crates/sqlite-pool/src/lib.rs +++ b/crates/sqlite-pool/src/lib.rs @@ -3,12 +3,10 @@ mod config; use std::{ fmt, sync::atomic::{AtomicUsize, Ordering}, + time::Duration, }; -use deadpool::{ - async_trait, - managed::{self, Object}, -}; +use deadpool::managed::{self, Object, RecycleError}; pub use deadpool::managed::reexports::*; pub use rusqlite; @@ -78,7 +76,6 @@ impl SqliteConn for rusqlite::Connection { } } -#[async_trait] impl managed::Manager for Manager where T: SqliteConn, @@ -95,9 +92,15 @@ where async fn recycle( &self, - _conn: &mut Self::Type, - _: &Metrics, + conn: &mut Self::Type, + metrics: &Metrics, ) -> managed::RecycleResult { + if conn.conn().is_interrupted() { + return Err(RecycleError::message("Connection was interrupted")); + } + if metrics.age() > Duration::from_secs(300) { + return Err(RecycleError::message("Max age reached")); + } let _ = self.recycle_count.fetch_add(1, Ordering::Relaxed); Ok(()) }