Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misalign batch testcase peers and add scoring #1044

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
166 changes: 93 additions & 73 deletions nat-lab/tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import itertools
import pytest
from contextlib import AsyncExitStack
from helpers import SetupParameters, setup_environment, setup_mesh_nodes
from itertools import zip_longest
from helpers import setup_api, setup_connections, SetupParameters, setup_mesh_nodes
from scapy.layers.inet import TCP, UDP, ICMP # type: ignore
from scapy.layers.l2 import ARP # type: ignore
from telio import Client
from timeouts import TEST_BATCHING_TIMEOUT
from typing import List
from utils.asyncio_util import run_async_context
Expand All @@ -17,10 +17,10 @@
FeatureBatching,
EndpointProvider,
RelayState,
LinkState,
NodeState,
PathType,
TelioAdapterType,
LinkState,
)
from utils.connection import DockerConnection
from utils.connection_util import ConnectionTag, DOCKER_GW_MAP, container_id
Expand All @@ -29,17 +29,19 @@
render_chart,
generate_packet_distribution_histogram,
generate_packet_delay_histogram,
get_ordered_histogram_score,
)

BATCHING_MISALIGN_S = 7
BATCHING_CAPTURE_TIME = 120 # Tied to TEST_BATCHING_TIMEOUT
BATCHING_CAPTURE_TIME = 130
DOCKER_CONE_GW_2_IP = "10.0.254.2"
LukasPukenis marked this conversation as resolved.
Show resolved Hide resolved


def _generate_setup_parameters(
conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool
) -> SetupParameters:
features = features_with_endpoint_providers([EndpointProvider.STUN])

features = features_with_endpoint_providers(
[EndpointProvider.STUN, EndpointProvider.LOCAL]
)
features.link_detection = FeatureLinkDetection(
rtt_seconds=1, no_of_pings=1, use_for_downgrade=True
)
Expand Down Expand Up @@ -75,38 +77,37 @@ def _generate_setup_parameters(
ConnectionTag.DOCKER_CONE_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
]
# This test captures histograms of network activity to evaluate the effect of local batching in libtelio.
# Since only local batching is implemented, no client-generated traffic should occur during the test.
# External traffic (incoming data) could distort the histograms, and receive-data-triggered batching is
# not yet supported in libtelio. The test setup is simple: all clients are interconnected and remain idle
# for an extended period. This idle period allows for a visual observation.
# Local batching will only have an effect of batching multiple local keepalives into one bundle but will
# not do anything about syncing the keepalives between the peers.


@pytest.mark.asyncio
@pytest.mark.timeout(TEST_BATCHING_TIMEOUT)
@pytest.mark.parametrize(
"setup_params,misalign_sleep_s,capture_duration",
"setup_params,capture_duration",
[
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, True)
_generate_setup_parameters(conn_tag, adapter, False)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
],
),
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, False)
_generate_setup_parameters(conn_tag, adapter, True)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
Expand All @@ -116,53 +117,88 @@ def _generate_setup_parameters(
)
async def test_batching(
setup_params: List[SetupParameters],
misalign_sleep_s: int,
capture_duration: int,
) -> None:
"""Batch test generates environment where all peers idle after forming direct connections
packet capture is being used to observe how traffic flows and is then processed and displayed.
"""

async with AsyncExitStack() as exit_stack:
env = await exit_stack.enter_async_context(
setup_environment(exit_stack, setup_params)
api, nodes = setup_api(
[(instance.is_local, instance.ip_stack) for instance in setup_params]
)
connection_managers = await setup_connections(
exit_stack,
[
(
instance.connection_tag,
instance.connection_tracker_config,
)
for instance in setup_params
],
)

await asyncio.gather(*[
client.wait_for_state_on_any_derp([RelayState.CONNECTED])
for client, instance in zip_longest(env.clients, setup_params)
if instance.derp_servers != []
])
clients = []
for node, conn_man, params in zip(nodes, connection_managers, setup_params):
client = Client(
conn_man.connection, node, params.adapter_type_override, params.features
)
clients.append(client)

alpha_client, beta_client, *_ = clients
alpha_node, beta_node, *_ = nodes

# Start capture tasks

# We capture the traffic from all nodes and gateways.
# On gateways we are sure the traffic has left the machine, however no easy way to
# inspect the packets(encrypted by wireguard). For packet inspection
# client traffic can be inspected.
gateways = [DOCKER_GW_MAP[param.connection_tag] for param in setup_params]
gateway_container_names = [container_id(conn_tag) for conn_tag in gateways]
conns = [client.get_connection() for client in env.clients]
conns = [client.get_connection() for client in clients]
node_container_names = [
conn.container_name()
for conn in conns
if isinstance(conn, DockerConnection)
]

container_names = gateway_container_names + node_container_names
container_names = sorted(
list(set(gateway_container_names + node_container_names))
)

print("Will capture batching on containers: ", container_names)
cnodes = zip(env.clients, env.nodes)
pcap_capture_tasks = []
for name in container_names:
pcap_task = asyncio.create_task(
capture_traffic(
name,
capture_duration,
)
)
pcap_capture_tasks.append(pcap_task)

async def delayed_task(delay, node, client):
await asyncio.sleep(delay)
return await exit_stack.enter_async_context(
client.run(api.get_meshnet_config(node.id))
)

# Misalign the peers by first stopping all of them and then restarting after various delays.
# This will have an effect of forcing neighboring libtelio node to add the peer to internal lists
# for keepalives at various points in time thus allowing us to observe better
# if the local batching is in action.
for client in env.clients:
await client.stop_device()
tasks = []
for i, (client, node) in enumerate(zip(clients, nodes)):
tasks.append(asyncio.create_task(delayed_task(i * 3, node, client)))

# misalign the peers by sleeping some before starting each node again
async def start_node_manually(client, node, sleep_s):
await asyncio.sleep(sleep_s)
await client.simple_start()
await client.set_meshnet_config(env.api.get_meshnet_config(node.id))
# deliberately block direct connection alpha <-> beta. This will make alpha and beta still form direct connections with other peers
# but alpha <-> beta itself will form after a delay causing misalignment which represents real world keepalive flow better
async with AsyncExitStack() as exit_stack2:
await exit_stack2.enter_async_context(
alpha_client.get_router().disable_path(DOCKER_CONE_GW_2_IP),
)
await asyncio.sleep(20)

await asyncio.gather(*[
start_node_manually(client, node, i * misalign_sleep_s)
for i, (client, node) in enumerate(cnodes)
client.wait_for_state_on_any_derp([RelayState.CONNECTED])
for client in [alpha_client, beta_client]
])

await asyncio.gather(*[
Expand All @@ -173,55 +209,37 @@ async def start_node_manually(client, node, sleep_s):
)
)
)
for client, node in itertools.product(env.clients, env.nodes)
for client, node in itertools.product(clients, nodes)
if not client.is_node(node)
])

print("All peers directly interconnected")

pyro5_ports = [
int(port) for port in {client.get_proxy_port() for client in env.clients}
int(port) for port in {client.get_proxy_port() for client in clients}
]

print("Pyro ports", pyro5_ports)
# In general it's not great to filter traffic but for testing and observing
# it's crucial since it distorts the results. For example Pyro traffic is a constant stream of
# TCP packets
allow_pcap_filters = [
(
"No Pyro5, SSDP, ARP",
"No Pyro5 and no ARP",
lambda p: (
(
(p.haslayer(UDP) or p.haslayer(TCP))
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
)
and (
not p.haslayer(ICMP)
or p.haslayer(ICMP)
and p[ICMP].type in [0, 8]
)
and (
p.haslayer(UDP)
and p[UDP].sport != 1900
and p[UDP].dport != 1900
(not p.haslayer(TCP))
or (
p.haslayer(TCP)
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
)
)
and (not p.haslayer(ARP))
),
),
]

pcap_capture_tasks = []
for name in container_names:
pcap_task = asyncio.create_task(
capture_traffic(
name,
capture_duration,
)
)
pcap_capture_tasks.append(pcap_task)
is_batching_enabled = clients[0].get_features().batching is not None

pcap_paths = await asyncio.gather(*pcap_capture_tasks)

is_batching_enabled = env.clients[0].get_features().batching is not None
for container, pcap_path in zip(container_names, pcap_paths):
distribution_hs = generate_packet_distribution_histogram(
pcap_path, capture_duration, allow_pcap_filters
Expand All @@ -243,6 +261,8 @@ async def start_node_manually(client, node, sleep_s):
print("Delay chart below")
print(delay_chart)

print("Score: ", get_ordered_histogram_score(delay_hs))


def proxying_peer_parameters(clients: List[ConnectionTag]):
def features():
Expand Down
2 changes: 1 addition & 1 deletion nat-lab/tests/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180
TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180
TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300
TEST_BATCHING_TIMEOUT = 1000
TEST_BATCHING_TIMEOUT = 600
9 changes: 9 additions & 0 deletions nat-lab/tests/utils/traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ def generate_packet_delay_histogram(
return generate_histogram(timestamps, buckets)


def get_ordered_histogram_score(data: typing.List[int]) -> int:
# Assumes the histogram order matters and each item going to the right adds more to the score
# Useful to quantity a score for things like periods between packets
score = 0
for i, value in enumerate(data, start=1):
score += i * value
return score


def generate_packet_distribution_histogram(
pcap_path: str,
buckets: int,
Expand Down
Loading