-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtodo_demo.py
111 lines (87 loc) · 2.49 KB
/
todo_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# ruff: noqa: A003, D100, D101, D102, D103, D104, D105, D107, T201
from __future__ import annotations
import time
import uuid
from dataclasses import replace
from typing import TYPE_CHECKING
from immutable import Immutable
from redux import BaseAction, Store
from redux.basic_types import (
BaseEvent,
CompleteReducerResult,
FinishAction,
ReducerResult,
)
if TYPE_CHECKING:
from collections.abc import Sequence
# state:
class ToDoItem(Immutable):
id: str
content: str
timestamp: float
class ToDoState(Immutable):
items: Sequence[ToDoItem]
# actions:
class AddTodoItemAction(BaseAction):
content: str
timestamp: float
class RemoveTodoItemAction(BaseAction):
id: str
# events:
class CallApi(BaseEvent):
parameters: object
# reducer:
def reducer(
state: ToDoState | None,
action: BaseAction,
) -> ReducerResult[ToDoState, BaseAction, BaseEvent]:
if state is None:
return ToDoState(
items=[
ToDoItem(
id=uuid.uuid4().hex,
content='Initial Item',
timestamp=time.time(),
),
],
)
if isinstance(action, AddTodoItemAction):
return replace(
state,
items=[
*state.items,
ToDoItem(
id=uuid.uuid4().hex,
content=action.content,
timestamp=action.timestamp,
),
],
)
if isinstance(action, RemoveTodoItemAction):
return CompleteReducerResult(
state=replace(
state,
actions=[item for item in state.items if item.id != action.id],
),
events=[CallApi(parameters={})],
)
return state
def main() -> None:
store = Store(reducer)
# subscription:
dummy_render = print
store.subscribe(dummy_render)
# autorun:
@store.autorun(
lambda state: state.items[0].content if len(state.items) > 0 else None,
)
def reaction(content: str | None) -> None:
print(content)
_ = reaction
# event listener, note that this will run async in a separate thread, so it can
# include async operations like API calls, etc:
dummy_api_call = print
store.subscribe_event(CallApi, lambda event: dummy_api_call(event.parameters))
# dispatch:
store.dispatch(AddTodoItemAction(content='New Item', timestamp=time.time()))
store.dispatch(FinishAction())