-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathviews.py
71 lines (55 loc) · 2.02 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from rest_framework.generics import CreateAPIView, ListAPIView, RetrieveAPIView
from django_dramatiq.models import Task
from .permissions import TaskExecutePermission, TaskSchedulePermission
from .serializers import (
ExecuteTaskSerializer, ScheduleJobCronSerializer, ScheduleJobDateSerializer, ScheduleJobIntervalSerializer,
TaskDetailSerializer, TaskListSerializer, ScheduleListSerializer)
from .scheduler import Scheduler
class ExecuteTaskView(CreateAPIView):
"""
Execute an actor with given params
"""
permission_classes = (TaskExecutePermission, )
serializer_class = ExecuteTaskSerializer
class ExecutedListTaskView(ListAPIView):
"""
The list of executed tasks
"""
permission_classes = (TaskExecutePermission,)
serializer_class = TaskListSerializer
queryset = Task.tasks.all()
class ExecutedDetailTaskView(RetrieveAPIView):
"""
Details of executed task
"""
permission_classes = (TaskExecutePermission, )
serializer_class = TaskDetailSerializer
queryset = Task.tasks.all()
lookup_field = 'id'
lookup_url_kwarg = 'id'
class ScheduleJobByDateView(CreateAPIView):
"""
Schedule task in the certain date
"""
permission_classes = (TaskSchedulePermission, )
serializer_class = ScheduleJobDateSerializer
class ScheduleJobByIntervalView(CreateAPIView):
"""
Schedule task to execute in certain interval of time. E.g. every 10 minutes
"""
permission_classes = (TaskSchedulePermission, )
serializer_class = ScheduleJobIntervalSerializer
class ScheduleJobByCronView(CreateAPIView):
"""
Advanced schedule method. Provides the power of cron to customize execution time and period
"""
permission_classes = (TaskSchedulePermission, )
serializer_class = ScheduleJobCronSerializer
class ScheduledListView(ListAPIView):
"""
List of scheduled tasks
"""
permission_classes = (TaskSchedulePermission,)
serializer_class = ScheduleListSerializer
def get_queryset(self):
return Scheduler().get_jobs()