Skip to content

Commit

Permalink
hook up selector.close to loop
Browse files Browse the repository at this point in the history
try to avoid leaking loop closers
  • Loading branch information
minrk committed May 10, 2021
1 parent b490223 commit b53e158
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
4 changes: 4 additions & 0 deletions zmq/_asyncio_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
self._closed = False
self._thread = threading.Thread(
name="Tornado selector",
daemon=True,
Expand Down Expand Up @@ -119,6 +120,8 @@ def __del__(self) -> None:
self._waker_w.close()

def close(self) -> None:
if self._closed:
return
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
Expand All @@ -127,6 +130,7 @@ def close(self) -> None:
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._closed = True

def _wake_selector(self) -> None:
try:
Expand Down
15 changes: 12 additions & 3 deletions zmq/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
from zmq import _future

# registry of asyncio loop : selector thread
_selectors: WeakKeyDictionary[
asyncio.AbstractEventLoop, "_zmq._asyncio_selector.SelectorThread"
] = WeakKeyDictionary()
_selectors: WeakKeyDictionary = WeakKeyDictionary()


def _get_selector_windows(
Expand Down Expand Up @@ -53,7 +51,18 @@ def _get_selector_windows(
# stacklevel 5 matches most likely zmq.asyncio.Context().socket()
stacklevel=5,
)

selector = _selectors[io_loop] = SelectorThread(io_loop)

# patch loop.close to also close the selector thread
loop_close = io_loop.close

def _close_selector_and_loop():
_selectors.pop(io_loop, None)
selector.close()
loop_close()

io_loop.close = _close_selector_and_loop
return selector
else:
return io_loop
Expand Down
6 changes: 5 additions & 1 deletion zmq/tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def setUp(self):

def tearDown(self):
self.loop.close()
# verify cleanup of references to selectors
assert zaio._selectors == {}
if 'zmq._asyncio_selector' in sys.modules:
assert zmq._asyncio_selector._selector_loops == set()
super().tearDown()

def test_socket_class(self):
Expand Down Expand Up @@ -432,7 +436,7 @@ def shortDescription(self):
return doc

def setUp(self):
self.loop = zaio.ZMQEventLoop()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
super().setUp()

Expand Down
2 changes: 1 addition & 1 deletion zmq/tests/test_retry_eintr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class TestEINTRSysCall(BaseZMQTestCase):
""" Base class for EINTR tests. """
"""Base class for EINTR tests."""

# delay for initial signal delivery
signal_delay = 0.1
Expand Down

0 comments on commit b53e158

Please sign in to comment.