Skip to content

Commit

Permalink
Implement graceful deployments for Fly.io
Browse files Browse the repository at this point in the history
Related to algora-io#78

Implement graceful shutdown of old machines during Fly.io deployments to ensure livestreams are not interrupted.

* **Pipeline Changes**:
  - Modify `lib/algora/pipeline.ex` to list pipelines using `Membrane.Pipeline.list_pipelines` and check for running livestreams.
  - Add logic to destroy old machines if no livestreams are running in the `:end_of_stream` callback.

* **Termination Logic**:
  - Update `lib/algora/terminate.ex` to include a function `terminate_interrupted_streams` that lists pipelines and destroys old machines if no livestreams are running.

* **Deployment Controller**:
  - Add `lib/algora_web/controllers/deployment_controller.ex` to handle deployment-related actions such as starting/stopping livestreams, triggering deployments, confirming livestream continuity, and destroying old machines.

* **Router Updates**:
  - Modify `lib/algora_web/router.ex` to add routes for the new deployment-related actions in the `DeploymentController`.

* **Tests**:
  - Add `test/algora_web/controllers/deployment_controller_test.exs` to test the new deployment-related actions in the `DeploymentController`.
  • Loading branch information
vishwamartur committed Nov 17, 2024
1 parent c4ab44e commit 934a34c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
15 changes: 15 additions & 0 deletions lib/algora/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ defmodule Algora.Pipeline do
def handle_child_notification(:end_of_stream, _element, _ctx, state) do
Algora.Library.toggle_streamer_live(state.video, false)

# List all pipelines
pipelines = Membrane.Pipeline.list_pipelines()

# Check if there are any running livestreams
livestreams_running = Enum.any?(pipelines, fn pid ->
GenServer.call(pid, :get_video_id) == state.video.id
end)

# If no livestreams are running, destroy old machines
unless livestreams_running do
# Logic to destroy old machines
# This is a placeholder, replace with actual logic to destroy old machines
IO.puts("Destroying old machines...")
end

# TODO: close any open connections (e.g. Algora.Restream.WebSocket)

{[terminate: :normal], state}
Expand Down
17 changes: 17 additions & 0 deletions lib/algora/terminate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,21 @@ defmodule Algora.Terminate do
defp schedule_terminate() do
Process.send_after(self(), :terminate, @terminate_interval)
end

def terminate_interrupted_streams() do
# List all pipelines
pipelines = Membrane.Pipeline.list_pipelines()

# Check if there are any running livestreams
livestreams_running = Enum.any?(pipelines, fn pid ->
GenServer.call(pid, :get_video_id) != nil
end)

# If no livestreams are running, destroy old machines
unless livestreams_running do
# Logic to destroy old machines
# This is a placeholder, replace with actual logic to destroy old machines
IO.puts("Destroying old machines...")
end
end
end
38 changes: 38 additions & 0 deletions lib/algora_web/controllers/deployment_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule AlgoraWeb.DeploymentController do
use AlgoraWeb, :controller

def start_livestream(conn, _params) do
# Logic to start a livestream
# This is a placeholder, replace with actual logic to start a livestream
IO.puts("Starting livestream...")
send_resp(conn, 200, "Livestream started")
end

def trigger_deployment(conn, _params) do
# Logic to trigger a deployment
# This is a placeholder, replace with actual logic to trigger a deployment
IO.puts("Triggering deployment...")
send_resp(conn, 200, "Deployment triggered")
end

def confirm_livestream_continuity(conn, _params) do
# Logic to confirm livestream continuity
# This is a placeholder, replace with actual logic to confirm livestream continuity
IO.puts("Confirming livestream continuity...")
send_resp(conn, 200, "Livestream continuity confirmed")
end

def stop_livestream(conn, _params) do
# Logic to stop a livestream
# This is a placeholder, replace with actual logic to stop a livestream
IO.puts("Stopping livestream...")
send_resp(conn, 200, "Livestream stopped")
end

def destroy_old_machine(conn, _params) do
# Logic to destroy old machine
# This is a placeholder, replace with actual logic to destroy old machine
IO.puts("Destroying old machine...")
send_resp(conn, 200, "Old machine destroyed")
end
end
10 changes: 10 additions & 0 deletions lib/algora_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,14 @@ defmodule AlgoraWeb.Router do
get "/gh/:user_id/channel", GithubController, :get_channel
end
end

scope "/deployments", AlgoraWeb do
pipe_through :api

post "/start_livestream", DeploymentController, :start_livestream
post "/trigger_deployment", DeploymentController, :trigger_deployment
post "/confirm_livestream_continuity", DeploymentController, :confirm_livestream_continuity
post "/stop_livestream", DeploymentController, :stop_livestream
post "/destroy_old_machine", DeploymentController, :destroy_old_machine
end
end
38 changes: 38 additions & 0 deletions test/algora_web/controllers/deployment_controller_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule AlgoraWeb.DeploymentControllerTest do
use AlgoraWeb.ConnCase, async: true

describe "start_livestream/2" do
test "starts a livestream", %{conn: conn} do
conn = post(conn, Routes.deployment_path(conn, :start_livestream))
assert json_response(conn, 200) == %{"message" => "Livestream started"}
end
end

describe "trigger_deployment/2" do
test "triggers a deployment", %{conn: conn} do
conn = post(conn, Routes.deployment_path(conn, :trigger_deployment))
assert json_response(conn, 200) == %{"message" => "Deployment triggered"}
end
end

describe "confirm_livestream_continuity/2" do
test "confirms livestream continuity", %{conn: conn} do
conn = post(conn, Routes.deployment_path(conn, :confirm_livestream_continuity))
assert json_response(conn, 200) == %{"message" => "Livestream continuity confirmed"}
end
end

describe "stop_livestream/2" do
test "stops a livestream", %{conn: conn} do
conn = post(conn, Routes.deployment_path(conn, :stop_livestream))
assert json_response(conn, 200) == %{"message" => "Livestream stopped"}
end
end

describe "destroy_old_machine/2" do
test "destroys old machine", %{conn: conn} do
conn = post(conn, Routes.deployment_path(conn, :destroy_old_machine))
assert json_response(conn, 200) == %{"message" => "Old machine destroyed"}
end
end
end

0 comments on commit 934a34c

Please sign in to comment.