Skip to content

Commit

Permalink
Merge pull request #996 from NordSecurity/LLT-5795_detach_session_kee…
Browse files Browse the repository at this point in the history
…per_and_direct_feature

Detach batcher and direct feature
  • Loading branch information
LukasPukenis authored Dec 17, 2024
2 parents 84b3469 + aee6ac6 commit 5384fb6
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 242 deletions.
Empty file.
9 changes: 8 additions & 1 deletion nat-lab/tests/telio.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async def notify_peer_state(
paths: List[PathType],
is_exit: bool = False,
is_vpn: bool = False,
link_state: Optional[LinkState] = None,
) -> None:
while True:
peer = self.get_peer_info(public_key)
Expand All @@ -123,6 +124,7 @@ async def notify_peer_state(
and peer.state in states
and is_exit == peer.is_exit
and is_vpn == peer.is_vpn
and (link_state is None or peer.link_state == link_state)
):
return
await asyncio.sleep(0.1)
Expand Down Expand Up @@ -251,9 +253,12 @@ async def wait_for_state_peer(
is_exit: bool = False,
is_vpn: bool = False,
timeout: Optional[float] = None,
link_state: Optional[LinkState] = None,
) -> None:
await asyncio.wait_for(
self._runtime.notify_peer_state(public_key, state, paths, is_exit, is_vpn),
self._runtime.notify_peer_state(
public_key, state, paths, is_exit, is_vpn, link_state
),
timeout,
)

Expand Down Expand Up @@ -522,6 +527,7 @@ async def wait_for_state_peer(
is_exit: bool = False,
is_vpn: bool = False,
timeout: Optional[float] = None,
link_state: Optional[LinkState] = None,
) -> None:
await self.get_events().wait_for_state_peer(
public_key,
Expand All @@ -530,6 +536,7 @@ async def wait_for_state_peer(
is_exit,
is_vpn,
timeout,
link_state,
)

async def wait_for_event_peer(
Expand Down
78 changes: 76 additions & 2 deletions nat-lab/tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@
import itertools
import pytest
from contextlib import AsyncExitStack
from helpers import SetupParameters, setup_environment
from helpers import SetupParameters, setup_environment, setup_mesh_nodes
from itertools import zip_longest
from scapy.layers.inet import TCP, UDP, ICMP # type: ignore
from scapy.layers.l2 import ARP # type: ignore
from timeouts import TEST_BATCHING_TIMEOUT
from typing import List
from utils.asyncio_util import run_async_context
from utils.bindings import (
default_features,
features_with_endpoint_providers,
FeatureLinkDetection,
FeaturePersistentKeepalive,
FeatureBatching,
EndpointProvider,
RelayState,
LinkState,
NodeState,
PathType,
TelioAdapterType,
)
from utils.connection import DockerConnection
from utils.connection_util import DOCKER_GW_MAP, ConnectionTag, container_id
from utils.connection_util import ConnectionTag, DOCKER_GW_MAP, container_id
from utils.traffic import (
capture_traffic,
render_chart,
Expand Down Expand Up @@ -240,3 +242,75 @@ async def start_node_manually(client, node, sleep_s):

print("Delay chart below")
print(delay_chart)


def proxying_peer_parameters(clients: List[ConnectionTag]):
def features():
features = default_features(enable_direct=False, enable_nurse=False)
features.wireguard.persistent_keepalive.proxying = 5
features.link_detection = FeatureLinkDetection(
rtt_seconds=2, no_of_pings=0, use_for_downgrade=False
)

features.batching = FeatureBatching(
direct_connection_threshold=5,
trigger_cooldown_duration=60,
trigger_effective_duration=10,
)
return features

return [
SetupParameters(
connection_tag=conn_tag,
adapter_type_override=TelioAdapterType.NEP_TUN,
features=features(),
fingerprint=f"{conn_tag}",
)
for conn_tag in clients
]


@pytest.mark.asyncio
@pytest.mark.parametrize(
"setup_params",
[
proxying_peer_parameters(
[ConnectionTag.DOCKER_CONE_CLIENT_1, ConnectionTag.DOCKER_CONE_CLIENT_2]
)
],
)
async def test_proxying_peer_batched_keepalive(
setup_params: List[SetupParameters],
) -> None:
# Since batching keepalives are performed on application level instead of Wireguard
# backend we need to ensure that proxying peers are receiving the keepalives. To test
# for that we can enable link detection that guarantees quick detection if there's no corresponding
# received traffic(WireGuard PassiveKeepalive). If batcher correctly emits pings, it
# should trigger link detection quite quickly.
async with AsyncExitStack() as exit_stack:
env = await setup_mesh_nodes(exit_stack, setup_params)

await asyncio.gather(*[
await exit_stack.enter_async_context(
run_async_context(
client.wait_for_state_peer(
node.public_key, [NodeState.CONNECTED], [PathType.RELAY]
)
)
)
for client, node in itertools.product(env.clients, env.nodes)
if not client.is_node(node)
])

alpha, beta = env.clients
await beta.stop_device()

_, beta_node = env.nodes

await alpha.wait_for_state_peer(
beta_node.public_key,
[NodeState.CONNECTED],
[PathType.RELAY],
timeout=30,
link_state=LinkState.DOWN,
)
87 changes: 47 additions & 40 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use telio_task::{
io::{chan, mc_chan, mc_chan::Tx, Chan, McChan},
task_exec, BoxAction, Runtime as TaskRuntime, Task,
};

use telio_traversal::UpgradeSyncTrait;
use telio_traversal::{
connectivity_check,
cross_ping_check::{CrossPingCheck, CrossPingCheckTrait, Io as CpcIo, UpgradeController},
Expand All @@ -36,7 +38,6 @@ use telio_traversal::{
ping_pong_handler::PingPongHandler,
SessionKeeper, UpgradeRequestChangeEvent, UpgradeSync, WireGuardEndpointCandidateChangeEvent,
};
use telio_traversal::{SessionKeeperTrait, UpgradeSyncTrait};

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos"))]
use telio_sockets::native;
Expand Down Expand Up @@ -253,6 +254,9 @@ pub struct MeshnetEntities {

// Starcast components for multicast support.
starcast: Option<StarcastEntities>,

// Keepalive sender
session_keeper: Option<Arc<SessionKeeper>>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -324,9 +328,7 @@ impl Entities {
}

pub fn session_keeper(&self) -> Option<&Arc<SessionKeeper>> {
self.meshnet
.left()
.and_then(|m| m.direct.as_ref().map(|d| &d.session_keeper))
self.meshnet.left().and_then(|m| m.session_keeper.as_ref())
}

fn endpoint_providers(&self) -> Vec<&Arc<dyn EndpointProvider>> {
Expand Down Expand Up @@ -372,9 +374,6 @@ pub struct DirectEntities {

// Meshnet WG Connection upgrade synchronization
upgrade_sync: Arc<UpgradeSync>,

// Keepalive sender
session_keeper: Arc<SessionKeeper>,
}

pub struct EventListeners {
Expand Down Expand Up @@ -1018,8 +1017,10 @@ impl MeshnetEntities {
if let Some(upnp) = direct.upnp_endpoint_provider {
stop_arc_entity!(upnp, "UpnpEndpointProvider");
}
}

stop_arc_entity!(direct.session_keeper, "SessionKeeper");
if let Some(sk) = self.session_keeper {
stop_arc_entity!(sk, "SessionKeeper");
}

stop_arc_entity!(self.multiplexer, "Multiplexer");
Expand Down Expand Up @@ -1321,8 +1322,36 @@ impl Runtime {
.await;
}

let session_keeper = {
match SessionKeeper::start(
self.entities.socket_pool.clone(),
self.features.batching.unwrap_or_default(),
self.entities
.wireguard_interface
.subscribe_to_network_activity()
.await?,
) {
Ok(sk) => Some(Arc::new(sk)),
Err(e) => {
telio_log_warn!("Session keeper startup failed: {e:?} - direct connections will not be formed. Keepalive optimisations will be disabled");
None
}
}
};

// Batching optimisations work by employing SessionKeeper. If SessionKeeper is not present
// functionality will break when offloading actions to it, thus we disable the feature
if session_keeper.is_none() && self.features.batching.is_some() {
telio_log_warn!(
"Batching feature is enabled but SessionKeeper failed to start. Disabling batching."
);
self.features.batching = None;
}

// Start Direct entities if "direct" feature is on
let direct = if let Some(direct) = &self.features.direct {
let direct = if session_keeper.is_none() {
None
} else if let Some(direct) = &self.features.direct {
// Create endpoint providers
let has_provider = |provider| {
// Default is all providers
Expand Down Expand Up @@ -1465,30 +1494,14 @@ impl Runtime {
self.requested_state.device_config.private_key.public(),
)?);

match SessionKeeper::start(
self.entities.socket_pool.clone(),
self.features.batching.unwrap_or_default(),
self.entities
.wireguard_interface
.subscribe_to_network_activity()
.await?,
)
.map(Arc::new)
{
Ok(session_keeper) => Some(DirectEntities {
local_interfaces_endpoint_provider,
stun_endpoint_provider,
upnp_endpoint_provider,
endpoint_providers,
cross_ping_check,
upgrade_sync,
session_keeper,
}),
Err(e) => {
telio_log_warn!("Session keeper startup failed: {e:?} - direct connections will not be formed");
None
}
}
Some(DirectEntities {
local_interfaces_endpoint_provider,
stun_endpoint_provider,
upnp_endpoint_provider,
endpoint_providers,
cross_ping_check,
upgrade_sync,
})
} else {
None
};
Expand All @@ -1504,6 +1517,7 @@ impl Runtime {
proxy,
direct,
starcast,
session_keeper,
})
}

Expand Down Expand Up @@ -2360,13 +2374,6 @@ impl TaskRuntime for Runtime {
direct_entities.cross_ping_check.notify_failed_wg_connection(public_key)
.await?;
direct_entities.upgrade_sync.clear_accepted_session(public_key).await;

if self.features.batching.is_none() {
// When the batcher is enabled we use session keeper for all the connections
// direct, proxy, vpn and stun. This call is guarded by the batcher feature flag
// because it can disable keepalives when we don't want to.
direct_entities.session_keeper.remove_node(&public_key).await?;
}
} else {
telio_log_warn!("Connection downgraded while direct entities are disabled");
}
Expand Down
Loading

0 comments on commit 5384fb6

Please sign in to comment.