Skip to content

Commit

Permalink
merge expiration jobs into a single one
Browse files Browse the repository at this point in the history
  • Loading branch information
vdegove committed Dec 17, 2024
1 parent d278d6c commit b092f4e
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 271 deletions.

This file was deleted.

102 changes: 87 additions & 15 deletions apps/transport/lib/jobs/expiration_notification_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
This job sends daily digests to reusers about the expiration of their favorited datasets.
The expiration delays is the same for all reusers and cannot be customized for now.
It has 2 `perform/1` methods:
It has 3 `perform/1` methods:
- a dispatcher one in charge of identifying contacts we should get in touch with today
- another in charge of building the daily digest for a specific contact (with only their favorited datasets)
It is similar to `Transport.Jobs.ExpirationAdminProducerNotificationJob`, dedicated to producers and admins.
- and one to send to admins and producers
"""
use Oban.Worker,
max_attempts: 3,
Expand All @@ -23,11 +22,23 @@ defmodule Transport.Jobs.ExpirationNotificationJob do

import Ecto.Query
@notification_reason Transport.NotificationReason.reason(:expiration)
@expiration_reason Transport.NotificationReason.reason(:expiration)
@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-30, -7, 0, 7, 14]
@default_outdated_data_delays_reuser [-30, -7, 0, 7, 14]
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@impl Oban.Worker
def perform(%Oban.Job{id: job_id, args: %{"contact_id" => contact_id, "digest_date" => digest_date}}) do

def perform(%Oban.Job{id: job_id, args: %{"role" => "admin_and_producer"}}) do
outdated_data(job_id)
:ok
end

def perform(%Oban.Job{
id: job_id,
args: %{"contact_id" => contact_id, "digest_date" => digest_date, "role" => "reuser"}
}) do
contact = DB.Repo.get!(DB.Contact, contact_id)
subscribed_dataset_ids = subscribed_dataset_ids_for_expiration(contact)

Expand Down Expand Up @@ -56,10 +67,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do

DB.Repo.transaction(
fn ->
new(%{"role" => "admin_and_producer"}) |> Oban.insert()

dataset_ids
|> contact_ids_subscribed_to_dataset_ids()
|> Stream.chunk_every(100)
|> Stream.each(fn contact_ids -> insert_jobs(contact_ids, target_date) end)
|> Stream.each(fn contact_ids -> insert_reuser_jobs(contact_ids, target_date) end)
|> Stream.run()
end,
timeout: :timer.seconds(60)
Expand Down Expand Up @@ -138,12 +151,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
def delay_str(-1), do: "périmé depuis hier"
def delay_str(d) when d <= -2, do: "périmés depuis #{-d} jours"

defp insert_jobs(contact_ids, %Date{} = target_date) do
defp insert_reuser_jobs(contact_ids, %Date{} = target_date) do
# Oban caveat: can't use [insert_all/2](https://hexdocs.pm/oban/Oban.html#insert_all/2):
# > Only the Smart Engine in Oban Pro supports bulk unique jobs and automatic batching.
# > With the basic engine, you must use insert/3 for unique support.
Enum.each(contact_ids, fn contact_id ->
%{"contact_id" => contact_id, "digest_date" => target_date}
%{"contact_id" => contact_id, "digest_date" => target_date, "role" => "reuser"}
|> new()
|> Oban.insert()
end)
Expand Down Expand Up @@ -172,9 +185,9 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
Transport.Cache.fetch(
to_string(__MODULE__) <> ":gtfs_expiring_on_target_dates:#{reference_date}",
fn ->
delays_and_dates = delays_and_dates(reference_date)
dates_and_delays = Map.new(delays_and_dates, fn {key, value} -> {value, key} end)
expiring_dates = Map.values(delays_and_dates)
delays_and_date_reuser = delays_and_date_reuser(reference_date)
dates_and_delays = Map.new(delays_and_date_reuser, fn {key, value} -> {value, key} end)
expiring_dates = Map.values(delays_and_date_reuser)

DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
Expand All @@ -201,7 +214,7 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
end

@doc """
iex> delays_and_dates(~D[2024-05-21])
iex> delays_and_date_reuser(~D[2024-05-21])
%{
-30 => ~D[2024-04-21],
-7 => ~D[2024-05-14],
Expand All @@ -210,8 +223,67 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
14 => ~D[2024-06-04]
}
"""
@spec delays_and_dates(Date.t()) :: %{delay() => Date.t()}
def delays_and_dates(%Date{} = date) do
Map.new(@default_outdated_data_delays, fn delay -> {delay, Date.add(date, delay)} end)
@spec delays_and_date_reuser(Date.t()) :: %{delay() => Date.t()}
def delays_and_date_reuser(%Date{} = date) do
Map.new(@default_outdated_data_delays_reuser, fn delay -> {delay, Date.add(date, delay)} end)
end

####  HERE CODE FROM ADMIN AND PRODUCER JOB ####

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end
end
Loading

0 comments on commit b092f4e

Please sign in to comment.