Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement the Protocol #8

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
492c51a
New attributes
sleipnir Jul 11, 2022
3117930
Fix. Comment. Correct URI pattern on default ActorId
sleipnir Jul 11, 2022
f409c87
New types
sleipnir Jul 12, 2022
bfa3a2a
Fix. Attribute order
sleipnir Jul 12, 2022
db2bd0d
Change semantics
sleipnir Jul 12, 2022
0184ede
Added timeout on requests
sleipnir Jul 12, 2022
d49c877
Better comment
sleipnir Jul 12, 2022
3dfeb29
New persistence strategies and others adjusts
sleipnir Jul 14, 2022
712dc03
Fix. Use new attributes
sleipnir Jul 15, 2022
aca7dff
Change database schema
sleipnir Jul 16, 2022
fba2924
Grouped strategies into ActorConfiguration type
sleipnir Jul 18, 2022
ae27a2f
Add comment
sleipnir Jul 18, 2022
a2fa127
Fix. Use ActorConfiguration
sleipnir Jul 18, 2022
abfdc16
Remove unused alias
sleipnir Jul 18, 2022
fcb5eae
Feat. Added CronCommand
sleipnir Jul 18, 2022
0f86f1d
Feat. Add flag to indicate actor dispatcher type
sleipnir Jul 18, 2022
1d0cc34
Use invocation user defined timeout when necessary
sleipnir Jul 19, 2022
115919e
Only start Singleton Actors on register phase
sleipnir Jul 19, 2022
38405a3
Set initial strategies
sleipnir Jul 20, 2022
d6d1581
Refactor. Rename and better logs
sleipnir Jul 20, 2022
0f9b976
Refactor. Extract ActorEntity logic to another modules
sleipnir Jul 20, 2022
9b32e49
Refactor. Move functions
sleipnir Jul 20, 2022
af86bdc
Include other sdk and some minor changes
sleipnir Jul 23, 2022
a4bd7e3
Include snapshot strategies and minor changes
sleipnir Jul 26, 2022
fc516fc
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 23, 2022
b17542a
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
3c57cc6
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
260e447
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
96ff13f
merge
sleipnir Aug 24, 2022
c2fe1cf
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
ee75ec6
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
86dc2ec
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
88b56a4
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
deac66f
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 26, 2022
8845389
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 26, 2022
ad86c2f
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 27, 2022
ec43749
merge
sleipnir Aug 29, 2022
d3050c8
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 29, 2022
30a219c
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 29, 2022
08a1130
Merge branch 'main' into feat/protocol-improvement
sleipnir Sep 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions apps/actors/lib/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ defmodule Actors do

alias Actors.Registry.ActorRegistry

alias Eigr.Functions.Protocol.Actors.{Actor, ActorSystem, Registry}
alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
ActorConfiguration,
ActorSystem,
Registry
}

alias Eigr.Functions.Protocol.{
InvocationRequest,
Expand All @@ -27,12 +33,12 @@ defmodule Actors do
service_info: %ServiceInfo{} = _service_info,
actor_system:
%ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
actor_system
_actor_system
} = _registration
) do
ActorRegistry.register(actors)

with :ok <- create_actors(actor_system, actors) do
with :ok <- create_actors(actors) do
proxy_info =
ProxyInfo.new(
protocol_major_version: 1,
Expand Down Expand Up @@ -84,18 +90,16 @@ defmodule Actors do

def invoke(
%InvocationRequest{
actor: %Actor{} = actor,
system: %ActorSystem{} = system,
actor_id: %ActorId{} = actor,
async: type
} = request
) do
invoke(type, system, actor, request)
invoke(type, actor, request)
end

defp invoke(
false,
%ActorSystem{name: system_name} = system,
%Actor{name: actor_name} = actor,
%ActorId{name: actor_name, actor_system: system_name} = _actor_id,
request
) do
case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, actor_name) do
Expand All @@ -105,9 +109,10 @@ defmodule Actors do
ActorEntity.invoke(actor_name, request)

_ ->
with {:ok, %{node: node, actor: _registered_actor}} <-
with {:ok, %{node: node, actor: registered_actor}} <-
ActorRegistry.lookup(system_name, actor_name),
_pid <- Node.spawn(node, __MODULE__, :try_reactivate_actor, [system, actor]) do
_pid <-
Node.spawn(node, __MODULE__, :try_reactivate_actor, [registered_actor]) do
Process.sleep(1)
{:ok, response_body} = ActorEntity.invoke(actor_name, request)

Expand All @@ -133,8 +138,7 @@ defmodule Actors do

defp invoke(
true,
%ActorSystem{name: system_name} = system,
%Actor{name: actor_name} = actor,
%ActorId{name: actor_name, actor_system: system_name} = _actor_id,
request
) do
case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, actor_name) do
Expand All @@ -144,9 +148,10 @@ defmodule Actors do
ActorEntity.invoke(actor_name, request)

_ ->
with {:ok, %{node: node, actor: _registered_actor}} <-
with {:ok, %{node: node, actor: registered_actor}} <-
ActorRegistry.lookup(system_name, actor_name),
_pid <- Node.spawn(node, __MODULE__, :try_reactivate_actor, [system, actor]) do
_pid <-
Node.spawn(node, __MODULE__, :try_reactivate_actor, [registered_actor]) do
Process.sleep(1)
{:ok, response_body} = ActorEntity.invoke_async(actor_name, request)

Expand All @@ -170,8 +175,8 @@ defmodule Actors do
end
end

def try_reactivate_actor(%ActorSystem{} = system, %Actor{name: name} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(system, actor) do
def try_reactivate_actor(%Actor{actor_id: %ActorId{name: name}} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor) do
{:ok, pid} ->
Logger.debug("Actor #{name} reactivated. PID: #{inspect(pid)}")
{:ok, pid}
Expand All @@ -182,29 +187,17 @@ defmodule Actors do
end
end

# To lookup all actors
def try_reactivate_actor(nil, %Actor{name: name} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(nil, actor) do
{:ok, pid} ->
Logger.debug("Actor #{name} reactivated. PID: #{inspect(pid)}")
{:ok, pid}

reason ->
Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
{:error, reason}
end
end

defp create_actors(actor_system, actors) do
defp create_actors(actors) do
actors
|> Flow.from_enumerable(
min_demand: @activate_actors_min_demand,
max_demand: @activate_actors_max_demand
)
|> Flow.filter(&is_singleton?/1)
|> Flow.map(fn {actor_name, actor} ->
Logger.debug("Registering #{actor_name} #{inspect(actor)} on Node: #{inspect(Node.self())}")

{time, result} = :timer.tc(&lookup_actor/3, [actor_system, actor_name, actor])
{time, result} = :timer.tc(&lookup_actor/2, [actor_name, actor])

Logger.info(
"Registered and Activated the #{actor_name} on Node #{inspect(Node.self())} in #{inspect(time)}ms"
Expand All @@ -215,8 +208,17 @@ defmodule Actors do
|> Flow.run()
end

defp lookup_actor(actor_system, actor_name, actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor_system, actor) do
defp is_singleton?(
{_actor_name,
%Actor{configuration: %ActorConfiguration{dispatcher: SINGLETON_DISPATCHER}}}
),
do: true

defp is_singleton?({_actor_name, %Actor{configuration: %ActorConfiguration{dispatcher: _}}}),
do: false

defp lookup_actor(actor_name, actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor) do
{:ok, pid} ->
{:ok, pid}

Expand Down
Loading