From 714dc2167dacbb00be71c8e0eb7a34c9febd02fe Mon Sep 17 00:00:00 2001 From: Sassan Haradji Date: Thu, 28 Mar 2024 14:57:03 +0400 Subject: [PATCH] refactor: `Store` no longer aggregates changes, it now calls listeners with every change refactor: `SideEffectRunnerThread` now runs async side effects in the event loop of the thread in which it was instantiated in (it used to create its own event loop) refactor(test): `event_loop` fixture now sets the global event loop on setup and restores it on teardown --- .github/workflows/integration_delivery.yml | 4 +- CHANGELOG.md | 10 +++++ poetry.lock | 6 +-- pyproject.toml | 3 +- redux/autorun.py | 3 +- redux/main.py | 41 +++++++++++-------- redux/side_effect_runner.py | 28 +++---------- redux_pytest/fixtures/event_loop.py | 4 +- redux_pytest/fixtures/snapshot.py | 7 +++- .../general/store-subscription-002.jsonc | 6 +-- .../general/store-subscription-003.jsonc | 3 ++ .../general/store-subscription-004.jsonc | 4 +- .../general/store-subscription-005.jsonc | 4 +- .../general/store-subscription-007.jsonc | 10 +++++ tests/test_async.py | 36 +++------------- tests/test_scheduler.py | 26 ++++++++---- 16 files changed, 102 insertions(+), 93 deletions(-) create mode 100644 tests/results/test_features/general/store-subscription-007.jsonc diff --git a/.github/workflows/integration_delivery.yml b/.github/workflows/integration_delivery.yml index dec345f..26fe5fa 100644 --- a/.github/workflows/integration_delivery.yml +++ b/.github/workflows/integration_delivery.yml @@ -6,7 +6,7 @@ on: workflow_dispatch: env: - PYTHON_VERSION: "3.11" + PYTHON_VERSION: '3.11' jobs: dependencies: @@ -267,7 +267,7 @@ jobs: path: artifacts - name: Release - uses: softprops/action-gh-release@v1 + uses: softprops/action-gh-release@v2 with: files: artifacts/* tag_name: v${{ needs.build.outputs.version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 6641d45..8bf2973 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## Version 0.14.0 + +- refactor: `Store` no longer aggregates changes, it now calls listeners with every + change +- refactor: `SideEffectRunnerThread` now runs async side effects in the event loop + of the thread in which it was instantiated in (it used to create its own event + loop) +- refactor(test): `event_loop` fixture now sets the global event loop on setup and + restores it on teardown + ## Version 0.13.2 - fix: initial snapshot cleanup which used to mistakenly remove files with store:... diff --git a/poetry.lock b/poetry.lock index a5291eb..7f1d087 100644 --- a/poetry.lock +++ b/poetry.lock @@ -157,13 +157,13 @@ poetry-plugin = ["poetry (>=1.0,<2.0)"] [[package]] name = "pyright" -version = "1.1.355" +version = "1.1.356" description = "Command line wrapper for pyright" optional = false python-versions = ">=3.7" files = [ - {file = "pyright-1.1.355-py3-none-any.whl", hash = "sha256:bf30b6728fd68ae7d09c98292b67152858dd89738569836896df786e52b5fe48"}, - {file = "pyright-1.1.355.tar.gz", hash = "sha256:dca4104cd53d6484e6b1b50b7a239ad2d16d2ffd20030bcf3111b56f44c263bf"}, + {file = "pyright-1.1.356-py3-none-any.whl", hash = "sha256:a101b0f375f93d7082f9046cfaa7ba15b7cf8e1939ace45e984c351f6e8feb99"}, + {file = "pyright-1.1.356.tar.gz", hash = "sha256:f05b8b29d06b96ed4a0885dad5a31d9dff691ca12b2f658249f583d5f2754021"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 2f9602f..ecbd21b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "python-redux" -version = "0.13.2" +version = "0.14.0" description = "Redux implementation for Python" authors = ["Sassan Haradji "] license = "Apache-2.0" @@ -60,6 +60,7 @@ profile = "black" [tool.pyright] exclude = ['typings'] +filterwarnings = 'error' [tool.pytest.ini_options] log_cli = 1 diff --git a/redux/autorun.py b/redux/autorun.py index d657982..ef27a7a 100644 --- a/redux/autorun.py +++ b/redux/autorun.py @@ -171,8 +171,7 @@ def _check_and_call( create_task = self._store._create_task # noqa: SLF001 if iscoroutine(self._latest_value) and create_task: create_task(self._latest_value, callback=self._task_callback) - else: - self.inform_subscribers() + self.inform_subscribers() else: self.unsubscribe() diff --git a/redux/main.py b/redux/main.py index d66ad0a..f3b3c4f 100644 --- a/redux/main.py +++ b/redux/main.py @@ -50,6 +50,7 @@ def __init__( options: CreateStoreOptions[Action, Event] | None = None, ) -> None: """Create a new store.""" + self.finished = False self.store_options = options or CreateStoreOptions() self.reducer = reducer self._create_task = self.store_options.task_creator @@ -95,35 +96,37 @@ def __init__( if self.store_options.scheduler: self.store_options.scheduler(self.run, interval=True) + def _call_listeners(self: Store[State, Action, Event], state: State) -> None: + for listener_ in self._listeners.copy(): + if isinstance(listener_, weakref.ref): + listener = listener_() + if listener is None: + self._listeners.discard(listener_) + continue + else: + listener = listener_ + result = listener(state) + if asyncio.iscoroutine(result) and self._create_task: + self._create_task(result) + def _run_actions(self: Store[State, Action, Event]) -> None: action = self._actions.pop(0) result = self.reducer(self._state, action) if is_complete_reducer_result(result): self._state = result.state + self._call_listeners(self._state) self.dispatch([*(result.actions or []), *(result.events or [])]) elif is_state_reducer_result(result): self._state = result + self._call_listeners(self._state) if isinstance(action, FinishAction): self.dispatch(cast(Event, FinishEvent())) - if len(self._actions) == 0 and self._state: - for listener_ in self._listeners.copy(): - if isinstance(listener_, weakref.ref): - listener = listener_() - if listener is None: - self._listeners.discard(listener_) - continue - else: - listener = listener_ - result = listener(self._state) - if asyncio.iscoroutine(result) and self._create_task: - self._create_task(result) - def _run_event_handlers(self: Store[State, Action, Event]) -> None: event = self._events.pop(0) for event_handler_ in self._event_handlers[type(event)].copy(): - self._event_handlers_queue.put((event_handler_, event)) + self._event_handlers_queue.put_nowait((event_handler_, event)) def run(self: Store[State, Action, Event]) -> None: """Run the store.""" @@ -134,7 +137,12 @@ def run(self: Store[State, Action, Event]) -> None: if len(self._events) > 0: self._run_event_handlers() - if not any(worker.is_alive() for worker in self._workers): + if ( + self.finished + and self._actions == [] + and self._events == [] + and not any(worker.is_alive() for worker in self._workers) + ): self.clean_up() def clean_up(self: Store[State, Action, Event]) -> None: @@ -219,7 +227,8 @@ def unsubscribe() -> None: def _handle_finish_event(self: Store[State, Action, Event]) -> None: for _ in range(self.store_options.threads): - self._event_handlers_queue.put(None) + self._event_handlers_queue.put_nowait(None) + self.finished = True def autorun( self: Store[State, Action, Event], diff --git a/redux/side_effect_runner.py b/redux/side_effect_runner.py index 2a0634a..003226c 100644 --- a/redux/side_effect_runner.py +++ b/redux/side_effect_runner.py @@ -6,7 +6,7 @@ import contextlib import threading import weakref -from asyncio import Task, iscoroutine +from asyncio import Handle, iscoroutine from inspect import signature from typing import TYPE_CHECKING, Any, Callable, Generic, cast @@ -27,16 +27,14 @@ def __init__( """Initialize the side effect runner thread.""" super().__init__() self.task_queue = task_queue - self._tasks: set[Task] = set() + self.loop = asyncio.get_event_loop() + self._handles: set[Handle] = set() + self.create_task = lambda coro: self._handles.add( + self.loop.call_soon_threadsafe(self.loop.create_task, coro), + ) def run(self: SideEffectRunnerThread[Event]) -> None: """Run the side effect runner thread.""" - self.loop = asyncio.new_event_loop() - self.create_task = lambda coro: self._tasks.add(self.loop.create_task(coro)) - self.loop.run_until_complete(self.work()) - - async def work(self: SideEffectRunnerThread[Event]) -> None: - """Run the side effects.""" while True: task = self.task_queue.get() if task is None: @@ -61,17 +59,3 @@ async def work(self: SideEffectRunnerThread[Event]) -> None: self.create_task(result) finally: self.task_queue.task_done() - await self.clean_up() - - async def clean_up(self: SideEffectRunnerThread[Event]) -> None: - """Clean up the side effect runner thread.""" - while True: - tasks = [ - task - for task in asyncio.all_tasks(self.loop) - if task is not asyncio.current_task(self.loop) - ] - if not tasks: - break - for task in tasks: - await task diff --git a/redux_pytest/fixtures/event_loop.py b/redux_pytest/fixtures/event_loop.py index 2f96a19..0abf266 100644 --- a/redux_pytest/fixtures/event_loop.py +++ b/redux_pytest/fixtures/event_loop.py @@ -12,15 +12,17 @@ class LoopThread(threading.Thread): def __init__(self: LoopThread) -> None: super().__init__() self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) def run(self: LoopThread) -> None: self.loop.run_forever() def stop(self: LoopThread) -> None: + asyncio.set_event_loop(None) self.loop.call_soon_threadsafe(self.loop.stop) def create_task(self: LoopThread, coro: Coroutine) -> None: - self.loop.call_soon_threadsafe(lambda: self.loop.create_task(coro)) + self.loop.call_soon_threadsafe(self.loop.create_task, coro) @pytest.fixture() diff --git a/redux_pytest/fixtures/snapshot.py b/redux_pytest/fixtures/snapshot.py index f33d7ed..e3388eb 100644 --- a/redux_pytest/fixtures/snapshot.py +++ b/redux_pytest/fixtures/snapshot.py @@ -51,7 +51,12 @@ def __init__( def json_snapshot(self: StoreSnapshot) -> str: """Return the snapshot of the current state of the store.""" return ( - json.dumps(self.store.snapshot, indent=2, sort_keys=True) + json.dumps( + self.store.snapshot, + indent=2, + sort_keys=True, + ensure_ascii=False, + ) if self.store._state # noqa: SLF001 else '' ) diff --git a/tests/results/test_features/general/store-subscription-002.jsonc b/tests/results/test_features/general/store-subscription-002.jsonc index 34bb840..68158e1 100644 --- a/tests/results/test_features/general/store-subscription-002.jsonc +++ b/tests/results/test_features/general/store-subscription-002.jsonc @@ -2,12 +2,12 @@ { "_id": "e3e70682c2094cac629f6fbed82c07cd", "base10": { - "count": 12 + "count": 11 }, "inverse": { - "count": -1 + "count": 0 }, "straight": { - "count": 2 + "count": 1 } } diff --git a/tests/results/test_features/general/store-subscription-003.jsonc b/tests/results/test_features/general/store-subscription-003.jsonc index c593cbc..5dc6abf 100644 --- a/tests/results/test_features/general/store-subscription-003.jsonc +++ b/tests/results/test_features/general/store-subscription-003.jsonc @@ -6,5 +6,8 @@ }, "inverse": { "count": -1 + }, + "straight": { + "count": 2 } } diff --git a/tests/results/test_features/general/store-subscription-004.jsonc b/tests/results/test_features/general/store-subscription-004.jsonc index 5e4379b..a6b589c 100644 --- a/tests/results/test_features/general/store-subscription-004.jsonc +++ b/tests/results/test_features/general/store-subscription-004.jsonc @@ -2,9 +2,9 @@ { "_id": "e3e70682c2094cac629f6fbed82c07cd", "base10": { - "count": 10 + "count": 12 }, "inverse": { - "count": 1 + "count": -1 } } diff --git a/tests/results/test_features/general/store-subscription-005.jsonc b/tests/results/test_features/general/store-subscription-005.jsonc index 23a026b..3aa1053 100644 --- a/tests/results/test_features/general/store-subscription-005.jsonc +++ b/tests/results/test_features/general/store-subscription-005.jsonc @@ -2,9 +2,9 @@ { "_id": "e3e70682c2094cac629f6fbed82c07cd", "base10": { - "count": 8 + "count": 10 }, "inverse": { - "count": 3 + "count": 1 } } diff --git a/tests/results/test_features/general/store-subscription-007.jsonc b/tests/results/test_features/general/store-subscription-007.jsonc new file mode 100644 index 0000000..20b1154 --- /dev/null +++ b/tests/results/test_features/general/store-subscription-007.jsonc @@ -0,0 +1,10 @@ +// store-subscription-007 +{ + "_id": "e3e70682c2094cac629f6fbed82c07cd", + "base10": { + "count": 8 + }, + "inverse": { + "count": 3 + } +} diff --git a/tests/test_async.py b/tests/test_async.py index 2802fe6..b991c28 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -2,9 +2,8 @@ from __future__ import annotations import asyncio -import threading from dataclasses import replace -from typing import Callable, Coroutine, Generator +from typing import TYPE_CHECKING, Callable, Coroutine, Generator import pytest from immutable import Immutable @@ -20,6 +19,9 @@ ) from redux.main import Store +if TYPE_CHECKING: + from redux_pytest.fixtures.event_loop import LoopThread + INCREMENTS = 2 @@ -51,26 +53,6 @@ def reducer( return state -class LoopThread(threading.Thread): - def __init__(self: LoopThread) -> None: - super().__init__() - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - def run(self: LoopThread) -> None: - self.loop.run_forever() - - def stop(self: LoopThread) -> None: - self.loop.call_soon_threadsafe(self.loop.stop) - - -@pytest.fixture() -def loop() -> LoopThread: - loop_thread = LoopThread() - loop_thread.start() - return loop_thread - - Action = IncrementAction | SetMirroredValueAction | InitAction | FinishAction StoreType = Store[StateType, Action, FinishEvent] @@ -118,13 +100,8 @@ async def _(value: int) -> int: async def _(mirrored_value: int) -> None: if mirrored_value < INCREMENTS: return - store.dispatch(FinishAction()) - - async def finish() -> None: event_loop.stop() - - store.subscribe_event(FinishEvent, finish) - store.subscribe_event(FinishEvent, finish) + store.dispatch(FinishAction()) def test_subscription( @@ -152,8 +129,7 @@ async def finish() -> None: store.dispatch(FinishAction()) -def test_event_subscription_with_default_task_creator(event_loop: LoopThread) -> None: - asyncio.set_event_loop(event_loop.loop) +def test_event_subscription_with_no_task_creator(event_loop: LoopThread) -> None: store = Store( reducer, options=CreateStoreOptions(auto_init=True), diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index bc30f57..76c8243 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -4,7 +4,7 @@ import asyncio import threading from dataclasses import replace -from typing import TYPE_CHECKING, Callable, TypeAlias +from typing import TYPE_CHECKING, Callable, Coroutine, TypeAlias from unittest.mock import call from immutable import Immutable @@ -50,6 +50,7 @@ def __init__(self: Scheduler) -> None: self.stopped = False self._callbacks: list[tuple[Callable[[], None], float]] = [] self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) self.tasks: set[asyncio.Task] = set() def run(self: Scheduler) -> None: @@ -85,17 +86,27 @@ def schedule_stop(self: Scheduler) -> None: self.loop.call_soon_threadsafe(self.loop.create_task, self.graceful_stop()) -def test_scheduler( - mocker: MockerFixture, -) -> None: +def test_scheduler(mocker: MockerFixture) -> None: scheduler = Scheduler() scheduler.start() + def _create_task_with_callback( + coro: Coroutine, + callback: Callable[[asyncio.Task], None] | None = None, + ) -> None: + def create_task_with_callback() -> None: + task = scheduler.loop.create_task(coro) + if callback: + callback(task) + + scheduler.loop.call_soon_threadsafe(create_task_with_callback) + store = Store( reducer, options=CreateStoreOptions( auto_init=True, scheduler=scheduler.set, + task_creator=_create_task_with_callback, on_finish=scheduler.schedule_stop, ), ) @@ -114,8 +125,7 @@ def test_scheduler( scheduler.join() render.assert_has_calls( - [ - call(StateType(value=0)), - call(StateType(value=10)), - ], + [call(StateType(value=i)) for i in range(11)] + [call(StateType(value=10))], ) + + print(3)