-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
537556f
commit 2ef2755
Showing
30 changed files
with
1,076 additions
and
947 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,103 +1,103 @@ | ||
import asyncio | ||
from datetime import datetime | ||
from typing import List, Union, Type, Dict, Any | ||
from .entities.delegate import EventDelegate | ||
from .entities.event import TemplateEvent | ||
from .entities.publisher import Publisher | ||
from .entities.decorator import TemplateDecorator | ||
from .entities.subscriber import Subscriber | ||
from .utils import Condition_T, search_event, event_class_generator | ||
|
||
|
||
class EventSystem: | ||
loop: asyncio.AbstractEventLoop | ||
publisher_list: List[Publisher] | ||
safety_interval: float | ||
last_run: datetime | ||
current_event: Union[TemplateEvent, Dict[str, Any]] | ||
|
||
def __init__( | ||
self, | ||
*, | ||
loop: asyncio.AbstractEventLoop = None, | ||
interval: float = 0.00 | ||
): | ||
self.publisher_list = [] | ||
self.loop = loop or asyncio.get_event_loop() | ||
self.safety_interval = interval | ||
self.last_run = datetime.now() | ||
|
||
def event_spread(self, target: Union[TemplateEvent, Dict[str, Any]]): | ||
if (datetime.now() - self.last_run).total_seconds() >= self.safety_interval: | ||
self.current_event = target | ||
try: | ||
for pub in self.publisher_generator(): | ||
pub.on_event(self) | ||
except asyncio.CancelledError: | ||
return | ||
self.last_run = datetime.now() | ||
|
||
def publisher_generator(self): | ||
able_pubs = list( | ||
filter( | ||
lambda x: all([condition.judge(self.current_event) for condition in x.external_conditions]), | ||
self.publisher_list | ||
) | ||
) | ||
able_pubs.sort(key=lambda x: x.priority) | ||
return able_pubs | ||
|
||
def get_publisher(self, target: Union[Type[TemplateEvent], Condition_T]): | ||
p_list = [] | ||
for publisher in self.publisher_list: | ||
if target in publisher: | ||
p_list.append(publisher) | ||
if len(p_list) > 0: | ||
return p_list | ||
return False | ||
|
||
def remove_publisher(self, target: Publisher): | ||
self.publisher_list.remove(target) | ||
|
||
def register( | ||
self, | ||
event: Union[str, Type[TemplateEvent]], | ||
*, | ||
priority: int = 16, | ||
conditions: List[Condition_T] = None, | ||
decorators: List[TemplateDecorator] = None, | ||
): | ||
if isinstance(event, str): | ||
name = event | ||
event = search_event(event) | ||
if not event: | ||
raise Exception(name + " cannot found!") | ||
|
||
events = [event] | ||
events.extend(event_class_generator(event)) | ||
conditions = conditions or [] | ||
decorators = decorators or [] | ||
|
||
def register_wrapper(exec_target): | ||
if not isinstance(exec_target, Subscriber): | ||
exec_target = Subscriber( | ||
callable_target=exec_target, | ||
decorators=decorators | ||
) | ||
for e in events: | ||
may_publishers = self.get_publisher(e) | ||
_event_handler = EventDelegate(event=e) | ||
_event_handler += exec_target | ||
if not may_publishers: | ||
self.publisher_list.append(Publisher(priority, conditions, _event_handler)) | ||
else: | ||
for m_publisher in may_publishers: | ||
if m_publisher.equal_conditions(conditions): | ||
m_publisher += _event_handler | ||
break | ||
else: | ||
self.publisher_list.append(Publisher(priority, conditions, _event_handler)) | ||
|
||
return exec_target | ||
|
||
return register_wrapper | ||
import asyncio | ||
from datetime import datetime | ||
from typing import List, Union, Dict, Any, Type | ||
|
||
from .builtin.publisher import TemplatePublisher | ||
from .entities.auxiliary import BaseAuxiliary | ||
from .entities.delegate import EventDelegate | ||
from .entities.event import TemplateEvent | ||
from .entities.publisher import Publisher | ||
from .entities.subscriber import Subscriber | ||
from .exceptions import PropagationCancelled | ||
from .handler import await_exec_target, event_ctx | ||
from .utils import search_event, event_class_generator, group_dict, gather_inserts | ||
|
||
|
||
class EventSystem: | ||
loop: asyncio.AbstractEventLoop | ||
publishers: List[Publisher] | ||
__publisher: Publisher | ||
safety_interval: float | ||
last_run: datetime | ||
|
||
def __init__( | ||
self, | ||
*, | ||
loop: asyncio.AbstractEventLoop = None, | ||
interval: float = 0.00 | ||
): | ||
self.loop = loop or asyncio.new_event_loop() | ||
self.safety_interval = interval | ||
self.last_run = datetime.now() | ||
self.publishers = [] | ||
self.__publisher = TemplatePublisher() | ||
self.publishers.append(self.__publisher) | ||
|
||
def event_publish( | ||
self, | ||
event: Union[TemplateEvent, Dict[str, Any]], | ||
publisher: Publisher = None | ||
): | ||
publishers = [publisher] if publisher else self.publishers | ||
delegates = [] | ||
for publisher in publishers: | ||
delegates.extend(publisher.require(event.__class__)) | ||
if (datetime.now() - self.last_run).total_seconds() >= self.safety_interval: | ||
self.loop.create_task( | ||
self.delegate_exec(delegates, event) | ||
) | ||
self.last_run = datetime.now() | ||
|
||
@staticmethod | ||
async def delegate_exec(delegates: List[EventDelegate], event: TemplateEvent): | ||
event_chains = gather_inserts(event) | ||
grouped: Dict[int, EventDelegate] = group_dict(delegates, lambda x: x.priority) | ||
with event_ctx.use(event): | ||
for _, current_delegate in sorted(grouped.items(), key=lambda x: x[0]): | ||
coroutine = [ | ||
await_exec_target(target, event_chains) | ||
for target in current_delegate.subscribers | ||
] | ||
results = await asyncio.gather(*coroutine, return_exceptions=True) | ||
for result in results: | ||
if result is PropagationCancelled: | ||
return | ||
|
||
def register( | ||
self, | ||
event: Union[str, Type[TemplateEvent]], | ||
*, | ||
priority: int = 16, | ||
auxiliaries: List[BaseAuxiliary] = None, | ||
publisher: Publisher = None, | ||
inline_arguments: Dict[str, Any] = None | ||
): | ||
if isinstance(event, str): | ||
name = event | ||
event = search_event(event) | ||
if not event: | ||
raise Exception(name + " cannot found!") | ||
|
||
events = [event] | ||
events.extend(event_class_generator(event)) | ||
auxiliaries = auxiliaries or [] | ||
inline_arguments = inline_arguments or {} | ||
publisher = publisher or self.__publisher | ||
|
||
def register_wrapper(exec_target): | ||
if not isinstance(exec_target, Subscriber): | ||
exec_target = Subscriber( | ||
callable_target=exec_target, | ||
auxiliaries=auxiliaries, | ||
**inline_arguments | ||
) | ||
for e in events: | ||
may_delegate = publisher.require(e, priority) | ||
if may_delegate: | ||
may_delegate += exec_target | ||
else: | ||
_event_handler = EventDelegate(e, priority) | ||
_event_handler += exec_target | ||
publisher.add_delegate(_event_handler) | ||
return exec_target | ||
return register_wrapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,58 +1,58 @@ | ||
from typing import Type, Dict, List | ||
from ..entities.subscriber import Subscriber | ||
from .. import EventSystem, TemplateEvent | ||
from .stepout import StepOut | ||
from ..entities.publisher import Publisher | ||
from ..entities.delegate import EventDelegate | ||
|
||
|
||
class Breakpoint: | ||
event_system: EventSystem | ||
step_publishers: List[Publisher] | ||
|
||
def __init__(self, event_system: EventSystem, priority: int = 15): | ||
self.event_system = event_system | ||
self._step_outs: Dict[Type[TemplateEvent], List[StepOut]] = {} | ||
self.priority = priority | ||
self.step_publishers = [] | ||
|
||
async def wait( | ||
self, | ||
step_out_condition: StepOut, | ||
timeout: float = 0., | ||
): | ||
event_type = step_out_condition.event_type | ||
self.new_subscriber(event_type, step_out_condition) | ||
if event_type not in self._step_outs: | ||
self._step_outs[event_type] = [step_out_condition] | ||
else: | ||
self._step_outs[event_type].append(step_out_condition) | ||
try: | ||
return await step_out_condition.wait(timeout) | ||
finally: | ||
for pub in self.event_system.get_publisher(step_out_condition): | ||
self.event_system.remove_publisher(pub) | ||
if step_out_condition.done(): | ||
for step in self._step_outs[event_type]: | ||
step.future.cancel() | ||
self._step_outs[event_type].clear() | ||
else: | ||
for pub in self.step_publishers: | ||
self.event_system.remove_publisher(pub) | ||
|
||
def new_subscriber(self, event_type, step_out_condition): | ||
async def _(): | ||
for step in self._step_outs[event_type]: | ||
if await step.make_done(): | ||
break | ||
|
||
subscriber = Subscriber.set()(_) | ||
delegate = EventDelegate(event_type) | ||
delegate += subscriber | ||
publisher = Publisher(15, [step_out_condition], delegate) | ||
self.event_system.publisher_list.append(publisher) | ||
self.step_publishers.append(publisher) | ||
|
||
def __call__(self, step_out_condition: StepOut, timeout: float = 0.): | ||
return self.wait(step_out_condition, timeout) | ||
from typing import Type, Dict, List | ||
from ..builtin.publisher import TemplatePublisher | ||
from ..entities.subscriber import Subscriber | ||
from .. import EventSystem, TemplateEvent | ||
from .stepout import StepOut | ||
from ..entities.delegate import EventDelegate | ||
|
||
|
||
class Breakpoint: | ||
event_system: EventSystem | ||
step_out_publisher: TemplatePublisher | ||
step_delegates: List[EventDelegate] | ||
|
||
def __init__(self, event_system: EventSystem, priority: int = 15): | ||
self.event_system = event_system | ||
self.step_out_publisher = TemplatePublisher() | ||
self._step_outs: Dict[Type[TemplateEvent], List[StepOut]] = {} | ||
self.priority = priority | ||
self.step_delegates = [] | ||
|
||
async def wait( | ||
self, | ||
step_out_condition: StepOut, | ||
timeout: float = 0., | ||
): | ||
event_type = step_out_condition.event_type | ||
self.new_subscriber(event_type, step_out_condition) | ||
if event_type not in self._step_outs: | ||
self._step_outs[event_type] = [step_out_condition] | ||
else: | ||
self._step_outs[event_type].append(step_out_condition) | ||
try: | ||
return await step_out_condition.wait(timeout) | ||
finally: | ||
self.event_system.publishers.remove(self.step_out_publisher) | ||
if step_out_condition.done(): | ||
for step in self._step_outs[event_type]: | ||
step.future.cancel() | ||
self._step_outs[event_type].clear() | ||
else: | ||
for delegate in self.step_delegates: | ||
self.step_out_publisher.remove_delegate(delegate) | ||
|
||
def new_subscriber(self, event_type, step_out_condition): | ||
async def _(): | ||
for step in self._step_outs[event_type]: | ||
if await step.make_done(): | ||
break | ||
|
||
subscriber = Subscriber(_, auxiliaries=[step_out_condition]) | ||
delegate = EventDelegate(event_type, priority=15) | ||
delegate += subscriber | ||
self.step_out_publisher.add_delegate(delegate) | ||
self.step_delegates.append(delegate) | ||
self.event_system.publishers.append(self.step_out_publisher) | ||
|
||
def __call__(self, step_out_condition: StepOut, timeout: float = 0.): | ||
return self.wait(step_out_condition, timeout) |
Oops, something went wrong.