Skip to content

Commit

Permalink
chore: add routes_stats() to RouteProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jan 17, 2025
1 parent 446bea4 commit bd59d8e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 0 deletions.
22 changes: 22 additions & 0 deletions ic-agent/src/agent/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub trait RouteProvider: std::fmt::Debug + Send + Sync {
/// appearing first. The returned vector can contain fewer than `n` URLs if
/// fewer are available.
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;

/// Returns the total number of routes and healthy routes as a tuple.
///
/// - First element is the total number of routes available (both healthy and unhealthy)
/// - Second element is the number of currently healthy routes
///
/// A healthy route is one that is available and ready to receive traffic.
/// The specific criteria for what constitutes a "healthy" route is implementation dependent.
fn routes_stats(&self) -> (usize, usize);
}

/// A simple implementation of the [`RouteProvider`] which produces an even distribution of the urls from the input ones.
Expand Down Expand Up @@ -94,6 +103,10 @@ impl RouteProvider for RoundRobinRouteProvider {

Ok(urls)
}

fn routes_stats(&self) -> (usize, usize) {
(self.routes.len(), self.routes.len())
}
}

impl RoundRobinRouteProvider {
Expand Down Expand Up @@ -133,6 +146,9 @@ impl RouteProvider for Url {
fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
Ok(vec![self.route()?])
}
fn routes_stats(&self) -> (usize, usize) {
(1, 1)
}
}

/// A [`RouteProvider`] that will attempt to discover new boundary nodes and cycle through them, optionally prioritizing those with low latency.
Expand Down Expand Up @@ -215,6 +231,9 @@ impl RouteProvider for DynamicRouteProvider {
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
self.inner.n_ordered_routes(n)
}
fn routes_stats(&self) -> (usize, usize) {
self.inner.routes_stats()
}
}

/// Strategy for [`DynamicRouteProvider`]'s routing mechanism.
Expand Down Expand Up @@ -270,6 +289,9 @@ impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
self.url.route()
}
}
fn routes_stats(&self) -> (usize, usize) {
(1, 1)
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ where
let urls = nodes.iter().map(|n| n.to_routing_url()).collect();
Ok(urls)
}

fn routes_stats(&self) -> (usize, usize) {
let snapshot = self.routing_snapshot.load();
snapshot.nodes_stats()
}
}

impl<S> DynamicRouteProvider<S>
Expand Down Expand Up @@ -417,6 +422,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain()], 6);
assert_eq!(route_provider.routes_stats(), (1, 1));

// Test 2: multiple route() calls return 3 different domains with equal fairness (repetition).
// Two healthy nodes are added to the topology.
Expand All @@ -431,13 +437,15 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (3, 3));

// Test 3: multiple route() calls return 2 different domains with equal fairness (repetition).
// One node is set to unhealthy.
checker.overwrite_healthy_nodes(vec![node_1.clone(), node_3.clone()]);
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain(), node_3.domain()], 3);
assert_eq!(route_provider.routes_stats(), (3, 2));

// Test 4: multiple route() calls return 3 different domains with equal fairness (repetition).
// Unhealthy node is set back to healthy.
Expand All @@ -449,6 +457,7 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (3, 3));

// Test 5: multiple route() calls return 3 different domains with equal fairness (repetition).
// One healthy node is added, but another one goes unhealthy.
Expand All @@ -467,6 +476,7 @@ mod tests {
vec![node_2.domain(), node_3.domain(), node_4.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (4, 3));

// Test 6: multiple route() calls return a single domain=api1.com.
// One node is set to unhealthy and one is removed from the topology.
Expand All @@ -475,6 +485,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_2.domain()], 3);
assert_eq!(route_provider.routes_stats(), (3, 1));
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ fn compute_score(
pub struct LatencyRoutingSnapshot {
nodes_with_metrics: Vec<NodeWithMetrics>,
existing_nodes: HashSet<Node>,
healthy_nodes: HashSet<Node>,
window_weights: Vec<f64>,
window_weights_sum: f64,
use_availability_penalty: bool,
Expand All @@ -174,6 +175,7 @@ impl LatencyRoutingSnapshot {
Self {
nodes_with_metrics: vec![],
existing_nodes: HashSet::new(),
healthy_nodes: HashSet::new(),
use_availability_penalty: true,
window_weights,
window_weights_sum,
Expand Down Expand Up @@ -302,6 +304,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
self.nodes_with_metrics.len() - 1
});

if health.is_healthy() {
self.healthy_nodes.insert(node.clone());
} else {
self.healthy_nodes.remove(node);
}

self.nodes_with_metrics[idx].add_latency_measurement(health.latency());

self.nodes_with_metrics[idx].score = compute_score(
Expand All @@ -314,6 +322,10 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {

true
}

fn nodes_stats(&self) -> (usize, usize) {
(self.existing_nodes.len(), self.healthy_nodes.len())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -344,6 +356,7 @@ mod tests {
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert!(snapshot.next_n_nodes(1).is_empty());
assert_eq!(snapshot.nodes_stats(), (0, 0));
}

#[test]
Expand All @@ -359,6 +372,7 @@ mod tests {
assert!(snapshot.nodes_with_metrics.is_empty());
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.nodes_stats(), (0, 0));
}

#[test]
Expand All @@ -370,13 +384,15 @@ mod tests {
let node = Node::new("api1.com").unwrap();
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
snapshot.existing_nodes.insert(node.clone());
assert_eq!(snapshot.nodes_stats(), (1, 0));
// Check first update
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
let node_with_metrics = snapshot.nodes_with_metrics.first().unwrap();
assert_eq!(node_with_metrics.score, (2.0 / 1.0) / 2.0);
assert_eq!(snapshot.next_node().unwrap(), node);
assert_eq!(snapshot.nodes_stats(), (1, 1));
// Check second update
let health = HealthCheckStatus::new(Some(Duration::from_secs(2)));
let is_updated = snapshot.update_node(&node, health);
Expand All @@ -399,6 +415,7 @@ mod tests {
assert_eq!(snapshot.nodes_with_metrics.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.nodes_stats(), (1, 0));
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
self.healthy_nodes.remove(node)
}
}

fn nodes_stats(&self) -> (usize, usize) {
(self.existing_nodes.len(), self.healthy_nodes.len())
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
fn sync_nodes(&mut self, nodes: &[Node]) -> bool;
/// Updates the health status of a specific node, returning `true` if the node was found and updated.
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool;
/// Returns the total number of nodes and healthy nodes as a tuple with first and second element, respectively.
fn nodes_stats(&self) -> (usize, usize);
}

0 comments on commit bd59d8e

Please sign in to comment.