-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathscheduler.lisp
492 lines (436 loc) · 20.7 KB
/
scheduler.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
;;; Copyright (c) 2017, Emergent Network Defense <http://endsecurity.com/>
;;; Copyright (c) 2017-2022, Daniel Kochmański <[email protected]>
(defpackage #:scheduler
(:use)
(:export #:scheduler
#:in-memory-scheduler
#:start-scheduler
#:stop-scheduler
;; CRUDL
#:create-scheduler-task
#:read-scheduler-task
#:update-scheduler-task
#:delete-scheduler-task
#:list-scheduler-tasks))
(defpackage #:scheduler-implementation
(:use #:cl #:alexandria)
(:import-from #:scheduler
#:scheduler
#:in-memory-scheduler
#:start-scheduler
#:stop-scheduler
#:create-scheduler-task
#:read-scheduler-task
#:update-scheduler-task
#:delete-scheduler-task
#:list-scheduler-tasks)
(:export #:scheduler #:in-memory-scheduler
#:start-scheduler #:stop-scheduler
#:create-scheduler-task
#:read-scheduler-task
#:update-scheduler-task
#:delete-scheduler-task
#:list-scheduler-tasks
;; entry
#:scheduler-entry
#:cron-entry
#:parse-entry
;; task
#:task
#:task-time-specs
#:task-command
#:task-last-execution
#:task-next-execution
#:task-source-entry
#:execute-task))
(in-package #:scheduler-implementation)
;; utils
(eval-when (:compile-toplevel :load-toplevel :execute)
(defmacro ss (&rest args)
`(split-sequence:split-sequence #\space ,@args))
(defmacro db (lambda-list expression &body body)
`(destructuring-bind ,lambda-list ,expression ,@body))
(defmacro mvb (lambda-list expression &body body)
`(multiple-value-bind ,lambda-list ,expression ,@body))
(defmacro ^if (test if-true if-false)
`(let ((^it ,test)) (if ^it ,if-true ,if-false))))
(defun seed-random-state (seed)
"Returns a new random state seeded with `object'."
#-(or ecl sbcl) (declare (ignore seed))
#+ecl(make-random-state seed)
#+sbcl(sb-ext:seed-random-state seed)
#-(or ecl sbcl) (make-random-state #.(make-random-state *random-state*)))
(defun parse-cron-spec (line)
(flet ((split-at ()
(multiple-value-bind (spec sp) (ss line :count 1)
(values (alexandria:eswitch ((car spec) :test 'equal)
("@reboot" :reboot)
("@shutdown" :shutdown)
("@yearly" '#.(ss "H H H H *"))
("@annually" '#.(ss "H H H H *"))
("@monthly" '#.(ss "H H H * *"))
("@weekly" '#.(ss "H H * * H"))
("@daily" '#.(ss "H H * * *"))
("@midnight" '#.(ss "H H * * *"))
("@hourly" '#.(ss "H * * * *")))
(subseq line sp))))
(split ()
(multiple-value-bind (specs sp) (ss line :count 5)
(values specs (subseq line sp)))))
(multiple-value-bind (specs command)
(if (char/= #\@ (elt line 0))
(split)
(split-at))
(assert (or (member specs '(:reboot :shutdown))
(alexandria:length= 5 specs)))
(cons specs (if (emptyp command) "nil" command)))))
(defun %parse-cron-time-1/no-step (spec range)
(optima:ematch spec
("*" :every)
("H" (list (alexandria:random-elt range)))
;; 1,2,4,8 | 1-4,5,6 etc
((or (optima.ppcre:ppcre "^[0-9]+,.*")
(optima.ppcre:ppcre "^[0-9]+-[0-9]*,.*"))
;; (mapcar #'parse-integer (ppcre:split "," spec))
(alexandria:mappend
(lambda (spec-1)
(%parse-cron-time-1/no-step spec-1 range))
(ppcre:split "," spec)))
;; H(1-12)
((optima.ppcre:ppcre "H\\(([0-9]+)-([0-9]+)\\)" from to)
(let* ((from (parse-integer from))
(to (parse-integer to))
(size (1+ (- to from))))
(list (alexandria:random-elt (alexandria:iota size :start from)))))
;; 1-12
((optima.ppcre:ppcre "([0-9]+)-([0-9]+)" from to)
(let* ((from (parse-integer from))
(to (parse-integer to))
(size (1+ (abs (- to from)))))
(alexandria:iota size :start (min to from))))
((optima.ppcre:ppcre "(^[0-9]+)" number)
(list (parse-integer number)))))
(defun %parse-cron-time-1/step (spec range step)
(flet ((every-nth (list step)
(if (= 1 step)
list
(loop for elt in list by #'(lambda (l) (nthcdr step l))
collect elt)))
(every-nth* (from to step)
(let ((set-size (1+ (floor (/ (- to from) step)))))
(alexandria:iota set-size :start from :step step))))
(optima:ematch spec
;; */2
("*" (every-nth range step))
;; H/2
("H" (every-nth (nthcdr (random step) range) step))
;; H(1-14)/3
((optima.ppcre:ppcre "H\\(([0-9]+)-([0-9]+)\\)" from to)
(let* ((to (parse-integer to))
(from (min to
(alexandria:random-elt
(alexandria:iota step
:start (parse-integer from))))))
(every-nth* from to step)))
;; 1-14/3
((optima.ppcre:ppcre "([0-9]+)-([0-9]+)" from to)
(every-nth* (parse-integer from)
(parse-integer to)
step))
#+invalid ; this is invalid, but we catch it with ematch anyway
((optima.ppcre:ppcre "[H]?.*,.*")
(error "invalid clause")))))
(defun parse-cron-time-1 (spec range)
(db (base . step) (ppcre:split "/" spec)
(let ((parse-result
(if (null step)
(%parse-cron-time-1/no-step base range)
(%parse-cron-time-1/step base range
(parse-integer (car step))))))
(assert (or (eql parse-result :every)
(every (lambda (n) (member n range)) parse-result))
nil "PARSE-CRON-TIME-1: Each element of ~s must be a member of:~%~s."
parse-result (cons :every range))
parse-result)))
;;; XXX: backward compatibility (mainly for tests)
(defun parse-cron-entry (spec)
(parse-entry (make-instance 'cron-entry :string spec)))
(defclass scheduler-entry () ())
(defgeneric parse-entry (entry)
(:documentation "Parses entry to return two values: the time spec and command."))
(defclass cron-entry (scheduler-entry)
((string :initarg :string :reader cron-entry-string)))
(defmethod parse-entry ((entry cron-entry) &aux (spec (parse-cron-spec (cron-entry-string entry))))
(when (member (car spec) '(:reboot :shutdown))
(return-from parse-entry (values `(:event ,(car spec)) (cdr spec))))
(destructuring-bind ((minute hour day-of-month month day-of-week) . command) spec
;; XXX: seeding `*random-state*' with command ensures, that
;; randomness is stable. This is useful for parsing H entries.
(let ((*random-state* (seed-random-state (sxhash command))))
(values
(list :minute (parse-cron-time-1 minute '#.(alexandria:iota 60))
:hour (parse-cron-time-1 hour '#.(alexandria:iota 24))
:day-of-month (if (char= (elt day-of-month 0) #\H)
(parse-cron-time-1 day-of-month '#.(alexandria:iota 28 :start 1))
(parse-cron-time-1 day-of-month '#.(alexandria:iota 31 :start 1)))
:month (parse-cron-time-1 month '#.(alexandria:iota 12 :start 1))
:day-of-week (if (string= day-of-week "7")
(parse-cron-time-1 "0" '#.(alexandria:iota 7))
(parse-cron-time-1 day-of-week '#.(alexandria:iota 7))))
command))))
;;; we assume here that all `:random' entries are already picked and
;;; if `:step' present already coerced to sets.
(defun compute-next-occurance (spec &optional time)
;; algorithm is based on
;; https://stackoverflow.com/questions/321494/calculate-when-a-cron-job-will-be-executed-then-next-time#3453872
(when (keywordp time) (return-from compute-next-occurance time))
(when (null time) (setf time (local-time:now)))
(local-time:adjust-timestamp! time
(set :nsec 0) (set :sec (random 60)) (offset :minute 1))
(flet ((match-spec (obj spec)
(or (eql :every spec)
(member obj spec)))
(next-fit (n set) (find-if (lambda (s) (< n s)) set))
(first* (set default)
(if (eql set :every)
default
(first set)))
(day-offset (now set default)
(let ((day (local-time:timestamp-day now))
(mday (local-time:days-in-month
(local-time:timestamp-month now)
(local-time:timestamp-year now)))
(target (if (eql set :every)
default
(first set))))
(+ (- mday day) target))))
(symbol-macrolet ((next.minute (local-time:timestamp-minute time))
(next.hour (local-time:timestamp-hour time))
(next.weekday (local-time:timestamp-day-of-week time))
(next.day (local-time:timestamp-day time))
(next.month (local-time:timestamp-month time))
(next.year (local-time:timestamp-year time)))
(db (&key minute hour day-of-month month day-of-week event) spec
(when event (return-from compute-next-occurance event))
(loop
do (block nil
;; nudge minute
(unless (match-spec next.minute minute)
(^if (next-fit next.minute minute)
(local-time:adjust-timestamp! time
(set :minute to ^it))
(local-time:adjust-timestamp! time
(offset :hour by 1)
(set :minute to (first* minute 0))))
(return))
;; nudge hour
(unless (match-spec next.hour hour)
(^if (next-fit next.hour hour)
(local-time:adjust-timestamp! time
(set :hour to ^it)
(set :minute to (first* minute 0)))
(local-time:adjust-timestamp! time
(offset :day by 1)
(set :hour to (first* hour 0))
(set :minute to (first* minute 0))))
(return))
;; nudge weekday
(unless (match-spec next.weekday day-of-week)
(let ((delta (- (first day-of-week) next.weekday)))
(when (minusp delta) (incf delta 7))
(local-time:adjust-timestamp! time
(offset :day by delta)
(set :hour to (first* hour 0))
(set :minute to (first* minute 0)))
(return)))
;; nudge month day
(unless (match-spec next.day day-of-month)
(^if (next-fit next.day day-of-month)
(local-time:adjust-timestamp! time
(offset :day by (- ^it next.day))
(set :hour to (first* hour 0))
(set :minute to (first* minute 0)))
(local-time:adjust-timestamp! time
(offset :day by (day-offset time day-of-month 1))
(set :hour to (first* hour 0))
(set :minute to (first* minute 0))))
(return))
;; nudge month
(unless (match-spec next.month month)
(^if (next-fit next.month month)
(local-time:adjust-timestamp! time
(set :month to ^it)
;; always get back to 1 to prevent month bump!
(offset :day by (1+ (- next.day)))
(set :hour to (first* hour 0))
(set :minute to (first* minute 0)))
(local-time:adjust-timestamp! time
(offset :month by (+ (- 12 next.month)
(first month)))
;; same as above
(offset :day by (1+ (- next.day)))
(set :hour to (first* hour 0))
(set :minute to (first* minute 0))))
(return))
(return-from compute-next-occurance time)))))))
(defclass scheduler ()
((%state :initform :stopped :accessor scheduler-state))
(:documentation "Thread-safe abstract scheduler class."))
(defgeneric create-scheduler-task (scheduler entry &key &allow-other-keys)
(:documentation "Parse ENTRY and add the task it to SCHEDULER."))
(defgeneric update-scheduler-task (scheduler entry &key &allow-other-keys)
(:documentation "Update the task designated by ENTRY in SCHEDULER."))
(defgeneric read-scheduler-task (scheduler entry)
(:documentation "Find the task designated by ENTRY in SCHEDULER."))
(defgeneric delete-scheduler-task (scheduler entry)
(:documentation "Delete the task designated by ENTRY in SCHEDULER."))
(defgeneric list-scheduler-tasks (scheduler)
(:documentation "Lists all tasks registered with SCHEDULER."))
(defgeneric start-scheduler (scheduler)
(:documentation "Start the scheduler."))
(defgeneric stop-scheduler (scheduler)
(:documentation "Stop the scheduler."))
(defmethod start-scheduler ((scheduler scheduler))
(format *debug-io* "~&Starting a scheduler.~%")
(setf (scheduler-state scheduler) :running)
(labels ((missed-task? (task)
(and (typep (task-next-execution task) 'local-time:timestamp)
(local-time:timestamp< (task-next-execution task)
(local-time:now))))
(active-task? (task timespec)
(if (and (typep timespec 'local-time:timestamp)
(typep (task-next-execution task) 'local-time:timestamp))
(local-time:timestamp= (task-next-execution task)
(local-time:adjust-timestamp timespec (set :nsec 0)))
(eql (task-next-execution task) timespec)))
(run-valid-tasks (time/event-spec)
(dolist (task (list-scheduler-tasks scheduler))
(unless (task-next-execution task)
(update-scheduler-task scheduler task
:start-at (compute-next-occurance
(task-time-specs task)
(local-time:now))))
(cond
((missed-task? task)
(execute-task task)
(update-scheduler-task scheduler task
:last-run time/event-spec
:start-at (compute-next-occurance
(task-time-specs task)
(local-time:now))))
((active-task? task time/event-spec)
(execute-task task)
(update-scheduler-task scheduler task
:last-run time/event-spec
:start-at (compute-next-occurance
(task-time-specs task)
(local-time:now))))))))
(run-valid-tasks :reboot)
(loop while (eql (scheduler-state scheduler) :running)
do (progn (run-valid-tasks (local-time:now)) (sleep 1)))
(run-valid-tasks :shutdown))
(format *debug-io* "~&Exiting a scheduler.~%")
(setf (scheduler-state scheduler) :stopped))
(defmethod stop-scheduler ((scheduler scheduler))
(setf (scheduler-state scheduler) :exit))
(defclass task ()
((time-specs :initarg :time-specs :accessor task-time-specs)
(command :initarg :command :accessor task-command)
(last-execution :initform nil :initarg :last-execution :accessor task-last-execution)
(next-execution :initform nil :initarg :next-execution :accessor task-next-execution)
(source-entry :initform nil :initarg :source-entry :accessor task-source-entry)))
(defmethod initialize-instance :after ((task task) &key time-specs start-after)
(setf (task-next-execution task) (compute-next-occurance time-specs start-after)))
(defmethod execute-task ((task task))
(eval (read-from-string (task-command task))))
;;; Default parser for strings.
(defmethod create-scheduler-task ((scheduler scheduler) (cron-entry string)
&key start-after &allow-other-keys)
(mvb (time-specs command) (parse-cron-entry cron-entry)
(create-scheduler-task
scheduler
(make-instance 'task :time-specs time-specs
:command command
:start-after start-after
:source-entry cron-entry))))
;;; Default methods
(defmethod read-scheduler-task ((scheduler scheduler) (task task))
(find task (list-scheduler-tasks scheduler)))
(defmethod update-scheduler-task
((scheduler scheduler) (task task)
&key (cron-entry nil ce-p) (last-run nil lr-p) (start-at nil at-p))
(assert (find task (list-scheduler-tasks scheduler)))
;; We update either the entry itself (then the next occurance is computed), or
;; we've just executed the command and we update last-run and start-at.
(assert (alexandria:xor ce-p (or at-p lr-p)))
(when ce-p
(mvb (time-specs command) (parse-cron-entry cron-entry)
(setf (task-source-entry task) cron-entry
(task-time-specs task) time-specs
(task-command task) command
(task-next-execution task) (compute-next-occurance time-specs))))
(when at-p (setf (task-next-execution task) start-at))
(when lr-p (setf (task-last-execution task) last-run))
task)
;;; This type of a task may be harder to serialize. The command is a function
;;; without arguments.
(defclass lambda-task (task) ())
(defmethod initialize-instance :after ((task lambda-task) &key)
(check-type (task-command task) function))
(defmethod execute-task ((task lambda-task))
(funcall (task-command task)))
;;; Similar to the above, but the command is a function.
(defmethod create-scheduler-task ((scheduler scheduler) (cron-entry-and-lambda cons)
&key start-after &allow-other-keys)
(db (cron-entry . command) cron-entry-and-lambda
(assert (functionp command))
(let ((time-specs (parse-cron-entry cron-entry)))
(create-scheduler-task
scheduler
(make-instance 'lambda-task :time-specs time-specs
:command command
:start-after start-after
:source-entry cron-entry-and-lambda)))))
;;; Scheduler classes
(defclass in-memory-scheduler (scheduler)
((tasks :initform nil :accessor list-scheduler-tasks)))
(defmethod create-scheduler-task
((scheduler in-memory-scheduler) (task task)
&key &allow-other-keys)
(car (push task (list-scheduler-tasks scheduler))))
(defmethod delete-scheduler-task
((scheduler in-memory-scheduler) (task task))
(alexandria:removef (list-scheduler-tasks scheduler) task))
;; (list
;; :every
;; (:every :step 2)
;; (:from 1 :to 4 :step 2)
;; (1 2 3 4)
;; ((1 2 3 4 5 6 7 9 12 23) :step 3)
;; ;; :random is equivalent to (:random :every)
;; :random
;; ;; (:random :step 2) is equivalent to (:random :every :step 2)
;; (:random :step 2)
;; ;; note that step denotes set, from which we start, for instance we
;; ;; pick one of (1 2 3 4) here and move by 4 beginning at picked
;; ;; element. basically we start from (nth (random 4) full-set)
;; (:random (1 2 3 4 5 7 8 10 12 43 55) :step 4)
;; ;; likewise, we pick number from 0 to 14
;; (:random :from 0 :to 59 :step 15))
;; (defun add-scheduler-task (&key
;; (minute :every)
;; (hour :every)
;; (day-of-month :every)
;; (month :every)
;; (day-of-week :every)))
;; (defun add-scheduler-task (schedule-specs function))
;; ;;; query database, plan whole day based on scheduled and not executed
;; ;;; yet tasks. Remove (or set as `retired' schedule tasks which has
;; ;;; ended.
;; (defun update-task-list ())
;; ;; Repeats: weekly
;; ;; Repeat every: x weeks
;; ;; Repeat on: [s] [w]
;; ;; Starts on: [date]
;; ;; Ends: [never], after [n] occurances, on [date]
;; ;; Summary: Weekly on Sunday