Skip to content

Commit

Permalink
refactor: Store no longer aggregates changes, it now calls listener…
Browse files Browse the repository at this point in the history
…s with every change

refactor: `SideEffectRunnerThread` now runs async side effects in the event loop of the thread in which it was instantiated in (it used to create its own event loop)
refactor(test): `event_loop` fixture now sets the global event loop on setup and restores it on teardown
  • Loading branch information
sassanh committed Mar 28, 2024
1 parent 855ab60 commit 714dc21
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 93 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration_delivery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
workflow_dispatch:

env:
PYTHON_VERSION: "3.11"
PYTHON_VERSION: '3.11'

jobs:
dependencies:
Expand Down Expand Up @@ -267,7 +267,7 @@ jobs:
path: artifacts

- name: Release
uses: softprops/action-gh-release@v1
uses: softprops/action-gh-release@v2
with:
files: artifacts/*
tag_name: v${{ needs.build.outputs.version }}
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## Version 0.14.0

- refactor: `Store` no longer aggregates changes, it now calls listeners with every
change
- refactor: `SideEffectRunnerThread` now runs async side effects in the event loop
of the thread in which it was instantiated in (it used to create its own event
loop)
- refactor(test): `event_loop` fixture now sets the global event loop on setup and
restores it on teardown

## Version 0.13.2

- fix: initial snapshot cleanup which used to mistakenly remove files with store:...
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "python-redux"
version = "0.13.2"
version = "0.14.0"
description = "Redux implementation for Python"
authors = ["Sassan Haradji <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -60,6 +60,7 @@ profile = "black"

[tool.pyright]
exclude = ['typings']
filterwarnings = 'error'

[tool.pytest.ini_options]
log_cli = 1
Expand Down
3 changes: 1 addition & 2 deletions redux/autorun.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ def _check_and_call(
create_task = self._store._create_task # noqa: SLF001
if iscoroutine(self._latest_value) and create_task:
create_task(self._latest_value, callback=self._task_callback)
else:
self.inform_subscribers()
self.inform_subscribers()
else:
self.unsubscribe()

Expand Down
41 changes: 25 additions & 16 deletions redux/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
options: CreateStoreOptions[Action, Event] | None = None,
) -> None:
"""Create a new store."""
self.finished = False
self.store_options = options or CreateStoreOptions()
self.reducer = reducer
self._create_task = self.store_options.task_creator
Expand Down Expand Up @@ -95,35 +96,37 @@ def __init__(
if self.store_options.scheduler:
self.store_options.scheduler(self.run, interval=True)

def _call_listeners(self: Store[State, Action, Event], state: State) -> None:
for listener_ in self._listeners.copy():
if isinstance(listener_, weakref.ref):
listener = listener_()
if listener is None:
self._listeners.discard(listener_)
continue
else:
listener = listener_
result = listener(state)
if asyncio.iscoroutine(result) and self._create_task:
self._create_task(result)

def _run_actions(self: Store[State, Action, Event]) -> None:
action = self._actions.pop(0)
result = self.reducer(self._state, action)
if is_complete_reducer_result(result):
self._state = result.state
self._call_listeners(self._state)
self.dispatch([*(result.actions or []), *(result.events or [])])
elif is_state_reducer_result(result):
self._state = result
self._call_listeners(self._state)

if isinstance(action, FinishAction):
self.dispatch(cast(Event, FinishEvent()))

if len(self._actions) == 0 and self._state:
for listener_ in self._listeners.copy():
if isinstance(listener_, weakref.ref):
listener = listener_()
if listener is None:
self._listeners.discard(listener_)
continue
else:
listener = listener_
result = listener(self._state)
if asyncio.iscoroutine(result) and self._create_task:
self._create_task(result)

def _run_event_handlers(self: Store[State, Action, Event]) -> None:
event = self._events.pop(0)
for event_handler_ in self._event_handlers[type(event)].copy():
self._event_handlers_queue.put((event_handler_, event))
self._event_handlers_queue.put_nowait((event_handler_, event))

def run(self: Store[State, Action, Event]) -> None:
"""Run the store."""
Expand All @@ -134,7 +137,12 @@ def run(self: Store[State, Action, Event]) -> None:

if len(self._events) > 0:
self._run_event_handlers()
if not any(worker.is_alive() for worker in self._workers):
if (
self.finished
and self._actions == []
and self._events == []
and not any(worker.is_alive() for worker in self._workers)
):
self.clean_up()

def clean_up(self: Store[State, Action, Event]) -> None:
Expand Down Expand Up @@ -219,7 +227,8 @@ def unsubscribe() -> None:

def _handle_finish_event(self: Store[State, Action, Event]) -> None:
for _ in range(self.store_options.threads):
self._event_handlers_queue.put(None)
self._event_handlers_queue.put_nowait(None)
self.finished = True

def autorun(
self: Store[State, Action, Event],
Expand Down
28 changes: 6 additions & 22 deletions redux/side_effect_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import contextlib
import threading
import weakref
from asyncio import Task, iscoroutine
from asyncio import Handle, iscoroutine
from inspect import signature
from typing import TYPE_CHECKING, Any, Callable, Generic, cast

Expand All @@ -27,16 +27,14 @@ def __init__(
"""Initialize the side effect runner thread."""
super().__init__()
self.task_queue = task_queue
self._tasks: set[Task] = set()
self.loop = asyncio.get_event_loop()
self._handles: set[Handle] = set()
self.create_task = lambda coro: self._handles.add(
self.loop.call_soon_threadsafe(self.loop.create_task, coro),
)

def run(self: SideEffectRunnerThread[Event]) -> None:
"""Run the side effect runner thread."""
self.loop = asyncio.new_event_loop()
self.create_task = lambda coro: self._tasks.add(self.loop.create_task(coro))
self.loop.run_until_complete(self.work())

async def work(self: SideEffectRunnerThread[Event]) -> None:
"""Run the side effects."""
while True:
task = self.task_queue.get()
if task is None:
Expand All @@ -61,17 +59,3 @@ async def work(self: SideEffectRunnerThread[Event]) -> None:
self.create_task(result)
finally:
self.task_queue.task_done()
await self.clean_up()

async def clean_up(self: SideEffectRunnerThread[Event]) -> None:
"""Clean up the side effect runner thread."""
while True:
tasks = [
task
for task in asyncio.all_tasks(self.loop)
if task is not asyncio.current_task(self.loop)
]
if not tasks:
break
for task in tasks:
await task
4 changes: 3 additions & 1 deletion redux_pytest/fixtures/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ class LoopThread(threading.Thread):
def __init__(self: LoopThread) -> None:
super().__init__()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def run(self: LoopThread) -> None:
self.loop.run_forever()

def stop(self: LoopThread) -> None:
asyncio.set_event_loop(None)
self.loop.call_soon_threadsafe(self.loop.stop)

def create_task(self: LoopThread, coro: Coroutine) -> None:
self.loop.call_soon_threadsafe(lambda: self.loop.create_task(coro))
self.loop.call_soon_threadsafe(self.loop.create_task, coro)


@pytest.fixture()
Expand Down
7 changes: 6 additions & 1 deletion redux_pytest/fixtures/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ def __init__(
def json_snapshot(self: StoreSnapshot) -> str:
"""Return the snapshot of the current state of the store."""
return (
json.dumps(self.store.snapshot, indent=2, sort_keys=True)
json.dumps(
self.store.snapshot,
indent=2,
sort_keys=True,
ensure_ascii=False,
)
if self.store._state # noqa: SLF001
else ''
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
{
"_id": "e3e70682c2094cac629f6fbed82c07cd",
"base10": {
"count": 12
"count": 11
},
"inverse": {
"count": -1
"count": 0
},
"straight": {
"count": 2
"count": 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@
},
"inverse": {
"count": -1
},
"straight": {
"count": 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
{
"_id": "e3e70682c2094cac629f6fbed82c07cd",
"base10": {
"count": 10
"count": 12
},
"inverse": {
"count": 1
"count": -1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
{
"_id": "e3e70682c2094cac629f6fbed82c07cd",
"base10": {
"count": 8
"count": 10
},
"inverse": {
"count": 3
"count": 1
}
}
10 changes: 10 additions & 0 deletions tests/results/test_features/general/store-subscription-007.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// store-subscription-007
{
"_id": "e3e70682c2094cac629f6fbed82c07cd",
"base10": {
"count": 8
},
"inverse": {
"count": 3
}
}
36 changes: 6 additions & 30 deletions tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from __future__ import annotations

import asyncio
import threading
from dataclasses import replace
from typing import Callable, Coroutine, Generator
from typing import TYPE_CHECKING, Callable, Coroutine, Generator

import pytest
from immutable import Immutable
Expand All @@ -20,6 +19,9 @@
)
from redux.main import Store

if TYPE_CHECKING:
from redux_pytest.fixtures.event_loop import LoopThread

INCREMENTS = 2


Expand Down Expand Up @@ -51,26 +53,6 @@ def reducer(
return state


class LoopThread(threading.Thread):
def __init__(self: LoopThread) -> None:
super().__init__()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def run(self: LoopThread) -> None:
self.loop.run_forever()

def stop(self: LoopThread) -> None:
self.loop.call_soon_threadsafe(self.loop.stop)


@pytest.fixture()
def loop() -> LoopThread:
loop_thread = LoopThread()
loop_thread.start()
return loop_thread


Action = IncrementAction | SetMirroredValueAction | InitAction | FinishAction
StoreType = Store[StateType, Action, FinishEvent]

Expand Down Expand Up @@ -118,13 +100,8 @@ async def _(value: int) -> int:
async def _(mirrored_value: int) -> None:
if mirrored_value < INCREMENTS:
return
store.dispatch(FinishAction())

async def finish() -> None:
event_loop.stop()

store.subscribe_event(FinishEvent, finish)
store.subscribe_event(FinishEvent, finish)
store.dispatch(FinishAction())


def test_subscription(
Expand Down Expand Up @@ -152,8 +129,7 @@ async def finish() -> None:
store.dispatch(FinishAction())


def test_event_subscription_with_default_task_creator(event_loop: LoopThread) -> None:
asyncio.set_event_loop(event_loop.loop)
def test_event_subscription_with_no_task_creator(event_loop: LoopThread) -> None:
store = Store(
reducer,
options=CreateStoreOptions(auto_init=True),
Expand Down
Loading

0 comments on commit 714dc21

Please sign in to comment.