Skip to content

Commit

Permalink
Merge pull request #1033 from NordSecurity/olekoliinyk/add_on_connect…
Browse files Browse the repository at this point in the history
…_callback_to_mqtt_listener

Add on_connect callback to mqtt listener
  • Loading branch information
olekoliinyk authored Dec 19, 2024
2 parents 6f2e392 + 1c4209b commit 17112c2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 24 deletions.
Empty file.
11 changes: 10 additions & 1 deletion nat-lab/bin/mqtt-listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message):
sys.exit(0)


def on_connect(client, _userdata, _flags, rc, _properties):
if rc == 0:
print("Connected to MQTT Broker")
client.subscribe("meshnet", qos=0)
else:
print(f"Failed to connect with result code: {rc}")
sys.exit(1)


def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password):

mqttc = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311
)

mqttc.on_connect = on_connect
mqttc.on_message = on_message

mqttc.username_pw_set(
Expand All @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw
cert_reqs=ssl.CERT_REQUIRED,
)
mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1)
mqttc.subscribe("meshnet", qos=0)
mqttc.loop_forever()


Expand Down
96 changes: 73 additions & 23 deletions nat-lab/tests/test_notification_center.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import base64
import json
from asyncio import sleep
from asyncio import Event
from config import CORE_API_CA_CERTIFICATE_PATH, CORE_API_URL
from contextlib import AsyncExitStack
from dataclasses import dataclass
from helpers import send_https_request, verify_uuid
from utils.connection_util import ConnectionTag, new_connection_by_tag
from utils.output_notifier import OutputNotifier


@dataclass
Expand Down Expand Up @@ -58,22 +59,39 @@ async def run_mqtt_listener(
mqtt_broker_port,
mqtt_broker_user,
mqtt_broker_password,
output_notifier,
):
stdout_buffer = []

async def stdout_stderr_callback(output):
print(f"MQTT Listener output: {output}")
stdout_buffer.append(output)
await output_notifier.handle_output(output)

mqtt_process = await exit_stack.enter_async_context(
connection.create_process([
"python3",
"-u",
"/opt/bin/mqtt-listener.py",
mqtt_broker_host,
mqtt_broker_port,
mqtt_broker_user,
mqtt_broker_password,
]).run()
]).run(
stdout_callback=stdout_stderr_callback,
stderr_callback=stdout_stderr_callback,
)
)
while not mqtt_process.is_executing():
await sleep(0.1)
print("Waiting for MQTT listener to start executing ...")
print("MQTT listener is executing ...")
return mqtt_process

try:
await mqtt_process.wait_stdin_ready(timeout=10.0)
print("MQTT listener process stdin is ready")
except TimeoutError as e:
raise TimeoutError(
f"Timed out waiting for MQTT listener stdin readiness: {e}"
) from e

return stdout_buffer


async def get_mqtt_broker_credentials(connection):
Expand All @@ -98,6 +116,14 @@ async def get_mqtt_broker_credentials(connection):
return host, port, user, password


def create_output_notifier(events):
output_notifier = OutputNotifier()
event_objects = {event: Event() for event in events}
for event, event_obj in event_objects.items():
output_notifier.notify_output(event, event_obj)
return output_notifier, event_objects


async def test_nc_register():
async with AsyncExitStack() as exit_stack:
# Setup connections
Expand All @@ -112,10 +138,25 @@ async def test_nc_register():
mqtt_host, mqtt_port, user, password = await get_mqtt_broker_credentials(
connection
)
mqtt_process = await run_mqtt_listener(
exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password

output_notifier, events = create_output_notifier(
["Connected to MQTT Broker", "message"]
)
connected_event = events["Connected to MQTT Broker"]
message_event = events["message"]

stdout_buffer = await run_mqtt_listener(
exit_stack,
mqtt_connection,
mqtt_host,
mqtt_port,
user,
password,
output_notifier,
)

await connected_event.wait()

# Register machine - this is a minimal version which should be also supported
request_json = {
"public_key": "some-public-key",
Expand Down Expand Up @@ -146,19 +187,30 @@ async def test_nc_register():
assert request_json["device_type"] == response.device_type
assert response.traffic_routing_supported is False

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")
await message_event.wait()

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = stdout_buffer[
-1
] # The last item in the buffer is expected to be a message from the meshnet topic
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

mqtt_process = await run_mqtt_listener(
exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password
output_notifier, events = create_output_notifier(
["Connected to MQTT Broker", "message"]
)
connected_event = events["Connected to MQTT Broker"]
message_event = events["message"]

stdout_buffer = await run_mqtt_listener(
exit_stack,
mqtt_connection,
mqtt_host,
mqtt_port,
user,
password,
output_notifier,
)
await connected_event.wait()

await send_https_request(
connection,
Expand All @@ -180,12 +232,10 @@ async def test_nc_register():

assert len(machines) == 0

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")
await message_event.wait()

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = stdout_buffer[
-1
] # The last item in the buffer is expected to be a message from the meshnet topic
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

0 comments on commit 17112c2

Please sign in to comment.