Skip to content

Commit

Permalink
New shard position and name replacement strings, to avoid conflicting…
Browse files Browse the repository at this point in the history
… `$1` with PostgreSQL's param resolution
  • Loading branch information
alfredbaudisch committed May 11, 2023
1 parent 566d0ea commit 9c449a7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 28 deletions.
25 changes: 16 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Add `ecto_instashard` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[{:ecto_instashard, "~> 0.5"}]
[{:ecto_instashard, "~> 0.6"}]
end
```

Expand Down Expand Up @@ -140,33 +140,40 @@ end
```

### <a name="scripts"></a>Database Scripts and Migrations
InstaShard can create your sharded tables and dynamic PostgreSQL schemas if you provide the related SQL scripts. Each DDL command must be separated by two blank lines. Whenever you want to have the logical shard position defined, add `$1` to the DDL. Save the scripts in the folder `scripts` in your application and fill the list of SQL scripts as atoms in the Shard configuration, in the `scripts` key (as per previous example).
InstaShard can create your sharded tables and dynamic PostgreSQL schemas if you provide the related SQL scripts. Each DDL command must be separated by two blank lines.

Replacement strings:
- `$shard_pos$`: logical shard position.
- `$shard_name$`: logical shard name, example: `shard0`.
- `$shard$`: logical shard name followed by dot `.`, example: `shard0.`. Usage: `$shard$some_table_name`.

Save the scripts in the folder `scripts` in your application and fill the list of SQL scripts as atoms in the Shard configuration, in the `scripts` key (as per previous example).

Examples of scripts (these scripts are also inside `test/scripts`):

```sql
CREATE SCHEMA shard$1;
CREATE SCHEMA $shard_name$;

CREATE SEQUENCE shard$1.message_seq;
CREATE SEQUENCE $shard_name$.message_seq;

CREATE OR REPLACE FUNCTION shard$1.next_id(OUT result bigint) AS $$
CREATE OR REPLACE FUNCTION $shard_name$.next_id(OUT result bigint) AS $$
DECLARE
our_epoch bigint := 1314220021721;
seq_id bigint;
now_millis bigint;
shard_id int := $1;
shard_id int := $shard_pos$;
max_shard_id bigint := 2048;
BEGIN
SELECT nextval('shard$1.message_seq') % max_shard_id INTO seq_id;
SELECT nextval('$shard_name$.message_seq') % max_shard_id INTO seq_id;
SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
result := (now_millis - our_epoch) << 23;
result := result | (shard_id << 10);
result := result | (seq_id);
END;
$$ LANGUAGE PLPGSQL;

CREATE TABLE shard$1.messages (
id bigint not null default shard$1.next_id(),
CREATE TABLE $shard_name$.messages (
id bigint not null default $shard_name$.next_id(),
user_id int not null,
message text NOT NULL,
inserted_at timestamp with time zone default now() not null,
Expand Down
13 changes: 8 additions & 5 deletions lib/ecto_instashard/sharding.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ defmodule Ecto.InstaShard.Sharding do
end, Macro.Env.location(__ENV__))
end

def replace_and_run_script_sql(mod, script, param, directory \\ "scripts") do
def replace_and_run_script_sql(mod, script, pos, directory \\ "scripts") do
sql_file_to_string(script, directory)
|> Enum.each(&replace_and_run_sql(&1, mod, param))
|> Enum.each(&replace_and_run_sql(&1, mod, pos))
end

def replace_and_run_sql("", _mod, _param), do: :ok
def replace_and_run_sql("", _mod, _pos), do: :ok

def replace_and_run_sql(sql, mod, param) do
String.replace(sql, "$1", "#{param}")
def replace_and_run_sql(sql, mod, pos) do
sql
|> String.replace("$shard_pos$", "#{pos}")
|> String.replace("$shard_name$", "shard#{pos}")
|> String.replace("$shard$", "shard#{pos}.")
|> mod.run()
end

Expand Down
17 changes: 11 additions & 6 deletions lib/ecto_instashard/sharding/setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,21 @@ defmodule Ecto.InstaShard.Sharding.Setup do
end

def sql_script_all_shards(script, directory \\ "scripts") do
run_all_shards(fn(n, mod) ->
run_all_shards(fn(pos, mod) ->
mod.transaction(fn ->
replace_and_run_script_sql(mod, script, n, directory)
replace_and_run_script_sql(mod, script, pos, directory)
end)
end)
end

def sql_all_shards(sql) do
run_all_shards(fn(n, mod) ->
Ecto.Adapters.SQL.query!(mod, sql |> String.replace("{shard}", "shard#{n}."))
run_all_shards(fn(pos, mod) ->
Ecto.Adapters.SQL.query!(mod,
sql
|> String.replace("$shard_pos$", "#{pos}")
|> String.replace("$shard_name$", "shard#{pos}")
|> String.replace("$shard$", "shard#{pos}.")
)
end)
end

Expand Down Expand Up @@ -130,10 +135,10 @@ defmodule Ecto.InstaShard.Sharding.Setup do
repository_module(physical)
end

def create_tables(mod, n, directory) do
def create_tables(mod, pos, directory) do
mod.transaction(fn ->
Enum.map(unquote(config[:scripts]), fn(script) ->
replace_and_run_script_sql(mod, script, n, directory)
replace_and_run_script_sql(mod, script, pos, directory)
end)
end)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Ecto.InstaShard.Mixfile do

def project do
[app: :ecto_instashard,
version: "0.5.0",
version: "0.6.0",
elixir: "~> 1.3",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
Expand Down
14 changes: 7 additions & 7 deletions test/scripts/messages.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
CREATE SCHEMA shard$1;
CREATE SCHEMA $shard_name$;

CREATE SEQUENCE shard$1.message_seq;
CREATE SEQUENCE $shard_name$.message_seq;

CREATE OR REPLACE FUNCTION shard$1.next_id(OUT result bigint) AS $$
CREATE OR REPLACE FUNCTION $shard_name$.next_id(OUT result bigint) AS $$
DECLARE
our_epoch bigint := 1314220021721;
seq_id bigint;
now_millis bigint;
shard_id int := $1;
shard_id int := $shard_pos$;
max_shard_id bigint := 1024;
BEGIN
SELECT nextval('shard$1.message_seq') % max_shard_id INTO seq_id;
SELECT nextval('$shard_name$.message_seq') % max_shard_id INTO seq_id;
SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
result := (now_millis - our_epoch) << 23;
result := result | (shard_id << 10);
result := result | (seq_id);
END;
$$ LANGUAGE PLPGSQL;

CREATE TABLE shard$1.messages (
id bigint not null default shard$1.next_id(),
CREATE TABLE $shard_name$.messages (
id bigint not null default $shard_name$.next_id(),
user_id int not null,
message text NOT NULL,
inserted_at timestamp with time zone default now() not null,
Expand Down

0 comments on commit 9c449a7

Please sign in to comment.