Skip to content

Commit

Permalink
feat(#267): Introduced better typing to bs_rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
lxgr-linux committed Nov 13, 2024
1 parent 7442028 commit 15faa99
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
8 changes: 4 additions & 4 deletions bs_rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Client:
def __init__(self, rw, reg: Registry):
self.rw = rw
self.reg = reg
self.calls: dict[int, Channel] = {}
self.calls: dict[int, Channel[Body]] = {}

def __send(self, body: Body, call: int, method: Method):
"""Sends a request to the server
Expand All @@ -32,7 +32,7 @@ def __send(self, body: Body, call: int, method: Method):
) + END_SECTION
)

def __get_call(self, call_id: int) -> Channel:
def __get_call(self, call_id: int) -> Channel[Body]:
if (
call := self.calls.get(call_id)
) is not None:
Expand All @@ -47,7 +47,7 @@ def __new_call_id(self):
return call_id

def call_for_response(self, body: Body) -> Body:
ch = Channel()
ch = Channel[Body]()
call_id = self.__new_call_id()
self.calls[call_id] = ch
self.__send(body, call_id, Method.CALL_FOR_RESPONSE)
Expand All @@ -56,7 +56,7 @@ def call_for_response(self, body: Body) -> Body:
return call

def call_for_responses(self, body: Body) -> ChannelGenerator:
ch = Channel()
ch = Channel[Body]()
call_id = self.__new_call_id()
self.calls[call_id] = ch
self.__send(body, call_id, Method.CALL_FOR_RESPONSES)
Expand Down
10 changes: 6 additions & 4 deletions bs_rpc/channel.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import threading
from typing import Generic, TypeVar

T = TypeVar("T")

class Channel:
class Channel(Generic[T]):
def __init__(self):
self.__state: list = []
self.__state: list[T] = []
self.__event: threading.Event = threading.Event()
self.__closed = False

def close(self):
self.__closed = True

def push(self, item):
def push(self, item: T):
self.__state.append(item)
self.__event.set()
self.__event.clear()

def is_closed(self) -> bool:
return self.__closed

def listen(self):
def listen(self) -> T | None:
if len(self.__state) == 0:
if self.__closed:
return None
Expand Down
14 changes: 8 additions & 6 deletions bs_rpc/channel_generator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from typing import Generator
from typing import Callable, Generator, Generic, TypeVar

from . import msg
from .channel import Channel

CloseFn = Callable[[], None]
T = TypeVar("T")

class ChannelGenerator:
def __init__(self, ch: Channel, close_fn):
class ChannelGenerator(Generic[T]):
def __init__(self, ch: Channel[T], close_fn: CloseFn | None):
self.__ch: Channel = ch
self.__close_fn = close_fn

def __call__(self) -> Generator[msg.Body, None, None]:
def __call__(self) -> Generator[T, None, None]:
while True:
ret = self.__ch.listen()
if ret is None:
self.__close_fn()
if self.__close_fn is not None:
self.__close_fn()
return
yield ret

0 comments on commit 15faa99

Please sign in to comment.