From 4ebec7ca62b08f7929b630cdfd4500d9ec06a916 Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Mon, 15 Jul 2019 14:01:14 +0300 Subject: [PATCH 1/2] Optimize Shard.list and Shard.get_by_key Previous list and get_by_key had to go through GenServer to acquire values ets table and replicas information. In case GenServer was processing an update (e.g. heartbeat, track, untrack) then list and get_by_key functions were blocked until it was completed. We saw this behaviour in our cluster where simple list/get_by_key calls were sometimes taking over few hundred milliseconds. Storing down replicas information in an ets table allows us to avoid going through genserver and allows us to process list/get_by_key immediately. I removed dirty_list function which was not public / exposed and which was trying to resolve the same issue. dirty_list was called dirty because it didn't check for down_replicas. This solution checks down_replicas and doesn't change the api interface. This should also resolve #124 --- lib/phoenix/tracker/shard.ex | 13 +++-- lib/phoenix/tracker/state.ex | 58 +++++++++++-------- .../tracker/shard_replication_test.exs | 15 ++--- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 27956caa1..4e20b4791 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -68,23 +68,24 @@ defmodule Phoenix.Tracker.Shard do end @spec list(pid | atom, topic) :: [presence] - def list(server_pid, topic) do + def list(server_pid, topic) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_topic(topic) end - - @doc false - def dirty_list(shard_name, topic) do - State.tracked_values(shard_name, topic, []) + def list(shard_name, topic) when is_atom(shard_name) do + State.get_by_topic(shard_name, topic) end @spec get_by_key(pid | atom, topic, term) :: [presence] - def get_by_key(server_pid, topic, key) do + def get_by_key(server_pid, topic, key) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_key(topic, key) end + def get_by_key(shard_name, topic, key) when is_atom(shard_name) do + State.get_by_key(shard_name, topic, key) + end @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d2d0291d..125380014 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -21,15 +21,15 @@ defmodule Phoenix.Tracker.State do @type pid_lookup :: {pid, topic, key} @type t :: %State{ - replica: name, - context: context, - clouds: clouds, - values: values, - pids: ets_id, - mode: :unset | :delta | :normal, - delta: :unset | delta, - replicas: %{name => :up | :down}, - range: {context, context} + replica: name, + context: context, + clouds: clouds, + values: values, + pids: ets_id, + mode: :unset | :delta | :normal, + delta: :unset | delta, + down_replicas: ets_id, + range: {context, context} } defstruct replica: nil, @@ -39,7 +39,7 @@ defmodule Phoenix.Tracker.State do pids: nil, mode: :unset, delta: :unset, - replicas: %{}, + down_replicas: nil, range: {%{}, %{}} @compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2} @@ -61,7 +61,7 @@ defmodule Phoenix.Tracker.State do mode: :normal, values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), - replicas: %{replica => :up}}) + down_replicas: :ets.new(down_replicas_table(shard_name), [:named_table, :protected, :bag])}) end @doc """ @@ -116,21 +116,30 @@ defmodule Phoenix.Tracker.State do @doc """ Returns a list of elements for the topic who belong to an online replica. """ - @spec get_by_topic(t, topic) :: [key_meta] + @spec get_by_topic(t | atom, topic) :: [key_meta] def get_by_topic(%State{values: values} = state, topic) do tracked_values(values, topic, down_replicas(state)) end + def get_by_topic(shard_name, topic) do + tracked_values(shard_name, topic, down_replicas(shard_name)) + end @doc """ Returns a list of elements for the topic who belong to an online replica. """ - @spec get_by_key(t, topic, key) :: [key_meta] + @spec get_by_key(t | atom, topic, key) :: [key_meta] def get_by_key(%State{values: values} = state, topic, key) do case tracked_key(values, topic, key, down_replicas(state)) do [] -> [] [_|_] = metas -> metas end end + def get_by_key(shard_name, topic, key) do + case tracked_key(shard_name, topic, key, down_replicas(shard_name)) do + [] -> [] + [_|_] = metas -> metas + end + end @doc """ Performs table lookup for tracked elements in the topic. @@ -393,18 +402,18 @@ defmodule Phoenix.Tracker.State do Marks a replica as up in the set and returns rejoined users. """ @spec replica_up(t, name) :: {t, joins :: [values], leaves :: []} - def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do - {%State{state | - context: Map.put_new(ctx, replica, 0), - replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []} + def replica_up(%State{down_replicas: down_replicas, context: ctx} = state, replica) do + :ets.delete_object(down_replicas, replica) + {%State{state | context: Map.put_new(ctx, replica, 0)}, replica_users(state, replica), []} end @doc """ Marks a replica as down in the set and returns left users. """ @spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]} - def replica_down(%State{replicas: replicas} = state, replica) do - {%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)} + def replica_down(%State{down_replicas: down_replicas} = state, replica) do + :ets.insert(down_replicas, replica) + {state, [], replica_users(state, replica)} end @doc """ @@ -555,10 +564,9 @@ defmodule Phoenix.Tracker.State do delta: %State{delta | range: {start_clock, new_end}}} end - @spec down_replicas(t) :: [name] - defp down_replicas(%State{replicas: replicas}) do - for {replica, :down} <- replicas, do: replica - end + @spec down_replicas(t | atom) :: [name] + defp down_replicas(%State{down_replicas: down_replicas}), do: :ets.tab2list(down_replicas) + defp down_replicas(shard_name), do: :ets.tab2list(down_replicas_table(shard_name)) @spec replica_users(t, name) :: [value] defp replica_users(%State{values: values}, replica) do @@ -575,4 +583,8 @@ defmodule Phoenix.Tracker.State do defp foldl({objects, cont}, acc, func) do foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func) end + + defp down_replicas_table(shard_name) do + :"#{shard_name}.down_replicas" + end end diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 88473717d..b9e2ddac4 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -42,6 +42,9 @@ defmodule Phoenix.Tracker.ShardReplicationTest do # node1 fulfills tranfer request and sends transfer_ack to primary assert_transfer_ack ref, from: @node1 assert_heartbeat to: @node1, from: @primary + + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}] = list(shard, topic) end @@ -87,6 +90,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_heartbeat from: @node1 assert_heartbeat from: @node2 + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic) end @@ -237,7 +242,6 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_join ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) # nodedown Process.unlink(node_pid) @@ -245,13 +249,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_leave ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) - - :timer.sleep(@permdown + 2*@heartbeat) - assert [{"local1", _}] = dirty_list(shard, topic) end - test "untrack with no tracked topic is a noop", %{shard: shard, topic: topic} do assert Shard.untrack(shard, self(), topic, "foo") == :ok @@ -382,8 +381,4 @@ defmodule Phoenix.Tracker.ShardReplicationTest do defp list(shard, topic) do Enum.sort(Shard.list(shard, topic)) end - - defp dirty_list(shard, topic) do - Enum.sort(Shard.dirty_list(shard, topic)) - end end From 59938a73e513f307e0ba4df286d3eb7bc67df8fb Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Thu, 23 Jan 2020 16:29:56 +0200 Subject: [PATCH 2/2] Enable read_concurrency for down_replicas https://erlang.org/doc/man/ets.html > Performance tuning. Defaults to false. When set to true, the table is optimized for concurrent read operations. When this option is enabled on a runtime system with SMP support, read operations become much cheaper; especially on systems with multiple physical processors. However, switching between read and write operations becomes more expensive. We have a lot of read operations. Each time get_by_key or get_by_topic is called then this table is read. We want it to be as fast as possible. > You typically want to enable this option when concurrent read operations are much more frequent than write operations, or when concurrent reads and writes comes in large read and write bursts (that is, many reads not interrupted by writes, and many writes not interrupted by reads). This table is rarely updated. It is only updated when a replica goes down or when that replica comes back up. --- lib/phoenix/tracker/state.ex | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 125380014..0c69d8f33 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -61,7 +61,10 @@ defmodule Phoenix.Tracker.State do mode: :normal, values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), - down_replicas: :ets.new(down_replicas_table(shard_name), [:named_table, :protected, :bag])}) + down_replicas: :ets.new( + down_replicas_table(shard_name), + [:named_table, :protected, :bag, {:read_concurrency, true}] + )}) end @doc """