Skip to content

Commit

Permalink
Import des stats mensuelles data.gouv.fr pour les ressources
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineAugusti committed Jan 17, 2024
1 parent c471d43 commit 3e46684
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 39 deletions.
26 changes: 26 additions & 0 deletions apps/transport/lib/db/resource_monthly_metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule DB.ResourceMonthlyMetric do
@moduledoc """
Monthly metrics related to resources as given by the data.gouv.fr
API.
Example: https://metric-api.data.gouv.fr/api/resources/data/?metric_month__sort=desc&resource_id__exact=e0dbd217-15cd-4e28-9459-211a27511a34&page_size=50
"""
use Ecto.Schema
use TypedEctoSchema
import Ecto.Changeset

typed_schema "resource_monthly_metrics" do
belongs_to(:resource, DB.Resource, foreign_key: :resource_datagouv_id, references: :datagouv_id, type: :string)
field(:year_month, :string)
field(:metric_name, Ecto.Enum, values: [:downloads])
field(:count, :integer)
timestamps(type: :utc_datetime_usec)
end

def changeset(struct, attrs \\ %{}) do
struct
|> cast(attrs, [:resource_datagouv_id, :year_month, :metric_name, :count])
|> validate_required([:resource_datagouv_id, :year_month, :metric_name, :count])
|> validate_format(:year_month, ~r/\A2\d{3}-(0[1-9]|1[012])\z/)
|> validate_number(:count, greater_than_or_equal_to: 0)
end
end
104 changes: 70 additions & 34 deletions apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,15 @@ defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

# Number of months to fetch for each dataset
# 12*2 = 24 months
@nb_records 12 * 2
# The number of workers to run in parallel when importing metrics
@task_concurrency 5
@api_base_url URI.new!("https://metric-api.data.gouv.fr/api/datasets/data/")

@impl Oban.Worker
def perform(%Oban.Job{}) do
dataset_datagouv_ids()
|> Task.async_stream(
&import_metrics/1,
fn datagouv_id -> Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id) end,
max_concurrency: @task_concurrency,
on_timeout: :kill_task,
timeout: 10_000
Expand All @@ -31,59 +26,100 @@ defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do
def dataset_datagouv_ids do
DB.Dataset.base_query() |> select([dataset: d], d.datagouv_id) |> DB.Repo.all()
end
end

defmodule Transport.Jobs.ImportMonthlyMetrics do
@moduledoc """
Shared methods to import monthly metrics from the data.gouv.fr's API.
"""
require Logger

def import_metrics(dataset_datagouv_id) do
url = api_url(dataset_datagouv_id)
# Maximum number of months to fetch for each model
# 12*2 = 24 months
@nb_records 12 * 2

@doc """
iex> api_url(:dataset, "datagouv_id")
"https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc"
"""
def api_url(model_name, datagouv_id, page_size \\ @nb_records) when model_name in [:dataset, :resource] do
model_name
|> api_base_url()
|> URI.append_query(api_args(model_name, datagouv_id, page_size))
|> URI.to_string()
end

def import_metrics(model_name, datagouv_id) when model_name in [:dataset, :resource] do
url = api_url(model_name, datagouv_id)

case http_client().get(url, []) do
{:ok, %Req.Response{status: 200, body: body}} ->
body
|> Map.fetch!("data")
|> Enum.each(fn data -> insert_or_update(data, dataset_datagouv_id) end)
|> Enum.each(fn data -> insert_or_update(model_name, data, datagouv_id) end)

other ->
Logger.error(
"metric-api.data.gouv.fr unexpected HTTP response for Dataset##{dataset_datagouv_id}: #{inspect(other)}"
"metric-api.data.gouv.fr unexpected HTTP response for #{model_name}##{datagouv_id}: #{inspect(other)}"
)
end
end

@doc """
iex> api_url("datagouv_id")
"https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc"
"""
def api_url(dataset_datagouv_id) do
@api_base_url
|> URI.append_query(
URI.encode_query(dataset_id__exact: dataset_datagouv_id, page_size: @nb_records, metric_month__sort: "desc")
)
|> URI.to_string()
end

defp insert_or_update(
%{
"metric_month" => metric_month,
"monthly_visit" => monthly_visit,
"monthly_download_resource" => monthly_download_resource
},
dataset_datagouv_id
) do
Enum.each([{:views, monthly_visit}, {:downloads, monthly_download_resource}], fn {metric_name, count} ->
model_name,
%{"metric_month" => metric_month} = data,
datagouv_id
)
when model_name in [:dataset, :resource] do
Enum.each(metrics(model_name, data), fn {metric_name, count} ->
count = count || 0

%DB.DatasetMonthlyMetric{}
|> DB.DatasetMonthlyMetric.changeset(%{
dataset_datagouv_id: dataset_datagouv_id,
model_name
|> changeset(%{
datagouv_id: datagouv_id,
year_month: metric_month,
metric_name: metric_name,
count: count
})
|> DB.Repo.insert!(
conflict_target: [:dataset_datagouv_id, :year_month, :metric_name],
conflict_target: [String.to_existing_atom("#{model_name}_datagouv_id"), :year_month, :metric_name],
on_conflict: [set: [count: count, updated_at: DateTime.utc_now()]]
)
end)
end

defp metrics(:dataset, %{
"monthly_visit" => monthly_visit,
"monthly_download_resource" => monthly_download_resource
}) do
[{:views, monthly_visit}, {:downloads, monthly_download_resource}]
end

defp metrics(:resource, %{"monthly_download_resource" => monthly_download_resource}) do
[{:downloads, monthly_download_resource}]
end

defp changeset(:dataset, %{datagouv_id: datagouv_id} = params) do
params = Map.merge(params, %{dataset_datagouv_id: datagouv_id})
DB.DatasetMonthlyMetric.changeset(%DB.DatasetMonthlyMetric{}, params)
end

defp changeset(:resource, %{datagouv_id: datagouv_id} = params) do
params = Map.merge(params, %{resource_datagouv_id: datagouv_id})
DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, params)
end

defp api_args(:dataset, datagouv_id, page_size) do
[dataset_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query()
end

defp api_args(:resource, datagouv_id, page_size) do
[resource_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query()
end

defp api_base_url(model_name) when model_name in [:dataset, :resource] do
URI.new!("https://metric-api.data.gouv.fr/api/#{model_name}s/data/")
end

defp http_client, do: Transport.Req.impl()
end
29 changes: 29 additions & 0 deletions apps/transport/lib/jobs/import_resource_monthly_metrics_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Transport.Jobs.ImportResourceMonthlyMetricsJob do
@moduledoc """
Import monthly metrics related to resources coming from the data.gouv.fr's API.
This job is executed daily and imports metrics for all resources for the last 2 years.
Records are not supposed to change in the past, except for the current month.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query

# The number of workers to run in parallel when importing metrics
@task_concurrency 5

@impl Oban.Worker
def perform(%Oban.Job{}) do
resource_datagouv_ids()
|> Task.async_stream(
fn datagouv_id -> Transport.Jobs.ImportMonthlyMetrics.import_metrics(:resource, datagouv_id) end,
max_concurrency: @task_concurrency,
on_timeout: :kill_task,
timeout: 10_000
)
|> Stream.run()
end

def resource_datagouv_ids do
DB.Resource.base_query() |> select([resource: r], r.datagouv_id) |> DB.Repo.all()
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule DB.Repo.Migrations.ResourceMonthlyMetrics do
use Ecto.Migration

def change do
create table(:resource_monthly_metrics) do
add(:resource_datagouv_id, :string, null: false, size: 50)
# Example: 2023-12
add(:year_month, :string, null: false, size: 7)
add(:metric_name, :string, null: false, size: 50)
add(:count, :integer, null: false)
timestamps(type: :utc_datetime_usec)
end

create(unique_index(:resource_monthly_metrics, [:resource_datagouv_id, :year_month, :metric_name]))
end
end
42 changes: 42 additions & 0 deletions apps/transport/test/db/resource_monthly_metric_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule DB.ResourceMonthlyMetricTest do
use ExUnit.Case, async: true
import DB.Factory

setup do
Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

describe "changeset" do
test "can insert a record" do
resource = insert(:resource)

assert %Ecto.Changeset{valid?: true} =
changeset =
DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, %{
resource_datagouv_id: resource.datagouv_id,
metric_name: :downloads,
count: 42,
year_month: "2023-12"
})

DB.Repo.insert!(changeset)
end

test "identifies errors" do
assert %Ecto.Changeset{
valid?: false,
errors: [
{:count, _},
{:year_month, _},
{:metric_name, _}
]
} =
DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, %{
resource_datagouv_id: Ecto.UUID.generate(),
metric_name: :foo,
count: -1,
year_month: "bar"
})
end
end
end
4 changes: 4 additions & 0 deletions apps/transport/test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ defmodule DB.Factory do
%DB.DatasetMonthlyMetric{}
end

def resource_monthly_metric_factory do
%DB.ResourceMonthlyMetric{}
end

def resource_factory do
%DB.Resource{
last_import: DateTime.utc_now(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
use Oban.Testing, repo: DB.Repo
alias Transport.Jobs.ImportDatasetMonthlyMetricsJob

doctest ImportDatasetMonthlyMetricsJob, import: true
doctest Transport.Jobs.ImportMonthlyMetrics, import: true

setup :verify_on_exit!

Expand Down Expand Up @@ -35,7 +35,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do

assert DB.DatasetMonthlyMetric |> DB.Repo.all() |> Enum.empty?()

ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id)
Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id)

assert [
%DB.DatasetMonthlyMetric{
Expand Down Expand Up @@ -94,7 +94,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
}
] = DB.Repo.all(DB.DatasetMonthlyMetric)

ImportDatasetMonthlyMetricsJob.import_metrics(datagouv_id)
Transport.Jobs.ImportMonthlyMetrics.import_metrics(:dataset, datagouv_id)

assert [
# Count has been updated, primary key is still the same
Expand Down Expand Up @@ -126,7 +126,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
%DB.Dataset{datagouv_id: d2_datagouv_id} = insert(:dataset)
insert(:dataset, is_active: false)

assert [d1_datagouv_id, d2_datagouv_id] == ImportDatasetMonthlyMetricsJob.dataset_datagouv_ids()
assert MapSet.new([d1_datagouv_id, d2_datagouv_id]) == ImportDatasetMonthlyMetricsJob.dataset_datagouv_ids() |> MapSet.new()

setup_http_response(d1_datagouv_id, [
%{
Expand Down Expand Up @@ -190,7 +190,7 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
end

defp setup_http_response(datagouv_id, data) do
metrics_api_url = ImportDatasetMonthlyMetricsJob.api_url(datagouv_id)
metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:dataset, datagouv_id)

expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] ->
{:ok, %Req.Response{status: 200, body: %{"data" => data}}}
Expand Down
Loading

0 comments on commit 3e46684

Please sign in to comment.