diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index a544ed79bb..10ebd1e7e8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -33,6 +33,10 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () +let stopping = Atomic.make false + +let (loop_thread : Thread.t option ref) = ref None + module Clock = struct let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9)) @@ -50,23 +54,18 @@ module Clock = struct Mtime.min_stamp end -let add_to_queue name ty start newfunc = - let ( ++ ) = Clock.add_span in - let item = - {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} - in - with_lock lock (fun () -> Ipq.add queue item) ; - Delay.signal delay - -let remove_from_queue name = +let loop_stop () = with_lock lock @@ fun () -> - match !pending_event with - | Some ev when ev.name = name -> - pending_event := None - | Some _ | None -> - let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in - if index > -1 then - Ipq.remove queue index + match !loop_thread with + | Some thread -> + Atomic.set stopping true ; + Delay.signal delay ; + Delay.signal delay ; + Thread.join thread ; + Atomic.set stopping false ; + loop_thread := None + | None -> + () let add_periodic_pending () = with_lock lock @@ fun () -> @@ -84,7 +83,7 @@ let add_periodic_pending () = let loop () = debug "%s started" __MODULE__ ; try - while true do + while not (Atomic.get stopping) do let now = Mtime_clock.now () in let deadline, item = with_lock lock @@ fun () -> @@ -133,3 +132,41 @@ let loop () = error "Scheduler thread died! This daemon will no longer function well and \ should be restarted." + +let loop_start wrapper = + loop_stop () ; + with_lock lock @@ fun () -> + match !loop_thread with + | Some _thread -> + () + | None -> + let loop = + match wrapper with + | Some wrapper -> + fun () -> wrapper loop + | None -> + loop + in + loop_thread := Some (Thread.create loop ()) ; + () + +let add_to_queue name ty start newfunc = + let ( ++ ) = Clock.add_span in + let item = + {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} + in + with_lock lock (fun () -> + Ipq.add queue item ; + if !loop_thread = None then loop_thread := Some (Thread.create loop ()) + ) ; + Delay.signal delay + +let remove_from_queue name = + with_lock lock @@ fun () -> + match !pending_event with + | Some ev when ev.name = name -> + pending_event := None + | Some _ | None -> + let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in + if index > -1 then + Ipq.remove queue index diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli index d4d19b1f79..5e9b6a37b6 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli @@ -24,5 +24,5 @@ val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit val remove_from_queue : string -> unit (** Remove a scheduled item by name *) -val loop : unit -> unit -(** The scheduler's main loop, started by {!Xapi} on start-up. *) +val loop_start : ((unit -> unit) -> unit) option -> unit +(** Start the scheduler's main loop. *) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml index 0a4a847403..f1405bba04 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -14,12 +14,6 @@ module Scheduler = Xapi_stdext_threads_scheduler.Scheduler -let started = Atomic.make false - -let start_schedule () = - if not (Atomic.exchange started true) then - Thread.create Scheduler.loop () |> ignore - let send event data = Event.(send event data |> sync) let receive event = Event.(receive event |> sync) @@ -35,7 +29,6 @@ let test_single () = Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> send finished true ) ; - start_schedule () ; Alcotest.(check bool) "result" true (receive finished) let test_remove_self () = @@ -49,7 +42,6 @@ let test_remove_self () = ) ; send which "self" ) ; - start_schedule () ; let cnt = Mtime_clock.counter () in Alcotest.(check string) "same event name" "self" (receive which) ; Alcotest.(check string) "same event name" "stop" (receive which) ; @@ -61,7 +53,6 @@ let test_empty () = Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> send finished true ) ; - start_schedule () ; Alcotest.(check bool) "finished" true (receive finished) ; (* wait loop to go to wait with no work to do *) Thread.delay 0.1 ; @@ -79,7 +70,6 @@ let test_wakeup () = Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () -> send which "long" ) ; - start_schedule () ; (* wait loop to go to wait with no work to do *) Thread.delay 0.1 ; let cnt = Mtime_clock.counter () in @@ -92,12 +82,28 @@ let test_wakeup () = let elapsed_ms = elapsed_ms cnt in Alcotest.check is_less "small time" 150 elapsed_ms +let test_start () = + let which = Event.new_channel () in + Scheduler.add_to_queue "done" Scheduler.OneShot 0.05 (fun () -> + send which "done" + ) ; + let wrapper f = send which "wrapper" ; f () in + Scheduler.loop_start (Some wrapper) ; + Alcotest.(check string) "same event name" "wrapper" (receive which) ; + Scheduler.loop_start (Some wrapper) ; + Alcotest.(check string) "same event name" "wrapper" (receive which) ; + let cnt = Mtime_clock.counter () in + Alcotest.(check string) "same event name" "done" (receive which) ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 100 elapsed_ms + let tests = [ ("test_single", `Quick, test_single) ; ("test_remove_self", `Quick, test_remove_self) ; ("test_empty", `Quick, test_empty) ; ("test_wakeup", `Quick, test_wakeup) + ; ("test_start", `Quick, test_start) ] let () = Alcotest.run "Scheduler" [("generic", tests)] diff --git a/ocaml/tests/common/test_event_common.ml b/ocaml/tests/common/test_event_common.ml index 9d37c038ab..ff4a10c4dc 100644 --- a/ocaml/tests/common/test_event_common.ml +++ b/ocaml/tests/common/test_event_common.ml @@ -11,7 +11,6 @@ let start_periodic_scheduler () = else ( Scheduler.add_to_queue "dummy" (Scheduler.Periodic 60.0) 0.0 (fun () -> ()) ; Xapi_event.register_hooks () ; - ignore (Thread.create Scheduler.loop ()) ; ps_start := true ) ; Mutex.unlock scheduler_mutex diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index fd5c065026..4d951c9169 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -1041,7 +1041,13 @@ let server_init () = ) in try + let scheduler_name = "Periodic scheduler" in Server_helpers.exec_with_new_task "server_init" (fun __context -> + let scheduler_wrapper func = + Server_helpers.exec_with_subtask ~__context scheduler_name + (fun ~__context -> Startup.thread_exn_wrapper scheduler_name func + ) + in Startup.run ~__context [ ("XAPI SERVER STARTING", [], print_server_starting_message) @@ -1113,8 +1119,10 @@ let server_init () = , bring_up_management_if ~__context ) ; ( "Starting periodic scheduler" - , [Startup.OnThread] - , Xapi_stdext_threads_scheduler.Scheduler.loop + , [] + , fun () -> + Xapi_stdext_threads_scheduler.Scheduler.loop_start + (Some scheduler_wrapper) ) ; ( "Synchronising host configuration files" , []