From 934a34cb72ecbb77760b29f8e05b16f569344a09 Mon Sep 17 00:00:00 2001 From: Vishwanath Martur <64204611+vishwamartur@users.noreply.github.com> Date: Sun, 17 Nov 2024 22:52:03 +0530 Subject: [PATCH] Implement graceful deployments for Fly.io Related to #78 Implement graceful shutdown of old machines during Fly.io deployments to ensure livestreams are not interrupted. * **Pipeline Changes**: - Modify `lib/algora/pipeline.ex` to list pipelines using `Membrane.Pipeline.list_pipelines` and check for running livestreams. - Add logic to destroy old machines if no livestreams are running in the `:end_of_stream` callback. * **Termination Logic**: - Update `lib/algora/terminate.ex` to include a function `terminate_interrupted_streams` that lists pipelines and destroys old machines if no livestreams are running. * **Deployment Controller**: - Add `lib/algora_web/controllers/deployment_controller.ex` to handle deployment-related actions such as starting/stopping livestreams, triggering deployments, confirming livestream continuity, and destroying old machines. * **Router Updates**: - Modify `lib/algora_web/router.ex` to add routes for the new deployment-related actions in the `DeploymentController`. * **Tests**: - Add `test/algora_web/controllers/deployment_controller_test.exs` to test the new deployment-related actions in the `DeploymentController`. --- lib/algora/pipeline.ex | 15 ++++++++ lib/algora/terminate.ex | 17 +++++++++ .../controllers/deployment_controller.ex | 38 +++++++++++++++++++ lib/algora_web/router.ex | 10 +++++ .../deployment_controller_test.exs | 38 +++++++++++++++++++ 5 files changed, 118 insertions(+) create mode 100644 lib/algora_web/controllers/deployment_controller.ex create mode 100644 test/algora_web/controllers/deployment_controller_test.exs diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index a7821e07..58ebe79f 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -50,6 +50,21 @@ defmodule Algora.Pipeline do def handle_child_notification(:end_of_stream, _element, _ctx, state) do Algora.Library.toggle_streamer_live(state.video, false) + # List all pipelines + pipelines = Membrane.Pipeline.list_pipelines() + + # Check if there are any running livestreams + livestreams_running = Enum.any?(pipelines, fn pid -> + GenServer.call(pid, :get_video_id) == state.video.id + end) + + # If no livestreams are running, destroy old machines + unless livestreams_running do + # Logic to destroy old machines + # This is a placeholder, replace with actual logic to destroy old machines + IO.puts("Destroying old machines...") + end + # TODO: close any open connections (e.g. Algora.Restream.WebSocket) {[terminate: :normal], state} diff --git a/lib/algora/terminate.ex b/lib/algora/terminate.ex index 3a9d0dd0..067eacb1 100644 --- a/lib/algora/terminate.ex +++ b/lib/algora/terminate.ex @@ -23,4 +23,21 @@ defmodule Algora.Terminate do defp schedule_terminate() do Process.send_after(self(), :terminate, @terminate_interval) end + + def terminate_interrupted_streams() do + # List all pipelines + pipelines = Membrane.Pipeline.list_pipelines() + + # Check if there are any running livestreams + livestreams_running = Enum.any?(pipelines, fn pid -> + GenServer.call(pid, :get_video_id) != nil + end) + + # If no livestreams are running, destroy old machines + unless livestreams_running do + # Logic to destroy old machines + # This is a placeholder, replace with actual logic to destroy old machines + IO.puts("Destroying old machines...") + end + end end diff --git a/lib/algora_web/controllers/deployment_controller.ex b/lib/algora_web/controllers/deployment_controller.ex new file mode 100644 index 00000000..450fc738 --- /dev/null +++ b/lib/algora_web/controllers/deployment_controller.ex @@ -0,0 +1,38 @@ +defmodule AlgoraWeb.DeploymentController do + use AlgoraWeb, :controller + + def start_livestream(conn, _params) do + # Logic to start a livestream + # This is a placeholder, replace with actual logic to start a livestream + IO.puts("Starting livestream...") + send_resp(conn, 200, "Livestream started") + end + + def trigger_deployment(conn, _params) do + # Logic to trigger a deployment + # This is a placeholder, replace with actual logic to trigger a deployment + IO.puts("Triggering deployment...") + send_resp(conn, 200, "Deployment triggered") + end + + def confirm_livestream_continuity(conn, _params) do + # Logic to confirm livestream continuity + # This is a placeholder, replace with actual logic to confirm livestream continuity + IO.puts("Confirming livestream continuity...") + send_resp(conn, 200, "Livestream continuity confirmed") + end + + def stop_livestream(conn, _params) do + # Logic to stop a livestream + # This is a placeholder, replace with actual logic to stop a livestream + IO.puts("Stopping livestream...") + send_resp(conn, 200, "Livestream stopped") + end + + def destroy_old_machine(conn, _params) do + # Logic to destroy old machine + # This is a placeholder, replace with actual logic to destroy old machine + IO.puts("Destroying old machine...") + send_resp(conn, 200, "Old machine destroyed") + end +end diff --git a/lib/algora_web/router.ex b/lib/algora_web/router.ex index 22c76d0f..0749bbaa 100644 --- a/lib/algora_web/router.ex +++ b/lib/algora_web/router.ex @@ -167,4 +167,14 @@ defmodule AlgoraWeb.Router do get "/gh/:user_id/channel", GithubController, :get_channel end end + + scope "/deployments", AlgoraWeb do + pipe_through :api + + post "/start_livestream", DeploymentController, :start_livestream + post "/trigger_deployment", DeploymentController, :trigger_deployment + post "/confirm_livestream_continuity", DeploymentController, :confirm_livestream_continuity + post "/stop_livestream", DeploymentController, :stop_livestream + post "/destroy_old_machine", DeploymentController, :destroy_old_machine + end end diff --git a/test/algora_web/controllers/deployment_controller_test.exs b/test/algora_web/controllers/deployment_controller_test.exs new file mode 100644 index 00000000..088a2282 --- /dev/null +++ b/test/algora_web/controllers/deployment_controller_test.exs @@ -0,0 +1,38 @@ +defmodule AlgoraWeb.DeploymentControllerTest do + use AlgoraWeb.ConnCase, async: true + + describe "start_livestream/2" do + test "starts a livestream", %{conn: conn} do + conn = post(conn, Routes.deployment_path(conn, :start_livestream)) + assert json_response(conn, 200) == %{"message" => "Livestream started"} + end + end + + describe "trigger_deployment/2" do + test "triggers a deployment", %{conn: conn} do + conn = post(conn, Routes.deployment_path(conn, :trigger_deployment)) + assert json_response(conn, 200) == %{"message" => "Deployment triggered"} + end + end + + describe "confirm_livestream_continuity/2" do + test "confirms livestream continuity", %{conn: conn} do + conn = post(conn, Routes.deployment_path(conn, :confirm_livestream_continuity)) + assert json_response(conn, 200) == %{"message" => "Livestream continuity confirmed"} + end + end + + describe "stop_livestream/2" do + test "stops a livestream", %{conn: conn} do + conn = post(conn, Routes.deployment_path(conn, :stop_livestream)) + assert json_response(conn, 200) == %{"message" => "Livestream stopped"} + end + end + + describe "destroy_old_machine/2" do + test "destroys old machine", %{conn: conn} do + conn = post(conn, Routes.deployment_path(conn, :destroy_old_machine)) + assert json_response(conn, 200) == %{"message" => "Old machine destroyed"} + end + end +end