Skip to content

Commit

Permalink
Import des stats mensuelles data.gouv.fr pour les JDDs (#3663)
Browse files Browse the repository at this point in the history
* Add migration and model

* Improve migration

* Work on ImportDatasetMonthlyMetricsJob

* Require Logger

* Add perform

* Schedule in crontab

* sort

* PR comments
  • Loading branch information
AntoineAugusti authored Dec 18, 2023
1 parent 008e6c6 commit 33aa142
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 1 deletion.
28 changes: 28 additions & 0 deletions apps/transport/lib/db/dataset_monthly_metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule DB.DatasetMonthlyMetric do
@moduledoc """
Monthly metrics related to datasets as given by the data.gouv.fr
API.
Example: https://metric-api.data.gouv.fr/api/datasets/data/?metric_month__sort=asc&dataset_id__exact=5b3cc551c751df4822526c1c
"""
use Ecto.Schema
use TypedEctoSchema
import Ecto.Changeset

typed_schema "dataset_monthly_metrics" do
# Foreign key constraint is not enforced on the dataset table
# See: https://github.com/etalab/transport-site/pull/3663/files#r1429890393
belongs_to(:dataset, DB.Dataset, foreign_key: :dataset_datagouv_id, references: :datagouv_id, type: :string)
field(:year_month, :string)
field(:metric_name, Ecto.Enum, values: [:views, :downloads])
field(:count, :integer)
timestamps(type: :utc_datetime_usec)
end

def changeset(struct, attrs \\ %{}) do
struct
|> cast(attrs, [:dataset_datagouv_id, :year_month, :metric_name, :count])
|> validate_required([:dataset_datagouv_id, :year_month, :metric_name, :count])
|> validate_format(:year_month, ~r/\A2\d{3}-(0[1-9]|1[012])\z/)
|> validate_number(:count, greater_than_or_equal_to: 0)
end
end
87 changes: 87 additions & 0 deletions apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do
@moduledoc """
Import monthly metrics related to datasets coming from the data.gouv.fr's API.
This job is executed daily and imports metrics for all datasets for the last 2 years.
Records are not supposed to change in the past, except for the current month.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

# Number of months to fetch for each dataset
# 12*2 = 24 months
@nb_records 12 * 2
# The number of workers to run in parallel when importing metrics
@task_concurrency 5
@api_base_url URI.new!("https://metric-api.data.gouv.fr/api/datasets/data/")

@impl Oban.Worker
def perform(%Oban.Job{}) do
dataset_datagouv_ids()
|> Task.async_stream(
&import_metrics/1,
max_concurrency: @task_concurrency,
on_timeout: :kill_task,
timeout: 10_000
)
|> Stream.run()
end

def dataset_datagouv_ids do
DB.Dataset.base_query() |> select([dataset: d], d.datagouv_id) |> DB.Repo.all()
end

def import_metrics(dataset_datagouv_id) do
url = api_url(dataset_datagouv_id)

case http_client().get(url, []) do
{:ok, %Req.Response{status: 200, body: body}} ->
body
|> Map.fetch!("data")
|> Enum.each(fn data -> insert_or_update(data, dataset_datagouv_id) end)

other ->
Logger.error(
"metric-api.data.gouv.fr unexpected HTTP response for Dataset##{dataset_datagouv_id}: #{inspect(other)}"
)
end
end

@doc """
iex> api_url("datagouv_id")
"https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc"
"""
def api_url(dataset_datagouv_id) do
@api_base_url
|> URI.append_query(
URI.encode_query(dataset_id__exact: dataset_datagouv_id, page_size: @nb_records, metric_month__sort: "desc")
)
|> URI.to_string()
end

defp insert_or_update(
%{
"metric_month" => metric_month,
"monthly_visit" => monthly_visit,
"monthly_download_resource" => monthly_download_resource
},
dataset_datagouv_id
) do
Enum.each([{:views, monthly_visit}, {:downloads, monthly_download_resource}], fn {metric_name, count} ->
%DB.DatasetMonthlyMetric{}
|> DB.DatasetMonthlyMetric.changeset(%{
dataset_datagouv_id: dataset_datagouv_id,
year_month: metric_month,
metric_name: metric_name,
count: count || 0
})
|> DB.Repo.insert!(
conflict_target: [:dataset_datagouv_id, :year_month, :metric_name],
on_conflict: {:replace, [:count]}
)
end)
end

defp http_client, do: Transport.Req.impl()
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule DB.Repo.Migrations.DatasetMonthlyMetrics do
use Ecto.Migration

def change do
create table(:dataset_monthly_metrics) do
# Not adding a foreign key: https://github.com/etalab/transport-site/pull/3663/files#r1429890393
add(:dataset_datagouv_id, :string, null: false, size: 50)
# Example: 2023-12
add(:year_month, :string, null: false, size: 7)
add(:metric_name, :string, null: false, size: 50)
add(:count, :integer, null: false)
timestamps(type: :utc_datetime_usec)
end

create(unique_index(:dataset_monthly_metrics, [:dataset_datagouv_id, :year_month, :metric_name]))
end
end
42 changes: 42 additions & 0 deletions apps/transport/test/db/dataset_monthly_metric_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule DB.DatasetMonthlyMetricTest do
use ExUnit.Case, async: true
import DB.Factory

setup do
Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

describe "changeset" do
test "can insert a record" do
dataset = insert(:dataset)

assert %Ecto.Changeset{valid?: true} =
changeset =
DB.DatasetMonthlyMetric.changeset(%DB.DatasetMonthlyMetric{}, %{
dataset_datagouv_id: dataset.datagouv_id,
metric_name: :views,
count: 42,
year_month: "2023-12"
})

DB.Repo.insert!(changeset)
end

test "identifies errors" do
assert %Ecto.Changeset{
valid?: false,
errors: [
{:count, _},
{:year_month, _},
{:metric_name, _}
]
} =
DB.DatasetMonthlyMetric.changeset(%DB.DatasetMonthlyMetric{}, %{
dataset_datagouv_id: Ecto.UUID.generate(),
metric_name: :foo,
count: -1,
year_month: "bar"
})
end
end
end
4 changes: 4 additions & 0 deletions apps/transport/test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ defmodule DB.Factory do
}
end

def dataset_monthly_metric_factory do
%DB.DatasetMonthlyMetric{}
end

def resource_factory do
%DB.Resource{
last_import: DateTime.utc_now(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
use ExUnit.Case, async: true
import DB.Factory
import Mox
use Oban.Testing, repo: DB.Repo
alias Transport.Jobs.ImportDatasetMonthlyMetricsJob

doctest ImportDatasetMonthlyMetricsJob, import: true

setup :verify_on_exit!

setup do
Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

describe "import_metrics" do
test "base case" do
%DB.Dataset{datagouv_id: datagouv_id} = insert(:dataset)

setup_http_response(datagouv_id, [
%{
"dataset_id" => datagouv_id,
"metric_month" => "2022-08",
"monthly_visit" => 2633,
"monthly_download_resource" => 557_626
},
%{
"dataset_id" => datagouv_id,
"metric_month" => "2022-07",
"monthly_visit" => 1475,
"monthly_download_resource" => 343_617
}
])

assert DB.DatasetMonthlyMetric |> DB.Repo.all() |> Enum.empty?()

ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id)

assert [
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^datagouv_id,
year_month: "2022-08",
metric_name: :views,
count: 2633
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^datagouv_id,
year_month: "2022-08",
metric_name: :downloads,
count: 557_626
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^datagouv_id,
year_month: "2022-07",
metric_name: :views,
count: 1475
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^datagouv_id,
year_month: "2022-07",
metric_name: :downloads,
count: 343_617
}
] = DB.Repo.all(DB.DatasetMonthlyMetric)
end

test "replaces existing records" do
%DB.Dataset{datagouv_id: datagouv_id} = insert(:dataset)

insert(:dataset_monthly_metric,
dataset_datagouv_id: datagouv_id,
year_month: "2023-12",
metric_name: :views,
count: 42
)

setup_http_response(datagouv_id, [
%{
"dataset_id" => datagouv_id,
"metric_month" => "2023-12",
"monthly_visit" => 1337,
"monthly_download_resource" => 43
}
])

assert [
%DB.DatasetMonthlyMetric{
id: metric_id,
dataset_datagouv_id: ^datagouv_id,
year_month: "2023-12",
metric_name: :views,
count: 42
}
] = DB.Repo.all(DB.DatasetMonthlyMetric)

ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id)

assert [
# Count has been updated, primary key is still the same
%DB.DatasetMonthlyMetric{
id: ^metric_id,
dataset_datagouv_id: ^datagouv_id,
year_month: "2023-12",
metric_name: :views,
count: 1337
},
# Has been inserted
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^datagouv_id,
year_month: "2023-12",
metric_name: :downloads,
count: 43
}
] = DB.Repo.all(DB.DatasetMonthlyMetric)
end
end

test "perform" do
%DB.Dataset{datagouv_id: d1_datagouv_id} = insert(:dataset)
%DB.Dataset{datagouv_id: d2_datagouv_id} = insert(:dataset)
insert(:dataset, is_active: false)

assert [d1_datagouv_id, d2_datagouv_id] == ImportDatasetMonthlyMetricsJob.dataset_datagouv_ids()

setup_http_response(d1_datagouv_id, [
%{
"dataset_id" => d1_datagouv_id,
"metric_month" => "2023-12",
"monthly_visit" => 1337,
"monthly_download_resource" => 43
}
])

setup_http_response(d2_datagouv_id, [
%{
"dataset_id" => d2_datagouv_id,
"metric_month" => "2023-12",
"monthly_visit" => nil,
"monthly_download_resource" => 5
}
])

assert :ok == perform_job(ImportDatasetMonthlyMetricsJob, %{})

assert [
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^d1_datagouv_id,
year_month: "2023-12",
metric_name: :downloads,
count: 43
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^d1_datagouv_id,
year_month: "2023-12",
metric_name: :views,
count: 1337
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^d2_datagouv_id,
year_month: "2023-12",
metric_name: :downloads,
count: 5
},
%DB.DatasetMonthlyMetric{
dataset_datagouv_id: ^d2_datagouv_id,
year_month: "2023-12",
metric_name: :views,
count: 0
}
] = DB.DatasetMonthlyMetric |> DB.Repo.all() |> Enum.sort_by(&{&1.dataset_datagouv_id, &1.metric_name})
end

defp setup_http_response(datagouv_id, data) do
metrics_api_url = ImportDatasetMonthlyMetricsJob.api_url(datagouv_id)

expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] ->
{:ok, %Req.Response{status: 200, body: %{"data" => data}}}
end)
end
end
3 changes: 2 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ oban_prod_crontab = [
{"10 5 * * *", Transport.Jobs.NotificationSubscriptionProducerJob},
# "At 08:15 on Monday in March, June, and November.""
# The job will make sure that it's executed only on the first Monday of these months
{"15 8 * 3,6,11 1", Transport.Jobs.PeriodicReminderProducersNotificationJob}
{"15 8 * 3,6,11 1", Transport.Jobs.PeriodicReminderProducersNotificationJob},
{"30 5 * * *", Transport.Jobs.ImportDatasetMonthlyMetricsJob}
]

# Make sure that all modules exist
Expand Down

0 comments on commit 33aa142

Please sign in to comment.