Skip to content

Commit

Permalink
WIP: introduce event_reporter module
Browse files Browse the repository at this point in the history
  • Loading branch information
Jauler committed Dec 11, 2024
1 parent 0ab514e commit bd42e11
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 9 deletions.
5 changes: 5 additions & 0 deletions crates/telio-model/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use telio_utils::const_ipnet::{ConstIpv4Net, ConstIpv6Net};

/// Reserved IPv4 Range
pub const RESERVED_IPV4_IPS: ConstIpv4Net = ConstIpv4Net::new(Ipv4Addr::new(100, 64, 0, 0), 29);
/// Reserved IPv6 Range
pub const RESERVED_IPV6_IPS: ConstIpv6Net = ConstIpv6Net::new(Ipv6Addr::new(0xfd74, 0x656c, 0x696f, 0, 0, 0, 0, 0x5), 125);

/// VPN IPv4 Meshnet Address
pub const VPN_INTERNAL_IPV4: Ipv4Addr = Ipv4Addr::new(100, 64, 0, 1);
/// VPN IPv6 Meshnet Address
Expand Down
32 changes: 23 additions & 9 deletions src/device.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod event_reporter;
mod wg_controller;

use async_trait::async_trait;
Expand Down Expand Up @@ -50,7 +51,7 @@ use telio_wg as wg;
use thiserror::Error as TError;
use tokio::{
runtime::{Builder, Runtime as AsyncRuntime},
sync::{broadcast::error::RecvError, Mutex},
sync::{broadcast::error::RecvError, broadcast::error::SendError, Mutex},
time::Interval,
};

Expand Down Expand Up @@ -82,7 +83,7 @@ use telio_utils::{
use telio_model::{
config::{Config, Peer, PeerBase, Server as DerpServer},
event::{Event, Set},
features::{FeaturePersistentKeepalive, Features, PathType},
features::{Features, PathType},
mesh::{ExitNode, LinkState, Node},
validation::validate_nickname,
EndpointMap,
Expand Down Expand Up @@ -183,6 +184,10 @@ pub enum Error {
TransportError(#[from] telio_starcast::transport::Error),
#[error("Events processing thread failed to start: {0}")]
EventsProcessingThreadStartError(std::io::Error),
#[error("Failed to build event structure")]
EventBuildingError,
#[error("Failed to publish libtelio event: {0}")]
EventPublishingError(#[from] SendError<Box<Event>>),
}

pub type Result<T = ()> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -449,6 +454,11 @@ struct Runtime {
/// Some of the events are time based, so just poll the whole state from time to time
polling_interval: Interval,

/// Event reporter.
///
/// This instance is responsible for all libtelio events reported to the integrators
event_reporter: event_reporter::EventReporter,

#[cfg(test)]
/// MockedAdapter (tests)
test_env: telio_wg::tests::Env,
Expand Down Expand Up @@ -1034,6 +1044,8 @@ impl Runtime {
features: Features,
protect: Option<Arc<dyn Protector>>,
) -> Result<Self> {
let event_reporter = event_reporter::EventReporter::new(libtelio_wide_event_publisher.clone());

let neptun_reset_conns =
features.firewall.neptun_reset_conns || features.firewall.boringtun_reset_conns;

Expand Down Expand Up @@ -1270,6 +1282,7 @@ impl Runtime {
derp_events_publisher: derp_events.tx,
},
polling_interval,
event_reporter,
#[cfg(test)]
test_env: wg::tests::Env {
analytics: analytics_ch,
Expand Down Expand Up @@ -2366,6 +2379,7 @@ impl TaskRuntime for Runtime {
}
}

// TODO: Events: state change event occured. Need to notify libtelio event handler
let node = self.peer_to_node(&mesh_event.peer, Some(mesh_event.state), mesh_event.link_state).await;

if let Some(node) = node {
Expand All @@ -2381,13 +2395,11 @@ impl TaskRuntime for Runtime {
Ok(())
},

Ok(derp_event) = self.event_listeners.derp_event_subscriber.recv() => {
let event = Event::builder::<DerpServer>().set(*derp_event).build();
if let Some(event) = event {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(event)
);
}
Ok(_) = self.event_listeners.derp_event_subscriber.recv() => {
let res = self.event_reporter.report_events(&self.requested_state, &self.entities, &self.features).await;
if res.is_err() {
telio_log_error!("Failed to report events to library integrators {:?}", res);
}
Ok(())
},

Expand Down Expand Up @@ -2421,6 +2433,7 @@ impl TaskRuntime for Runtime {

Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => {
telio_log_debug!("WG consolidation triggered by PQ event");
//TODO: Events: Potentially PQ state has changed

self.entities.postquantum_wg.on_event(pq_event);

Expand All @@ -2435,6 +2448,7 @@ impl TaskRuntime for Runtime {

_ = self.polling_interval.tick() => {
telio_log_debug!("WG consolidation triggered by tick event, total logs dropped: {}", logs_dropped_until_now());
//TODO: Events: just in case - trigger telio event handling code
let dropped = logs_dropped_since_last_checked();
if dropped > 0 {
telio_log_warn!("New logs dropped: {dropped}");
Expand Down
140 changes: 140 additions & 0 deletions src/device/event_reporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::collections::HashMap;
use super::{Entities, Error, RequestedState, Result};
use std::sync::Arc;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use telio_model::config::{RelayState, Server as RelayEvent};
use telio_model::event::Set;
use telio_model::{event::Event, features::Features, mesh::Node, PublicKey};
use telio_model::constants::{RESERVED_IPV4_IPS, RESERVED_IPV6_IPS};
use telio_relay::DerpRelay;
use telio_task::io::mc_chan::Tx;
use telio_wg::uapi::Peer;
use telio_wg::{DynamicWg, WireGuard};

pub struct EventReporter {
event_publisher: Tx<Box<Event>>,
last_reported_derp_state: Option<RelayEvent>,
last_reported_node_state: HashMap<PublicKey, Node>,
}

impl EventReporter {
pub fn new(event_publisher: Tx<Box<Event>>) -> Self {
return Self {
event_publisher,
last_reported_derp_state: Default::default(),
last_reported_node_state: Default::default(),
};
}

/// Report any state changes to the libtelio library users
///
/// This function will mainly take care of the following uses cases:
/// * Report event for any state change of any VPN/Meshnet Node. VPN nodes may retrieve their
/// statuses from telio-pq as well as telio-wg as sources of truth for current state.
/// * Report DERP connection status changes.
/// * Notify apps about non-recoverable issues, for example in case WireGuard starts spewing
/// errors over UAPI.
///
/// The list above may get extended in the future.
///
/// This module depends on external monitoring of state changes, it will not monitor for it
/// itself. Hence the handle_state_change function should get called whenever any state change
/// occurs.
///
/// Note here we are *not* handling panic's reporting.
pub async fn report_events(
&mut self,
_requested_state: &RequestedState,
entities: &Entities,
_features: &Features,
) -> Result {
self.report_derp_events(entities.meshnet.left().map(|e| &e.derp))
.await?;
self.report_node_events(&entities.wireguard_interface).await?;
Ok(())
}

pub async fn report_derp_events(&mut self, relay: Option<&Arc<DerpRelay>>) -> Result {
// Meshnet may be disabled, in which case relay will be None
// Otherwise we may have no server selected
// Or we may be attemting to connect to some server
// Or we may be connected to some server
let current_relay_state = match relay {
Some(r) => r.get_connected_server().await,
_ => None,
};

// Evaluate if anything has changed and build an even if necessary
let event = match (
self.last_reported_derp_state.clone(),
current_relay_state.clone(),
) {
// Nothing Changed
(None, None) => return Ok(()),
(Some(a), Some(b)) if a == b => return Ok(()),

// Either freshly connected or connection info has changed
(None, Some(current_state)) | (Some(_), Some(current_state)) => {
Event::builder::<RelayEvent>()
.set(current_state)
.build()
.ok_or(Error::EventBuildingError)?
}

// Disconnected
(Some(last_reported_derp_state), None) => {
let relay_event = RelayEvent {
conn_state: RelayState::Disconnected,
..last_reported_derp_state
};
Event::builder::<RelayEvent>()
.set(relay_event)
.build()
.ok_or(Error::EventBuildingError)?
}
};

// Fire event
self.event_publisher.send(Box::new(event))?;

// Make sure, that we know what has been reported in the future iterations
self.last_reported_derp_state = current_relay_state;
Ok(())
}

pub async fn report_node_events(&mut self, wireguard: &Arc<DynamicWg>) -> Result {
// Retreive relevant WireGuard peers
let wg_peers: HashMap<PublicKey, Peer> = wireguard.get_interface().await?
.peers
.into_iter()
.filter(|(_, peer)| Self::should_report_peer(&peer))
.collect();

//TODO: acquire some sort of representation of telio-pq current state

//TODO: For each node:
// compare against reported state
//TODO: report any differences
//TODO: save reported state
Ok(())
}

fn should_report_peer(peer: &Peer) -> bool {
// Note: We have some internal virtual peers (like DNS/starcast and similar) which we
// would like to leave unreported as per agreement with integrators, peers which are
// not explicitly requested by them - should not trigger events.
// Current convention is that such peers have allowed ips set to reserved range
// defined in telio_model::constants.
// It would be nicer to have explicit indication of the fact that peer is internal,
// but until it is introduced - we will have to rely on allowed ips, and reserved
// ranges
// There is a case worth mentioning, that for VPN and Exit Node peers we have a range
// of 0.0.0.0/0 in allowedIps, which is not contained by reserved range. But at the
// same time such peers should be reported, so everything is in-line
peer.allowed_ips.iter()
.all(|net| match &net {
IpNet::V4(ipv4net) => Ipv4Net::from(RESERVED_IPV4_IPS).contains(ipv4net),
IpNet::V6(ipv6net) => Ipv6Net::from(RESERVED_IPV6_IPS).contains(ipv6net)
})
}
}

0 comments on commit bd42e11

Please sign in to comment.