Skip to content

Commit

Permalink
refactor: add cleanup to FinishEvent handler to clean workers, list…
Browse files Browse the repository at this point in the history
…eners, subscriptions, autoruns, etc
  • Loading branch information
sassanh committed Mar 19, 2024
1 parent c892588 commit 4958ceb
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 157 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## Version 0.12.5

- refactor: add cleanup to `FinishEvent` handler to clean workers, listeners, subscriptions,
autoruns, etc
- refactor: `TaskCreator` add `TaskCreatorCallback` protocols
- refactor: `Store._create_task` now has a callback parameter to report the created
task
- refactor: move serialization methods and side_effect_runner class to separate
files

## Version 0.12.4

- fix: serialization class methods of `Store` use `cls` instead of `Store` for the
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 🚀 Python Redux
# 🎛️ Python Redux

[![image](https://img.shields.io/pypi/v/python-redux.svg)](https://pypi.python.org/pypi/python-redux)
[![image](https://img.shields.io/pypi/l/python-redux.svg)](https://github.com/sassanh/python-redux/LICENSE)
[![image](https://img.shields.io/pypi/pyversions/python-redux.svg)](https://pypi.python.org/pypi/python-redux)
[![Actions status](https://github.com/sassanh/python-redux/workflows/CI/CD/badge.svg)](https://github.com/sassanh/python-redux/actions)
[![codecov](https://codecov.io/gh/sassanh/python-redux/graph/badge.svg?token=4F3EWZRLCL)](https://codecov.io/gh/sassanh/python-redux)

## 🌟 Overview
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "python-redux"
version = "0.12.4"
version = "0.12.5"
description = "Redux implementation for Python"
authors = ["Sassan Haradji <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -11,7 +11,6 @@ packages = [{ include = "redux" }]
python = "^3.11"
python-immutable = "^1.0.5"
typing-extensions = "^4.9.0"
pytest-timeout = "^2.3.1"

[tool.poetry.group.dev]
optional = true
Expand All @@ -22,6 +21,7 @@ pyright = "^1.1.354"
ruff = "^0.3.3"
pytest = "^8.1.1"
pytest-cov = "^4.1.0"
pytest-timeout = "^2.3.1"

[build-system]
requires = ["poetry-core"]
Expand Down Expand Up @@ -63,7 +63,7 @@ exclude = ['typings']
[tool.pytest.ini_options]
log_cli = 1
log_cli_level = 'ERROR'
timeout = 4
timeout = 1

[tool.coverage.report]
exclude_also = ["if TYPE_CHECKING:"]
Expand Down
35 changes: 21 additions & 14 deletions redux/autorun.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import inspect
import weakref
from asyncio import iscoroutinefunction
from asyncio import Task, iscoroutine
from inspect import signature
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Generic, cast
from typing import TYPE_CHECKING, Any, Callable, Generic, cast

from redux.basic_types import (
Action,
Expand Down Expand Up @@ -121,6 +121,20 @@ def call_func(
func,
)(selector_result, previous_result)

def _task_callback(
self: Autorun[
State,
Action,
Event,
SelectorOutput,
ComparatorOutput,
AutorunOriginalReturnType,
],
task: Task,
) -> None:
task.add_done_callback(lambda _: self.inform_subscribers())
self._latest_value = cast(AutorunOriginalReturnType, task)

def _check_and_call(
self: Autorun[
State,
Expand Down Expand Up @@ -154,12 +168,11 @@ def _check_and_call(
previous_result,
func,
)
if iscoroutinefunction(func):
task = self._store._async_loop.create_task( # noqa: SLF001
cast(Coroutine, self._latest_value),
if iscoroutine(self._latest_value):
self._store._create_task( # noqa: SLF001
self._latest_value,
callback=self._task_callback,
)
task.add_done_callback(lambda _: self.inform_subscribers())
self._latest_value = cast(AutorunOriginalReturnType, task)
else:
self.inform_subscribers()

Expand Down Expand Up @@ -234,12 +247,6 @@ def subscribe(
callback(self.value)

def unsubscribe() -> None:
callback = (
callback_ref()
if isinstance(callback_ref, weakref.ref)
else callback_ref
)
if callback is not None:
self._subscriptions.discard(callback)
self._subscriptions.discard(callback_ref)

return unsubscribe
28 changes: 25 additions & 3 deletions redux/basic_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
from __future__ import annotations

from types import NoneType
from typing import TYPE_CHECKING, Any, Callable, Generic, Protocol, TypeAlias, TypeGuard
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
Generic,
Protocol,
TypeAlias,
TypeGuard,
)

from immutable import Immutable
from typing_extensions import TypeVar

if TYPE_CHECKING:
import asyncio
from asyncio import Task


class BaseAction(Immutable): ...
Expand Down Expand Up @@ -77,13 +86,26 @@ class Scheduler(Protocol):
def __call__(self: Scheduler, callback: Callable, *, interval: bool) -> None: ...


class TaskCreatorCallback(Protocol):
def __call__(self: TaskCreatorCallback, task: Task) -> None: ...


class TaskCreator(Protocol):
def __call__(
self: TaskCreator,
coro: Coroutine,
*,
callback: TaskCreatorCallback | None = None,
) -> None: ...


class CreateStoreOptions(Immutable):
auto_init: bool = False
threads: int = 5
scheduler: Scheduler | None = None
action_middleware: Callable[[BaseAction], Any] | None = None
event_middleware: Callable[[BaseEvent], Any] | None = None
async_loop: asyncio.AbstractEventLoop | None = None
task_creator: TaskCreator | None = None


class AutorunOptions(Immutable, Generic[AutorunOriginalReturnType]):
Expand Down
123 changes: 42 additions & 81 deletions redux/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@

from __future__ import annotations

import dataclasses
import inspect
import queue
import threading
import weakref
from asyncio import AbstractEventLoop, get_event_loop, iscoroutinefunction
from asyncio import get_event_loop, iscoroutine
from collections import defaultdict
from inspect import signature
from threading import Lock
from types import NoneType
from typing import Any, Callable, Coroutine, Generic, cast

from immutable import Immutable, is_immutable

from redux.autorun import Autorun
from redux.basic_types import (
Action,
Expand All @@ -39,45 +34,25 @@
SelectorOutput,
SnapshotAtom,
State,
TaskCreator,
TaskCreatorCallback,
is_complete_reducer_result,
is_state_reducer_result,
)
from redux.serialization_mixin import SerializationMixin
from redux.side_effect_runner import SideEffectRunnerThread


class _SideEffectRunnerThread(threading.Thread, Generic[Event]):
def __init__(
self: _SideEffectRunnerThread[Event],
*,
task_queue: queue.Queue[tuple[EventHandler[Event], Event] | None],
async_loop: AbstractEventLoop,
) -> None:
super().__init__()
self.task_queue = task_queue
self.async_loop = async_loop

def create_task(self: _SideEffectRunnerThread[Event], coro: Coroutine) -> None:
self.async_loop.call_soon_threadsafe(lambda: self.async_loop.create_task(coro))

def run(self: _SideEffectRunnerThread[Event]) -> None:
while True:
task = self.task_queue.get()
if task is None:
self.task_queue.task_done()
break

try:
event_handler, event = task
if len(signature(event_handler).parameters) == 1:
result = cast(Callable[[Event], Any], event_handler)(event)
else:
result = cast(Callable[[], Any], event_handler)()
if iscoroutinefunction(event_handler):
self.create_task(result)
finally:
self.task_queue.task_done()
def _default_task_creator(
coro: Coroutine,
callback: TaskCreatorCallback | None = None,
) -> None:
result = get_event_loop().create_task(coro)
if callback:
callback(result)


class Store(Generic[State, Action, Event]):
class Store(Generic[State, Action, Event], SerializationMixin):
"""Redux store for managing state and side effects."""

def __init__(
Expand All @@ -88,7 +63,9 @@ def __init__(
"""Create a new store."""
self.store_options = options or CreateStoreOptions()
self.reducer = reducer
self._async_loop = self.store_options.async_loop or get_event_loop()
self._create_task: TaskCreator = (
self.store_options.task_creator or _default_task_creator
)

self._state: State | None = None
self._listeners: set[
Expand All @@ -110,14 +87,14 @@ def __init__(
self._event_handlers_queue = queue.Queue[
tuple[EventHandler[Event], Event] | None
]()
workers = [
_SideEffectRunnerThread(
self._workers = [
SideEffectRunnerThread(
task_queue=self._event_handlers_queue,
async_loop=self._async_loop,
task_creator=self._create_task,
)
for _ in range(self.store_options.threads)
]
for worker in workers:
for worker in self._workers:
worker.start()

self._is_running = Lock()
Expand Down Expand Up @@ -158,8 +135,8 @@ def _run_actions(self: Store[State, Action, Event]) -> None:
else:
listener = listener_
result = listener(self._state)
if iscoroutinefunction(listener):
self._async_loop.create_task(result)
if iscoroutine(result):
self._create_task(result)

def _run_event_handlers(self: Store[State, Action, Event]) -> None:
event = self._events.pop(0)
Expand All @@ -175,10 +152,13 @@ def _run_event_handlers(self: Store[State, Action, Event]) -> None:
event_handler = event_handler_
if not options.immediate_run:
self._event_handlers_queue.put((event_handler, event))
elif len(signature(event_handler).parameters) == 1:
cast(Callable[[Event], Any], event_handler)(event)
else:
cast(Callable[[], Any], event_handler)()
if len(signature(event_handler).parameters) == 1:
result = cast(Callable[[Event], Any], event_handler)(event)
else:
result = cast(Callable[[], Any], event_handler)()
if iscoroutine(result):
self._create_task(result)

def run(self: Store[State, Action, Event]) -> None:
"""Run the store."""
Expand All @@ -189,6 +169,12 @@ def run(self: Store[State, Action, Event]) -> None:

if len(self._events) > 0:
self._run_event_handlers()
if not any(i.is_alive() for i in self._workers):
for worker in self._workers:
worker.join()
self._workers.clear()
self._listeners.clear()
self._event_handlers.clear()

def dispatch(
self: Store[State, Action, Event],
Expand Down Expand Up @@ -258,15 +244,15 @@ def subscribe_event(
self._event_handlers[cast(type[Event], event_type)].add(
(handler_ref, subscription_options),
)
return lambda: self._event_handlers[cast(type[Event], event_type)].discard(
(handler_ref, subscription_options),
)

def _handle_finish_event(
self: Store[State, Action, Event],
finish_event: Event,
) -> None:
_ = finish_event
def unsubscribe() -> None:
self._event_handlers[cast(type[Event], event_type)].discard(
(handler_ref, subscription_options),
)

return unsubscribe

def _handle_finish_event(self: Store[State, Action, Event]) -> None:
for _ in range(self.store_options.threads):
self._event_handlers_queue.put(None)

Expand Down Expand Up @@ -301,28 +287,3 @@ def decorator(
def snapshot(self: Store[State, Action, Event]) -> SnapshotAtom:
"""Return a snapshot of the current state of the store."""
return self.serialize_value(self._state)

@classmethod
def serialize_value(cls: type[Store], obj: object | type) -> SnapshotAtom:
"""Serialize a value to a snapshot atom."""
if isinstance(obj, (int, float, str, bool, NoneType)):
return obj
if callable(obj):
return cls.serialize_value(obj())
if isinstance(obj, (list, tuple)):
return [cls.serialize_value(i) for i in obj]
if is_immutable(obj):
return cls._serialize_dataclass_to_dict(obj)
msg = f'Unable to serialize object with type `{type(obj)}`.'
raise TypeError(msg)

@classmethod
def _serialize_dataclass_to_dict(
cls: type[Store],
obj: Immutable,
) -> dict[str, Any]:
result = {}
for field in dataclasses.fields(obj):
value = cls.serialize_value(getattr(obj, field.name))
result[field.name] = value
return result
Loading

0 comments on commit 4958ceb

Please sign in to comment.