-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathscheduler_demo.py
119 lines (94 loc) · 4.46 KB
/
scheduler_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright The PFDL Contributors
#
# Licensed under the MIT License.
# For details on the licensing terms, see the LICENSE file.
# SPDX-License-Identifier: MIT
"""This file contains an interface to demonstrate the use of the PFDL Scheduler."""
# standard libraries
import argparse
# local sources
from pfdl_scheduler.api.service_api import ServiceAPI
from pfdl_scheduler.api.task_api import TaskAPI
from pfdl_scheduler.model.struct import Struct
from pfdl_scheduler.scheduler import Scheduler, Event
class DemoInterface:
"""A dummy interface which demonstrates the use of the scheduler functions.
At start the interface register its callback functions and variable access function
to the scheduler. The callback functions provide a simple debug message to show
the functionality of the scheduler.
Attributes:
scheduler: A Scheduler instance
wetness: A dummy variable which is used in the PFDL examples
parts_count: A dummy variable which is used in the PFDL examples
"""
def __init__(self, scheduler: Scheduler) -> None:
"""Initialize the object"""
self.scheduler: Scheduler = scheduler
self.wetness: int = 11
self.parts_count: int = 3
def cb_task_started(self, task_api: TaskAPI) -> None:
task_name = task_api.task.name
task_uuid = task_api.uuid
print("Task " + task_name + " with UUID '" + task_uuid + "' started")
def cb_service_started(self, service_api: ServiceAPI) -> None:
service_name = service_api.service.name
service_uuid = service_api.uuid
print("Service " + service_name + " with UUID '" + service_uuid + "' started")
def cb_service_finished(self, service_api: ServiceAPI) -> None:
service_name = service_api.service.name
service_uuid = service_api.uuid
print("Service " + service_name + " with UUID '" + service_uuid + "' finished")
def cb_task_finished(self, task_api: TaskAPI) -> None:
task_name = task_api.task.name
task_uuid = task_api.uuid
print("Task " + task_name + " with UUID '" + task_uuid + "' finished")
def variable_access_function(self, var_name, task_context: TaskAPI) -> Struct:
"""Simulate a variable access function which returns a Struct variable.
This dummy method simulates an access to variables from the PFDL. The returned structs
are used in the examples folder.
Returns:
A struct variable corresponding to the given variable name in the given task context.
"""
print("Request variable '" + var_name + "' from task with UUID '" + task_context.uuid + "'")
dummy_struct = Struct()
if var_name == "pr" or var_name == "dr":
dummy_struct.attributes = {"wetness": self.wetness}
elif var_name == "cr":
dummy_struct.attributes = {"parts_count": self.parts_count}
return dummy_struct
def start(self):
self.scheduler.register_callback_task_started(self.cb_task_started)
self.scheduler.register_callback_service_started(self.cb_service_started)
self.scheduler.register_callback_service_finished(self.cb_service_finished)
self.scheduler.register_callback_task_finished(self.cb_task_finished)
self.scheduler.register_variable_access_function(self.variable_access_function)
self.scheduler.start()
while self.scheduler.running:
input_str = str(input("Wait for input:>"))
splitted = input_str.split(",")
service_uuid = splitted[0]
event_type = splitted[1]
event = Event(event_type=event_type, data={"service_uuid": service_uuid})
self.scheduler.fire_event(event)
def main():
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument("file_path", type=str, help="the path for the PFDL file.")
parser.add_argument(
"--test_ids",
action="store_true",
help="services and tasks get test ids starting from 0.",
)
parser.add_argument(
"-dh", "--dashboard_host", type=str, help="the host address of the PFDL dashboard."
)
args = parser.parse_args()
dashboard_host_address = ""
if args.dashboard_host:
dashboard_host_address = args.dashboard_host
scheduler = Scheduler(
args.file_path, args.test_ids, dashboard_host_address=dashboard_host_address
)
demo_interface = DemoInterface(scheduler)
demo_interface.start()
if __name__ == "__main__":
main()