Skip to content

Commit

Permalink
Conntracker sync (#784)
Browse files Browse the repository at this point in the history
Convert get_out_of_limits() to async and adds a Lock
Use asyncio.Event to synchronize to a known ping event
  • Loading branch information
tomasz-grz authored Aug 30, 2024
1 parent 7b41245 commit f4d9c87
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 55 deletions.
Empty file added .unreleased/conntracker_sync
Empty file.
2 changes: 1 addition & 1 deletion nat-lab/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async def setup_environment(
print(datetime.now(), "Checking connection limits")
for conn_manager in connection_managers:
if conn_manager.tracker:
limits = conn_manager.tracker.get_out_of_limits()
limits = await conn_manager.tracker.get_out_of_limits()
assert limits is None, f"conntracker reported out of limits {limits}"
finally:
stop_tcpdump([server["container"] for server in WG_SERVERS])
Expand Down
4 changes: 2 additions & 2 deletions nat-lab/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,5 +703,5 @@ async def test_event_content_meshnet_node_upgrade_direct(
alpha_node_state.endpoint and alpha_public_ip in alpha_node_state.endpoint
)

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None
18 changes: 9 additions & 9 deletions nat-lab/tests/test_lana.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ async def run_default_scenario(
)
assert gamma_events

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert gamma_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None
assert await gamma_conn_tracker.get_out_of_limits() is None

(alpha_expected_states, beta_expected_states, gamma_expected_states) = (
[
Expand Down Expand Up @@ -1420,8 +1420,8 @@ async def test_lana_with_meshnet_exit_node(
# Validate all nodes have the same meshnet id
assert alpha_events[0].fp == beta_events[0].fp

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None

# LLT-5532: To be cleaned up...
client_alpha.allow_errors(
Expand Down Expand Up @@ -1823,8 +1823,8 @@ def get_features_with_long_qos() -> TelioFeatures:
== beta_events[0].fp
== beta_events[1].fp
)
assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None


@pytest.mark.moose
Expand Down Expand Up @@ -1918,8 +1918,8 @@ async def test_lana_with_second_node_joining_later_meshnet_id_can_change(
else:
assert False, "[PANIC] Public keys match!"

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None

# LLT-5532: To be cleaned up...
client_alpha.allow_errors(
Expand Down
4 changes: 2 additions & 2 deletions nat-lab/tests/test_mesh_exit_through_peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,5 @@ async def test_ipv6_exit_node(
ip_beta = await stun.get(connection_beta, config.STUNV6_SERVER)
assert ip_alpha == ip_beta

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None
4 changes: 2 additions & 2 deletions nat-lab/tests/test_mesh_plus_vpn.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ async def test_vpn_plus_mesh(
ip = await stun.get(connection_alpha, config.STUN_SERVER)
assert ip == wg_server["ipv4"], f"wrong public IP when connected to VPN {ip}"

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None


@pytest.mark.asyncio
Expand Down
9 changes: 5 additions & 4 deletions nat-lab/tests/test_pinging.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ async def test_session_keeper(

async def wait_for_conntracker() -> None:
while True:
alpha_limits = alpha_conntrack.get_out_of_limits()
beta_limits = beta_conntrack.get_out_of_limits()
alpha_limits = await alpha_conntrack.get_out_of_limits()
beta_limits = await beta_conntrack.get_out_of_limits()
print(datetime.now(), "Conntracker state: ", alpha_limits, beta_limits)
if alpha_limits is None and beta_limits is None:
return
Expand Down Expand Up @@ -280,8 +280,9 @@ async def test_qos(

async def wait_for_conntracker() -> None:
while True:
print("wait_for_conntracker(): ", alpha_conntrack.get_out_of_limits())
if alpha_conntrack.get_out_of_limits() is None:
alpha_limits = await alpha_conntrack.get_out_of_limits()
print("wait_for_conntracker(): ", alpha_limits)
if alpha_limits is None:
return
await asyncio.sleep(1.0)

Expand Down
4 changes: 2 additions & 2 deletions nat-lab/tests/test_telio_version_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,5 @@ async def on_stdout_stderr(output):
testing.unpack_optional(beta.get_ip_address(IPProto.IPv4)),
)

assert alpha_conn_tracker.get_out_of_limits() is None
assert beta_conn_tracker.get_out_of_limits() is None
assert await alpha_conn_tracker.get_out_of_limits() is None
assert await beta_conn_tracker.get_out_of_limits() is None
100 changes: 67 additions & 33 deletions nat-lab/tests/utils/connection_tracker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import platform
import re
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -92,33 +91,34 @@ def __init__(
self._connection: Connection = connection
self._config: Optional[List[ConnectionTrackerConfig]] = configuration
self._events: List[FiveTuple] = []

self._initialized: bool = False
self._init_connection: FiveTuple = FiveTuple(
protocol="icmp", dst_ip="127.0.0.1"
self._lock: asyncio.Lock = asyncio.Lock()
self._sync_event: asyncio.Event = asyncio.Event()
self._sync_connection: FiveTuple = FiveTuple(
protocol="icmp", dst_ip="127.0.0.2"
)

async def on_stdout(self, stdout: str) -> None:
if not self._config:
return

for line in stdout.splitlines():
connection = parse_input(line)
if connection is FiveTuple():
continue

if not self._initialized:
if self._init_connection.partial_eq(connection):
self._initialized = True
async with self._lock:
for line in stdout.splitlines():
connection = parse_input(line)
if connection is FiveTuple():
continue

matching_configs = [
cfg for cfg in self._config if cfg.target.partial_eq(connection)
]
if not matching_configs:
continue
if not self._sync_event.is_set():
if self._sync_connection.partial_eq(connection):
self._sync_event.set()
continue

self._events.append(connection)
matching_configs = [
cfg for cfg in self._config if cfg.target.partial_eq(connection)
]
if not matching_configs:
continue

self._events.append(connection)

async def execute(self) -> None:
if platform.system() == "Darwin":
Expand All @@ -128,42 +128,76 @@ async def execute(self) -> None:

await self._process.execute(stdout_callback=self.on_stdout)

def get_out_of_limits(self) -> Optional[Dict[str, int]]:
async def get_out_of_limits(self) -> Optional[Dict[str, int]]:
if platform.system() == "Darwin":
return None
if not self._config:
return None

await self.synchronize()

out_of_limit_connections: Dict[str, int] = {}

for cfg in self._config:
count = len(
[event for event in self._events if cfg.target.partial_eq(event)]
)
async with self._lock:
count = len(
[event for event in self._events if cfg.target.partial_eq(event)]
)
if cfg.limits.max is not None:
if count > cfg.limits.max:
out_of_limit_connections[cfg.key] = count
print(
datetime.now(),
"ConnectionTracker for",
cfg.target.src_ip,
cfg.key,
"is over the limit:",
count,
">",
cfg.limits.max,
)
continue
if cfg.limits.min is not None:
if count < cfg.limits.min:
out_of_limit_connections[cfg.key] = count
print(
datetime.now(),
"ConnectionTracker for",
cfg.target.src_ip,
cfg.key,
"is under the limit:",
count,
"<",
cfg.limits.min,
)
continue

return out_of_limit_connections if bool(out_of_limit_connections) else None

async def synchronize(self):
if not self._config:
return None

# wait to synchronize over a known event
async with Ping(self._connection, "127.0.0.2").run():
print(datetime.now(), "ConnectionTracker waiting for _sync_event")
try:
await asyncio.wait_for(self._sync_event.wait(), timeout=10.0)
print(datetime.now(), "ConnectionTracker got _sync_event")
except TimeoutError:
print(datetime.now(), "ConnectionTracker sync timeout")
async with self._lock:
self._sync_event.clear()

@asynccontextmanager
async def run(self) -> AsyncIterator["ConnectionTracker"]:
async with self._process.run(stdout_callback=self.on_stdout):
await self._process.wait_stdin_ready()
# initialization is just waiting for first conntrack event,
# since it has no other indication if it is truly running.
# Or wait for 1 second and pray it was initialized
async with Ping(self._connection, "127.0.0.1").run():
start_time = time.time()
while not self._initialized:
if time.time() - start_time >= 1:
self._initialized = True
break
await asyncio.sleep(0.1)

# magic sleep, for unknown reason it takes a moment before
# conntrack on_stdout is ready to process events
await asyncio.sleep(0.1)

# initialization is just waiting for first conntrack event
await self.synchronize()
yield self

0 comments on commit f4d9c87

Please sign in to comment.