From 3e466847cf1e2d022e386cffe9435caa5c4d839e Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Wed, 17 Jan 2024 09:55:20 +0100 Subject: [PATCH] Import des stats mensuelles data.gouv.fr pour les ressources --- .../lib/db/resource_monthly_metric.ex | 26 +++ .../import_dataset_monthly_metrics_job.ex | 104 +++++++---- .../import_resource_monthly_metrics_job.ex | 29 +++ ...0240117075117_resource_monthly_metrics.exs | 16 ++ .../test/db/resource_monthly_metric_test.exs | 42 +++++ apps/transport/test/support/factory.ex | 4 + ...mport_dataset_monthly_metrics_job_test.exs | 10 +- ...port_resource_monthly_metrics_job_test.exs | 170 ++++++++++++++++++ 8 files changed, 362 insertions(+), 39 deletions(-) create mode 100644 apps/transport/lib/db/resource_monthly_metric.ex create mode 100644 apps/transport/lib/jobs/import_resource_monthly_metrics_job.ex create mode 100644 apps/transport/priv/repo/migrations/20240117075117_resource_monthly_metrics.exs create mode 100644 apps/transport/test/db/resource_monthly_metric_test.exs create mode 100644 apps/transport/test/transport/jobs/import_resource_monthly_metrics_job_test.exs diff --git a/apps/transport/lib/db/resource_monthly_metric.ex b/apps/transport/lib/db/resource_monthly_metric.ex new file mode 100644 index 0000000000..fdc25ac72d --- /dev/null +++ b/apps/transport/lib/db/resource_monthly_metric.ex @@ -0,0 +1,26 @@ +defmodule DB.ResourceMonthlyMetric do + @moduledoc """ + Monthly metrics related to resources as given by the data.gouv.fr + API. + Example: https://metric-api.data.gouv.fr/api/resources/data/?metric_month__sort=desc&resource_id__exact=e0dbd217-15cd-4e28-9459-211a27511a34&page_size=50 + """ + use Ecto.Schema + use TypedEctoSchema + import Ecto.Changeset + + typed_schema "resource_monthly_metrics" do + belongs_to(:resource, DB.Resource, foreign_key: :resource_datagouv_id, references: :datagouv_id, type: :string) + field(:year_month, :string) + field(:metric_name, Ecto.Enum, values: [:downloads]) + field(:count, :integer) + timestamps(type: :utc_datetime_usec) + end + + def changeset(struct, attrs \\ %{}) do + struct + |> cast(attrs, [:resource_datagouv_id, :year_month, :metric_name, :count]) + |> validate_required([:resource_datagouv_id, :year_month, :metric_name, :count]) + |> validate_format(:year_month, ~r/\A2\d{3}-(0[1-9]|1[012])\z/) + |> validate_number(:count, greater_than_or_equal_to: 0) + end +end diff --git a/apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex b/apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex index ab8b100787..17957b3772 100644 --- a/apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex +++ b/apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex @@ -7,20 +7,15 @@ defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do """ use Oban.Worker, max_attempts: 3 import Ecto.Query - require Logger - # Number of months to fetch for each dataset - # 12*2 = 24 months - @nb_records 12 * 2 # The number of workers to run in parallel when importing metrics @task_concurrency 5 - @api_base_url URI.new!("https://metric-api.data.gouv.fr/api/datasets/data/") @impl Oban.Worker def perform(%Oban.Job{}) do dataset_datagouv_ids() |> Task.async_stream( - &import_metrics/1, + fn datagouv_id -> Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id) end, max_concurrency: @task_concurrency, on_timeout: :kill_task, timeout: 10_000 @@ -31,59 +26,100 @@ defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do def dataset_datagouv_ids do DB.Dataset.base_query() |> select([dataset: d], d.datagouv_id) |> DB.Repo.all() end +end + +defmodule Transport.Jobs.ImportMonthlyMetrics do + @moduledoc """ + Shared methods to import monthly metrics from the data.gouv.fr's API. + """ + require Logger - def import_metrics(dataset_datagouv_id) do - url = api_url(dataset_datagouv_id) + # Maximum number of months to fetch for each model + # 12*2 = 24 months + @nb_records 12 * 2 + + @doc """ + iex> api_url(:dataset, "datagouv_id") + "https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc" + """ + def api_url(model_name, datagouv_id, page_size \\ @nb_records) when model_name in [:dataset, :resource] do + model_name + |> api_base_url() + |> URI.append_query(api_args(model_name, datagouv_id, page_size)) + |> URI.to_string() + end + + def import_metrics(model_name, datagouv_id) when model_name in [:dataset, :resource] do + url = api_url(model_name, datagouv_id) case http_client().get(url, []) do {:ok, %Req.Response{status: 200, body: body}} -> body |> Map.fetch!("data") - |> Enum.each(fn data -> insert_or_update(data, dataset_datagouv_id) end) + |> Enum.each(fn data -> insert_or_update(model_name, data, datagouv_id) end) other -> Logger.error( - "metric-api.data.gouv.fr unexpected HTTP response for Dataset##{dataset_datagouv_id}: #{inspect(other)}" + "metric-api.data.gouv.fr unexpected HTTP response for #{model_name}##{datagouv_id}: #{inspect(other)}" ) end end - @doc """ - iex> api_url("datagouv_id") - "https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc" - """ - def api_url(dataset_datagouv_id) do - @api_base_url - |> URI.append_query( - URI.encode_query(dataset_id__exact: dataset_datagouv_id, page_size: @nb_records, metric_month__sort: "desc") - ) - |> URI.to_string() - end - defp insert_or_update( - %{ - "metric_month" => metric_month, - "monthly_visit" => monthly_visit, - "monthly_download_resource" => monthly_download_resource - }, - dataset_datagouv_id - ) do - Enum.each([{:views, monthly_visit}, {:downloads, monthly_download_resource}], fn {metric_name, count} -> + model_name, + %{"metric_month" => metric_month} = data, + datagouv_id + ) + when model_name in [:dataset, :resource] do + Enum.each(metrics(model_name, data), fn {metric_name, count} -> count = count || 0 - %DB.DatasetMonthlyMetric{} - |> DB.DatasetMonthlyMetric.changeset(%{ - dataset_datagouv_id: dataset_datagouv_id, + model_name + |> changeset(%{ + datagouv_id: datagouv_id, year_month: metric_month, metric_name: metric_name, count: count }) |> DB.Repo.insert!( - conflict_target: [:dataset_datagouv_id, :year_month, :metric_name], + conflict_target: [String.to_existing_atom("#{model_name}_datagouv_id"), :year_month, :metric_name], on_conflict: [set: [count: count, updated_at: DateTime.utc_now()]] ) end) end + defp metrics(:dataset, %{ + "monthly_visit" => monthly_visit, + "monthly_download_resource" => monthly_download_resource + }) do + [{:views, monthly_visit}, {:downloads, monthly_download_resource}] + end + + defp metrics(:resource, %{"monthly_download_resource" => monthly_download_resource}) do + [{:downloads, monthly_download_resource}] + end + + defp changeset(:dataset, %{datagouv_id: datagouv_id} = params) do + params = Map.merge(params, %{dataset_datagouv_id: datagouv_id}) + DB.DatasetMonthlyMetric.changeset(%DB.DatasetMonthlyMetric{}, params) + end + + defp changeset(:resource, %{datagouv_id: datagouv_id} = params) do + params = Map.merge(params, %{resource_datagouv_id: datagouv_id}) + DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, params) + end + + defp api_args(:dataset, datagouv_id, page_size) do + [dataset_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query() + end + + defp api_args(:resource, datagouv_id, page_size) do + [resource_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query() + end + + defp api_base_url(model_name) when model_name in [:dataset, :resource] do + URI.new!("https://metric-api.data.gouv.fr/api/#{model_name}s/data/") + end + defp http_client, do: Transport.Req.impl() end diff --git a/apps/transport/lib/jobs/import_resource_monthly_metrics_job.ex b/apps/transport/lib/jobs/import_resource_monthly_metrics_job.ex new file mode 100644 index 0000000000..637357f2da --- /dev/null +++ b/apps/transport/lib/jobs/import_resource_monthly_metrics_job.ex @@ -0,0 +1,29 @@ +defmodule Transport.Jobs.ImportResourceMonthlyMetricsJob do + @moduledoc """ + Import monthly metrics related to resources coming from the data.gouv.fr's API. + + This job is executed daily and imports metrics for all resources for the last 2 years. + Records are not supposed to change in the past, except for the current month. + """ + use Oban.Worker, max_attempts: 3 + import Ecto.Query + + # The number of workers to run in parallel when importing metrics + @task_concurrency 5 + + @impl Oban.Worker + def perform(%Oban.Job{}) do + resource_datagouv_ids() + |> Task.async_stream( + fn datagouv_id -> Transport.Jobs.ImportMonthlyMetrics.import_metrics(:resource, datagouv_id) end, + max_concurrency: @task_concurrency, + on_timeout: :kill_task, + timeout: 10_000 + ) + |> Stream.run() + end + + def resource_datagouv_ids do + DB.Resource.base_query() |> select([resource: r], r.datagouv_id) |> DB.Repo.all() + end +end diff --git a/apps/transport/priv/repo/migrations/20240117075117_resource_monthly_metrics.exs b/apps/transport/priv/repo/migrations/20240117075117_resource_monthly_metrics.exs new file mode 100644 index 0000000000..b261e2474d --- /dev/null +++ b/apps/transport/priv/repo/migrations/20240117075117_resource_monthly_metrics.exs @@ -0,0 +1,16 @@ +defmodule DB.Repo.Migrations.ResourceMonthlyMetrics do + use Ecto.Migration + + def change do + create table(:resource_monthly_metrics) do + add(:resource_datagouv_id, :string, null: false, size: 50) + # Example: 2023-12 + add(:year_month, :string, null: false, size: 7) + add(:metric_name, :string, null: false, size: 50) + add(:count, :integer, null: false) + timestamps(type: :utc_datetime_usec) + end + + create(unique_index(:resource_monthly_metrics, [:resource_datagouv_id, :year_month, :metric_name])) + end +end diff --git a/apps/transport/test/db/resource_monthly_metric_test.exs b/apps/transport/test/db/resource_monthly_metric_test.exs new file mode 100644 index 0000000000..e2970be99b --- /dev/null +++ b/apps/transport/test/db/resource_monthly_metric_test.exs @@ -0,0 +1,42 @@ +defmodule DB.ResourceMonthlyMetricTest do + use ExUnit.Case, async: true + import DB.Factory + + setup do + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + describe "changeset" do + test "can insert a record" do + resource = insert(:resource) + + assert %Ecto.Changeset{valid?: true} = + changeset = + DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, %{ + resource_datagouv_id: resource.datagouv_id, + metric_name: :downloads, + count: 42, + year_month: "2023-12" + }) + + DB.Repo.insert!(changeset) + end + + test "identifies errors" do + assert %Ecto.Changeset{ + valid?: false, + errors: [ + {:count, _}, + {:year_month, _}, + {:metric_name, _} + ] + } = + DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, %{ + resource_datagouv_id: Ecto.UUID.generate(), + metric_name: :foo, + count: -1, + year_month: "bar" + }) + end + end +end diff --git a/apps/transport/test/support/factory.ex b/apps/transport/test/support/factory.ex index 791cf3a198..2680367dcc 100644 --- a/apps/transport/test/support/factory.ex +++ b/apps/transport/test/support/factory.ex @@ -73,6 +73,10 @@ defmodule DB.Factory do %DB.DatasetMonthlyMetric{} end + def resource_monthly_metric_factory do + %DB.ResourceMonthlyMetric{} + end + def resource_factory do %DB.Resource{ last_import: DateTime.utc_now(), diff --git a/apps/transport/test/transport/jobs/import_dataset_monthly_metrics_job_test.exs b/apps/transport/test/transport/jobs/import_dataset_monthly_metrics_job_test.exs index 44e7fc5c34..cbb80d87e0 100644 --- a/apps/transport/test/transport/jobs/import_dataset_monthly_metrics_job_test.exs +++ b/apps/transport/test/transport/jobs/import_dataset_monthly_metrics_job_test.exs @@ -6,7 +6,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do use Oban.Testing, repo: DB.Repo alias Transport.Jobs.ImportDatasetMonthlyMetricsJob - doctest ImportDatasetMonthlyMetricsJob, import: true + doctest Transport.Jobs.ImportMonthlyMetrics, import: true setup :verify_on_exit! @@ -35,7 +35,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do assert DB.DatasetMonthlyMetric |> DB.Repo.all() |> Enum.empty?() - ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id) + Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id) assert [ %DB.DatasetMonthlyMetric{ @@ -94,7 +94,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do } ] = DB.Repo.all(DB.DatasetMonthlyMetric) - ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id) + Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id) assert [ # Count has been updated, primary key is still the same @@ -126,7 +126,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do %DB.Dataset{datagouv_id: d2_datagouv_id} = insert(:dataset) insert(:dataset, is_active: false) - assert [d1_datagouv_id, d2_datagouv_id] == ImportDatasetMonthlyMetricsJob.dataset_datagouv_ids() + assert MapSet.new([d1_datagouv_id, d2_datagouv_id]) == ImportDatasetMonthlyMetricsJob.dataset_datagouv_ids() |> MapSet.new() setup_http_response(d1_datagouv_id, [ %{ @@ -190,7 +190,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do end defp setup_http_response(datagouv_id, data) do - metrics_api_url = ImportDatasetMonthlyMetricsJob.api_url(datagouv_id) + metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:dataset, datagouv_id) expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] -> {:ok, %Req.Response{status: 200, body: %{"data" => data}}} diff --git a/apps/transport/test/transport/jobs/import_resource_monthly_metrics_job_test.exs b/apps/transport/test/transport/jobs/import_resource_monthly_metrics_job_test.exs new file mode 100644 index 0000000000..99b3d0aeee --- /dev/null +++ b/apps/transport/test/transport/jobs/import_resource_monthly_metrics_job_test.exs @@ -0,0 +1,170 @@ +defmodule Transport.Test.Transport.Jobs.ImportResourceMonthlyMetricsTestJob do + use ExUnit.Case, async: true + import DB.Factory + import Ecto.Query + import Mox + use Oban.Testing, repo: DB.Repo + alias Transport.Jobs.ImportResourceMonthlyMetricsJob + + setup :verify_on_exit! + + setup do + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + describe "import_metrics" do + test "base case" do + %DB.Resource{datagouv_id: datagouv_id} = insert(:resource) + + setup_http_response(datagouv_id, [ + %{ + "resource_id" => datagouv_id, + "metric_month" => "2022-08", + "monthly_download_resource" => 557_626 + }, + %{ + "resource_id" => datagouv_id, + "metric_month" => "2022-07", + "monthly_download_resource" => 343_617 + } + ]) + + assert DB.ResourceMonthlyMetric |> DB.Repo.all() |> Enum.empty?() + + Transport.Jobs.ImportMonthlyMetrics.import_metrics(:resource, datagouv_id) + + assert [ + %DB.ResourceMonthlyMetric{ + resource_datagouv_id: ^datagouv_id, + year_month: "2022-08", + metric_name: :downloads, + count: 557_626 + }, + %DB.ResourceMonthlyMetric{ + resource_datagouv_id: ^datagouv_id, + year_month: "2022-07", + metric_name: :downloads, + count: 343_617 + } + ] = DB.Repo.all(DB.ResourceMonthlyMetric) + end + + test "replaces existing records" do + %DB.Resource{datagouv_id: datagouv_id} = insert(:resource) + + insert(:resource_monthly_metric, + resource_datagouv_id: datagouv_id, + year_month: "2023-12", + metric_name: :downloads, + count: 42 + ) + + setup_http_response(datagouv_id, [ + %{ + "resource_id" => datagouv_id, + "metric_month" => "2023-12", + "monthly_download_resource" => 43 + }, + %{ + "resource_id" => datagouv_id, + "metric_month" => "2023-11", + "monthly_download_resource" => 1337 + } + ]) + + assert [ + %DB.ResourceMonthlyMetric{ + id: metric_id, + resource_datagouv_id: ^datagouv_id, + year_month: "2023-12", + metric_name: :downloads, + count: 42 + } + ] = DB.Repo.all(DB.ResourceMonthlyMetric) + + Transport.Jobs.ImportMonthlyMetrics.import_metrics(:resource, datagouv_id) + + assert [ + # Count has been updated, primary key is still the same + %DB.ResourceMonthlyMetric{ + id: ^metric_id, + resource_datagouv_id: ^datagouv_id, + year_month: "2023-12", + metric_name: :downloads, + count: 43, + inserted_at: inserted_at, + updated_at: updated_at + }, + # Has been inserted + %DB.ResourceMonthlyMetric{ + resource_datagouv_id: ^datagouv_id, + year_month: "2023-11", + metric_name: :downloads, + count: 1337 + } + ] = DB.Repo.all(DB.ResourceMonthlyMetric) + + # `updated_at` has been updated to reflect that this row has changed + assert DateTime.after?(updated_at, inserted_at) + end + end + + test "perform" do + %DB.Resource{datagouv_id: r1_datagouv_id} = insert(:resource) + %DB.Resource{datagouv_id: r2_datagouv_id} = insert(:resource) + + assert MapSet.new([r1_datagouv_id, r2_datagouv_id]) == ImportResourceMonthlyMetricsJob.resource_datagouv_ids() |> MapSet.new() + + setup_http_response(r1_datagouv_id, [ + %{ + "resource_id" => r1_datagouv_id, + "metric_month" => "2023-12", + "monthly_download_resource" => 43 + } + ]) + + setup_http_response(r2_datagouv_id, [ + %{ + "resource_id" => r2_datagouv_id, + "metric_month" => "2023-12", + "monthly_download_resource" => 5 + } + ]) + + assert :ok == perform_job(ImportResourceMonthlyMetricsJob, %{}) + + assert 2 == DB.Repo.aggregate(DB.ResourceMonthlyMetric, :count, :id) + + assert [ + %DB.ResourceMonthlyMetric{ + resource_datagouv_id: ^r1_datagouv_id, + year_month: "2023-12", + metric_name: :downloads, + count: 43 + } + ] = + DB.ResourceMonthlyMetric + |> where([rmm], rmm.resource_datagouv_id == ^r1_datagouv_id) + |> DB.Repo.all() + + assert [ + %DB.ResourceMonthlyMetric{ + resource_datagouv_id: ^r2_datagouv_id, + year_month: "2023-12", + metric_name: :downloads, + count: 5 + } + ] = + DB.ResourceMonthlyMetric + |> where([rmm], rmm.resource_datagouv_id == ^r2_datagouv_id) + |> DB.Repo.all() + end + + defp setup_http_response(datagouv_id, data) do + metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:resource, datagouv_id) + + expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] -> + {:ok, %Req.Response{status: 200, body: %{"data" => data}}} + end) + end +end