Skip to content

Commit

Permalink
Merge pull request #419 from eigr/flow-meta-and-backoff-on-fail-option
Browse files Browse the repository at this point in the history
Fail backoff and fail metadata
  • Loading branch information
sleipnir authored Jan 24, 2025
2 parents 98ab747 + 1cf415f commit 70100a1
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 84 deletions.
4 changes: 4 additions & 0 deletions lib/_generated/spawn/actors/healthcheck.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckReply do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
@moduledoc false

use GRPC.Service,
name: "spawn.actors.healthcheck.HealthCheckActor",
protoc_gen_elixir_version: "0.14.0"
Expand Down Expand Up @@ -717,6 +719,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Stub do
@moduledoc false

use GRPC.Stub, service: Spawn.Actors.Healthcheck.HealthCheckActor.Service
end

Expand Down
30 changes: 30 additions & 0 deletions lib/_generated/spawn/actors/protocol.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,20 @@ defmodule Spawn.Pipe do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1005,6 +1019,7 @@ defmodule Spawn.Pipe do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Forward do
Expand Down Expand Up @@ -1043,6 +1058,20 @@ defmodule Spawn.Forward do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1059,6 +1088,7 @@ defmodule Spawn.Forward do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Fact.MetadataEntry do
Expand Down
15 changes: 13 additions & 2 deletions lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ defmodule Actors.Actor.CallerConsumer do
ActorSettings,
ActorSystem,
Registry,
ActorOpts,
TimeoutStrategy,
ProjectionSettings,
ActorDeactivationStrategy,
Expand Down Expand Up @@ -565,6 +564,16 @@ defmodule Actors.Actor.CallerConsumer do
timeout =
case metadata["request-timeout"] do
nil -> 60_000
value -> String.to_integer(value)
end

# when a invoke errors or throws an exception
# we can backoff or not
fail_backoff =
case metadata["fail_backoff"] do
nil -> false
"false" -> false
"true" -> true
value -> value
end

Expand Down Expand Up @@ -635,7 +644,9 @@ defmodule Actors.Actor.CallerConsumer do
{:halt, result}

{:error, :actor_invoke, error} ->
{:halt, {:error, error}}
keep_retrying_action = if fail_backoff, do: :cont, else: :halt

{keep_retrying_action, {:error, error}}

{:error, _msg} = result ->
{:cont, result}
Expand Down
46 changes: 31 additions & 15 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ defmodule Actors.Actor.Entity.Invocation do
name = Map.get(message.metadata, "actor-name")
source_action = Map.get(message.metadata, "actor-action")

action_metadata =
case Map.get(message.metadata, "action-metadata") do
nil -> %{}
metadata -> Jason.decode!(metadata)
end

action =
actor.settings.projection_settings.subjects
|> Enum.find(fn subject -> subject.source_action == source_action end)
Expand All @@ -82,7 +88,7 @@ defmodule Actors.Actor.Entity.Invocation do
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
metadata: action_metadata,
action_name: action,
payload: {:value, Google.Protobuf.Any.decode(message.state)},
caller: %ActorId{name: name, system: system_name, parent: parent}
Expand Down Expand Up @@ -404,7 +410,7 @@ defmodule Actors.Actor.Entity.Invocation do
response_checkpoint(response, checkpoint, revision, state)
end

defp is_authorized?(invocation, actions, timers) do
defp is_authorized?(invocation, _actions, _timers) do
acl_manager = get_acl_manager()

acl_manager.get_policies!()
Expand Down Expand Up @@ -580,7 +586,7 @@ defmodule Actors.Actor.Entity.Invocation do
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

response
end
Expand All @@ -595,12 +601,14 @@ defmodule Actors.Actor.Entity.Invocation do
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, _state, response) do
defp do_handle_projection(id, request, %{sourceable: true} = _settings, _state, response) do
action = request.action_name

stream_name = StreamInitiator.stream_name(id)
id_name = String.replace(id.name, ".", "-")

Expand All @@ -615,19 +623,20 @@ defmodule Actors.Actor.Entity.Invocation do
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
{"Actor-Action", "#{action}"},
{"Action-Metadata", Jason.encode!(request.current_context.metadata)}
]
)
end

defp do_handle_projection(
id,
action,
request,
_settings,
%EntityState{actor: %Actor{settings: %ActorSettings{kind: :PROJECTION}}} = state,
response
) do
if :persistent_term.get("view-#{id.name}-#{action}", false) do
if :persistent_term.get("view-#{id.name}-#{request.action_name}", false) do
# no need to persist any state since this is a view only action
:ok
else
Expand All @@ -650,7 +659,7 @@ defmodule Actors.Actor.Entity.Invocation do
end
end

defp do_handle_projection(_id, _action, _settings, _state, _response), do: :ok
defp do_handle_projection(_id, _request, _settings, _state, _response), do: :ok

defp do_run_workflow(
_request,
Expand All @@ -671,7 +680,7 @@ defmodule Actors.Actor.Entity.Invocation do
opts
) do
Tracer.with_span "run-workflow" do
do_side_effects(effects, opts)
do_side_effects(request, effects, opts)
do_broadcast(request, broadcast, opts)
do_handle_routing(request, response, opts)
end
Expand All @@ -689,7 +698,8 @@ defmodule Actors.Actor.Entity.Invocation do

defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name}
actor: %ActorId{name: caller_actor_name, system: system_name},
current_context: %Context{metadata: metadata}
},
%ActorInvocationResponse{
payload: payload,
Expand All @@ -708,6 +718,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
Expand Down Expand Up @@ -737,7 +748,8 @@ defmodule Actors.Actor.Entity.Invocation do
defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name},
payload: payload
payload: payload,
current_context: %Context{metadata: metadata}
} = _request,
%ActorInvocationResponse{
workflow:
Expand All @@ -756,6 +768,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
Expand Down Expand Up @@ -810,13 +823,13 @@ defmodule Actors.Actor.Entity.Invocation do
:noreply
end

def do_side_effects(effects, opts \\ [])
def do_side_effects(request, effects, opts \\ [])

def do_side_effects(effects, _opts) when effects == [] do
def do_side_effects(_request, effects, _opts) when effects == [] do
:ok
end

def do_side_effects(effects, _opts) when is_list(effects) do
def do_side_effects(request, effects, _opts) when is_list(effects) do
Tracer.with_span "handle-side-effects" do
try do
spawn(fn ->
Expand All @@ -830,6 +843,9 @@ defmodule Actors.Actor.Entity.Invocation do
} = invocation
} ->
try do
metadata = Map.merge(request.current_context.metadata, invocation.metadata)
invocation = %InvocationRequest{invocation | metadata: metadata}

Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
Expand Down
9 changes: 0 additions & 9 deletions lib/spawn/cluster/cluster_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,12 @@ defmodule Spawn.Cluster.ClusterResolver do
service = Keyword.fetch!(config, :service)
resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a))

IO.inspect(app_name, label: "Using application name ---------------------")
IO.inspect(service, label: "Using service ---------------------")
IO.inspect(resolver, label: "Using resolver ---------------------")
IO.inspect(Node.get_cookie(), label: "Using node cookie ---------------------")

cond do
app_name != nil and service != nil ->
headless_service = to_charlist(service)

IO.inspect(headless_service, label: "Using headless service ---------------------")

case resolver.(headless_service) do
{:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} ->
IO.inspect(addresses, label: "Using addresses ---------------------")
parse_response(addresses, app_name)

{:error, reason} ->
Expand Down Expand Up @@ -135,6 +127,5 @@ defmodule Spawn.Cluster.ClusterResolver do
|> Enum.map(&:inet_parse.ntoa(&1))
|> Enum.map(&"#{app_name}@#{&1}")
|> Enum.map(&String.to_atom(&1))
|> IO.inspect(label: "Parsed addresses ---------------------")
end
end
6 changes: 6 additions & 0 deletions priv/protos/spawn/actors/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ message Pipe {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Sends the input of a action of an Actor to the input of another action of an
Expand All @@ -291,6 +294,9 @@ message Forward {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Facts are emitted by actions and represent the internal state of the moment
Expand Down
52 changes: 21 additions & 31 deletions spawn_sdk/spawn_sdk/lib/defact.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,27 @@ defmodule SpawnSdk.Defact do
)

def handle_action({unquote(action_name), payload}, context) do
try do
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
rescue
e ->
reraise SpawnSdk.Actor.MalformedActor,
[
message: "Error in action=#{unquote(action_name)} error=#{inspect(e)}",
exception: e
],
__STACKTRACE__
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
end
end
Expand Down
Loading

0 comments on commit 70100a1

Please sign in to comment.