Skip to content

Commit

Permalink
Starts periodic schedule automatically
Browse files Browse the repository at this point in the history
Remove dependency from Xapi.
The function "loop_start" allows to set a wrapper for the thread
loop if needed.

Signed-off-by: Frediano Ziglio <[email protected]>
  • Loading branch information
freddy77 committed Jan 7, 2025
1 parent dc565fd commit 95792db
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 32 deletions.
71 changes: 54 additions & 17 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()

let stopping = Atomic.make false

let (loop_thread : Thread.t option ref) = ref None

module Clock = struct
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))

Expand All @@ -50,23 +54,18 @@ module Clock = struct
Mtime.min_stamp
end

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let remove_from_queue name =
let loop_stop () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index
match !loop_thread with
| Some thread ->
Atomic.set stopping true ;
Delay.signal delay ;
Delay.signal delay ;
Thread.join thread ;
Atomic.set stopping false ;
loop_thread := None
| None ->
()

let add_periodic_pending () =
with_lock lock @@ fun () ->
Expand All @@ -84,7 +83,7 @@ let add_periodic_pending () =
let loop () =
debug "%s started" __MODULE__ ;
try
while true do
while not (Atomic.get stopping) do
let now = Mtime_clock.now () in
let deadline, item =
with_lock lock @@ fun () ->
Expand Down Expand Up @@ -133,3 +132,41 @@ let loop () =
error
"Scheduler thread died! This daemon will no longer function well and \
should be restarted."

let loop_start wrapper =
loop_stop () ;
with_lock lock @@ fun () ->
match !loop_thread with
| Some _thread ->
()
| None ->
let loop =
match wrapper with
| Some wrapper ->
fun () -> wrapper loop
| None ->
loop
in
loop_thread := Some (Thread.create loop ()) ;
()

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () ->
Ipq.add queue item ;
if !loop_thread = None then loop_thread := Some (Thread.create loop ())
) ;
Delay.signal delay

let remove_from_queue name =
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index
4 changes: 2 additions & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
val remove_from_queue : string -> unit
(** Remove a scheduled item by name *)

val loop : unit -> unit
(** The scheduler's main loop, started by {!Xapi} on start-up. *)
val loop_start : ((unit -> unit) -> unit) option -> unit
(** Start the scheduler's main loop. *)
26 changes: 16 additions & 10 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let started = Atomic.make false

let start_schedule () =
if not (Atomic.exchange started true) then
Thread.create Scheduler.loop () |> ignore

let send event data = Event.(send event data |> sync)

let receive event = Event.(receive event |> sync)
Expand All @@ -35,7 +29,6 @@ let test_single () =
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "result" true (receive finished)

let test_remove_self () =
Expand All @@ -49,7 +42,6 @@ let test_remove_self () =
) ;
send which "self"
) ;
start_schedule () ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "self" (receive which) ;
Alcotest.(check string) "same event name" "stop" (receive which) ;
Expand All @@ -61,7 +53,6 @@ let test_empty () =
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "finished" true (receive finished) ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
Expand All @@ -79,7 +70,6 @@ let test_wakeup () =
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
send which "long"
) ;
start_schedule () ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
let cnt = Mtime_clock.counter () in
Expand All @@ -92,12 +82,28 @@ let test_wakeup () =
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 150 elapsed_ms

let test_start () =
let which = Event.new_channel () in
Scheduler.add_to_queue "done" Scheduler.OneShot 0.05 (fun () ->
send which "done"
) ;
let wrapper f = send which "wrapper" ; f () in
Scheduler.loop_start (Some wrapper) ;
Alcotest.(check string) "same event name" "wrapper" (receive which) ;
Scheduler.loop_start (Some wrapper) ;
Alcotest.(check string) "same event name" "wrapper" (receive which) ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "done" (receive which) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 100 elapsed_ms

let tests =
[
("test_single", `Quick, test_single)
; ("test_remove_self", `Quick, test_remove_self)
; ("test_empty", `Quick, test_empty)
; ("test_wakeup", `Quick, test_wakeup)
; ("test_start", `Quick, test_start)
]

let () = Alcotest.run "Scheduler" [("generic", tests)]
1 change: 0 additions & 1 deletion ocaml/tests/common/test_event_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ let start_periodic_scheduler () =
else (
Scheduler.add_to_queue "dummy" (Scheduler.Periodic 60.0) 0.0 (fun () -> ()) ;
Xapi_event.register_hooks () ;
ignore (Thread.create Scheduler.loop ()) ;
ps_start := true
) ;
Mutex.unlock scheduler_mutex
Expand Down
12 changes: 10 additions & 2 deletions ocaml/xapi/xapi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,13 @@ let server_init () =
)
in
try
let scheduler_name = "Periodic scheduler" in
Server_helpers.exec_with_new_task "server_init" (fun __context ->
let scheduler_wrapper func =
Server_helpers.exec_with_subtask ~__context scheduler_name
(fun ~__context -> Startup.thread_exn_wrapper scheduler_name func
)
in
Startup.run ~__context
[
("XAPI SERVER STARTING", [], print_server_starting_message)
Expand Down Expand Up @@ -1113,8 +1119,10 @@ let server_init () =
, bring_up_management_if ~__context
)
; ( "Starting periodic scheduler"
, [Startup.OnThread]
, Xapi_stdext_threads_scheduler.Scheduler.loop
, []
, fun () ->
Xapi_stdext_threads_scheduler.Scheduler.loop_start
(Some scheduler_wrapper)
)
; ( "Synchronising host configuration files"
, []
Expand Down

0 comments on commit 95792db

Please sign in to comment.