Skip to content

Commit

Permalink
Add and configure Oban and setup email job
Browse files Browse the repository at this point in the history
- Install Oban and configure according to docs: https://hexdocs.pm/oban/Oban.html
- Setup error handling according to docs: https://hexdocs.pm/oban/expected-failures.html#content
- Add reporting to Sentry via: https://hexdocs.pm/oban/Oban.html#module-reporting-errors
  • Loading branch information
lleger committed Nov 11, 2020
1 parent 87b6d88 commit 2756719
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 45 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The following technologies are included and configured with our defaults:
- [AssertIdentity] for easy Ecto identity assertions in tests
- [phx_gen_auth] for a user authentication system
- [Bamboo] for sending emails
- [Oban] for background jobs
- [asdf] for managing runtime versions
- [Docker] for building release containers
- [Mix Releases] for compiling release binaries
Expand Down Expand Up @@ -57,6 +58,7 @@ Elixir and Phoenix][services] development as part of our portfolio of services.
[assertidentity]: https://github.com/newaperio/assert_identity/
[phx_gen_auth]: https://github.com/aaronrenner/phx_gen_auth
[bamboo]: https://github.com/thoughtbot/bamboo
[oban]: https://github.com/sorentwo/oban/
[asdf]: https://asdf-vm.com/
[docker]: https://docs.docker.com/
[mix releases]: https://hexdocs.pm/mix/Mix.Tasks.Release.html
Expand Down
6 changes: 6 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ config :phoenix, :json_library, Jason
config :phoenix_starter, PhoenixStarter.Email,
default_from: {"PhoenixStarter", "[email protected]"}

# Configures Oban
config :phoenix_starter, Oban,
repo: PhoenixStarter.Repo,
plugins: [Oban.Plugins.Pruner],
queues: [default: 10, emails: 10]

# Configures Sentry
config :sentry,
enable_source_code_context: true,
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ config :logger, level: :warn
# Configures Bamboo
config :phoenix_starter, PhoenixStarter.Mailer, adapter: Bamboo.TestAdapter

# Configures Oban
config :phoenix_starter, Oban, crontab: false, queues: false, plugins: false

# Configures Sentry
config :sentry, environment_name: "test"
18 changes: 17 additions & 1 deletion lib/phoenix_starter/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule PhoenixStarter.Application do

@impl true
def start(_type, _args) do
attach_telemetry_handlers()

children = [
# Start the Ecto repository
PhoenixStarter.Repo,
Expand All @@ -15,9 +17,10 @@ defmodule PhoenixStarter.Application do
# Start the PubSub system
{Phoenix.PubSub, name: PhoenixStarter.PubSub},
# Start the Endpoint (http/https)
PhoenixStarterWeb.Endpoint
PhoenixStarterWeb.Endpoint,
# Start a worker by calling: PhoenixStarter.Worker.start_link(arg)
# {PhoenixStarter.Worker, arg}
{Oban, oban_config()}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand All @@ -33,4 +36,17 @@ defmodule PhoenixStarter.Application do
PhoenixStarterWeb.Endpoint.config_change(changed, removed)
:ok
end

defp attach_telemetry_handlers do
:telemetry.attach_many(
"oban-errors",
[[:oban, :job, :exception], [:oban, :circuit, :trip]],
&PhoenixStarter.Workers.ErrorReporter.handle_event/4,
%{}
)
end

defp oban_config do
Application.get_env(:phoenix_starter, Oban)
end
end
55 changes: 37 additions & 18 deletions lib/phoenix_starter/users.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,28 @@ defmodule PhoenixStarter.Users do
{:ok, %Bamboo.Email{}}
"""
@spec deliver_update_email_instructions(User.t(), String.t(), (String.t() -> String.t())) ::
{:ok, Bamboo.Email.t()}
def deliver_update_email_instructions(%User{} = user, current_email, update_email_url_fun)
@spec deliver_update_email_instructions(
User.t(),
String.t(),
(String.t() -> String.t()),
boolean
) :: UserNotifier.notifier_result()
def deliver_update_email_instructions(
%User{} = user,
current_email,
update_email_url_fun,
async \\ true
)
when is_function(update_email_url_fun, 1) do
{encoded_token, user_token} = UserToken.build_email_token(user, "change:#{current_email}")

Repo.insert!(user_token)

{:ok,
UserNotifier.deliver_update_email_instructions(user, update_email_url_fun.(encoded_token))}
UserNotifier.deliver_update_email_instructions(
user,
update_email_url_fun.(encoded_token),
async
)
end

@doc """
Expand Down Expand Up @@ -273,18 +285,21 @@ defmodule PhoenixStarter.Users do
{:error, :already_confirmed}
"""
@spec deliver_user_confirmation_instructions(User.t(), (String.t() -> String.t())) ::
{:ok, Bamboo.Email.t()} | {:error, atom()}
def deliver_user_confirmation_instructions(%User{} = user, confirmation_url_fun)
@spec deliver_user_confirmation_instructions(User.t(), (String.t() -> String.t()), boolean) ::
UserNotifier.notifier_result() | {:error, atom()}
def deliver_user_confirmation_instructions(%User{} = user, confirmation_url_fun, async \\ true)
when is_function(confirmation_url_fun, 1) do
if user.confirmed_at do
{:error, :already_confirmed}
else
{encoded_token, user_token} = UserToken.build_email_token(user, "confirm")
Repo.insert!(user_token)

{:ok,
UserNotifier.deliver_confirmation_instructions(user, confirmation_url_fun.(encoded_token))}
UserNotifier.deliver_confirmation_instructions(
user,
confirmation_url_fun.(encoded_token),
async
)
end
end

Expand Down Expand Up @@ -322,18 +337,22 @@ defmodule PhoenixStarter.Users do
{:ok, %Bamboo.Email{}}
"""
@spec deliver_user_reset_password_instructions(User.t(), (String.t() -> String.t())) ::
{:ok, Bamboo.Email.t()}
def deliver_user_reset_password_instructions(%User{} = user, reset_password_url_fun)
@spec deliver_user_reset_password_instructions(User.t(), (String.t() -> String.t()), boolean) ::
UserNotifier.notifier_result()
def deliver_user_reset_password_instructions(
%User{} = user,
reset_password_url_fun,
async \\ true
)
when is_function(reset_password_url_fun, 1) do
{encoded_token, user_token} = UserToken.build_email_token(user, "reset_password")
Repo.insert!(user_token)

{:ok,
UserNotifier.deliver_reset_password_instructions(
user,
reset_password_url_fun.(encoded_token)
)}
UserNotifier.deliver_reset_password_instructions(
user,
reset_password_url_fun.(encoded_token),
async
)
end

@doc """
Expand Down
64 changes: 49 additions & 15 deletions lib/phoenix_starter/users/user_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,68 @@ defmodule PhoenixStarter.Users.UserNotifier do
alias PhoenixStarter.Users.User
alias PhoenixStarter.Users.UserEmail
alias PhoenixStarter.Mailer
alias PhoenixStarter.Workers.UserEmailWorker

@type notifier_result ::
{:ok, Bamboo.Email}
| {:ok, Oban.Job.t()}
| {:error, Oban.job_changeset()}
| {:error, term()}

@doc """
Deliver instructions to confirm `PhoenixStarter.Users.User`.
"""
@spec deliver_confirmation_instructions(User.t(), String.t()) :: Bamboo.Email.t()
def deliver_confirmation_instructions(user, url) do
user
|> UserEmail.confirmation_instructions(url)
|> Mailer.deliver_now()
@spec deliver_confirmation_instructions(User.t(), String.t(), boolean()) :: notifier_result
def deliver_confirmation_instructions(user, url, async \\ true)

def deliver_confirmation_instructions(user, url, true) do
%{email: "confirmation_instructions", user_id: user.id, url: url}
|> UserEmailWorker.new()
|> Oban.insert()
end

def deliver_confirmation_instructions(user, url, false) do
{:ok,
user
|> UserEmail.confirmation_instructions(url)
|> Mailer.deliver_now()}
end

@doc """
Deliver instructions to reset a `PhoenixStarter.Users.User` password.
"""
@spec deliver_reset_password_instructions(User.t(), String.t()) :: Bamboo.Email.t()
def deliver_reset_password_instructions(user, url) do
user
|> UserEmail.reset_password_instructions(url)
|> Mailer.deliver_now()
@spec deliver_reset_password_instructions(User.t(), String.t(), boolean) :: notifier_result
def deliver_reset_password_instructions(user, url, async \\ true)

def deliver_reset_password_instructions(user, url, true) do
%{email: "reset_password_instructions", user_id: user.id, url: url}
|> UserEmailWorker.new()
|> Oban.insert()
end

def deliver_reset_password_instructions(user, url, false) do
{:ok,
user
|> UserEmail.reset_password_instructions(url)
|> Mailer.deliver_now()}
end

@doc """
Deliver instructions to update a `PhoenixStarter.Users.User` email.
"""
@spec deliver_update_email_instructions(User.t(), String.t()) :: Bamboo.Email.t()
def deliver_update_email_instructions(user, url) do
user
|> UserEmail.update_email_instructions(url)
|> Mailer.deliver_now()
@spec deliver_update_email_instructions(User.t(), String.t(), boolean) :: notifier_result
def deliver_update_email_instructions(user, url, async \\ true)

def deliver_update_email_instructions(user, url, true) do
%{email: "update_email_instructions", user_id: user.id, url: url}
|> UserEmailWorker.new()
|> Oban.insert()
end

def deliver_update_email_instructions(user, url, false) do
{:ok,
user
|> UserEmail.update_email_instructions(url)
|> Mailer.deliver_now()}
end
end
32 changes: 32 additions & 0 deletions lib/phoenix_starter/workers/error_reporter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule PhoenixStarter.Workers.ErrorReporter do
@moduledoc """
Receives exception events in `Oban.Worker` for error reporting.
"""
alias PhoenixStarter.Workers.Reportable

@spec handle_event(
:telemetry.event_name(),
:telemetry.event_measurements(),
:telemetry.event_metadata(),
:telemetry.handler_config()
) :: any()
def handle_event(
[:oban, :job, :exception],
measure,
%{attempt: attempt, worker: worker} = meta,
_
) do
if Reportable.reportable?(worker, attempt) do
extra =
meta
|> Map.take([:id, :args, :queue, :worker])
|> Map.merge(measure)

Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: extra)
end
end

def handle_event([:oban, :circuit, :trip], _measure, meta, _) do
Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: meta)
end
end
36 changes: 36 additions & 0 deletions lib/phoenix_starter/workers/reportable.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defprotocol PhoenixStarter.Workers.Reportable do
@moduledoc """
Determines if errors encountered by `Oban.Worker` should be reported.
By default all errors are reported. However some workers have an
expectation of transient errors, such as those that send email, because of
flaky third-party providers. By implementing this protocol for a given
worker, you can customize error reporting.
## Example
defmodule PhoenixStarter.Workers.EmailWorker do
use Oban.Worker
defimpl PhoenixStarter.Workers.Reportable do
@threshold 3
# Will only report the error after 3 attempts
def reportable?(_worker, attempt), do: attempt > @threshold
end
@impl true
def perform(%{args: %{"email" => email}}) do
PhoenixStarter.Email.deliver(email)
end
end
"""
@fallback_to_any true
@spec reportable?(Oban.Worker.t(), integer) :: boolean
def reportable?(worker, attempt)
end

defimpl PhoenixStarter.Workers.Reportable, for: Any do
def reportable?(_worker, _attempt), do: true
end
37 changes: 37 additions & 0 deletions lib/phoenix_starter/workers/user_email_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule PhoenixStarter.Workers.UserEmailWorker do
@moduledoc """
Schedules an email from `PhoenixStarter.Users.UserEmail` to be sent.
"""
use Oban.Worker, queue: :emails, tags: ["email", "user-email"]

alias PhoenixStarter.Users
alias PhoenixStarter.Users.UserEmail
alias PhoenixStarter.Mailer

@emails :functions
|> UserEmail.__info__()
|> Keyword.keys()
|> Enum.reject(fn f -> f == :render end)
|> Enum.map(&Atom.to_string/1)

@impl true
@spec perform(Oban.Job.t()) :: Oban.Worker.result()
def perform(%Oban.Job{args: %{"email" => email} = args}) when email in @emails do
{email, args} = Map.pop(args, "email")

%{"user_id" => user_id, "url" => url} = args
user = Users.get_user!(user_id)

email = apply(UserEmail, String.to_existing_atom(email), [user, url])
{:ok, Mailer.deliver_now(email)}
end

def perform(_job), do: {:discard, :invalid_email}

defimpl PhoenixStarter.Workers.Reportable do
@threshold 3

@spec reportable?(Oban.Worker.t(), integer) :: boolean
def reportable?(_worker, attempt), do: attempt > @threshold
end
end
7 changes: 6 additions & 1 deletion lib/phoenix_starter_web/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ defmodule PhoenixStarterWeb.Telemetry do
summary("vm.memory.total", unit: {:byte, :kilobyte}),
summary("vm.total_run_queue_lengths.total"),
summary("vm.total_run_queue_lengths.cpu"),
summary("vm.total_run_queue_lengths.io")
summary("vm.total_run_queue_lengths.io"),

# Oban Metrics
summary("oban.job.stop.duration", unit: {:native, :millisecond}),
counter("oban.job.stop"),
counter("oban.job.exception")
]
end

Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule PhoenixStarter.MixProject do
{:gettext, "~> 0.11"},
{:hackney, "~> 1.8"},
{:jason, "~> 1.0"},
{:oban, "~> 2.1"},
{:phoenix, "~> 1.5.6"},
{:phoenix_ecto, "~> 4.1"},
{:phoenix_html, "~> 2.11"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"mime": {:hex, :mime, "1.4.0", "5066f14944b470286146047d2f73518cf5cca82f8e4815cf35d196b58cf07c47", [:mix], [], "hexpm", "75fa42c4228ea9a23f70f123c74ba7cece6a03b1fd474fe13f6a7a85c6ea4ff6"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"oban": {:hex, :oban, "2.3.2", "f478d3785a0dd5c75604817a6555f46f37e15687266dcdf1b20483097554a10f", [:mix], [{:ecto_sql, ">= 3.4.3", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "466152063d5520c36e13f1798f8a9dbc411b1764a4ba472ba79923ef4569408d"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"phoenix": {:hex, :phoenix, "1.5.6", "8298cdb4e0f943242ba8410780a6a69cbbe972fef199b341a36898dd751bdd66", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0dc4d39af1306b6aa5122729b0a95ca779e42c708c6fe7abbb3d336d5379e956"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.2.1", "13f124cf0a3ce0f1948cf24654c7b9f2347169ff75c1123f44674afee6af3b03", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 2.15", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "478a1bae899cac0a6e02be1deec7e2944b7754c04e7d4107fc5a517f877743c0"},
Expand Down
Loading

0 comments on commit 2756719

Please sign in to comment.