diff --git a/apps/shared/lib/wrapper/wrapper_req.ex b/apps/shared/lib/wrapper/wrapper_req.ex new file mode 100644 index 0000000000..050b92cebe --- /dev/null +++ b/apps/shared/lib/wrapper/wrapper_req.ex @@ -0,0 +1,27 @@ +defmodule Transport.Req.Behaviour do + @moduledoc """ + At time of writing, Req does not introduce a behaviour allowing us to "Mox", as described here: + - https://github.com/wojtekmach/req/issues/143 + - https://github.com/wojtekmach/req/issues/246 + + We introduce an "above-level" wrapper with only the specific bits we are interested in, + in order to allow the use of Mox during tests. + """ + + # Ref: https://github.com/wojtekmach/req/blob/b40de7b7a0e7cc97a2c398ffcc42aa14962f3963/lib/req.ex#L545 + @type url() :: URI.t() | String.t() + # Simplified version for our needs + @callback get(url(), options :: keyword()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} +end + +defmodule Transport.Req do + @moduledoc """ + The wrapper for the behaviour, which acts as a central access point for `Req` operations. + By default the implementation is itself & delegates to `Req` directly. During tests, a Mox + # is configured instead + """ + def impl, do: Application.get_env(:transport, :req_impl, __MODULE__) + + @behaviour Transport.Req.Behaviour + defdelegate get(url, options), to: Req +end diff --git a/apps/shared/mix.exs b/apps/shared/mix.exs index 9e14593d29..0f0bdc183d 100644 --- a/apps/shared/mix.exs +++ b/apps/shared/mix.exs @@ -37,6 +37,7 @@ defmodule Shared.MixProject do [ {:timex, ">= 0.0.0"}, {:httpoison, ">= 0.0.0"}, + {:req, "~> 0.4.0"}, {:bypass, "~> 2.1", only: :test}, {:mox, "~> 1.0.0", only: :test}, # Mint is used by our HttpStream shared component, so we add an explicity dependency diff --git a/apps/shared/test/support/mocks.ex b/apps/shared/test/support/mocks.ex index 2614a1532e..ccd3983abb 100644 --- a/apps/shared/test/support/mocks.ex +++ b/apps/shared/test/support/mocks.ex @@ -1,4 +1,5 @@ Mox.defmock(Transport.HTTPoison.Mock, for: HTTPoison.Base) +Mox.defmock(Transport.Req.Mock, for: Transport.Req.Behaviour) Mox.defmock(Transport.Shared.Schemas.Mock, for: Transport.Shared.Schemas.Wrapper) Mox.defmock(Shared.Validation.GBFSValidator.Mock, for: Shared.Validation.GBFSValidator.Wrapper) Mox.defmock(Transport.DataVisualization.Mock, for: Transport.DataVisualization) diff --git a/apps/transport/lib/jobs/resource_history_job.ex b/apps/transport/lib/jobs/resource_history_job.ex index 413fa57ac4..c130059afe 100644 --- a/apps/transport/lib/jobs/resource_history_job.ex +++ b/apps/transport/lib/jobs/resource_history_job.ex @@ -52,6 +52,17 @@ defmodule Transport.Jobs.ResourceHistoryJob do alias DB.{Resource, ResourceHistory} import Transport.Jobs.Workflow.Notifier, only: [notify_workflow: 2] + @headers_to_keep [ + "content-disposition", + "content-encoding", + "content-length", + "content-type", + "etag", + "expires", + "if-modified-since", + "last-modified" + ] + @impl Oban.Worker def perform(%Oban.Job{args: %{"resource_id" => resource_id}} = job) do Logger.info("Running ResourceHistoryJob for resource##{resource_id}") @@ -72,7 +83,9 @@ defmodule Transport.Jobs.ResourceHistoryJob do notification = try do - %{resource_history_id: resource_history_id} = resource |> download_resource(path) |> process_download(resource) + %{resource_history_id: resource_history_id} = + :req |> download_resource(resource, path) |> process_download(resource) + %{"success" => true, "job_id" => job.id, "output" => %{resource_history_id: resource_history_id}} rescue e -> %{"success" => false, "job_id" => job.id, "reason" => inspect(e)} @@ -94,7 +107,7 @@ defmodule Transport.Jobs.ResourceHistoryJob do Logger.debug("Got an error while downloading resource##{resource_id}: #{message}") end - defp process_download({:ok, resource_path, headers, body}, %Resource{} = resource) do + defp process_download({:ok, resource_path, headers}, %Resource{} = resource) do download_datetime = DateTime.utc_now() hash = resource_hash(resource, resource_path) @@ -137,7 +150,7 @@ defmodule Transport.Jobs.ResourceHistoryJob do Map.merge(base, %{content_hash: hash, filesize: size}) end - Transport.S3.upload_to_s3!(:history, body, filename, acl: :public_read) + Transport.S3.stream_to_s3!(:history, resource_path, filename, acl: :public_read) %{id: resource_history_id} = store_resource_history!(resource, data) %{resource_history_id: resource_history_id} @@ -242,12 +255,34 @@ defmodule Transport.Jobs.ResourceHistoryJob do System.tmp_dir!() |> Path.join("resource_#{resource_id}_download") end - defp download_resource(%Resource{id: resource_id, url: url}, file_path) do + def download_resource(:req, %Resource{id: resource_id, url: url}, file_path) do + file_stream = File.stream!(file_path) + req_options = [compressed: false, decode_body: false, receive_timeout: 180_000, into: file_stream] + + # temporary fix until we figure out what must be done (https://github.com/wojtekmach/req/issues/270) + url = url |> String.replace("|", URI.encode("|")) + + case Transport.Req.impl().get(url, req_options) do + {:ok, %{status: 200} = r} -> + Logger.debug("Saved resource##{resource_id} to #{file_path}") + {:ok, file_path, relevant_http_headers(r)} + + {:ok, %{status: status_code}} -> + # NOTE: the file is still on disk at this point + {:error, "Got a non 200 status: #{status_code}"} + + {:error, error} -> + {:error, "Got an error: #{error |> inspect}"} + end + end + + # NOTE: dead code, kept to allow comparison in the coming weeks if needed + def download_resource(:legacy, %Resource{id: resource_id, url: url}, file_path) do case http_client().get(url, [], follow_redirect: true, recv_timeout: 180_000) do {:ok, %HTTPoison.Response{status_code: 200, body: body} = r} -> Logger.debug("Saving resource##{resource_id} to #{file_path}") File.write!(file_path, body) - {:ok, file_path, relevant_http_headers(r), body} + {:ok, file_path, relevant_http_headers(r)} {:ok, %HTTPoison.Response{status_code: status}} -> {:error, "Got a non 200 status: #{status}"} @@ -293,18 +328,24 @@ defmodule Transport.Jobs.ResourceHistoryJob do end def relevant_http_headers(%HTTPoison.Response{headers: headers}) do - headers_to_keep = [ - "content-disposition", - "content-encoding", - "content-length", - "content-type", - "etag", - "expires", - "if-modified-since", - "last-modified" - ] + headers |> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v} end) |> Map.take(@headers_to_keep) + end - headers |> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v} end) |> Map.take(headers_to_keep) + @doc """ + Extract only the HTTP headers we need. Concatenate them if multiple values are found (rare but can occur if we + allow more headers). + + iex> relevant_http_headers(%Req.Response{headers: %{"foo" => ["bar"]}}) + %{} + iex> relevant_http_headers(%Req.Response{headers: %{"content-type" => ["application/json"]}}) + %{"content-type" => "application/json"} + iex> relevant_http_headers(%Req.Response{headers: %{"content-type" => ["application/json", "but-also/csv"]}}) + %{"content-type" => "application/json, but-also/csv"} + """ + def relevant_http_headers(%Req.Response{headers: headers}) do + headers + |> Map.take(@headers_to_keep) + |> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v |> Enum.join(", ")} end) end defp latest_schema_version_to_date(%Resource{schema_name: nil}), do: nil diff --git a/apps/transport/test/transport/jobs/resource_history_job_test.exs b/apps/transport/test/transport/jobs/resource_history_job_test.exs index 9cc648d9eb..bb65dfca43 100644 --- a/apps/transport/test/transport/jobs/resource_history_job_test.exs +++ b/apps/transport/test/transport/jobs/resource_history_job_test.exs @@ -21,6 +21,45 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do @gtfs_path "#{__DIR__}/../../../../shared/test/validation/gtfs.zip" @gtfs_content File.read!(@gtfs_path) + def setup_req_mock(resource_url, csv_content, opts \\ []) do + opts = Keyword.validate!(opts, status: 200) + + Transport.Req.Mock + |> expect(:get, fn ^resource_url, options -> + assert options[:compressed] == false + assert options[:decode_body] == false + stream = options |> Keyword.fetch!(:into) + # fake write + File.write!(stream.path, csv_content) + + response = + %Req.Response{status: opts[:status], headers: %{}} + |> Req.Response.put_header("Content-Type", "application/octet-stream") + |> Req.Response.put_header("x-foo", "bar") + + {:ok, response} + end) + end + + def setup_aws_mock(resource_id) do + Transport.ExAWS.Mock + # Resource upload + |> expect(:request!, fn request -> + bucket_name = Transport.S3.bucket_name(:history) + assert Map.has_key?(request, :body) == false + + assert %{ + src: %File.Stream{} = _, + service: :s3, + path: path, + bucket: ^bucket_name, + opts: [acl: :public_read] + } = request + + assert String.starts_with?(path, "#{resource_id}/#{resource_id}.") + end) + end + describe "ResourceHistoryAndValidationDispatcherJob" do test "resources_to_historise" do ids = create_resources_for_history() @@ -242,34 +281,8 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do is_community_resource: false ) - Transport.HTTPoison.Mock - |> expect(:get, fn ^resource_url, _headers, options -> - assert options |> Keyword.fetch!(:follow_redirect) == true - - {:ok, - %HTTPoison.Response{ - status_code: 200, - body: @gtfs_content, - headers: [{"Content-Type", "application/octet-stream"}, {"x-foo", "bar"}] - }} - end) - - Transport.ExAWS.Mock - # Resource upload - |> expect(:request!, fn request -> - bucket_name = Transport.S3.bucket_name(:history) - - assert %{ - service: :s3, - http_method: :put, - path: path, - bucket: ^bucket_name, - body: @gtfs_content, - headers: %{"x-amz-acl" => "public-read"} - } = request - - assert String.starts_with?(path, "#{resource_id}/#{resource_id}.") - end) + setup_req_mock(resource_url, @gtfs_content) + setup_aws_mock(resource_id) assert 0 == count_resource_history() assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id}) @@ -330,34 +343,8 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do schema_version: schema_version = "0.4.1" ) - Transport.HTTPoison.Mock - |> expect(:get, fn ^resource_url, _headers, options -> - assert options |> Keyword.fetch!(:follow_redirect) == true - - {:ok, - %HTTPoison.Response{ - status_code: 200, - body: csv_content, - headers: [{"Content-Type", "application/octet-stream"}, {"x-foo", "bar"}] - }} - end) - - Transport.ExAWS.Mock - # Resource upload - |> expect(:request!, fn request -> - bucket_name = Transport.S3.bucket_name(:history) - - assert %{ - service: :s3, - http_method: :put, - path: path, - bucket: ^bucket_name, - body: ^csv_content, - headers: %{"x-amz-acl" => "public-read"} - } = request - - assert String.starts_with?(path, "#{resource_id}/#{resource_id}.") - end) + setup_req_mock(resource_url, csv_content) + setup_aws_mock(resource_id) Transport.Shared.Schemas.Mock |> expect(:transport_schemas, 1, fn -> @@ -435,11 +422,7 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do payload: %{"zip_metadata" => zip_metadata()} ) - Transport.HTTPoison.Mock - |> expect(:get, fn ^resource_url, _headers, options -> - assert options |> Keyword.fetch!(:follow_redirect) == true - {:ok, %HTTPoison.Response{status_code: 200, body: @gtfs_content, headers: []}} - end) + setup_req_mock(resource_url, @gtfs_content) assert 1 == count_resource_history() assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id}) @@ -466,11 +449,7 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do is_community_resource: false ) - Transport.HTTPoison.Mock - |> expect(:get, fn ^resource_url, _headers, options -> - assert options == [follow_redirect: true] - {:ok, %HTTPoison.Response{status_code: 500, body: "", headers: []}} - end) + setup_req_mock(resource_url, "", status: 500) assert 0 == count_resource_history() assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id}) diff --git a/config/dev.secret.template.exs b/config/dev.secret.template.exs index ecc3181411..f19e533673 100644 --- a/config/dev.secret.template.exs +++ b/config/dev.secret.template.exs @@ -32,6 +32,7 @@ config :transport, TransportWeb.Endpoint, config :ex_aws, access_key_id: System.fetch_env!("MINIO_ROOT_USER"), secret_access_key: System.fetch_env!("MINIO_ROOT_PASSWORD"), + cellar_url: "http://127.0.0.1:9000/~s", s3: [ scheme: "http://", host: "127.0.0.1", diff --git a/config/test.exs b/config/test.exs index 689daeae47..dddcf8f596 100644 --- a/config/test.exs +++ b/config/test.exs @@ -29,6 +29,7 @@ config :transport, cache_impl: Transport.Cache.Null, ex_aws_impl: Transport.ExAWS.Mock, httpoison_impl: Transport.HTTPoison.Mock, + req_impl: Transport.Req.Mock, history_impl: Transport.History.Fetcher.Mock, gtfs_validator: Shared.Validation.Validator.Mock, gbfs_validator_impl: Shared.Validation.GBFSValidator.Mock, diff --git a/scripts/compare-json.sh b/scripts/compare-json.sh new file mode 100755 index 0000000000..9a000e1413 --- /dev/null +++ b/scripts/compare-json.sh @@ -0,0 +1,3 @@ +#!/bin/bash +set -e +diff <(jq --sort-keys 'del(.timeStamp)' $1) <(jq --sort-keys 'del(.timeStamp)' $2) 2>&1 diff --git a/scripts/req_httpoison_testing.exs b/scripts/req_httpoison_testing.exs new file mode 100644 index 0000000000..53f903974b --- /dev/null +++ b/scripts/req_httpoison_testing.exs @@ -0,0 +1,197 @@ +# Logger.configure(level: :debug) + +resources = Transport.Jobs.ResourceHistoryAndValidationDispatcherJob.resources_to_historise() + +defmodule Downloader do + def handle(folder, resource) do + [:req, :legacy] + |> Enum.map(fn mode -> + file_path = Path.join(folder, "#{mode}-#{resource.id}.dat") + state_file_path = file_path <> ".state" + + unless File.exists?(state_file_path) do + outcome = + Transport.Jobs.ResourceHistoryJob.download_resource(mode, resource, file_path) + + unless outcome |> elem(0) == :ok do + File.rm(file_path) + end + + File.write!(state_file_path, outcome |> :erlang.term_to_binary()) + end + + {mode, File.read!(state_file_path) |> :erlang.binary_to_term()} + end) + |> Enum.into(%{}) + |> Map.put(:resource, resource) + end +end + +folder = Path.join(__ENV__.file, "../cache-dir") |> Path.expand() + +stream = + resources + |> Task.async_stream( + fn x -> Downloader.handle(folder, x) end, + max_concurrency: 10, + timeout: :infinity + ) + |> Stream.map(fn {:ok, x} -> x end) + +defmodule Comparer do + # compute checksum, using a checksum file for persistence since this is a costly operation + def checksum(file, :cached) do + checksum_file = file <> ".checksum" + + unless File.exists?(checksum_file) do + checksum = :crypto.hash(:md5, File.read!(file)) |> Base.encode16() + File.write!(checksum_file, checksum) + end + + File.read!(checksum_file) + end + + def get_zip_metadata(filename) do + {output, exit_code} = System.cmd("unzip", ["-Z1", filename], stderr_to_stdout: true) + + cond do + exit_code == 0 -> + output |> String.split("\n") |> Enum.sort() |> Enum.reject(fn x -> x == "" end) + + output =~ ~r/cannot find zipfile directory|signature not found/ -> + :corrupt + + true -> + raise "should not happen" + end + end + + def compare_json(file_1, file_2) do + {output, 0} = System.shell("scripts/compare-json.sh #{file_1} #{file_2} 2>&1") + output == "" + end + + def compare_csv(file_1, file_2) do + File.read!(file_1) |> String.split("\n") |> List.first() == + File.read!(file_2) |> String.split("\n") |> List.first() + end +end + +IO.puts("Total considered resources: count=#{resources |> Enum.count()}") + +{all_ok, not_all_ok} = + stream + |> Enum.split_with(fn x -> match?(%{legacy: {:ok, _, _}, req: {:ok, _, _}}, x) end) + +IO.puts("Download OK for both req & httpoison: count=#{all_ok |> Enum.count()}") +IO.write("How many OK files are not equivalent in a way or another? ") + +all_ok +|> Stream.map(fn x -> + {:ok, file_1, _} = x[:legacy] + {:ok, file_2, _} = x[:req] + same_checksum = Comparer.checksum(file_1, :cached) == Comparer.checksum(file_2, :cached) + Map.put(x, :same_checksum, same_checksum) +end) +|> Stream.filter(fn x -> !x[:same_checksum] end) +|> Stream.map(fn x -> + {:ok, file_1, _} = x[:legacy] + {:ok, file_2, _} = x[:req] + + x = + if x.resource.format == "GTFS" do + same_gtfs = Comparer.get_zip_metadata(file_1) == Comparer.get_zip_metadata(file_2) + Map.put(x, :same_gtfs, same_gtfs) + else + x + end + + x = + if x.resource.format == "geojson" do + same_json = Comparer.compare_json(file_1, file_2) + Map.put(x, :same_json, same_json) + else + x + end + + x = + if x.resource.format == "shp" do + same_shp = Comparer.get_zip_metadata(file_1) == Comparer.get_zip_metadata(file_2) + Map.put(x, :same_shp, same_shp) + else + x + end + + x = + if x.resource.format == "NeTEx" do + # at this point, the non-equal NeTEx are all zip containing zips + same_zipped_netex = Comparer.get_zip_metadata(file_1) == Comparer.get_zip_metadata(file_2) + Map.put(x, :same_zipped_netex, same_zipped_netex) + else + x + end + + x = + if x.resource.format == "csv" do + same_csv_headers = Comparer.compare_csv(file_1, file_2) + Map.put(x, :same_csv_headers, same_csv_headers) + else + x + end + + x +end) +|> Stream.reject(fn x -> x.resource.format == "GTFS" && x[:same_gtfs] end) +|> Stream.reject(fn x -> x.resource.format == "geojson" && x[:same_json] end) +|> Stream.reject(fn x -> x.resource.format == "shp" && x[:same_shp] end) +|> Stream.reject(fn x -> x.resource.format == "NeTEx" && x[:same_zipped_netex] end) +|> Stream.reject(fn x -> x.resource.format == "csv" && x[:same_csv_headers] end) +|> Enum.count() +|> IO.puts() + +IO.puts("Download not OK for at least one: count=#{not_all_ok |> Enum.count()}") + +{same_error, not_same_error} = + Enum.split_with(not_all_ok, fn x -> + case x do + %{req: {:error, err1}, legacy: {:error, err2}} -> + err1 == err2 && err1 |> String.starts_with?("Got a non 200 status") + + _ -> + false + end + end) + +IO.puts( + "Download not OK but req & httpoison lead to same http status error: count=" <> + (same_error |> Enum.count() |> inspect) +) + +IO.puts("Other: " <> (not_same_error |> Enum.count() |> inspect())) + +not_same_error +|> Enum.frequencies_by(fn x -> %{req: x[:req] |> elem(0), legacy: x[:legacy] |> elem(0)} end) +|> IO.inspect(IEx.inspect_opts()) + +not_same_error +|> Enum.map(fn x -> x[:resource].url end) +|> Enum.each(fn x -> IO.puts(x) end) + +same_error +|> Enum.frequencies_by(fn x -> + x[:legacy] |> elem(1) +end) +|> IO.inspect() + +same_error +|> Enum.group_by(fn x -> x[:legacy] |> elem(1) end, fn x -> + "https://transport.data.gouv.fr/resources/#{x.resource.id}" +end) +|> Enum.each(fn {k, v} -> + IO.puts("\n" <> k) + + v + |> Enum.each(fn x -> IO.puts("* #{x}") end) +end) + +IO.puts("============ done =============")