Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi db projection #410

Merged
merged 62 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
75e7fa9
chore: initial projection query DSL implementation
Dec 11, 2024
c42cd46
feat: support complex queries
Dec 11, 2024
2ace782
feat: add support to multi arch images
Dec 15, 2024
18fcc35
chore: update deps
Dec 17, 2024
ce8ba9e
chore: update deps
Dec 17, 2024
206a23e
Merge branch 'main' into feat/projection-query
Dec 23, 2024
08d1684
fix: projection repo start and some tests
Dec 23, 2024
bcbda9b
fix: add child spec to native projection adapter
Dec 23, 2024
70d9cc6
feat: added more tests
Dec 23, 2024
3ded22a
fix: String.chars not implemented
Dec 23, 2024
83e00a0
fix: convert to chars
Dec 23, 2024
01ebe99
try fix
Dec 23, 2024
b463337
handle tuple args
Dec 23, 2024
e604f39
fix: assert tests
Dec 24, 2024
5f1ce25
fix: formatting
Dec 24, 2024
56262cd
translate comments
Dec 24, 2024
fd87283
Merge branch 'main' into feat/projection-query
Dec 25, 2024
f2f393a
feat: add support to group by
Dec 26, 2024
f23300b
feat: added support to having clause
Dec 26, 2024
221b220
refactor: fix some compilation warnings
Dec 26, 2024
a725630
trying nimble_parsec to parse queries
Dec 26, 2024
3964202
Merge branch 'main' into feat/projection-query
Dec 27, 2024
dcf60f4
some progress in query dsl parser
Dec 27, 2024
833641e
Merge branch 'main' into feat/projection-query
Dec 27, 2024
6b07953
feat: create projection tables from protobuf data
Dec 28, 2024
b9eb15f
refactor: minor adjust
Dec 28, 2024
313e4bc
feat: added dynamic table creator
Dec 31, 2024
de0b43f
refactor: rename module
Dec 31, 2024
6f96dfd
working projection table with upsert and query
Dec 31, 2024
968c90a
refact
Dec 31, 2024
d487682
Merge branch 'feat/projection-query' of github.com:eigr/spawn into fe…
Dec 31, 2024
d6f6d55
refact
Dec 31, 2024
ff4e6bc
merge
Dec 31, 2024
1be33d1
querying and table creation working on projection actor initialization
Jan 1, 2025
bc1c549
saving state after callback
Jan 1, 2025
ff323c0
pagination to queries
Jan 1, 2025
996cd6c
tests fix
Jan 1, 2025
d54087e
remove ununsed code
Jan 1, 2025
f661c2e
fix
Jan 1, 2025
17b928d
fix test
Jan 1, 2025
42899e4
fix some errors
Jan 1, 2025
be51d9e
working for mariaDB and native throwing exception
Jan 2, 2025
4b3b697
merge conflict
Jan 2, 2025
e98819a
mix format
Jan 2, 2025
fe39c59
ci with mariadb and postgres statestores test
Jan 2, 2025
1bb303c
ci changes
Jan 2, 2025
62f28ac
ci fix
Jan 2, 2025
bb2d938
ci fix again
Jan 2, 2025
3872d75
ci fix again
Jan 2, 2025
34f1314
password for postgres
Jan 2, 2025
531fb80
ci again
Jan 2, 2025
121bb44
run otp 25
Jan 2, 2025
ec0c9d9
longblob
Jan 2, 2025
6602418
ci fix
Jan 2, 2025
65ad436
ci fix
Jan 2, 2025
2262a9b
remove double mariadb
Jan 2, 2025
d4f0b97
ci fix
Jan 2, 2025
77e0cc4
ci
Jan 2, 2025
dbe8cf4
user admin
Jan 2, 2025
e4e6bca
ci fix
Jan 2, 2025
09832a6
ci fix
Jan 3, 2025
38fc367
comment mariadb
Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
otp: [25. 26]
otp: [25]
elixir: [1.15]

env:
Expand All @@ -25,6 +25,59 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Install and Configure Postgres on Port 5232
run: |
sudo apt-get update
sudo apt-get install -y postgresql postgresql-contrib
sudo service postgresql start

# Set the password for the 'postgres' user to 'postgres'
sudo -u postgres psql -c "ALTER USER postgres WITH PASSWORD 'postgres';"

# Allow password authentication (uncomment in pg_hba.conf)
sudo sed -i "s/^#host all all 127.0.0.1\/32 md5/host all all 127.0.0.1\/32 md5/" /etc/postgresql/*/main/pg_hba.conf
sudo sed -i "s/^#host all all ::1\/128 md5/host all all ::1\/128 md5/" /etc/postgresql/*/main/pg_hba.conf

# Increase max_connections in postgresql.conf
sudo sed -i "s/^#max_connections = [0-9]*/max_connections = 500/" /etc/postgresql/*/main/postgresql.conf

# Restart PostgreSQL to apply changes
sudo service postgresql restart

# Create the database
sudo -u postgres psql -c "CREATE DATABASE \"eigr-functions-db\";"

- name: Shutdown Ubuntu MySQL (SUDO)
run: sudo service mysql stop

# - name: Set up MariaDB
# uses: getong/[email protected]
# with:
# host port: 3307
# container port: 3307
# character set server: 'utf8'
# collation server: 'utf8_general_ci'
# mariadb version: '10.4.10'
# mysql database: 'eigr-functions-db'
#
# - name: Wait for MariaDB to be Ready
# run: |
# for i in {1..10}; do
# if mysqladmin ping -h127.0.0.1 -P3307 --silent; then
# echo "MariaDB is ready!"
# break
# fi
# echo "Waiting for MariaDB..."
# sleep 5
# done
#
# - name: Set up MariaDB User
# run: |
# # Create 'admin' user with password 'admin' and grant privileges
# mysql -h127.0.0.1 -P3307 -uroot -e "CREATE USER IF NOT EXISTS 'admin'@'%' IDENTIFIED BY 'admin';"
# mysql -h127.0.0.1 -P3307 -uroot -e "GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;"
# mysql -h127.0.0.1 -P3307 -uroot -e "FLUSH PRIVILEGES;"

- name: Install NATS with JetStream
run: |
wget https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz
Expand Down Expand Up @@ -66,13 +119,36 @@ jobs:
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
cd ../../

- name: Run tests spawn_statestores
- name: Run tests spawn_statestores_postgres
run: |
cd spawn_statestores/statestores
cd spawn_statestores/statestores_postgres
mix deps.get
MIX_ENV=test PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test \
PROXY_DATABASE_TYPE=postgres \
PROXY_DATABASE_PORT=5432 \
PROXY_DATABASE_USERNAME=postgres \
PROXY_DATABASE_SECRET=postgres \
PROXY_CLUSTER_STRATEGY=gossip \
PROXY_HTTP_PORT=9005 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
elixir --name [email protected] -S mix test
cd ../../

# - name: Run tests spawn_statestores_mariadb
# run: |
# cd spawn_statestores/statestores_mariadb
# mix deps.get
# MIX_ENV=test \
# PROXY_DATABASE_TYPE=mariadb \
# PROXY_DATABASE_PORT=3307 \
# PROXY_DATABASE_USERNAME=admin \
# PROXY_DATABASE_SECRET=admin \
# PROXY_CLUSTER_STRATEGY=gossip \
# PROXY_HTTP_PORT=9005 \
# SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
# elixir --name [email protected] -S mix test
# cd ../../

- name: Run tests statestores_native
run: |
cd spawn_statestores/statestores_native
Expand Down Expand Up @@ -100,3 +176,4 @@ jobs:
# mix deps.get
# MIX_ENV=test PROXY_DATABASE_TYPE=mysql PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
# cd ../../

7 changes: 3 additions & 4 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Actors.Actor.Entity.Invocation do
}

alias Spawn.Utils.Nats
alias Statestores.Manager.StateManager

import Spawn.Utils.AnySerializer,
only: [any_pack!: 1, any_unpack!: 2, normalize_package_name: 1]
Expand Down Expand Up @@ -474,8 +475,7 @@ defmodule Actors.Actor.Entity.Invocation do
|> normalize_package_name()

{:ok, results} =
Statestores.Projection.Query.DynamicTableDataHandler.query(
load_projection_adapter(),
StateManager.projection_query(
state_type,
view.query,
any_unpack!(request.payload |> elem(1), view.input_type),
Expand Down Expand Up @@ -637,8 +637,7 @@ defmodule Actors.Actor.Entity.Invocation do
Macro.underscore(id.parent)
end

Statestores.Projection.Query.DynamicTableDataHandler.upsert(
load_projection_adapter(),
StateManager.projection_upsert(
state_type,
table_name,
any_unpack!(response.updated_context.state, state_type)
Expand Down
7 changes: 1 addition & 6 deletions lib/actors/actor/entity/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
Macro.underscore(actor.id.parent)
end

:ok =
DynamicTableCreator.create_or_update_table(
load_projection_adapter(),
state_type,
table_name
)
:ok = StateManager.projection_create_or_update_table(state_type, table_name)

StreamInitiator.init_projection_stream(actor)
end
Expand Down
86 changes: 18 additions & 68 deletions lib/actors/actor/state_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
alias Statestores.Schemas.Snapshot
alias Statestores.Manager.StateManager, as: StateStoreManager

def projection_create_or_update_table(projection_type, table_name) do
StateStoreManager.projection_create_or_update_table(projection_type, table_name)
end

def projection_upsert(projection_type, table_name, data) do
StateStoreManager.projection_upsert(projection_type, table_name, data)
end

def projection_query(projection_type, query, params, opts) do
StateStoreManager.projection_query(projection_type, query, params, opts)
end

def is_new?(_old_hash, new_state) when is_nil(new_state), do: false

def is_new?(old_hash, new_state) do
with bytes_from_state <- Any.encode(new_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
old_hash != hash
else
_ ->
false
end
catch
_kind, error ->
Expand Down Expand Up @@ -78,69 +87,6 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
{:error, error}
end

@spec load_all(ActorId.t()) :: {:ok, term()} | :not_found | {:error, term()}
def load_all(%ActorId{} = actor_id) do
key = generate_key(actor_id)

snapshots = StateStoreManager.load_all(key)

results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev

{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)

if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end

@spec load_by_interval(ActorId.t(), String.t(), String.t()) ::
{:ok, term()} | :not_found | {:error, term()}
def load_by_interval(%ActorId{} = actor_id, time_start, time_end) do
key = generate_key(actor_id)

snapshots = StateStoreManager.load_by_interval(key, time_start, time_end)

results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev

{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)

if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end

@spec save(ActorId.t(), Spawn.Actors.ActorState.t(), Keyword.t()) ::
{:ok, Spawn.Actors.ActorState.t()}
| {:error, any(), Spawn.Actors.ActorState.t()}
Expand Down Expand Up @@ -285,10 +231,14 @@ else
def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message)
def load(_actor_id), do: raise(@not_loaded_message)
def load(_actor_id, _), do: raise(@not_loaded_message)
def load_all(_), do: raise(@not_loaded_message)
def load_by_interval(_, _, _), do: raise(@not_loaded_message)
def save(_actor_id, _state), do: raise(@not_loaded_message)
def save(_actor_id, _state, _opts), do: raise(@not_loaded_message)
def save_async(_actor_id, _state, _timeout), do: raise(@not_loaded_message)

def projection_create_or_update_table(_projection_type, _table_name),
do: raise(@not_loaded_message)

def projection_upsert(_projection_type, _table_name, _data), do: raise(@not_loaded_message)
def projection_query(_projection_type, _query, _params, _opts), do: raise(@not_loaded_message)
end
end
19 changes: 14 additions & 5 deletions lib/actors/actor/state_manager_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ defmodule Actors.Actor.StateManager.Behaviour do
to be saved to persistent storage using database drivers.
"""

@type projection_type :: module()
@type table_name :: String.t()
@type data :: struct()
@type query :: String.t()
@type params :: struct()
@type opts :: Keyword.t()

@callback is_new?(String.t(), any()) :: {:error, term()} | boolean()

@callback load(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load(String.t(), number()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load_all(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load_by_interval(String.t(), String.t(), String.t()) ::
{:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback save(String.t(), term(), Keyword.t()) ::
{:ok, term(), String.t()}
| {:error, term(), term(), term()}
Expand All @@ -24,4 +26,11 @@ defmodule Actors.Actor.StateManager.Behaviour do
{:ok, term(), String.t()}
| {:error, term(), term(), term()}
| {:error, term(), term()}

@callback projection_create_or_update_table(projection_type(), table_name()) :: :ok

@callback projection_upsert(projection_type(), table_name(), data()) :: :ok

@callback projection_query(projection_type(), query(), params(), opts()) ::
{:error, term()} | {:ok, data()}
end
2 changes: 1 addition & 1 deletion spawn_activators/activator/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
"protobuf_generate": {:hex, :protobuf_generate, "0.1.3", "57841bc60e2135e190748119d83f78669ee7820c0ad6555ada3cd3cd7df93143", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "dae4139b00ba77a279251a0ceb5593b1bae745e333b4ce1ab7e81e8e4906016b"},
"rabbit_common": {:hex, :rabbit_common, "3.12.13", "a163432b377411d6033344d5f6a8b12443d67c897c9374b9738cc609cab3161c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "26a400f76976e66efd9cdab29a36dd4b129466d431c4e014aae9d2e36fefef44"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
Expand Down
2 changes: 1 addition & 1 deletion spawn_operator/spawn_operator/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
"protobuf_generate": {:hex, :protobuf_generate, "0.1.2", "45b9a9ae8606333cdea993ceaaecd799d206cdfe23348d37c06207eac76cbee6", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "55b0ff8385703317ca90e1bd30a2ece99e80ae0c73e6ebcfb374e84e57870d61"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
Expand Down
Loading
Loading