Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement the Protocol #8

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
492c51a
New attributes
sleipnir Jul 11, 2022
3117930
Fix. Comment. Correct URI pattern on default ActorId
sleipnir Jul 11, 2022
f409c87
New types
sleipnir Jul 12, 2022
bfa3a2a
Fix. Attribute order
sleipnir Jul 12, 2022
db2bd0d
Change semantics
sleipnir Jul 12, 2022
0184ede
Added timeout on requests
sleipnir Jul 12, 2022
d49c877
Better comment
sleipnir Jul 12, 2022
3dfeb29
New persistence strategies and others adjusts
sleipnir Jul 14, 2022
712dc03
Fix. Use new attributes
sleipnir Jul 15, 2022
aca7dff
Change database schema
sleipnir Jul 16, 2022
fba2924
Grouped strategies into ActorConfiguration type
sleipnir Jul 18, 2022
ae27a2f
Add comment
sleipnir Jul 18, 2022
a2fa127
Fix. Use ActorConfiguration
sleipnir Jul 18, 2022
abfdc16
Remove unused alias
sleipnir Jul 18, 2022
fcb5eae
Feat. Added CronCommand
sleipnir Jul 18, 2022
0f86f1d
Feat. Add flag to indicate actor dispatcher type
sleipnir Jul 18, 2022
1d0cc34
Use invocation user defined timeout when necessary
sleipnir Jul 19, 2022
115919e
Only start Singleton Actors on register phase
sleipnir Jul 19, 2022
38405a3
Set initial strategies
sleipnir Jul 20, 2022
d6d1581
Refactor. Rename and better logs
sleipnir Jul 20, 2022
0f9b976
Refactor. Extract ActorEntity logic to another modules
sleipnir Jul 20, 2022
9b32e49
Refactor. Move functions
sleipnir Jul 20, 2022
af86bdc
Include other sdk and some minor changes
sleipnir Jul 23, 2022
a4bd7e3
Include snapshot strategies and minor changes
sleipnir Jul 26, 2022
fc516fc
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 23, 2022
b17542a
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
3c57cc6
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
260e447
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
96ff13f
merge
sleipnir Aug 24, 2022
c2fe1cf
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 24, 2022
ee75ec6
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
86dc2ec
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
88b56a4
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 25, 2022
deac66f
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 26, 2022
8845389
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 26, 2022
ad86c2f
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 27, 2022
ec43749
merge
sleipnir Aug 29, 2022
d3050c8
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 29, 2022
30a219c
Merge branch 'main' into feat/protocol-improvement
sleipnir Aug 29, 2022
08a1130
Merge branch 'main' into feat/protocol-improvement
sleipnir Sep 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions apps/actors/lib/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Actors do

alias Actors.Registry.ActorRegistry

alias Eigr.Functions.Protocol.Actors.{Actor, ActorSystem, Registry}
alias Eigr.Functions.Protocol.Actors.{Actor, ActorId, ActorSystem, Registry}

alias Eigr.Functions.Protocol.{
InvocationRequest,
Expand All @@ -27,12 +27,12 @@ defmodule Actors do
service_info: %ServiceInfo{} = _service_info,
actor_system:
%ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
actor_system
_actor_system
} = _registration
) do
ActorRegistry.register(actors)

with :ok <- create_actors(actor_system, actors) do
with :ok <- create_actors(actors) do
proxy_info =
ProxyInfo.new(
protocol_major_version: 1,
Expand Down Expand Up @@ -84,18 +84,16 @@ defmodule Actors do

def invoke(
%InvocationRequest{
actor: %Actor{} = actor,
system: %ActorSystem{} = system,
actor_id: %ActorId{} = actor,
async: type
} = request
) do
invoke(type, system, actor, request)
invoke(type, actor, request)
end

defp invoke(
false,
%ActorSystem{name: system_name} = system,
%Actor{name: actor_name} = actor,
%ActorId{name: actor_name, actor_system: system_name} = _actor_id,
request
) do
case Actors.Actor.Registry.lookup(actor_name) do
Expand All @@ -105,9 +103,10 @@ defmodule Actors do
ActorEntity.invoke(actor_name, request)

_ ->
with {:ok, %{node: node, actor: _registered_actor}} <-
with {:ok, %{node: node, actor: registered_actor}} <-
ActorRegistry.lookup(system_name, actor_name),
_pid <- Node.spawn(node, __MODULE__, :try_reactivate_actor, [system, actor]) do
_pid <-
Node.spawn(node, __MODULE__, :try_reactivate_actor, [registered_actor]) do
Process.sleep(1)
{:ok, response_body} = ActorEntity.invoke(actor_name, request)

Expand All @@ -133,8 +132,7 @@ defmodule Actors do

defp invoke(
true,
%ActorSystem{name: system_name} = system,
%Actor{name: actor_name} = actor,
%ActorId{name: actor_name, actor_system: system_name} = _actor_id,
request
) do
case Actors.Actor.Registry.lookup(actor_name) do
Expand All @@ -144,9 +142,10 @@ defmodule Actors do
ActorEntity.invoke(actor_name, request)

_ ->
with {:ok, %{node: node, actor: _registered_actor}} <-
with {:ok, %{node: node, actor: registered_actor}} <-
ActorRegistry.lookup(system_name, actor_name),
_pid <- Node.spawn(node, __MODULE__, :try_reactivate_actor, [system, actor]) do
_pid <-
Node.spawn(node, __MODULE__, :try_reactivate_actor, [registered_actor]) do
Process.sleep(1)
{:ok, response_body} = ActorEntity.invoke_async(actor_name, request)

Expand All @@ -170,8 +169,8 @@ defmodule Actors do
end
end

def try_reactivate_actor(%ActorSystem{} = system, %Actor{name: name} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(system, actor) do
def try_reactivate_actor(%Actor{actor_id: %ActorId{name: name}} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor) do
{:ok, pid} ->
Logger.debug("Actor #{name} reactivated. PID: #{inspect(pid)}")
{:ok, pid}
Expand All @@ -182,20 +181,7 @@ defmodule Actors do
end
end

# To lookup all actors
def try_reactivate_actor(nil, %Actor{name: name} = actor) do
case ActorEntitySupervisor.lookup_or_create_actor(nil, actor) do
{:ok, pid} ->
Logger.debug("Actor #{name} reactivated. PID: #{inspect(pid)}")
{:ok, pid}

reason ->
Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
{:error, reason}
end
end

defp create_actors(actor_system, actors) do
defp create_actors(actors) do
actors
|> Flow.from_enumerable(
min_demand: @activate_actors_min_demand,
Expand All @@ -204,7 +190,7 @@ defmodule Actors do
|> Flow.map(fn {actor_name, actor} ->
Logger.debug("Registering #{actor_name} #{inspect(actor)} on Node: #{inspect(Node.self())}")

{time, result} = :timer.tc(&lookup_actor/3, [actor_system, actor_name, actor])
{time, result} = :timer.tc(&lookup_actor/2, [actor_name, actor])

Logger.info(
"Registered and Activated the #{actor_name} on Node #{inspect(Node.self())} in #{inspect(time)}ms"
Expand All @@ -215,8 +201,8 @@ defmodule Actors do
|> Flow.run()
end

defp lookup_actor(actor_system, actor_name, actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor_system, actor) do
defp lookup_actor(actor_name, actor) do
case ActorEntitySupervisor.lookup_or_create_actor(actor) do
{:ok, pid} ->
{:ok, pid}

Expand Down
65 changes: 36 additions & 29 deletions apps/actors/lib/actors/actor/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Actors.Actor.Entity do

alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
ActorDeactivateStrategy,
ActorState,
ActorSnapshotStrategy,
Expand All @@ -17,7 +18,8 @@ defmodule Actors.Actor.Entity do
Context,
ActorInvocation,
ActorInvocationResponse,
InvocationRequest
InvocationRequest,
Value
}

@min_snapshot_threshold 500
Expand All @@ -26,15 +28,14 @@ defmodule Actors.Actor.Entity do
@timeout_factor_range [200, 300, 500, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000]

defmodule EntityState do
defstruct system: nil, actor: nil, state_hash: nil
defstruct actor: nil, state_hash: nil

@type t(system, actor, state_hash) :: %EntityState{
system: system,
@type t(actor, state_hash) :: %EntityState{
actor: actor,
state_hash: state_hash
}

@type t :: %EntityState{system: ActorSystem.t(), actor: Actor.t(), state_hash: binary()}
@type t :: %EntityState{actor: Actor.t(), state_hash: binary()}
end

@impl true
Expand All @@ -43,7 +44,7 @@ defmodule Actors.Actor.Entity do
def init(
%EntityState{
actor: %Actor{
name: name,
actor_id: %ActorId{name: name},
persistent: false,
deactivate_strategy: deactivate_strategy
}
Expand All @@ -64,7 +65,7 @@ defmodule Actors.Actor.Entity do
def init(
%EntityState{
actor: %Actor{
name: name,
actor_id: %ActorId{name: name},
persistent: false,
deactivate_strategy:
%ActorDeactivateStrategy{strategy: deactivate_strategy} = _dstrategy
Expand All @@ -84,7 +85,7 @@ defmodule Actors.Actor.Entity do
def init(
%EntityState{
actor: %Actor{
name: name,
actor_id: %ActorId{name: name},
persistent: true,
snapshot_strategy: %ActorSnapshotStrategy{} = _snapshot_strategy,
deactivate_strategy: deactivate_strategy
Expand All @@ -109,7 +110,7 @@ defmodule Actors.Actor.Entity do
def init(
%EntityState{
actor: %Actor{
name: name,
actor_id: %ActorId{name: name},
persistent: true,
snapshot_strategy: %ActorSnapshotStrategy{} = _snapshot_strategy,
deactivate_strategy: %ActorDeactivateStrategy{strategy: deactivate_strategy}
Expand All @@ -134,7 +135,7 @@ defmodule Actors.Actor.Entity do
def handle_continue(
:load_state,
%EntityState{
actor: %Actor{name: name, state: actor_state} = actor
actor: %Actor{actor_id: %ActorId{name: name}, state: actor_state} = actor
} = state
)
when is_nil(actor_state) do
Expand All @@ -153,7 +154,8 @@ defmodule Actors.Actor.Entity do
def handle_continue(
:load_state,
%EntityState{
actor: %Actor{name: name, state: %ActorState{} = _actor_state} = actor
actor:
%Actor{actor_id: %ActorId{name: name}, state: %ActorState{} = _actor_state} = actor
} = state
) do
Logger.debug(
Expand Down Expand Up @@ -195,21 +197,19 @@ defmodule Actors.Actor.Entity do
def handle_call(
{:invocation_request,
%InvocationRequest{
actor: %Actor{name: name} = _actor,
actor_id: %ActorId{name: _name} = actor,
command_name: command,
value: payload
} = _invocation},
_from,
%EntityState{
system: %ActorSystem{name: actor_system} = _system,
actor: %Actor{state: current_state = _actor_state} = _state_actor
} = state
)
when is_nil(current_state) do
payload =
ActorInvocation.new(
actor_name: name,
actor_system: actor_system,
actor_id: actor,
command_name: command,
value: payload,
current_context: Context.new()
Expand All @@ -227,7 +227,7 @@ defmodule Actors.Actor.Entity do

{:ok, %Tesla.Env{body: body}} ->
with %ActorInvocationResponse{
updated_context: %Context{} = user_ctx
value: %Value{context: %Context{} = user_ctx} = _value
} = resp <- ActorInvocationResponse.decode(body) do
{:reply, {:ok, resp}, update_state(state, user_ctx)}
else
Expand All @@ -254,20 +254,18 @@ defmodule Actors.Actor.Entity do
def handle_call(
{:invocation_request,
%InvocationRequest{
actor: %Actor{name: name} = _actor,
actor_id: %ActorId{} = actor,
command_name: command,
value: payload
} = _invocation},
_from,
%EntityState{
system: %ActorSystem{name: actor_system} = _system,
actor: %Actor{state: %ActorState{state: current_state} = _actor_state} = _state_actor
} = state
) do
payload =
ActorInvocation.new(
actor_name: name,
actor_system: actor_system,
actor_id: actor,
command_name: command,
value: payload,
current_context: Context.new(state: current_state)
Expand All @@ -285,7 +283,7 @@ defmodule Actors.Actor.Entity do

{:ok, %Tesla.Env{body: body}} ->
with %ActorInvocationResponse{
updated_context: %Context{} = user_ctx
value: %Value{context: %Context{} = user_ctx} = _value
} = resp <- ActorInvocationResponse.decode(body) do
{:reply, {:ok, resp}, update_state(state, user_ctx)}
else
Expand Down Expand Up @@ -332,7 +330,7 @@ defmodule Actors.Actor.Entity do
state_hash: old_hash,
actor:
%Actor{
name: name,
actor_id: %ActorId{name: name},
state: %ActorState{} = actor_state,
snapshot_strategy: %ActorSnapshotStrategy{
strategy: {:timeout, %TimeoutStrategy{timeout: timeout}} = snapshot_strategy
Expand Down Expand Up @@ -372,7 +370,7 @@ defmodule Actors.Actor.Entity do
%EntityState{
actor:
%Actor{
name: name,
actor_id: %ActorId{name: name},
deactivate_strategy:
%ActorDeactivateStrategy{strategy: deactivate_strategy} =
_actor_deactivate_strategy
Expand All @@ -393,7 +391,7 @@ defmodule Actors.Actor.Entity do
def handle_info(
message,
%EntityState{
actor: %Actor{name: name, state: actor_state}
actor: %Actor{actor_id: %ActorId{name: name}, state: actor_state}
} = state
)
when is_nil(actor_state) do
Expand All @@ -407,7 +405,7 @@ defmodule Actors.Actor.Entity do
def handle_info(
message,
%EntityState{
actor: %Actor{name: name, state: %ActorState{} = actor_state}
actor: %Actor{actor_id: %ActorId{name: name}, state: %ActorState{} = actor_state}
} = state
) do
Logger.warn(
Expand All @@ -421,8 +419,13 @@ defmodule Actors.Actor.Entity do
@impl true
def terminate(
reason,
%EntityState{actor: %Actor{name: name, persistent: persistent, state: actor_state}} =
_state
%EntityState{
actor: %Actor{
actor_id: %ActorId{name: name},
persistent: persistent,
state: actor_state
}
} = _state
)
when is_nil(actor_state) or persistent == false do
Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
Expand All @@ -431,14 +434,18 @@ defmodule Actors.Actor.Entity do
def terminate(
reason,
%EntityState{
actor: %Actor{name: name, persistent: true, state: %ActorState{} = actor_state}
actor: %Actor{
actor_id: %ActorId{name: name},
persistent: true,
state: %ActorState{} = actor_state
}
} = _state
) do
StateManager.save(name, actor_state)
Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
end

def start_link(%EntityState{actor: %Actor{name: name}} = state) do
def start_link(%EntityState{actor: %Actor{actor_id: %ActorId{name: name}}} = state) do
GenServer.start(__MODULE__, state, name: via(name))
end

Expand Down
8 changes: 4 additions & 4 deletions apps/actors/lib/actors/actor/entity_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Actors.Actor.Entity.Supervisor do
use DynamicSupervisor

alias Eigr.Functions.Protocol.Actors.{Actor, ActorSystem}
alias Eigr.Functions.Protocol.Actors.Actor
alias Actors.Actor.Entity.EntityState

def child_spec() do
Expand All @@ -28,9 +28,9 @@ defmodule Actors.Actor.Entity.Supervisor do
@doc """
Adds a Actor to the dynamic supervisor.
"""
@spec lookup_or_create_actor(ActorSystem.t(), Actor.t()) :: {:ok, any}
def lookup_or_create_actor(actor_system, %Actor{} = actor) do
entity_state = %EntityState{system: actor_system, actor: actor}
@spec lookup_or_create_actor(Actor.t()) :: {:ok, any}
def lookup_or_create_actor(%Actor{} = actor) do
entity_state = %EntityState{actor: actor}

child_spec = %{
id: Actors.Actor.Entity,
Expand Down
Loading