diff --git a/.unreleased/LLT-5876_better_batching_testcase b/.unreleased/LLT-5876_better_batching_testcase new file mode 100644 index 000000000..e69de29bb diff --git a/nat-lab/tests/test_batching.py b/nat-lab/tests/test_batching.py index 6ed0a5e74..6e30fb891 100644 --- a/nat-lab/tests/test_batching.py +++ b/nat-lab/tests/test_batching.py @@ -2,10 +2,10 @@ import itertools import pytest from contextlib import AsyncExitStack -from helpers import SetupParameters, setup_environment, setup_mesh_nodes -from itertools import zip_longest +from helpers import setup_api, setup_connections, SetupParameters, setup_mesh_nodes from scapy.layers.inet import TCP, UDP, ICMP # type: ignore from scapy.layers.l2 import ARP # type: ignore +from telio import Client from timeouts import TEST_BATCHING_TIMEOUT from typing import List from utils.asyncio_util import run_async_context @@ -17,10 +17,10 @@ FeatureBatching, EndpointProvider, RelayState, - LinkState, NodeState, PathType, TelioAdapterType, + LinkState, ) from utils.connection import DockerConnection from utils.connection_util import ConnectionTag, DOCKER_GW_MAP, container_id @@ -29,17 +29,19 @@ render_chart, generate_packet_distribution_histogram, generate_packet_delay_histogram, + get_ordered_histogram_score, ) -BATCHING_MISALIGN_S = 7 -BATCHING_CAPTURE_TIME = 120 # Tied to TEST_BATCHING_TIMEOUT +BATCHING_CAPTURE_TIME = 130 +DOCKER_CONE_GW_2_IP = "10.0.254.2" def _generate_setup_parameters( conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool ) -> SetupParameters: - features = features_with_endpoint_providers([EndpointProvider.STUN]) - + features = features_with_endpoint_providers( + [EndpointProvider.STUN, EndpointProvider.LOCAL] + ) features.link_detection = FeatureLinkDetection( rtt_seconds=1, no_of_pings=1, use_for_downgrade=True ) @@ -75,27 +77,27 @@ def _generate_setup_parameters( ConnectionTag.DOCKER_CONE_CLIENT_2, TelioAdapterType.LINUX_NATIVE_TUN, ), + ( + ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1, + TelioAdapterType.LINUX_NATIVE_TUN, + ), + ( + ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2, + TelioAdapterType.LINUX_NATIVE_TUN, + ), ] -# This test captures histograms of network activity to evaluate the effect of local batching in libtelio. -# Since only local batching is implemented, no client-generated traffic should occur during the test. -# External traffic (incoming data) could distort the histograms, and receive-data-triggered batching is -# not yet supported in libtelio. The test setup is simple: all clients are interconnected and remain idle -# for an extended period. This idle period allows for a visual observation. -# Local batching will only have an effect of batching multiple local keepalives into one bundle but will -# not do anything about syncing the keepalives between the peers. @pytest.mark.asyncio @pytest.mark.timeout(TEST_BATCHING_TIMEOUT) @pytest.mark.parametrize( - "setup_params,misalign_sleep_s,capture_duration", + "setup_params,capture_duration", [ pytest.param( [ - _generate_setup_parameters(conn_tag, adapter, True) + _generate_setup_parameters(conn_tag, adapter, False) for conn_tag, adapter in ALL_NODES ], - BATCHING_MISALIGN_S, BATCHING_CAPTURE_TIME, marks=[ pytest.mark.batching, @@ -103,10 +105,9 @@ def _generate_setup_parameters( ), pytest.param( [ - _generate_setup_parameters(conn_tag, adapter, False) + _generate_setup_parameters(conn_tag, adapter, True) for conn_tag, adapter in ALL_NODES ], - BATCHING_MISALIGN_S, BATCHING_CAPTURE_TIME, marks=[ pytest.mark.batching, @@ -116,19 +117,38 @@ def _generate_setup_parameters( ) async def test_batching( setup_params: List[SetupParameters], - misalign_sleep_s: int, capture_duration: int, ) -> None: + """Batch test generates environment where all peers idle after forming direct connections + packet capture is being used to observe how traffic flows and is then processed and displayed. + """ + async with AsyncExitStack() as exit_stack: - env = await exit_stack.enter_async_context( - setup_environment(exit_stack, setup_params) + api, nodes = setup_api( + [(instance.is_local, instance.ip_stack) for instance in setup_params] + ) + connection_managers = await setup_connections( + exit_stack, + [ + ( + instance.connection_tag, + instance.connection_tracker_config, + ) + for instance in setup_params + ], ) - await asyncio.gather(*[ - client.wait_for_state_on_any_derp([RelayState.CONNECTED]) - for client, instance in zip_longest(env.clients, setup_params) - if instance.derp_servers != [] - ]) + clients = [] + for node, conn_man, params in zip(nodes, connection_managers, setup_params): + client = Client( + conn_man.connection, node, params.adapter_type_override, params.features + ) + clients.append(client) + + alpha_client, beta_client, *_ = clients + alpha_node, beta_node, *_ = nodes + + # Start capture tasks # We capture the traffic from all nodes and gateways. # On gateways we are sure the traffic has left the machine, however no easy way to @@ -136,33 +156,49 @@ async def test_batching( # client traffic can be inspected. gateways = [DOCKER_GW_MAP[param.connection_tag] for param in setup_params] gateway_container_names = [container_id(conn_tag) for conn_tag in gateways] - conns = [client.get_connection() for client in env.clients] + conns = [client.get_connection() for client in clients] node_container_names = [ conn.container_name() for conn in conns if isinstance(conn, DockerConnection) ] - container_names = gateway_container_names + node_container_names + container_names = sorted( + list(set(gateway_container_names + node_container_names)) + ) + print("Will capture batching on containers: ", container_names) - cnodes = zip(env.clients, env.nodes) + pcap_capture_tasks = [] + for name in container_names: + pcap_task = asyncio.create_task( + capture_traffic( + name, + capture_duration, + ) + ) + pcap_capture_tasks.append(pcap_task) + + async def delayed_task(delay, node, client): + await asyncio.sleep(delay) + return await exit_stack.enter_async_context( + client.run(api.get_meshnet_config(node.id)) + ) - # Misalign the peers by first stopping all of them and then restarting after various delays. - # This will have an effect of forcing neighboring libtelio node to add the peer to internal lists - # for keepalives at various points in time thus allowing us to observe better - # if the local batching is in action. - for client in env.clients: - await client.stop_device() + tasks = [] + for i, (client, node) in enumerate(zip(clients, nodes)): + tasks.append(asyncio.create_task(delayed_task(i * 3, node, client))) - # misalign the peers by sleeping some before starting each node again - async def start_node_manually(client, node, sleep_s): - await asyncio.sleep(sleep_s) - await client.simple_start() - await client.set_meshnet_config(env.api.get_meshnet_config(node.id)) + # deliberately block direct connection alpha <-> beta. This will make alpha and beta still form direct connections with other peers + # but alpha <-> beta itself will form after a delay causing misalignment which represents real world keepalive flow better + async with AsyncExitStack() as exit_stack2: + await exit_stack2.enter_async_context( + alpha_client.get_router().disable_path(DOCKER_CONE_GW_2_IP), + ) + await asyncio.sleep(20) await asyncio.gather(*[ - start_node_manually(client, node, i * misalign_sleep_s) - for i, (client, node) in enumerate(cnodes) + client.wait_for_state_on_any_derp([RelayState.CONNECTED]) + for client in [alpha_client, beta_client] ]) await asyncio.gather(*[ @@ -173,55 +209,37 @@ async def start_node_manually(client, node, sleep_s): ) ) ) - for client, node in itertools.product(env.clients, env.nodes) + for client, node in itertools.product(clients, nodes) if not client.is_node(node) ]) + print("All peers directly interconnected") + pyro5_ports = [ - int(port) for port in {client.get_proxy_port() for client in env.clients} + int(port) for port in {client.get_proxy_port() for client in clients} ] print("Pyro ports", pyro5_ports) - # In general it's not great to filter traffic but for testing and observing - # it's crucial since it distorts the results. For example Pyro traffic is a constant stream of - # TCP packets allow_pcap_filters = [ ( - "No Pyro5, SSDP, ARP", + "No Pyro5 and no ARP", lambda p: ( ( - (p.haslayer(UDP) or p.haslayer(TCP)) - and p.sport not in pyro5_ports - and p.dport not in pyro5_ports - ) - and ( - not p.haslayer(ICMP) - or p.haslayer(ICMP) - and p[ICMP].type in [0, 8] - ) - and ( - p.haslayer(UDP) - and p[UDP].sport != 1900 - and p[UDP].dport != 1900 + (not p.haslayer(TCP)) + or ( + p.haslayer(TCP) + and p.sport not in pyro5_ports + and p.dport not in pyro5_ports + ) ) and (not p.haslayer(ARP)) ), ), ] - pcap_capture_tasks = [] - for name in container_names: - pcap_task = asyncio.create_task( - capture_traffic( - name, - capture_duration, - ) - ) - pcap_capture_tasks.append(pcap_task) + is_batching_enabled = clients[0].get_features().batching is not None pcap_paths = await asyncio.gather(*pcap_capture_tasks) - - is_batching_enabled = env.clients[0].get_features().batching is not None for container, pcap_path in zip(container_names, pcap_paths): distribution_hs = generate_packet_distribution_histogram( pcap_path, capture_duration, allow_pcap_filters @@ -243,6 +261,8 @@ async def start_node_manually(client, node, sleep_s): print("Delay chart below") print(delay_chart) + print("Score: ", get_ordered_histogram_score(delay_hs)) + def proxying_peer_parameters(clients: List[ConnectionTag]): def features(): diff --git a/nat-lab/tests/timeouts.py b/nat-lab/tests/timeouts.py index 1dfb344c0..3b693906b 100644 --- a/nat-lab/tests/timeouts.py +++ b/nat-lab/tests/timeouts.py @@ -12,4 +12,4 @@ TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180 TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180 TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300 -TEST_BATCHING_TIMEOUT = 1000 +TEST_BATCHING_TIMEOUT = 600 diff --git a/nat-lab/tests/utils/traffic.py b/nat-lab/tests/utils/traffic.py index 1c73e1441..7fd608cd9 100644 --- a/nat-lab/tests/utils/traffic.py +++ b/nat-lab/tests/utils/traffic.py @@ -118,6 +118,15 @@ def generate_packet_delay_histogram( return generate_histogram(timestamps, buckets) +def get_ordered_histogram_score(data: typing.List[int]) -> int: + # Assumes the histogram order matters and each item going to the right adds more to the score + # Useful to quantity a score for things like periods between packets + score = 0 + for i, value in enumerate(data, start=1): + score += i * value + return score + + def generate_packet_distribution_histogram( pcap_path: str, buckets: int,