Skip to content

Commit

Permalink
Merge pull request #19 from cheerfulstoic/dont-require-update-type-fo…
Browse files Browse the repository at this point in the history
…r-labeled-events

change: Don't require specifying update type for watchers with labels
  • Loading branch information
cheerfulstoic authored Aug 7, 2024
2 parents e9824a0 + d607d39 commit 968ae51
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 170 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2023-08-01

### Changed

- BREAKING: Don't require specifying update type for watchers with labels (#19)

### Added

- Allow watchers without an ecto schema (thanks @frerich / #18)

## [0.7.0] - 2023-07-31

### Changed
Expand Down
66 changes: 34 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ By getting updates directly from PostgreSQL, EctoWatch ensures that messages are
To use EctoWatch, you need to add it to your supervision tree and specify watchers for Ecto schemas and update types. It would look something like this in your `application.ex` file (after `MyApp.Repo` and `MyApp.PubSub`):

```elixir
alias MyApp.Accounts.User
alias MyApp.Accounts.Package

{EctoWatch,
repo: MyApp.Repo,
pub_sub: MyApp.PubSub,
watchers: [
{MyApp.Accounts.User, :inserted},
{MyApp.Accounts.User, :updated},
{MyApp.Accounts.User, :deleted},
{MyApp.Shipping.Package, :inserted},
{MyApp.Shipping.Package, :updated}
{User, :inserted},
{User, :updated},
{User, :deleted},
{Package, :inserted},
{Package, :updated}
]}
```

Expand All @@ -45,50 +48,49 @@ This will setup:
Then any process (e.g. a GenServer, a LiveView, a Phoenix channel, etc...) can subscribe to messages like so:

```elixir
EctoWatch.subscribe(MyApp.Accounts.User, :inserted)
EctoWatch.subscribe(MyApp.Accounts.User, :updated)
EctoWatch.subscribe(MyApp.Accounts.User, :deleted)
EctoWatch.subscribe(User, :inserted)
EctoWatch.subscribe(User, :updated)
EctoWatch.subscribe(User, :deleted)

EctoWatch.subscribe(MyApp.Shipping.Package, :inserted)
EctoWatch.subscribe(MyApp.Shipping.Package, :updated)
EctoWatch.subscribe(Package, :inserted)
EctoWatch.subscribe(Package, :updated)
```

(note that if you are subscribing in a LiveView `mount` callback you should subscribe inside of a `if connected?(socket) do` to avoid subscribing twice).

You can also subscribe to individual records:

```elixir
EctoWatch.subscribe(MyApp.Accounts.User, :updated, user.id)
EctoWatch.subscribe(MyApp.Accounts.User, :deleted, user.id)
EctoWatch.subscribe(User, :updated, user.id)
EctoWatch.subscribe(User, :deleted, user.id)
```

... OR you can subscribe to records by an association column (but the given column must be in the `extra_columns` list for the watcher! See below for more info on the `extra_columns` option):

```elixir
EctoWatch.subscribe(MyApp.Accounts.User, :updated, {:role_id, role.id})
EctoWatch.subscribe(MyApp.Accounts.User, :deleted, {:role_id, role.id})
EctoWatch.subscribe(User, :updated, {:role_id, role.id})
EctoWatch.subscribe(User, :deleted, {:role_id, role.id})
```

Once subscribed, messages can be handled like so (LiveView example are given here but `handle_info` callbacks can be used elsewhere as well):

```elixir
def handle_info({:inserted, MyApp.Accounts.User, %{id: id}}, socket) do
def handle_info({{User, :inserted}, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)

{:noreply, socket}
end

def handle_info({:updated, MyApp.Accounts.User, %{id: id}}, socket) do
def handle_info({{User, :updated}, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)

{:noreply, socket}
end

def handle_info({:deleted, MyApp.Accounts.User, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_delete(socket, :users, user)
def handle_info({{User, :deleted}, %{id: id}}, socket) do
socket = stream_delete_by_dom_id(socket, :songs, "users-#{id}")

{:noreply, socket}
end
Expand All @@ -105,17 +107,17 @@ You can also setup the database to trigger only on specific column changes on `:
pub_sub: MyApp.PubSub,
watchers: [
# ...
{MyApp.Accounts.User, :updated, trigger_columns: [:email, :phone], label: :user_contact_info_updated},
{User, :updated, trigger_columns: [:email, :phone], label: :user_contact_info_updated},
# ...
]}

# subscribing
EctoWatch.subscribe(:user_contact_info_updated, :updated)
EctoWatch.subscribe(:user_contact_info_updated)
# or...
EctoWatch.subscribe(:user_contact_info_updated, :updated, package.id)
EctoWatch.subscribe(:user_contact_info_updated, package.id)

# handling messages
def handle_info({:updated, :user_contact_info_updated, %{id: id}}, socket) do
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
```

A label is required for two reasons:
Expand All @@ -132,17 +134,17 @@ You can also use labels in general without tracking specific columns:
pub_sub: MyApp.PubSub,
watchers: [
# ...
{MyApp.Accounts.User, :updated, label: :user_update},
{User, :updated, label: :user_update},
# ...
]}

# subscribing
EctoWatch.subscribe(:user_update, :updated)
EctoWatch.subscribe(:user_update)
# or...
EctoWatch.subscribe(:user_update, :updated, package.id)
EctoWatch.subscribe(:user_update, package.id)

# handling messages
def handle_info({:updated, :user_update, %{id: id}}, socket) do
def handle_info({:user_update, %{id: id}}, socket) do
```

## Getting additional values
Expand All @@ -166,16 +168,16 @@ If you would like to get more than just the `id` from the record, you can use th
pub_sub: MyApp.PubSub,
watchers: [
# ...
{MyApp.Posts.Comment, :deleted, extra_columns: [:post_id]},
{Comment, :deleted, extra_columns: [:post_id]},
# ...
]}

# subscribing
EctoWatch.subscribe(MyApp.Posts.Comment, :deleted)
EctoWatch.subscribe(Comment, :deleted)

# handling messages
def handle_info({:deleted, MyApp.Posts.Comment, %{id: id, post_id: post_id}}, socket) do
MyApp.Posts.refresh_cache(post_id)
def handle_info({{Comment, :deleted}, %{id: id, post_id: post_id}}, socket) do
Posts.refresh_cache(post_id)
```

## Watching without a schema
Expand Down Expand Up @@ -257,7 +259,7 @@ by adding `ecto_watch` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:ecto_watch, "~> 0.7.0"}
{:ecto_watch, "~> 0.8.0"}
]
end
```
Expand Down
153 changes: 130 additions & 23 deletions lib/ecto_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,153 @@ defmodule EctoWatch do
@moduledoc false

alias EctoWatch.WatcherServer
alias EctoWatch.Helpers

use Supervisor

def subscribe(schema_mod_or_label, update_type, id \\ nil) do
if !Process.whereis(__MODULE__) do
raise "EctoWatch is not running. Please start it by adding it to your supervision tree or using EctoWatch.start_link/1"
def subscribe(schema_mod_or_label, update_type, id) when is_atom(schema_mod_or_label) do
if Helpers.ecto_schema_mod?(schema_mod_or_label) do
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe({#{inspect(schema_mod_or_label)}, #{inspect(update_type)}}, #{id})
IMPORTANT NOTE: The messages that you receive have also changed!
Before:
# labels:
def handle_info({:updated, User, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, :updated, %{id: id}}, socket) do
Now:
# labels:
def handle_info({{User, :updated}, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
See the updated documentation for subscribing."
"""
else
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe(#{inspect(schema_mod_or_label)}, #{id})
Before:
# labels:
def handle_info({:updated, User, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, :updated, %{id: id}}, socket) do
Now:
# labels:
def handle_info({{User, :updated}, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
See the updated documentation for subscribing."
"""
end
end

def subscribe(identifier, id \\ nil) do
if is_atom(identifier) && id in ~w[inserted updated deleted]a do
if Helpers.ecto_schema_mod?(identifier) do
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe({#{inspect(identifier)}, #{inspect(id)}})
Before:
# labels:
def handle_info({:updated, User, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, :updated, %{id: id}}, socket) do
Now:
# labels:
def handle_info({{User, :updated}, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
See the updated documentation for subscribing."
"""
else
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe(#{inspect(identifier)})
Before:
# labels:
def handle_info({:updated, User, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, :updated, %{id: id}}, socket) do
Now:
# labels:
def handle_info({{User, :updated}, %{id: id}}, socket) do
# schemas:
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
See the updated documentation for subscribing."
"""
end
end

with :ok <- check_update_args(update_type, id),
validate_ecto_watch_running!()

with :ok <- validate_identifier(identifier),
{:ok, {pub_sub_mod, channel_name}} <-
WatcherServer.pub_sub_subscription_details(schema_mod_or_label, update_type, id) do
WatcherServer.pub_sub_subscription_details(identifier, id) do
Phoenix.PubSub.subscribe(pub_sub_mod, channel_name)
else
{:error, error} ->
raise ArgumentError, error
end
end

def check_update_args(update_type, id) do
case {update_type, id} do
{:inserted, _} ->
:ok
defp validate_identifier({schema_mod, update_type})
when is_atom(schema_mod) and is_atom(update_type) do
cond do
!EctoWatch.Helpers.ecto_schema_mod?(schema_mod) ->
raise ArgumentError,
"Expected atom to be an Ecto schema module. Got: #{inspect(schema_mod)}"

{:updated, _} ->
:ok
update_type not in ~w[inserted updated deleted]a ->
raise ArgumentError,
"Unexpected update_type: #{inspect(update_type)}. Expected :inserted, :updated, or :deleted"

{:deleted, _} ->
true ->
:ok
end
end

{other, _} ->
raise ArgumentError,
"Unexpected update_type: #{inspect(other)}. Expected :inserted, :updated, or :deleted"
defp validate_identifier(label) when is_atom(label) do
:ok
end

defp validate_identifier(other) do
raise ArgumentError,
"Invalid subscription (expected either `{schema_module, :inserted | :updated | :deleted}` or a label): #{inspect(other)}"
end

defp validate_ecto_watch_running! do
if !Process.whereis(__MODULE__) do
raise "EctoWatch is not running. Please start it by adding it to your supervision tree or using EctoWatch.start_link/1"
end
end

Expand Down Expand Up @@ -63,14 +178,6 @@ defmodule EctoWatch do
{EctoWatch.WatcherSupervisor, options}
]

# children = children ++
# Enum.map(options.watchers, fn watcher_options ->
# %{
# id: WatcherServer.name(watcher_options),
# start: {WatcherServer, :start_link, [{options.repo_mod, options.pub_sub_mod, watcher_options}]}
# }
# end)

Supervisor.init(children, strategy: :rest_for_one)
end
end
Loading

0 comments on commit 968ae51

Please sign in to comment.