From d072ea6ff5dc120163be3c3124198f7241e2a916 Mon Sep 17 00:00:00 2001 From: Ikubimu Date: Thu, 2 Jan 2025 19:00:52 +0100 Subject: [PATCH 1/5] added class decoder which deserialize packets --- src/services/communications/Decoder_Data.py | 144 ++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/services/communications/Decoder_Data.py diff --git a/src/services/communications/Decoder_Data.py b/src/services/communications/Decoder_Data.py new file mode 100644 index 0000000..8448764 --- /dev/null +++ b/src/services/communications/Decoder_Data.py @@ -0,0 +1,144 @@ +import threading +import struct +import re + +import DatagramSocket + + +class Decoder: + def __init__(self, package_data, ds): #package_data its a array that contains string id and string with all measurements + self.dict_measurement_types = {} #key string id of each measurament, contains an array from types + self.dict_measurement_names = {} #contains array of names of each variable + self.dict_measurement_value = {} #contains the value of each measurement + self.ds = ds #datagramsocket + + for data in package_data: + id = data[0] + measurements = data[1].split(',') + arr_measuremets = [] + arr_names = [] + for measure in measurements: + pair = measure.split(':') + arr_names.append(pair[0]) + arr_measuremets.append(pair[1]) + self.dict_measurement_value[pair[0]] = None + + self.dict_measurement_types[id] = arr_measuremets + self.dict_measurement_names[id] = arr_names + + self.recv_packet_thread = threading.Thread(target=self._recv_packet, daemon=True) + self.recv_packet_running = False + + def start(self): + self.recv_packet_running = True + self.recv_packet_thread.start() + + def _recv_packet(self): + buff = bytearray() + id_packet = 0 + bytes_count = 0 + bytes_size = 0 + wait_new_packet = True + + while self.recv_packet_running: + raw_data = self.ds.get_packet() #get_packet must to be blocking + if wait_new_packet: + id_packet = struct.unpack(' Date: Thu, 2 Jan 2025 22:23:56 +0100 Subject: [PATCH 2/5] change DatagramSocket thread, now get_data is blocking --- src/services/communications/DatagramSocket.py | 35 +++++++------------ src/services/communications/Decoder_Data.py | 2 ++ 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/src/services/communications/DatagramSocket.py b/src/services/communications/DatagramSocket.py index 054e808..90f2ec7 100644 --- a/src/services/communications/DatagramSocket.py +++ b/src/services/communications/DatagramSocket.py @@ -18,25 +18,7 @@ def __init__(self, lip, lport, rip, rport): self._sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) self._sock.bind((self.local_ip,self.local_port)) self._sock.settimeout(TIMEOUT_TIME) - self._running = True - self._recv_thread = threading.Thread(target=self._receive, daemon=True) - self._recv_thread.start() - - def _receive(self): - while self._running: - try: - data,address = self._sock.recvfrom(MAX_SIZE_PACKET) - if address == (self.remote_ip,self.remote_port): #check if the receiving message comes from the ip address correct - self._queue_packet_received.put(data) - except socket.timeout: - continue - except OSError as e: - if self._running == False: - return - print(f"Error while receiving data: {e}") - break - self._running = False - + self._running = False def transmit(self, buf: bytes) -> int: bytes_sent = 0 @@ -50,9 +32,18 @@ def transmit(self, buf: bytes) -> int: return bytes_sent def get_packet(self) -> Optional[bytes]: - if self._queue_packet_received.empty(): - return None - return self._queue_packet_received.get() + while self._running: + try: + data,address = self._sock.recvfrom(MAX_SIZE_PACKET) + if address == (self.remote_ip,self.remote_port): #check if the receiving message comes from the ip address correct + return data + except socket.timeout: + continue + except OSError as e: + if self._running == False: + return + print(f"Error while receiving data: {e}") + break def stop(self): if not self._running: diff --git a/src/services/communications/Decoder_Data.py b/src/services/communications/Decoder_Data.py index 8448764..8f4aa5b 100644 --- a/src/services/communications/Decoder_Data.py +++ b/src/services/communications/Decoder_Data.py @@ -31,6 +31,7 @@ def __init__(self, package_data, ds): #package_data its a array that contains st def start(self): self.recv_packet_running = True + self.ds._running = True self.recv_packet_thread.start() def _recv_packet(self): @@ -139,6 +140,7 @@ def __getitem__(self, key): def stop(self): self.recv_packet_running = False self.recv_packet_thread.join() + self.ds.stop() def __del__(self): self.stop() From d6c5b779908b173f83008e92fbec8c2acb0a5e35 Mon Sep 17 00:00:00 2001 From: Ikubimu Date: Fri, 3 Jan 2025 11:04:36 +0100 Subject: [PATCH 3/5] testing decoder and fix errors --- src/services/communications/Decoder_Data.py | 33 ++++++++++++--------- src/test_decoder/test_decoder_recieve.py | 19 ++++++++++++ src/test_decoder/test_decoder_send.py | 23 ++++++++++++++ 3 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 src/test_decoder/test_decoder_recieve.py create mode 100644 src/test_decoder/test_decoder_send.py diff --git a/src/services/communications/Decoder_Data.py b/src/services/communications/Decoder_Data.py index 8f4aa5b..f301184 100644 --- a/src/services/communications/Decoder_Data.py +++ b/src/services/communications/Decoder_Data.py @@ -74,43 +74,43 @@ def deserialize(self, buffer, id_packet): data = None match type: case "bool": - data = struct.unpack(' Date: Fri, 17 Jan 2025 19:05:33 +0100 Subject: [PATCH 4/5] update class for receive new dict --- .../services/communications/DatagramSocket.py | 1 + .../services/communications/Decoder_Data.py | 44 +++++++++---------- src/vmcu/test_decoder/test_decoder_recieve.py | 7 +-- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/src/vmcu/services/communications/DatagramSocket.py b/src/vmcu/services/communications/DatagramSocket.py index 90f2ec7..21f23b1 100644 --- a/src/vmcu/services/communications/DatagramSocket.py +++ b/src/vmcu/services/communications/DatagramSocket.py @@ -32,6 +32,7 @@ def transmit(self, buf: bytes) -> int: return bytes_sent def get_packet(self) -> Optional[bytes]: + self._running = True while self._running: try: data,address = self._sock.recvfrom(MAX_SIZE_PACKET) diff --git a/src/vmcu/services/communications/Decoder_Data.py b/src/vmcu/services/communications/Decoder_Data.py index f301184..43d572a 100644 --- a/src/vmcu/services/communications/Decoder_Data.py +++ b/src/vmcu/services/communications/Decoder_Data.py @@ -4,20 +4,34 @@ import DatagramSocket - +def calculate_byte_size(self, measurements): + + size = 0 + for measure in measurements: + if measure == 'uint8' or measure == 'int8' or measure == 'bool' or 'enum' in measure: + size += 1 + elif measure == 'uint16' or measure == 'int16': + size += 2 + elif measure == 'uint32' or measure == 'int32' or measure == 'float32': + size += 4 + elif measure == 'uint64' or measure == 'int64' or measure == 'float64': + size += 8 + + return size class Decoder: def __init__(self, package_data, ds): #package_data its a array that contains string id and string with all measurements self.dict_measurement_types = {} #key string id of each measurament, contains an array from types self.dict_measurement_names = {} #contains array of names of each variable self.dict_measurement_value = {} #contains the value of each measurement + self.dict_measurement_size = {} #contains size for each packet self.ds = ds #datagramsocket - - for data in package_data: - id = data[0] - measurements = data[1].split(',') + + for id, measurements in package_data.items(): + + measurements_split = measurements.split(',') arr_measuremets = [] arr_names = [] - for measure in measurements: + for measure in measurements_split: pair = measure.split(':') arr_names.append(pair[0]) arr_measuremets.append(pair[1]) @@ -25,13 +39,13 @@ def __init__(self, package_data, ds): #package_data its a array that contains st self.dict_measurement_types[id] = arr_measuremets self.dict_measurement_names[id] = arr_names + self.dict_measurement_size[id] = calculate_byte_size(arr_measuremets) self.recv_packet_thread = threading.Thread(target=self._recv_packet, daemon=True) self.recv_packet_running = False def start(self): self.recv_packet_running = True - self.ds._running = True self.recv_packet_thread.start() def _recv_packet(self): @@ -47,7 +61,7 @@ def _recv_packet(self): id_packet = struct.unpack(' Date: Mon, 20 Jan 2025 16:03:42 +0100 Subject: [PATCH 5/5] fix buffer implementation to manage multiple packages --- .../services/communications/Decoder_Data.py | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/src/vmcu/services/communications/Decoder_Data.py b/src/vmcu/services/communications/Decoder_Data.py index 43d572a..2d86976 100644 --- a/src/vmcu/services/communications/Decoder_Data.py +++ b/src/vmcu/services/communications/Decoder_Data.py @@ -2,9 +2,7 @@ import struct import re -import DatagramSocket - -def calculate_byte_size(self, measurements): +def calculate_byte_size(measurements): size = 0 for measure in measurements: @@ -51,33 +49,42 @@ def start(self): def _recv_packet(self): buff = bytearray() id_packet = 0 - bytes_count = 0 bytes_size = 0 wait_new_packet = True while self.recv_packet_running: - raw_data = self.ds.get_packet() #get_packet must to be blocking - if wait_new_packet: - id_packet = struct.unpack('= bytes_size and len(buff)>0: + data_buff = buff[:bytes_size] + buff = buff[bytes_size:] + self.deserialize(data_buff, id_packet) + + if len(buff) >= 2: + id_packet = struct.unpack('