diff --git a/README.md b/README.md index 252f898..50f6162 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,48 @@ # DecCom-Python -Decentralised communication with Python made easy. +[DecCom](https://theworkerthread.com/tool/deccom) - Decentralised communication with Python made easy. -DecCom provides an easy interface to stack modular protocols on top of each other to create the application you need. The package comes with several protocols you can already use in your development. +DecCom provides an easy interface to stack modular protocols on top of each other to create the application you need. The package comes with several protocols you can already use in your development. THE PROJECT IS STILL A WORK IN PROGRESS!! ALL BINDINGS ARE SUBJECT TO CHANGE! USE AT YOUR OWN RISK! + +## Why DecCom? + +Many popular frameworks for distributed applications are often overly complex, poorly maintained, or straight up not working. [IPv8](https://github.com/Tribler/py-ipv8) has an incredibly rigid structure and poor throughput. [LibP2P](https://libp2p.io/) has a tidiously slow developmental cycle and for many languages the repositories are no longer maintained. It is incredibly difficult to build your desired application on any of these. + +This is where DecCom comes in. With just a few lines of code you too can build a complex distributed application ready for a production environment. The modular nature of the protocols means that you can modify and add functionality without needing to rewrite your entire codebase. DecCom comes prepackaged with several common protocols which take care of the boring stuff for you - processing large messages, reliable communication, peer discovery, security, etc. + +**But why "DecCom"?** - DecCom is short for Decentralised Communication. There isn't much more to it. + + +## DecCom's philosophy + +Many protocol architectures have a very rigid structure - one layer binds to all ports of a lower layer and provides some ports to the layer above it. DecCom violates this strucutre by allowing for any layer to connect to any set of bindings of layers under it. This may sometimes result in very ugly looking diagrams such as the one below. + +![DecCom protocol stack](imgs/protocolstack.png "Example Protocol Stack in DecCom") + +DecCom works with two types of bindings (see [wrappers.py](wrappers.py))- bindto and bindfrom. Bindto refers to methods a protocol calls from a lower protocol. For example, in the TCP/IP protocol stack, an application would call a "send" method to send to some other IP their message. Bindfrom are methods a lower protocol can call in an upper protocol. In the TCP/IP analogy an application would have a receive method which the lower layer would call when the entire message has been received. Thus bindto goes down, bindfrom goes up the stack. Typically your application should stand at the top. + +## Identity + +Within DecCom each node has an [identifier](peer.py) and a public identity. The identifier is a SHA256 has of their public identity. Their public identity can be a public key (currently we support eliptic curve algorithms on the Ed25519 curve) or strings of arbitrary length. Public key identities are used in security layers for encrypting or signing messages. String identities are useful if you want to test an application with a small set of known nodes with specific ids. + +## Discovery + +Discovery can be performed with any of the available methods (currently a [Kademlia DHT](kademliadiscovery.py) or a [Gossip](biggossip.py) protocol). It is important to note that depending on the nodes you choose to connect to initially you can have many different peer networks built on DecCom, which do not communicate with each other. Unlike applications built on top of IPFS which use the IPFS network at all times, with DecCom you can create your own private group without ever contacting any of the publicly available ones. + +## Protocols + +DecCom currently has the following protocols implemented: + +1. UDP transport [defaultprotocol.py](defaultprotocol.py) +2. TCP transport [streamprotocol.py](streamprotocol.py) +3. UDP hole punching [holepuncher.py](holepuncher.py) +4. TCP hole punching [tcpholepuncher.py](tcpholepuncher.py) +5. Reliable UDP [reliableudp.py](reliableudp.py) +6. Noise protocol [noiseprotocol.py](noiseprotocol.py) - establishes a common secret between two nodes +7. Kademlia DHT [kademliadiscovery.py](kademliadiscovery.py) - for discovery +8. Gossip [gossipdiscovery.py](gossipdiscovery.py) - for discovery +9. BigGossip [biggossip.py](biggossip.py) - for discovery +10. Delay protocol [delayprotocol.py](delayprotocol.py) - for testing purposes as one might want to add artificial delay between nodes +11. Keep Alive protocol [keepalive.py](keepalive.py) +12. Faulty UDP transport [faultytransport.py](faultytransport.py) - for simulating random message drops (useful for testing your programs) \ No newline at end of file diff --git a/deccom/__init__.py b/deccom/__init__.py index f102a9c..3dc1f76 100644 --- a/deccom/__init__.py +++ b/deccom/__init__.py @@ -1 +1 @@ -__version__ = "0.0.1" +__version__ = "0.1.0" diff --git a/deccom/cryptofuncs/hash.py b/deccom/cryptofuncs/hash.py index 88bea3a..12e5b6a 100644 --- a/deccom/cryptofuncs/hash.py +++ b/deccom/cryptofuncs/hash.py @@ -8,23 +8,46 @@ def _helper(inp, encoding): # print("bytes") inp = inp else: - raise Exception("Unsupported format",type(inp)) + raise AttributeError("Unsupported format",type(inp)) return inp - + + +""" +Generates a SHA256 of a given input. Input can be of type string, bytes, int, or a list of them. + +Parameters +---------- +name + Input to be hashed + +salt : bytes + The salt to add to the hash (default is None). Salt is added to the end of the input. + +encoding : str + Encoding to use in case the input is a string. Integers are considered as a 64 big-endian byte string. Defaults to utf-8 + +Returns +---------- +bytes + The SHA256 representation of the input + +Raise +---------- +AttributeError +""" + def SHA256(inp, salt: bytes = None, encoding = "utf-8"): digest = hashlib.sha256() if isinstance(inp,str) or isinstance(inp,int) or isinstance(inp,bytes): - inp = _helper(inp,encoding) + digest.update(_helper(inp,encoding)) elif isinstance(inp, list): - tmp = bytearray() for i in inp: - tmp += _helper(inp,encoding) - inp = bytes(tmp) + digest.update(_helper(inp,encoding)) else: - print("unsupported format") + raise AttributeError("Unsupported format",type(inp)) + if salt != None: - digest.update(salt + inp) - else: - digest.update(inp) + digest.update(salt) + return digest.digest() diff --git a/deccom/cryptofuncs/signatures.py b/deccom/cryptofuncs/signatures.py index 0d41445..5902ec9 100644 --- a/deccom/cryptofuncs/signatures.py +++ b/deccom/cryptofuncs/signatures.py @@ -5,16 +5,46 @@ from cryptography.hazmat.primitives import hashes from fe25519 import fe25519 from ge25519 import ge25519, ge25519_p3 + def gen_key(): return Ed25519PrivateKey.generate() def sign(key: Ed25519PrivateKey, hash: bytes): - + if not isinstance(hash,bytes): + raise AttributeError("Unsupported format to hash. Expected bytes but got ",type(hash)) return key.sign(hash) def load_key(bts): return Ed25519PrivateKey.from_private_bytes(bts) -def verify(key: Ed25519PublicKey, hash, sign): + + +""" +Verifies an ED25519 signature. + +Parameters +---------- +key + Public key of the other party + +hash : bytes + The hash of the message + +sign : bytes + The other party's signature + +Returns +---------- +bool + Whether verification was successful + +Raise +---------- +AttributeError +""" + +def verify(key: Ed25519PublicKey, hash: bytes, sign: bytes): + if not isinstance(hash,bytes): + raise AttributeError("Unsupported format to verify. Expected bytes but got ",type(hash)) if isinstance(key,bytes): key = Ed25519PublicKey.from_public_bytes(key) try: @@ -27,6 +57,32 @@ def to_bytes(key: Ed25519PublicKey): return key.public_bytes(Encoding.Raw, PublicFormat.Raw) def from_bytes(key: bytes): return Ed25519PublicKey.from_public_bytes(key) + + + +""" +Given two Ed25519, computes a shared secret between them to be used in Ec-DH. The alogirthm is symmteric - so party one with their private key and the other +party's public key will compute the same result as the other party with their private key and party one's public key. Read more at: +https://en.wikipedia.org/wiki/Elliptic-curve_Diffie%E2%80%93Hellman + +Parameters +---------- +key : Ed25519PrivateKey + Private key of one party + +remote : Ed25519PublicKey + Public key of the other party + + +Returns +---------- +shared key between the two parties + +Raise +---------- +ValueError +""" + def get_secret(key:Ed25519PrivateKey, remote: Ed25519PublicKey): bts = key.private_bytes(encoding=Encoding.Raw, format=PrivateFormat.Raw, encryption_algorithm=NoEncryption()) privatedh = X25519PrivateKey.from_private_bytes(x25519_from_ed25519_private_bytes(bts)) @@ -51,6 +107,13 @@ def x25519_from_ed25519_private_bytes(private_bytes): return h[0:32] + + +""" +Generates a public x25519 key from a public ed25519 key +""" + + def x25519_from_ed25519_public_bytes(public_bytes) -> X25519PublicKey: # This is libsodium's crypto_sign_ed25519_pk_to_curve25519 translated into @@ -65,9 +128,6 @@ def x25519_from_ed25519_public_bytes(public_bytes) -> X25519PublicKey: if A.root_check: raise ValueError("Root check failed") - if not A.is_on_main_subgroup(): - raise ValueError("It's on the main subgroup") - one_minus_y = fe25519.one() - A.Y x = A.Y + fe25519.one() x = x * one_minus_y.invert() diff --git a/deccom/nodes/streamnode.py b/deccom/nodes/streamnode.py index 5fbbae7..2f91235 100644 --- a/deccom/nodes/streamnode.py +++ b/deccom/nodes/streamnode.py @@ -19,7 +19,7 @@ def __init__(self, p: Peer, protocol: StreamProtocol, ip_addr="0.0.0.0", port=No self.peer_reads = dict() self.peer_writes = dict() self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if socket.SO_REUSEPORT != None: + if hasattr(socket, 'SO_REUSEPORT'): self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) else: self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) diff --git a/deccom/peers/peer.py b/deccom/peers/peer.py index e49d54a..f5fc256 100644 --- a/deccom/peers/peer.py +++ b/deccom/peers/peer.py @@ -7,7 +7,13 @@ from deccom.utils.common import byte_reader, byte_writer class Peer(object): - def __init__(self, addr, pub_key: Ed25519PrivateKey = None, tcp = None, id_node = None, proof_of_self = None) -> None: + + """ + Peer object to store all information about a peer in the system (their identity, ip, port, etc...) + """ + + + def __init__(self, addr, pub_key: Ed25519PrivateKey | str = None, tcp = None, id_node = None, proof_of_self = None) -> None: self.priv_key = None if pub_key == None: self.key = gen_key() @@ -23,29 +29,33 @@ def __init__(self, addr, pub_key: Ed25519PrivateKey = None, tcp = None, id_node self.addr = addr self.tcp = tcp self.external_addr = addr - self.s = None # self.heard_from = None # if proof_of_self != None: # proof_of_self = SHA256([pub_key,addr[0],addr[1],]) # print("pub_key",pub_key) - pass - - # |------------------------| - # | Control header (1B) | - # |------------------------| - # | pub_key (MAX 63B) | - # |------------------------| - # | addr[0] (4B) | - # |------------------------| - # | addr[1] (2B) | - # |------------------------| - # | tcp (2B) | - # |------------------------| - # Control header contains (LSB FIRST): - # 6 Bits size of pub_key - # 1 Bits type of pub_key - # 1 Bit if TCP is present + + + """ + Generates a byte representation of the peer: + |------------------------| + | Control header (1B) | + |------------------------| + | pub_key (MAX 63B) | + |------------------------| + | addr[0] (4B) | + |------------------------| + | addr[1] (2B) | + |------------------------| + | tcp (2B) | + |------------------------| + Control header contains (LSB FIRST): + 6 Bits size of pub_key + 1 Bits type of pub_key + 1 Bit if TCP is present + + Hence a header of 10100000 means TCP is present, Public key is an Ed25519 identity, public key length of 32 bytes (256 bits) + """ def __bytes__(self)->bytes: writer = byte_writer() @@ -71,6 +81,20 @@ def __bytes__(self)->bytes: writer.write_int(2, self.tcp) return writer.bytes() + + """ + Given a byte representation generates a peer + + Return + ---------- + + Peer + Peer object + + int + The offset of the pointer on the byte list once the peer has been read + """ + @staticmethod def from_bytes(b: bytes) -> tuple["Peer", int]: # print(len(b)) diff --git a/deccom/protocols/defaultprotocol.py b/deccom/protocols/defaultprotocol.py index c25004c..5880a56 100644 --- a/deccom/protocols/defaultprotocol.py +++ b/deccom/protocols/defaultprotocol.py @@ -33,12 +33,12 @@ def set_callback(self, callback): self.callback = callback async def send_datagram(self, msg: bytes, addr: tuple[str,int]): - await self.sendto(self.uniqueid + msg, addr) def process_datagram(self, addr, data): + loop = asyncio.get_event_loop() if len(data) < 2: @@ -128,5 +128,7 @@ def connection_lost(self, exc: Exception) -> None: async def sendto(self,msg,addr): # print("sending to",addr,msg) + if addr[0] == self.transport.get_extra_info("sockname")[0] and addr[1] == self.transport.get_extra_info("sockname")[1]: + return self.transport.sendto(msg, addr) # print("sent") diff --git a/deccom/protocols/peerdiscovery/_finder.py b/deccom/protocols/peerdiscovery/_finder.py index 372b33a..72ae595 100644 --- a/deccom/protocols/peerdiscovery/_finder.py +++ b/deccom/protocols/peerdiscovery/_finder.py @@ -10,6 +10,11 @@ class NodeaAbstraction: class Finder(object): + """ + Finder class for the Kademlia Protocol. Given a list of initial peer and an alpha parameter of number of peers to contact in parallel, find a given + peer. + """ + def __init__(self, look_for: bytes, initial: list[Peer], alpha: int = 5) -> None: self.look_for = look_for self.look_for_int = int.from_bytes(look_for, byteorder="big") @@ -21,7 +26,17 @@ def __init__(self, look_for: bytes, initial: list[Peer], alpha: int = 5) -> None heapq.heappush(self.peers, (pi.idint ^ self.look_for_int, pi)) self.contacted.add(pi.idx) - pass + + """ + Gives the next list of peers that should be contacted during the search of the peer + + + Returns + ---------- + ret + A list of peers (of maximum size alpha) which are currently the closest known to the searched for peer. + + """ def find_peer(self): if len(self.peers) == 0: @@ -33,6 +48,16 @@ def find_peer(self): ret.append(heapq.heappop(self.peers)[1].p) return ret + + """ + Adds the list of peers to the internal heap queue + + Parameters + ---------- + peers + List of peers to add + """ + def add_peer(self, peers: list[Peer]): for p in peers: prv = len(self.contacted) diff --git a/deccom/protocols/peerdiscovery/_kademlia_routing.py b/deccom/protocols/peerdiscovery/_kademlia_routing.py index bc6aa91..4dda17d 100644 --- a/deccom/protocols/peerdiscovery/_kademlia_routing.py +++ b/deccom/protocols/peerdiscovery/_kademlia_routing.py @@ -3,6 +3,12 @@ from deccom.peers.peer import Peer import time class KBucket(object): + + """ + KBuckets as described by the Kademlia Paper. Each bucket has range of [min_dist,max_dist) and stores nodes at that distance from the + own id node. + """ + def __init__(self, min_dist, max_dist, k, originator = False, success_call = lambda addr, p : ...): self.min_dist = min_dist self.max_dist = max_dist @@ -13,8 +19,25 @@ def __init__(self, min_dist, max_dist, k, originator = False, success_call = lam self.originator = originator self.toadd: list[tuple[bytes,Peer]] = [] self.updated = time.monotonic() + + """ + Updates modification date of this bucket (same as UNIX touch command) + """ + def touch(self): self.updated = time.monotonic() + + + """ + Splits the bucket in two + + Returns + ---------- + (left,right) + A tuple of the new left and right bucket + + """ + def split_bucket(self): to_split = self.mid_point left = KBucket(self.min_dist, to_split, self.k, success_call = self.success_call) @@ -216,13 +239,7 @@ def get_closest(self, id, alpha = None) -> list[Peer]: lst: list[Peer] = [] lst += list(self.buckets[idx].peers.values()) diff = 1 - stopper = max(idx, len(self.buckets)-idx) - # print("stopper", idx, len(self.buckets), stopper, len(lst), alpha) - # acc = 0 - # for b in self.buckets: - # acc += len(b.peers) - # print("we know",acc) while len(lst) < alpha: diff --git a/deccom/protocols/peerdiscovery/biggossip.py b/deccom/protocols/peerdiscovery/biggossip.py index 9d3f416..1249e6c 100644 --- a/deccom/protocols/peerdiscovery/biggossip.py +++ b/deccom/protocols/peerdiscovery/biggossip.py @@ -204,6 +204,8 @@ async def _find_peer(self, fut, id: bytes): return async def find_peer(self, id) -> Peer: + if id == self.peer.id_node: + return self.peer if self.peers.get(id) == None: if self.peer_crawls.get(id) == None: loop = asyncio.get_running_loop() diff --git a/deccom/protocols/peerdiscovery/fixedpeers.py b/deccom/protocols/peerdiscovery/fixedpeers.py index 3badb5f..3d70918 100644 --- a/deccom/protocols/peerdiscovery/fixedpeers.py +++ b/deccom/protocols/peerdiscovery/fixedpeers.py @@ -86,6 +86,8 @@ async def broadcast(self, msg): def get_al(self, addr: tuple[str, int]) -> Union[Peer, None]: return self.a_to_p.get(addr) async def find_peer(self, id: bytes) -> Peer: + if id == self.peer.id_node: + return self.peer if self.peers.get(id) == None: if self.peer_crawls.get(id) == None: loop = asyncio.get_running_loop() diff --git a/deccom/protocols/peerdiscovery/gossipdiscovery.py b/deccom/protocols/peerdiscovery/gossipdiscovery.py index 6fdd4f0..0206698 100644 --- a/deccom/protocols/peerdiscovery/gossipdiscovery.py +++ b/deccom/protocols/peerdiscovery/gossipdiscovery.py @@ -211,6 +211,8 @@ async def _find_peer(self, fut, id): return async def find_peer(self, id) -> Peer: + if id == self.peer.id_node: + return self.peer if self.peers.get(id) == None: if self.peer_crawls.get(id) == None: loop = asyncio.get_running_loop() diff --git a/deccom/protocols/peerdiscovery/kademliadiscovery.py b/deccom/protocols/peerdiscovery/kademliadiscovery.py index 60b37ca..07e6708 100644 --- a/deccom/protocols/peerdiscovery/kademliadiscovery.py +++ b/deccom/protocols/peerdiscovery/kademliadiscovery.py @@ -329,7 +329,8 @@ async def _find_peer(self, fut, id): async def find_peer(self, id) -> Peer: - # print(self.peer.pub_key, "looking for") + if id == self.peer.id_node: + return self.peer if self.get_peer(id) == None: if self.peer_crawls.get(id) == None: diff --git a/deccom/protocols/streamprotocol.py b/deccom/protocols/streamprotocol.py index 5c6e5d5..8b4e2da 100644 --- a/deccom/protocols/streamprotocol.py +++ b/deccom/protocols/streamprotocol.py @@ -1,7 +1,8 @@ import asyncio +from os import urandom from datetime import datetime from deccom.utils.common import ternary_comparison -from asyncio import exceptions +from asyncio import exceptions, IncompleteReadError from typing import Any, Callable, List, Union from deccom.peers.peer import Peer from deccom.protocols.abstractprotocol import AbstractProtocol @@ -15,6 +16,8 @@ def __init__(self,reader: asyncio.StreamReader,writer: asyncio.StreamWriter,fut: self.fut = fut self.opened_by_me = opened_by_me self.using = 0 + self.unique_id = None + self.in_use = True pass @@ -54,8 +57,6 @@ def __init__(self, always_connect: bool, submodule=None, callback: Callable[[tup self.always_connect = always_connect self.await_connections = dict() - - # self.lock = asyncio.Lock() async def stop(self): for k,v in self.connections.items(): async with self.locks[k]: @@ -92,9 +93,10 @@ async def handle_connection(self, reader: asyncio.StreamReader,writer: asyncio.S self.locks[node_id] = asyncio.Lock() # print("connection from",node_id) if self.connections.get(node_id) != None: + with open(f"log{self.peer.pub_key}.txt", "a") as log: log.write(f"duplicate connection from {addr} {node_id}\n") - #rint(self.connections.get(node_id).opened_by_me,ternary_comparison(self.peer.id_node, node_id)) + if self.connections.get(node_id).opened_by_me * ternary_comparison(self.peer.id_node, node_id) == -1: with open(f"log{self.peer.pub_key}.txt", "a") as log: log.write(f"closing previous with {addr} {node_id}\n") @@ -107,7 +109,8 @@ async def handle_connection(self, reader: asyncio.StreamReader,writer: asyncio.S with open(f"log{self.peer.pub_key}.txt", "a") as log: log.write(f"listening from {addr} {node_id}\n") self.connections[node_id] = DictItem(reader,writer,None, -1) - self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr)) + self.connections[node_id].unique_id = urandom(4) + self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr,self.connections[node_id].unique_id)) return @bindto("get_peer") @@ -136,10 +139,11 @@ async def open_connection(self, remote_ip, remote_port, node_id: bytes, port_lis if remote_port == None: return False + if self.connections.get(node_id) == None and self.await_connections.get(node_id) != None: - await self.await_connections[node_id] - + return await self.await_connections[node_id] + if self.connections.get(node_id) != None: print("duplicate connection OPENED") @@ -152,7 +156,7 @@ async def open_connection(self, remote_ip, remote_port, node_id: bytes, port_lis s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if socket.SO_REUSEPORT != None: + if hasattr(socket, 'SO_REUSEPORT'): s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) else: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -185,7 +189,8 @@ async def open_connection(self, remote_ip, remote_port, node_id: bytes, port_lis return False self.connections[node_id] = DictItem(reader,writer,None,1) - self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,(remote_ip,remote_port))) + self.connections[node_id].unique_id = urandom(4) + self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,(remote_ip,remote_port),self.connections[node_id].unique_id )) #print("introducing myself :)") async with self.locks[node_id]: writer.write(b'\xe4\xe5\xf3\xc6') @@ -219,8 +224,10 @@ async def close_stream(self, node_id: bytes, user = False) -> bool: self.remove_from_dict(node_id) return True - async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, addr = None): + async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, addr = None, uniq_id = None): + if self.connections.get(node_id) == None or self.connections[node_id].unique_id != uniq_id: + return # seqrand = random.randint(1,40000) #print("listening for data") with open(f"log{self.peer.pub_key}.txt", "a") as log: @@ -228,13 +235,15 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad try: data = await reader.readexactly(32) - except (ConnectionResetError, BrokenPipeError) as e: + except (ConnectionResetError, BrokenPipeError,IncompleteReadError) as e: with open(f"log{self.peer.pub_key}.txt", "a") as log: log.write(datetime.now().strftime("%d/%m/%Y, %H:%M:%S")) log.write(f" closed because reset from {node_id}\n") - log.write(e) + # log.write(e) log.write("\n") async with self.locks[node_id]: + if self.connections[node_id].unique_id != uniq_id: + return if node_id !=None and self.connections.get(node_id) != None: self.connections[node_id].fut = None print("closing because reset", addr,node_id) @@ -245,6 +254,8 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad if data == b'': async with self.locks[node_id]: + if self.connections[node_id].unique_id != uniq_id: + return if node_id !=None and self.connections.get(node_id) != None: self.connections[node_id].fut = None print("closing because received empty bytes", addr,node_id) @@ -261,6 +272,8 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad data = await reader.read(min(i, 9048)) if data == b'': async with self.locks[node_id]: + if self.connections[node_id].unique_id != uniq_id: + return if node_id !=None and self.connections.get(node_id) != None: self.connections[node_id].fut = None print("closing because received empty bytes", addr,node_id) @@ -275,8 +288,7 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad # print(seqrand,"read",len(buffer), "from",self.get_peer(node_id).pub_key) loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self._caller(buffer,node_id,addr), loop) - - self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr)) + self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr,uniq_id)) async def send_stream(self, node_id, data, lvl = 0): if self.connections.get(node_id) == None or lvl >= 3: diff --git a/deccom/utils/common.py b/deccom/utils/common.py index cb02d05..3d6807f 100644 --- a/deccom/utils/common.py +++ b/deccom/utils/common.py @@ -1,7 +1,17 @@ from socket import socket from concurrent.futures import ThreadPoolExecutor + +""" +Find an open port + +Return +-------- +int + Avaialble open port +""" + def find_open_port(): - ret = 10010 + ret = None with socket() as s: s.bind(('',0)) @@ -9,7 +19,7 @@ def find_open_port(): s.close() return ret -def ternary_comparison(b1,b2): +def ternary_comparison(b1: bytes,b2: bytes): if b1 > b2: return 1 if b1 < b2: diff --git a/imgs/protocolstack.png b/imgs/protocolstack.png new file mode 100644 index 0000000..61dcd09 Binary files /dev/null and b/imgs/protocolstack.png differ diff --git a/setup.py b/setup.py index 919a719..4fa7715 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="deccom", - version="0.0.1", + version="0.1.0", description="Decentralized Communication With Modular Protocol Stack.", long_description=long_description, long_description_content_type='text/markdown',