From 613ca6a38301bc0cf4e63acf231503fc69773136 Mon Sep 17 00:00:00 2001 From: Lukas Pukenis Date: Mon, 16 Dec 2024 11:00:35 +0200 Subject: [PATCH] Refactor batcher and inline into SessionKeeper This commit does these things: - Inlines batching flag to SessionKeeper, removing batcher entirely as a separate entity with all of its logic(threshold, traffic trigger). - Emits all actions from SessionKeeper if batching is enabled when an action is added. This works nicely if we can guarantee that actions are gonna be a multiple of some T which is the case as currently we target T=70s for proxy, direct, vpn, stun keepalives. This also makes thresholds and the whole logic around them irrelevant. Traffic triggered batching implementation was doing more harm than good since it triggers on _any_ traffic, meaning even when alignment was achieved, it might misalign since any traffic triggers it. Feature flags are not removed in this commit so not to push for version update of libtelio, so now they are no-op. Signed-off-by: Lukas Pukenis Signed-off-by: Lukas Pukenis --- .unreleased/LLT-5857 | 1 + crates/telio-traversal/src/batcher.rs | 1292 ------------------ crates/telio-traversal/src/lib.rs | 1 - crates/telio-traversal/src/session_keeper.rs | 248 +--- crates/telio-utils/src/repeated_actions.rs | 103 +- crates/telio-wg/src/wg.rs | 43 +- src/device.rs | 18 +- src/device/wg_controller.rs | 72 +- 8 files changed, 159 insertions(+), 1619 deletions(-) create mode 100644 .unreleased/LLT-5857 delete mode 100644 crates/telio-traversal/src/batcher.rs diff --git a/.unreleased/LLT-5857 b/.unreleased/LLT-5857 new file mode 100644 index 000000000..9a86b1cb6 --- /dev/null +++ b/.unreleased/LLT-5857 @@ -0,0 +1 @@ +Refactor batcher diff --git a/crates/telio-traversal/src/batcher.rs b/crates/telio-traversal/src/batcher.rs deleted file mode 100644 index 651b5b03a..000000000 --- a/crates/telio-traversal/src/batcher.rs +++ /dev/null @@ -1,1292 +0,0 @@ -// Batcher is an optimizer of aligning actions to be done in batches -// with the goal to conserve resources, thus Batcher's main goals are: -// 1) Don't break existing functionality done by usual means -// 2) Align actions - -use futures::future::BoxFuture; -use std::fmt::Debug; -use std::hash::Hash; -use std::{collections::HashMap, sync::Arc}; - -use async_trait::async_trait; -use telio_utils::{telio_log_debug, telio_log_error, telio_log_warn}; - -use thiserror::Error as ThisError; -use tokio::time::{sleep_until, Duration, Instant}; - -type Action> = - Arc Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>; - -pub struct Batcher { - actions: HashMap)>, - last_trigger_checkpoint: Instant, - options: BatchingOptions, -} - -struct BatchEntry { - deadline: Instant, - interval: Duration, - threshold: Duration, -} - -/// Possible [Batcher] errors. -#[derive(ThisError, Debug)] -pub enum BatcherError { - /// No actions in batcher - #[error("No actions present")] - NoActions, -} - -#[cfg_attr(any(test, feature = "mockall"), mockall::automock)] -#[async_trait] -pub trait BatcherTrait: Send + Sync -where - K: Eq + Hash + Debug + Clone + Send + Sync, - V: Send + Sync, -{ - fn add(&mut self, key: K, interval: Duration, threshold: Duration, action: Action); - fn remove(&mut self, key: &K); - async fn get_actions( - &mut self, - last_network_activity: Option, - ) -> Result)>, BatcherError>; - fn get_interval(&self, key: &K) -> Option; -} - -#[derive(Copy, Clone, Debug)] -pub struct BatchingOptions { - pub trigger_effective_duration: Duration, - pub trigger_cooldown_duration: Duration, -} - -impl Default for BatchingOptions { - fn default() -> Self { - Self { - trigger_effective_duration: Duration::from_secs(10), - trigger_cooldown_duration: Duration::from_secs(60), - } - } -} - -impl Batcher -where - K: Eq + Hash + Send + Sync + Debug + Clone, - V: Send + Sync, -{ - // It's necessary not to allow too short intervals since they can drain resources. - // Historically, libtelio had highest frequency action of 5seconds for direct - // connection keepalive, thus we cap any action to have at least that. - const MIN_INTERVAL: Duration = Duration::from_secs(5); - - pub fn new(options: BatchingOptions) -> Self { - telio_log_debug!("Options for batcher: {:?}", options); - - Self { - actions: HashMap::new(), - options, - last_trigger_checkpoint: Instant::now(), - } - } - - pub fn is_trigger_outstanding(&self, latest_network_activity: Instant) -> bool { - if self.last_trigger_checkpoint.elapsed() <= self.options.trigger_cooldown_duration { - telio_log_debug!( - "Batcher trigger in cooldown: {}", - self.last_trigger_checkpoint.elapsed().as_secs() - ); - return false; - } - - latest_network_activity.elapsed() < self.options.trigger_effective_duration - } -} - -#[async_trait] -impl BatcherTrait for Batcher -where - K: Eq + Hash + Send + Sync + Debug + Clone, - V: Send + Sync, -{ - async fn get_actions( - &mut self, - last_network_activity: Option, - ) -> Result)>, BatcherError> { - if self.actions.is_empty() { - return Err(BatcherError::NoActions); - } - - fn collect_batch_jobs( - actions: &mut HashMap)>, - ) -> Vec<(K, Action)> { - let now = Instant::now(); - - let mut batched_actions: Vec<(K, Action)> = vec![]; - - for (key, action) in actions.iter_mut() { - let adjusted_deadline = now + action.0.threshold; - - if action.0.deadline <= adjusted_deadline { - action.0.deadline = now + action.0.interval; - batched_actions.push((key.clone(), action.1.clone())); - } - } - - batched_actions - } - - if let Some(closest_deadline) = self - .actions - .values() - .min_by_key(|entry| entry.0.deadline) - .map(|v| v.0.deadline) - { - let early_batch = last_network_activity.map(|na| self.is_trigger_outstanding(na)); - - if let Some(true) = early_batch { - let batched_actions = collect_batch_jobs(&mut self.actions); - - if !batched_actions.is_empty() { - // TODO(LLT-5807): if there's an eligible trigger, maybe it's worth just - // emitting all the actions there are. Given a big enough cooldown it should - // pose no harm at the cost of near perfect alignment in one go. This should be - // experimented with. - self.last_trigger_checkpoint = Instant::now(); - return Ok(batched_actions); - } - } - - // we failed to early batch any actions, so lets wait until the closest one resolves - _ = sleep_until(closest_deadline).await; - - let batched_actions = collect_batch_jobs(&mut self.actions); - - if batched_actions.is_empty() { - telio_log_error!("Batcher resolves with empty list of jobs"); - return Err(BatcherError::NoActions); - } - self.last_trigger_checkpoint = Instant::now(); - return Ok(batched_actions); - } else { - return Err(BatcherError::NoActions); - } - } - - /// Remove batcher action. Action is no longer eligible for batching - fn remove(&mut self, key: &K) { - telio_log_debug!("removing item from batcher with key({:?})", key); - self.actions.remove(key); - } - - /// Add batcher action. Batcher itself doesn't run the tasks and depends - /// on actions being polled and invoked. It is a passive component on itself. - /// Adding an action has immediate deadline thus action is invoked as soon - /// as it's polled. - /// - /// Higher threshold means more opportunities to batch the action, however too high - /// means wasting resources since the job will be batched alongside other jobs more - /// frequently than necessary. For example an infrequent job with high - /// threshold will batch together with higher frequency job(shorter interval) with no added - /// value. - /// - /// A threshold of 0 makes batcher act effectively as `RepeatedActions` and will have - /// no opportunities to be batched. A small threshold will limit batching opportunities. - /// - /// Proving that the threshold of half the period is enough between two actions with same interval - /// when being added at different times can be done visually. In the scenario below - /// we have two actions A and B that have same interval and threshold: - /// - /// Legends: - /// - `d` - next deadline - /// - `D` - current deadline - /// - `-` - time tick - /// - `*` - jobs threshold - /// - '_' - indicates that action is not yet added - /// - /// ```text - /// Step #1 - /// A: D------******d------******d------******d------******d-----> time - /// B: __d------******d------******d------******d------******d---> time - /// ``` - /// at the beginning, job A was added and thus is immediately emitted. B doesn't yet exist - /// as it's added a bit later. Once B is added, it's deadline is immediate and it's next - /// deadline is shifted after one interval. Once B is added it cannot batch A since it's - /// not within the threshold. - /// - /// however a nice thing happens on the next A deadline: - /// - /// ```text - /// Step #2 - /// A: d------******D------******d------******d------******d-----> time - /// B: __d------******d------******d------******d------******d---> time - /// ``` - /// - /// At this point A will batch action B as well, since it's `deadline-threshold` is within the reach - /// and the timelines suddenly align afterwards: - /// - /// ```text - /// Step #3 - /// A: d------******d------******d------******d------******d-----> time - /// B: __d------****d------******d------******d------******d-----> time - /// ``` - /// - /// From now on A and B will always emit together(order is not guaranteed nor maintained). - /// - /// This is however a simplified approach and is not as easy to prove exactly what happens - /// when: - /// - jobs have same interval but different threshold - /// - jobs have different intervals. - /// - jobs have different intervals and different thresholds - /// - /// Thus it's best to aim for: - /// - as high interval as possible - /// - threshold being close to half the interval - fn add(&mut self, key: K, interval: Duration, threshold: Duration, action: Action) { - telio_log_debug!( - "adding item to batcher with key({:?}), interval({:?}), threshold({:?})", - key, - interval, - threshold, - ); - - let interval = { - if interval < Self::MIN_INTERVAL { - telio_log_warn!( - "Interval of {:?} is too short, overriding to {:?}", - interval, - Self::MIN_INTERVAL - ); - Self::MIN_INTERVAL - } else { - interval - } - }; - - let max_threshold = interval / 2; - - let threshold = { - if threshold > max_threshold { - telio_log_warn!( - "Threshold of {:?} is higher than maximum allowed threshold calculated: {:?}, overriding", - threshold, - max_threshold - ); - max_threshold - } else { - threshold - } - }; - - let entry = BatchEntry { - deadline: Instant::now(), - interval, - threshold, - }; - - self.actions.insert(key, (entry, action)); - } - - fn get_interval(&self, key: &K) -> Option { - self.actions.get(key).map(|(entry, _)| entry.interval) - } -} - -#[cfg(test)] -mod tests { - use crate::batcher::{Batcher, BatcherError, BatcherTrait, BatchingOptions}; - use std::sync::Arc; - use tokio::sync::{watch, Mutex}; - use tokio::time::*; - - struct TestChecker { - values: Vec<(String, Instant)>, - } - - #[derive(Copy, Clone)] - struct FakeNetwork { - latency: Duration, - } - - impl FakeNetwork { - async fn send( - &self, - peer_a_activity: Arc>, - peer_b_activity: Arc>, - ) { - // We're concerned about any activity, thus we just send to channel of one peer - // to simulate the send-trigger and after latency we simulate receival on another - // channel - peer_a_activity.send(Instant::now()).unwrap(); - - // latency sim - sleep(self.latency).await; - - // Simulate peer receiving data after some latency - peer_b_activity.send(Instant::now()).unwrap(); - } - } - - #[tokio::test(start_paused = true)] - async fn test_triggered_batching() { - struct TestHelperStruct { - trigger_enabled: bool, - latency: Duration, - start_emit_traffic_at: Instant, - stop_emit_traffic_at: Instant, - emit_packet_every: Duration, - test_duration: Duration, - delay_to_add_second_peer: Duration, - left_batching_options: BatchingOptions, - right_batching_options: BatchingOptions, - } - - #[derive(Debug)] - struct TestHelperResult { - left_peer_avg_interval: Duration, - right_peer_avg_interval: Duration, - avg_aligned_interval: Duration, - } - - // testing is split into two parts and results are stored separately. This allows for - // flexibility when asserting results more accurately and expectedly - async fn test_helper(params: TestHelperStruct) -> (TestHelperResult, TestHelperResult) { - let (left_send, left_recv) = watch::channel(Instant::now()); - let (right_send, right_recv) = watch::channel(Instant::now()); - - let left_send = Arc::new(left_send); - let right_send = Arc::new(right_send); - - let fake_network = Arc::new(FakeNetwork { - latency: params.latency, - }); - - let mut batcher_left = - Batcher::>>::new(params.left_batching_options); - let mut batcher_right = - Batcher::>>::new(params.right_batching_options); - - let test_checker_left = Arc::new(Mutex::new(TestChecker { values: Vec::new() })); - let test_checker_right = Arc::new(Mutex::new(TestChecker { values: Vec::new() })); - - let first_half_left_values = Arc::new(Mutex::new(vec![])); - let first_half_right_values = Arc::new(Mutex::new(vec![])); - - // at half the testcase, save the values and erase existing ones to remove the bias - // when calculating test metrics - // - { - let test_duration = params.test_duration.clone(); - let test_checker_left = test_checker_left.clone(); - let test_checker_right = test_checker_right.clone(); - let first_half_left_values = first_half_left_values.clone(); - let first_half_right_values = first_half_right_values.clone(); - - tokio::spawn(async move { - sleep(test_duration / 2).await; - - let mut left = test_checker_left.lock().await; - let mut right = test_checker_right.lock().await; - - let mut l = first_half_left_values.lock().await; - let mut r = first_half_right_values.lock().await; - - *l = (*left.values).to_vec(); - *r = (*right.values).to_vec(); - - left.values.clear(); - right.values.clear() - }); - } - - // simulate emission of some organic traffic - { - let fake_network = fake_network.clone(); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - let start = params.start_emit_traffic_at; - let deadline = params.stop_emit_traffic_at; - - let emit_packet_every = params.emit_packet_every; - - tokio::spawn(async move { - sleep_until(start).await; - - loop { - tokio::select! { - _ = sleep_until(deadline) => { - break - } - _ = sleep(emit_packet_every) => { - let _ = fake_network - .send(left_send.clone(), right_send.clone()) - .await; - - } - } - } - }); - } - - { - let fake_network = fake_network.clone(); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - batcher_left.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(move |s: _| { - let fake_net = Arc::clone(&fake_network); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - Box::pin(async move { - { - let mut s = s.lock().await; - s.values.push(("key".to_owned(), Instant::now())); - } - - let _ = fake_net.send(left_send, right_send).await; - Ok(()) - }) - }), - ); - } - - { - let left_send = left_send.clone(); - let right_send = right_send.clone(); - let fake_network = fake_network.clone(); - - batcher_right.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(move |s: _| { - let fake_net = Arc::clone(&fake_network); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - Box::pin(async move { - { - let mut s = s.lock().await; - s.values.push(("key".to_owned(), Instant::now())); - } - let _ = fake_net.send(right_send, left_send).await; - Ok(()) - }) - }), - ); - } - - fn spawn_batcher_poller( - trigger: bool, - mut network_activity_sub: watch::Receiver, - mut batcher: Batcher>>, - mut checker: Arc>, - ) { - tokio::spawn(async move { - let mut last_activity = { - if trigger { - Some(Instant::now()) - } else { - None - } - }; - - loop { - tokio::select! { - Ok( _) = network_activity_sub.changed() => { - if trigger { - last_activity = Some(*network_activity_sub.borrow_and_update()); - } else { - last_activity = None; - } - - } - Ok(ac) = batcher.get_actions(last_activity) => { - for a in ac { - a.1(&mut checker).await.unwrap(); - } - } - } - } - }); - } - - spawn_batcher_poller( - params.trigger_enabled, - left_recv, - batcher_left, - test_checker_left.clone(), - ); - - // add another action after a delay to the peer so they would be misaligned - sleep(params.delay_to_add_second_peer).await; - - spawn_batcher_poller( - params.trigger_enabled, - right_recv, - batcher_right, - test_checker_right.clone(), - ); - - sleep(params.test_duration).await; - - let first_half_left_values = first_half_left_values - .lock() - .await - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - let first_half_right_values = first_half_right_values - .lock() - .await - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - - let second_half_left_values = test_checker_left - .lock() - .await - .values - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - let second_half_right_values = test_checker_right - .lock() - .await - .values - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - - fn average_alignment(left: &[Duration], right: &[Duration]) -> Duration { - if left.is_empty() || right.is_empty() { - panic!("no data present"); - } - - fn find_closest_value(v: Duration, data: &[Duration]) -> Duration { - data.iter() - .min_by_key(|&&ts| (ts.as_millis().abs_diff(v.as_millis()))) - .copied() - .unwrap_or(v) - } - - let mut total_diff = 0; - let mut count = 0; - - for &t1 in left { - let closest_t2 = find_closest_value(t1, right); - total_diff += t1.as_millis().abs_diff(closest_t2.as_millis()); - count += 1; - } - - Duration::from_millis((total_diff / count) as u64) - } - - fn average_difference(numbers: &[Duration]) -> Duration { - if numbers.len() < 2 { - panic!("not enough elements"); - } - - let total_diff: u128 = numbers - .windows(2) - .map(|w| (w[0].as_millis().abs_diff(w[1].as_millis() as u128))) - .sum(); - - let count = numbers.len() - 1; - Duration::from_millis((total_diff / count as u128) as u64) - } - - let res_first_half = TestHelperResult { - left_peer_avg_interval: average_difference(first_half_left_values.as_slice()), - right_peer_avg_interval: average_difference(first_half_right_values.as_slice()), - - avg_aligned_interval: average_alignment( - first_half_left_values.as_slice(), - first_half_right_values.as_slice(), - ), - }; - - let res_second_half = TestHelperResult { - left_peer_avg_interval: average_difference(second_half_left_values.as_slice()), - right_peer_avg_interval: average_difference(second_half_right_values.as_slice()), - - avg_aligned_interval: average_alignment( - second_half_left_values.as_slice(), - second_half_right_values.as_slice(), - ), - }; - - return (res_first_half, res_second_half); - } - - fn durations_close_enough(d1: Duration, d2: Duration, tolerance: Duration) -> bool { - let diff = if d1 > d2 { d1 - d2 } else { d2 - d1 }; - diff <= tolerance - } - - fn within_1s(d1: Duration, d2: Duration) -> bool { - durations_close_enough(d1, d2, Duration::from_secs(1)) - } - - fn within_1ms(d1: Duration, d2: Duration) -> bool { - durations_close_enough(d1, d2, Duration::from_millis(1)) - } - - // actual testcases - { - // due to traffic being triggered every second for the duration of the testcase, we do - // not expect effective batching to happen. This is because trigger doesn't differentiate the - // traffic and thus constant activity just means attempts to batch all the time which - // just means that intervals will be shortened to `T - threshold` but no alignment - // between peers happens. The trigger cooldown is also high enough that it doesn't - // interfere with average interval - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now(), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(600), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(600), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - // 17seconds in the test we add a secod peer and expect this amount of misalignment - // between the peers - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(17) - )); - assert!(within_1s( - res_second_half.avg_aligned_interval, - Duration::from_secs(20) - )); - } - - { - // due to traffic being triggered every second for the duration of the testcase, we do - // not expect effective batching to happen. This is because trigger doesn't differentiate the - // traffic and thus constant activity just means attempts to batch all the time which - // just means that intervals will be shortened to `T - threshold` but no alignment - // between peers happens. The trigger cooldown is short enough to showcase that it just - // reduces the action intervals but doesn't achieve alignment - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now(), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - // 17seconds in the test we add a secod peer and expect this amount of misalignment - // between the peers - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(17) - )); - assert!(within_1s( - res_second_half.avg_aligned_interval, - Duration::from_secs(18) - )); - } - - { - // Start emitting traffic in the middle of the testcase. It should have no effect since - // batcher already aligned both peers and triggering more often has no effect on that except - // on local interval shortening(side effect or triggering) - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now() + Duration::from_secs(1800), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(1) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1ms( - res_second_half.avg_aligned_interval, - Duration::from_millis(333), - )); - } - } - // - #[tokio::test] - async fn no_actions() { - let mut batcher = Batcher::::new(BatchingOptions::default()); - assert!(matches!( - batcher.get_actions(None).await, - Err(BatcherError::NoActions) - )); - } - - #[tokio::test(start_paused = true)] - async fn no_threshold_expect_no_batching() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - let mut test_checker = TestChecker { values: Vec::new() }; - - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - // simulate some delay before adding a second task so they would be misaligned - advance(Duration::from_secs(20)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // Await for few actions to resolve - for _ in 0..6 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - // expect no batched behaviour since the thresholds were zero - assert!(test_checker.values[0].1.duration_since(start_time) == Duration::from_secs(0)); - assert!(test_checker.values[1].1.duration_since(start_time) == Duration::from_secs(20)); - assert!(test_checker.values[2].1.duration_since(start_time) == Duration::from_secs(100)); - assert!(test_checker.values[3].1.duration_since(start_time) == Duration::from_secs(120)); - assert!(test_checker.values[4].1.duration_since(start_time) == Duration::from_secs(200)); - assert!(test_checker.values[5].1.duration_since(start_time) == Duration::from_secs(220)); - assert!(test_checker.values[6].1.duration_since(start_time) == Duration::from_secs(300)); - } - - #[tokio::test(start_paused = true)] - async fn batch_one_no_threshold_expect_no_batch() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 1); - - // pick up the second event - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 2); - - assert!(test_checker.values[0].1.duration_since(start_time) == Duration::from_secs(0)); - assert!(test_checker.values[1].1.duration_since(start_time) == Duration::from_secs(100)); - - // after a pause of retrieving actions there will be a delay in signals as well as they are not active on their own - tokio::time::advance(Duration::from_secs(550)).await; - assert!(test_checker.values.len() == 2); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 3); - assert!(test_checker.values[2].1.duration_since(start_time) == Duration::from_secs(650)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_and_delay() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // At this point there are two actions added, one at t(0) and another at t(30). - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fires - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 2); - - // Do again and batching should be in action since the threshold of the second signal is bigger(50) than the delay between the packets(30) - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 4); - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // Immediate fires were supressed because we need to poll them and we did so only after 30seconds - // Thus everything will be aligned at 30seconds - - assert!(key0_entries[0] == Duration::from_secs(30)); - assert!(key1_entries[0] == Duration::from_secs(30)); - - assert!(key0_entries[1] == Duration::from_secs(130)); - assert!(key1_entries[1] == Duration::from_secs(130)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_check_threshold_limit_enforcement() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(30), - Duration::from_secs(30), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - tokio::time::advance(Duration::from_secs(5)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(5), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for _ in 0..8 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // At this point we expect the first job to be batched alongside the - // second, very frequent job at half the interval(capped) - assert!(key0_entries[0] == Duration::from_secs(0)); - assert!(key0_entries[1] == Duration::from_secs(15)); - assert!(key0_entries[2] == Duration::from_secs(30)); - - assert!(key1_entries[0] == Duration::from_secs(5)); - assert!(key1_entries[1] == Duration::from_secs(10)); - assert!(key1_entries[2] == Duration::from_secs(15)); - assert!(key1_entries[3] == Duration::from_secs(20)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_check_interval_limit_enforcement() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - // Make interval and threshold too small and expect override - Duration::from_secs(1), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - tokio::time::advance(Duration::from_secs(1)).await; - - batcher.add( - "key1".to_owned(), - // Make interval too small and expect override - Duration::from_secs(2), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for _ in 0..8 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // Expect forcing minimal interval of 5seconds - assert!(key0_entries[0] == Duration::from_secs(0)); - assert!(key0_entries[1] == Duration::from_secs(5)); - assert!(key0_entries[2] == Duration::from_secs(10)); - assert!(key0_entries[3] == Duration::from_secs(15)); - - assert!(key1_entries[0] == Duration::from_secs(1)); - assert!(key1_entries[1] == Duration::from_secs(6)); - assert!(key1_entries[2] == Duration::from_secs(11)); - assert!(key1_entries[3] == Duration::from_secs(16)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_no_threshold_delayed_check() { - let _start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // At this point there are two actions added, one at t(0) and another at t(30). - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!( - test_checker.values.len() == 2, - "Both actions should be emitted immediately" - ); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 4, "Even though there was no threshold and actions were added at different times, we query only after adding the second one which resets both deadlines and aligns them"); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_no_threshold_nodelay_check() { - let start_time = Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values.push(("key0".to_owned(), Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 1); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values.push(("key1".to_owned(), Instant::now())); - Ok(()) - }) - }), - ); - - // pick up the immediate fire from the second action - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 2); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!( - test_checker.values.len() == 3, - "No threshold but a delay was given, thus one signal should have resolved" - ); - - test_checker.values = vec![]; - for _ in 0..12 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0sum: tokio::time::Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - let key1sum: tokio::time::Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - // Because of misalignment, each call produces only one action instead of both - assert!(key0sum == Duration::from_secs(500)); - assert!(key1sum == Duration::from_secs(500)); - - // Now let's wait a bit so upon iterating again signals would be aligned - test_checker.values = vec![]; - tokio::time::advance(Duration::from_secs(500)).await; - - for _i in 0..11 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0sum: Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - let key1sum: Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - assert!(key0sum == Duration::from_secs(1000)); - assert!(key1sum == Duration::from_secs(1000)); - } -} diff --git a/crates/telio-traversal/src/lib.rs b/crates/telio-traversal/src/lib.rs index c1f72450e..73057365a 100644 --- a/crates/telio-traversal/src/lib.rs +++ b/crates/telio-traversal/src/lib.rs @@ -1,6 +1,5 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -pub mod batcher; pub mod connectivity_check; pub mod endpoint_providers; pub mod endpoint_state; diff --git a/crates/telio-traversal/src/session_keeper.rs b/crates/telio-traversal/src/session_keeper.rs index ef17fb4b0..577eedf3c 100644 --- a/crates/telio-traversal/src/session_keeper.rs +++ b/crates/telio-traversal/src/session_keeper.rs @@ -1,4 +1,3 @@ -use crate::batcher::{Batcher, BatcherTrait, BatchingOptions}; use async_trait::async_trait; use socket2::Type; use std::future::Future; @@ -10,18 +9,11 @@ use surge_ping::{ SurgeError, ICMP, }; use telio_crypto::PublicKey; -use telio_model::features::FeatureBatching; use telio_sockets::SocketPool; use telio_task::{task_exec, BoxAction, Runtime, Task}; use telio_utils::{ dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions, }; -use tokio::sync::watch; -use tokio::time::Instant; - -use futures::future::{pending, BoxFuture}; -use futures::FutureExt; - const PING_PAYLOAD_SIZE: usize = 56; /// Possible [SessionKeeper] errors. @@ -57,28 +49,19 @@ pub trait SessionKeeperTrait { public_key: PublicKey, target: dual_target::Target, interval: Duration, - threshold: Option, ) -> Result<()>; async fn remove_node(&self, key: &PublicKey) -> Result<()>; async fn get_interval(&self, key: &PublicKey) -> Option; } pub struct SessionKeeper { + batch_all: bool, task: Task, } impl SessionKeeper { - pub fn start( - sock_pool: Arc, - batching_feature: FeatureBatching, - network_activity: Option>, - - #[cfg(test)] batcher: Box>, - ) -> Result { - telio_log_debug!( - "Starting SessionKeeper with network subscriber: {}", - network_activity.is_some() - ); + pub fn start(sock_pool: Arc, batch_all: bool) -> Result { + telio_log_debug!("Starting with batch_all({})", batch_all); let (client_v4, client_v6) = ( PingerClient::new(&Self::make_builder(ICMP::V4).build()) .map_err(|e| Error::PingerCreationError(ICMP::V4, e))?, @@ -90,21 +73,14 @@ impl SessionKeeper { sock_pool.make_internal(client_v6.get_socket().get_native_sock())?; Ok(Self { + batch_all, task: Task::start(State { pingers: Pingers { pinger_client_v4: client_v4, pinger_client_v6: client_v6, }, - #[cfg(test)] - batched_actions: batcher, - - #[cfg(not(test))] - batched_actions: Box::new(Batcher::new(batching_feature.into())), - - nonbatched_actions: RepeatedActions::default(), - - network_activity, + actions: RepeatedActions::default(), }), }) } @@ -194,71 +170,55 @@ impl SessionKeeperTrait for SessionKeeper { public_key: PublicKey, target: dual_target::Target, interval: Duration, - threshold: Option, ) -> Result<()> { let dual_target = DualTarget::new(target).map_err(Error::DualTargetError)?; - match threshold { - Some(t) => task_exec!(&self.task, async move |s| { - s.batched_actions.add( - public_key, - interval, - t, - Arc::new(move |c: &mut State| { - Box::pin(async move { - telio_log_debug!("Batch-Pinging: {:?}", public_key); - if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { - telio_log_warn!( - "Failed to batch-ping, peer with key: {:?}, error: {:?}", - public_key, - e - ); - } - Ok(()) - }) - }), - ); - - Ok(()) - }) - .await - .map_err(Error::Task)?, - None => task_exec!(&self.task, async move |s| { - if s.nonbatched_actions.contains_action(&public_key) { - let _ = s.nonbatched_actions.remove_action(&public_key); - } + let batch_all = self.batch_all; + telio_log_debug!( + "Add action for {} and interval {:?}. batch_all({})", + public_key, + interval, + batch_all + ); - Ok(s.nonbatched_actions.add_action( - public_key, - interval, - Arc::new(move |c| { - Box::pin(async move { - if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { - telio_log_warn!( - "Failed to ping, peer with key: {:?}, error: {:?}", - public_key, - e - ); - } - Ok(()) - }) - }), - )) - }) - .await - .map_err(Error::Task)? - .map_err(Error::RepeatedActionError) - .map(|_| ())?, - } + task_exec!(&self.task, async move |s| { + if s.actions.contains_action(&public_key) { + let _ = s.actions.remove_action(&public_key); + } + if batch_all { + s.actions.set_all_immediate(); + } + + Ok(s.actions.add_action( + public_key, + interval, + Arc::new(move |c| { + Box::pin(async move { + if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { + telio_log_warn!( + "Failed to ping, peer with key: {:?}, error: {:?}", + public_key, + e + ); + } + Ok(()) + }) + }), + )) + }) + .await + .map_err(Error::Task)? + .map_err(Error::RepeatedActionError)?; + + telio_log_debug!("Added {}", public_key); Ok(()) } async fn remove_node(&self, key: &PublicKey) -> Result<()> { let pk = *key; task_exec!(&self.task, async move |s| { - let _ = s.nonbatched_actions.remove_action(&pk); - let _ = s.batched_actions.remove(&pk); + let _ = s.actions.remove_action(&pk); Ok(()) }) .await?; @@ -266,31 +226,16 @@ impl SessionKeeperTrait for SessionKeeper { Ok(()) } - // TODO: SK calls batched and nonbatched actions interchangibly, however call sites in general - // should be aware which one to call async fn get_interval(&self, key: &PublicKey) -> Option { let pk = *key; task_exec!(&self.task, async move |s| { - if let Some(interval) = s.batched_actions.get_interval(&pk) { - Ok(Some(interval.as_secs() as u32)) - } else { - Ok(s.nonbatched_actions.get_interval(&pk)) - } + Ok(s.actions.get_interval(&pk)) }) .await .unwrap_or(None) } } -impl From for BatchingOptions { - fn from(f: FeatureBatching) -> Self { - Self { - trigger_effective_duration: Duration::from_secs(f.trigger_effective_duration.into()), - trigger_cooldown_duration: Duration::from_secs(f.trigger_cooldown_duration.into()), - } - } -} - struct Pingers { pinger_client_v4: PingerClient, pinger_client_v6: PingerClient, @@ -298,9 +243,7 @@ struct Pingers { struct State { pingers: Pingers, - batched_actions: Box>, - nonbatched_actions: RepeatedActions>, - network_activity: Option>, + actions: RepeatedActions>, } #[async_trait] @@ -312,26 +255,8 @@ impl Runtime for State { where F: Future>> + Send, { - let last_network_activity = self - .network_activity - .as_mut() - .map(|receiver| *receiver.borrow_and_update()); - - let network_change_fut: BoxFuture< - '_, - std::result::Result<(), telio_utils::sync::watch::error::RecvError>, - > = { - match self.network_activity { - Some(ref mut na) => na.changed().boxed(), - None => pending::<()>().map(|_| Ok(())).boxed(), - } - }; - tokio::select! { - _ = network_change_fut => { - return Ok(()); - } - Ok((pk, action)) = self.nonbatched_actions.select_action() => { + Ok((pk, action)) = self.actions.select_action() => { let pk = *pk; action(self) .await @@ -340,15 +265,6 @@ impl Runtime for State { Ok(()) }, |_| Ok(()))?; } - Ok(batched_actions) = self.batched_actions.get_actions(last_network_activity) => { - for (pk, action) in batched_actions { - action(self).await.map_or_else(|e| { - telio_log_warn!("({}) Error sending batch-keepalive to {}: {:?}", Self::NAME, pk, e); - Ok(()) - }, |_| Ok(()))?; - } - } - update = update => { return update(self).await; } @@ -364,7 +280,6 @@ impl Runtime for State { #[cfg(test)] mod tests { use super::*; - use crate::batcher::{BatcherError, MockBatcherTrait}; use std::net::{Ipv4Addr, Ipv6Addr}; use telio_crypto::PublicKey; use telio_sockets::NativeProtector; @@ -383,13 +298,7 @@ mod tests { ) .unwrap(), )); - let sess_keep = SessionKeeper::start( - socket_pool, - FeatureBatching::default(), - None, - Box::new(Batcher::new(FeatureBatching::default().into())), - ) - .unwrap(); + let sess_keep = SessionKeeper::start(socket_pool, false).unwrap(); let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" .parse::() @@ -400,7 +309,6 @@ mod tests { pk, (Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)), PERIOD, - None, ) .await .unwrap(); @@ -456,66 +364,4 @@ mod tests { .await .unwrap(); } - - #[tokio::test] - async fn test_batcher_invocation() { - const PERIOD: Duration = Duration::from_secs(20); - - const THRESHOLD: Duration = Duration::from_secs(10); - let socket_pool = Arc::new(SocketPool::new( - NativeProtector::new( - #[cfg(target_os = "macos")] - false, - ) - .unwrap(), - )); - - let mut batcher = Box::new(MockBatcherTrait::::new()); - - let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" - .parse::() - .unwrap(); - - use mockall::predicate::{always, eq}; - batcher - .expect_add() - .once() - .with(eq(pk), eq(PERIOD), eq(THRESHOLD), always()) - .return_once(|_, _, _, _| ()); - batcher - .expect_remove() - .once() - .with(eq(pk)) - .return_once(|_| ()); - - // it's hard to mock the exact return since it involves a complex type, however we - // can at least verify that the batcher's actions were queried - batcher - .expect_get_actions() - .times(..) - .returning(|_| Err(BatcherError::NoActions)); - - let sess_keep = SessionKeeper::start( - socket_pool, - FeatureBatching::default().into(), - None, - batcher, - ) - .unwrap(); - - sess_keep - .add_node( - pk, - (Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)), - PERIOD, - Some(THRESHOLD), - ) - .await - .unwrap(); - - sess_keep.remove_node(&pk).await.unwrap(); - - // courtesy wait to be sure the runtime polls everything - sess_keep.stop().await; - } } diff --git a/crates/telio-utils/src/repeated_actions.rs b/crates/telio-utils/src/repeated_actions.rs index c30a62922..18bfc5774 100644 --- a/crates/telio-utils/src/repeated_actions.rs +++ b/crates/telio-utils/src/repeated_actions.rs @@ -29,7 +29,6 @@ pub enum RepeatedActionError { /// Single action type pub type RepeatedAction = Arc Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>; -type Action = (K, (Interval, RepeatedAction)); type Result = std::result::Result; /// Main struct container, that hold all actions @@ -57,6 +56,13 @@ where } } + /// Set all actions to be executed when polled next time + pub fn set_all_immediate(&mut self) { + self.actions + .values_mut() + .for_each(|v| v.0.reset_immediately()); + } + /// Add single action (first tick is immediate) pub fn add_action( &mut self, @@ -128,17 +134,6 @@ where } } -impl From<[Action; N]> for RepeatedActions -where - K: Eq + Hash + Send + Sync, -{ - fn from(arr: [Action; N]) -> Self { - RepeatedActions:: { - actions: HashMap::from(arr), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -171,6 +166,90 @@ mod tests { self.test = str; Ok(()) } + + pub fn get(&self) -> &str { + &self.test + } + } + + #[tokio::test(start_paused = true)] + async fn test_set_all_immediate() { + let mut ctx = Context::new("test".to_owned()); + + let start = Instant::now(); + + ctx.actions + .add_action( + "action_0".to_owned(), + Duration::from_secs(10), + Arc::new({ + let start = start.clone(); + move |s: _| { + Box::pin({ + let start = start.clone(); + async move { + s.change(format!("ts_{}", start.elapsed().as_secs()).to_owned()) + .await + } + }) + } + }), + ) + .unwrap(); + + // immediate action + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + tokio::time::advance(Duration::from_secs(3)).await; + + ctx.actions + .add_action( + "action_1".to_owned(), + Duration::from_secs(10), + Arc::new({ + let start = start.clone(); + move |s: _| { + Box::pin({ + let start = start.clone(); + async move { + s.change(format!("ts_{}", start.elapsed().as_secs()).to_owned()) + .await + } + }) + } + }), + ) + .unwrap(); + + { + let values_to_expect = + vec!["ts_3", "ts_10", "ts_13", "ts_20", "ts_23", "ts_30", "ts_33"]; + + for v in values_to_expect { + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + assert_eq!(ctx.get(), v); + } + } + + // we have proven that two actions are misaligned + ctx.actions.set_all_immediate(); + + { + let aligned_values = vec!["ts_33", "ts_33", "ts_43", "ts_43"]; + + for v in aligned_values { + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + assert_eq!(ctx.get(), v); + } + } } #[tokio::test] diff --git a/crates/telio-wg/src/wg.rs b/crates/telio-wg/src/wg.rs index a6b562f3e..7aacde10f 100644 --- a/crates/telio-wg/src/wg.rs +++ b/crates/telio-wg/src/wg.rs @@ -240,8 +240,6 @@ struct State { stats: HashMap>>, ip_stack: Option, - - network_activity: Option>, } const POLL_MILLIS: u64 = 1000; @@ -328,7 +326,6 @@ impl DynamicWg { /// firewall_reset_connections: None, /// }, /// None, - /// None, /// true, /// ); /// } @@ -337,7 +334,6 @@ impl DynamicWg { io: Io, cfg: Config, link_detection: Option, - batching: Option, ipv6_enabled: bool, ) -> Result where @@ -349,25 +345,17 @@ impl DynamicWg { io, adapter, link_detection, - batching, cfg, ipv6_enabled, )); #[cfg(windows)] - return Ok(Self::start_with( - io, - adapter, - link_detection, - batching, - ipv6_enabled, - )); + return Ok(Self::start_with(io, adapter, link_detection, ipv6_enabled)); } fn start_with( io: Io, adapter: Box, link_detection: Option, - batching: Option, #[cfg(unix)] cfg: Config, ipv6_enabled: bool, ) -> Self { @@ -386,7 +374,6 @@ impl DynamicWg { libtelio_event: io.libtelio_wide_event_publisher, stats: HashMap::new(), ip_stack: None, - network_activity: batching.map(|_| watch::channel(Instant::now()).0), }), } } @@ -414,19 +401,6 @@ impl DynamicWg { Err(Error::RestartFailed) } } - - /// Returns a channel that can be used to observe WireGuard network activity - pub async fn subscribe_to_network_activity( - &self, - ) -> Result>, Error> { - Ok(task_exec!(&self.task, async move |s| { - match s.network_activity { - Some(ref mut na) => Ok(Some(na.subscribe())), - None => Ok(None), - } - }) - .await?) - } } #[async_trait] @@ -941,21 +915,13 @@ impl State { mut to: uapi::Interface, reason: UpdateReason, ) -> Result { - let mut new_network_activity = false; - for (pk, peer) in &mut to.peers { match self.stats.get_mut(pk) { Some(stats) => match stats.lock().as_mut() { Ok(s) => { - let before_rx = s.rx_bytes; - let before_tx = s.tx_bytes; - let new_rx = peer.rx_bytes.unwrap_or_default(); let new_tx = peer.tx_bytes.unwrap_or_default(); - if new_rx > before_rx || new_tx > before_tx { - new_network_activity = true; - } s.update(new_rx, new_tx); } Err(e) => { @@ -976,12 +942,6 @@ impl State { } self.stats.retain(|pk, _| to.peers.contains_key(pk)); - if new_network_activity { - if let Some(ref mut na) = self.network_activity { - let _ = na.send(Instant::now()); - } - } - // Diff and report events // Adapter doesn't keep track of mesh addresses, or endpoint changes, @@ -1305,7 +1265,6 @@ pub mod tests { }, Box::new(adapter.clone()), None, - None, #[cfg(all(unix, test))] Config::new().unwrap(), #[cfg(all(unix, not(test)))] diff --git a/src/device.rs b/src/device.rs index 00c2cdb56..50215ad06 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1120,7 +1120,6 @@ impl Runtime { firewall_reset_connections, }, features.link_detection, - features.batching, features.ipv6, )?); let wg_events = wg_events.rx; @@ -1325,11 +1324,7 @@ impl Runtime { let session_keeper = { match SessionKeeper::start( self.entities.socket_pool.clone(), - self.features.batching.unwrap_or_default(), - self.entities - .wireguard_interface - .subscribe_to_network_activity() - .await?, + self.features.batching.is_some(), ) { Ok(sk) => Some(Arc::new(sk)), Err(e) => { @@ -1341,10 +1336,13 @@ impl Runtime { // Batching optimisations work by employing SessionKeeper. If SessionKeeper is not present // functionality will break when offloading actions to it, thus we disable the feature - if session_keeper.is_none() && self.features.batching.is_some() { - telio_log_warn!( - "Batching feature is enabled but SessionKeeper failed to start. Disabling batching." - ); + if session_keeper.is_none() { + if self.features.batching.is_some() { + // Batching feature enables batching for various kinds of keepalives provided by + // SessionKeeper. As a fallback, disable batcher in case SessionKeeper fails to + // start to preerve maximum operatibility of libtelio + telio_log_warn!("Batching feature is enabled but SessionKeeper failed to start. Disabling batching."); + } self.features.batching = None; } diff --git a/src/device/wg_controller.rs b/src/device/wg_controller.rs index 9d74307e5..51d33295c 100644 --- a/src/device/wg_controller.rs +++ b/src/device/wg_controller.rs @@ -390,11 +390,6 @@ async fn consolidate_wg_peers< } } - let batcher_threshold = features - .batching - .as_ref() - .map(|b| Duration::from_secs(b.direct_connection_threshold as u64)); - for key in insert_keys { telio_log_info!("Inserting peer: {:?}", requested_peers.get(key)); let peer = requested_peers.get(key).ok_or(Error::PeerNotFound)?; @@ -405,21 +400,17 @@ async fn consolidate_wg_peers< // Add peer to session keeper if needed match (session_keeper, peer.batching_keepalive_interval) { (Some(sk), Some(keepalive_interval)) => { + telio_log_debug!("Adding to SessionKeeper"); let target = build_ping_endpoint(&ip_addresses, features.ipv6); if target.0.is_some() || target.1.is_some() { - sk.add_node( - *key, - target, - Duration::from_secs(keepalive_interval as u64), - batcher_threshold, - ) - .await?; + sk.add_node(*key, target, Duration::from_secs(keepalive_interval as u64)) + .await?; } else { telio_log_warn!("Peer {:?} has no ip address", key); } } (None, _) => telio_log_debug!("The session keeper is missing!"), - _ => (), + _ => {} } if let Some(stun) = stun_ep_provider { @@ -465,13 +456,8 @@ async fn consolidate_wg_peers< let target = build_ping_endpoint(&requested_peer.peer.ip_addresses, features.ipv6); if target.0.is_some() || target.1.is_some() { - sk.add_node( - *key, - target, - Duration::from_secs(interval as u64), - batcher_threshold, - ) - .await?; + sk.add_node(*key, target, Duration::from_secs(interval as u64)) + .await?; } else { telio_log_warn!("Peer {:?} has no ip address", key); } @@ -548,7 +534,6 @@ async fn consolidate_wg_peers< .unwrap_or(requested_state.keepalive_periods.direct) .into(), ), - None, ) .await?; } else { @@ -766,7 +751,6 @@ async fn build_requested_peers_list< } (_, pq_keys) => { let preshared_key = pq_keys.map(|pq| pq.pq_shared); - requested_peers.insert( exit_node.public_key, RequestedPeer { @@ -2112,10 +2096,7 @@ mod tests { } } - fn then_keeper_add_node( - &mut self, - input: Vec<(PublicKey, IpAddr, Option, u32, Option)>, - ) { + fn then_keeper_add_node(&mut self, input: Vec<(PublicKey, IpAddr, Option, u32)>) { for i in input { let ip4 = { match i.1 { @@ -2141,9 +2122,8 @@ mod tests { eq(i.0), eq((Some(ip4), ip6)), eq(Duration::from_secs(i.3.into())), - eq(i.4), ) - .return_once(|_, _, _, _| Ok(())); + .return_once(|_, _, _| Ok(())); } } @@ -2268,13 +2248,7 @@ mod tests { )]); if batching { - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - proxying_keepalive_time, - Some(Duration::default()), - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), proxying_keepalive_time)]); } f.consolidate_peers().await; @@ -2333,17 +2307,7 @@ mod tests { allowed_ips, )]); - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - direct_keepalive_period, - if batching { - Some(Duration::default()) - } else { - None - }, - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), direct_keepalive_period)]); f.session_keeper .expect_get_interval() @@ -2690,17 +2654,7 @@ mod tests { Some(Duration::from_secs(DEFAULT_PEER_UPGRADE_WINDOW)), )]); - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - direct_keepalive_period, - if batching { - Some(Duration::default()) - } else { - None - }, - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), direct_keepalive_period)]); f.consolidate_peers().await; } @@ -2911,7 +2865,6 @@ mod tests { allowed_ips[0], None, proxying_keepalive_period, - Some(Duration::from_secs(0)), )]); } else { f.then_keeper_del_node(vec![(pub_key)]); @@ -2985,7 +2938,6 @@ mod tests { IpAddr::from(VPN_INTERNAL_IPV4), Some(IpAddr::from(VPN_INTERNAL_IPV6)), vpn_persistent_keepalive, - Some(Duration::from_secs(0)), )]); } @@ -3136,7 +3088,6 @@ mod tests { IpAddr::from([100, 64, 0, 4]), Some(IpAddr::from([0xfd74, 0x656c, 0x696f, 0, 0, 0, 0, 4])), stun_persistent_keepalive, - Some(Duration::from_secs(0)), )]); } @@ -3202,7 +3153,6 @@ mod tests { ip1, Some(ip1v6), TEST_DIRECT_PERSISTENT_KEEPALIVE_PERIOD, - None, )]); f.consolidate_peers().await;