Skip to content

Commit

Permalink
Passage de HTTPoison à Req pour ResourceHistoryJob (#3585)
Browse files Browse the repository at this point in the history
* Fix broken config

* Save useful script

* Create a secondary path

* Implement req downloader

* Extract headers & implement req version

* Switch to req

* Stop passing the full HTTP body around

Optimize for memory consumption, since this is what we want, and the file is already stored on disk.

* Run mix format

* Run mix format

* Fix broken code

* Backport S3 streaming from #3560

* Enable streaming

* Update go.exs

* Rollback a part of the changes

* Start adding a Req wrapper (save my research)

* Add req to shared (where the wrapper will reside)

* Rework behaviour

* Setup Mox for testing

* Mix format

* Call the wrapper to allow testing

* Adapt one test

* Remove outdated TODO

* Extract method before reuse

* Extract method before reuse

* Adapt other test from http poison to req

* Adapt another test

* Support non-200 return codes & adapt test

* Add moduledoc (credo)

* Fix credo warning

* Remove TODO (I verified the merge was OK)

* Add moduledoc

* Remove TODO - there is no problem here

The :line_or_bytes option is only relevant when reading a file, after doc (https://hexdocs.pm/elixir/File.html#stream!/3) check. We're only doing a write here.

* Change method visibility to help me test without S3

* Start iterating on script to detect url regressions in more detail

* You had one job

* Add note

* Use a state file to properly analyse status codes

* Update go.exs

* Load & display

* Avoid code eval

* Parallelize the downloads a bit

* Save WIP

* Validate that all "OK on both sides" are actually equivalent

* Split not all ok and all ok

* Finalyse analysis

* Update go.exs

* Add fix for wojtekmach/req#270

* Update req_httpoison_testing.exs

* Apply feedback

* Add note

* Improve Req headers extraction

- support multiple values, by concatenating them
- filter them first (reduce unecessary processing)
- add new tests
- adapt broken existing tests

* Remove dead code

* Add space after comma

---------

Co-authored-by: Antoine Augusti <[email protected]>
  • Loading branch information
thbar and AntoineAugusti authored Nov 20, 2023
1 parent 35bd484 commit 6b6690e
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 82 deletions.
27 changes: 27 additions & 0 deletions apps/shared/lib/wrapper/wrapper_req.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Transport.Req.Behaviour do
@moduledoc """
At time of writing, Req does not introduce a behaviour allowing us to "Mox", as described here:
- https://github.com/wojtekmach/req/issues/143
- https://github.com/wojtekmach/req/issues/246
We introduce an "above-level" wrapper with only the specific bits we are interested in,
in order to allow the use of Mox during tests.
"""

# Ref: https://github.com/wojtekmach/req/blob/b40de7b7a0e7cc97a2c398ffcc42aa14962f3963/lib/req.ex#L545
@type url() :: URI.t() | String.t()
# Simplified version for our needs
@callback get(url(), options :: keyword()) :: {:ok, Req.Response.t()} | {:error, Exception.t()}
end

defmodule Transport.Req do
@moduledoc """
The wrapper for the behaviour, which acts as a central access point for `Req` operations.
By default the implementation is itself & delegates to `Req` directly. During tests, a Mox
# is configured instead
"""
def impl, do: Application.get_env(:transport, :req_impl, __MODULE__)

@behaviour Transport.Req.Behaviour
defdelegate get(url, options), to: Req
end
1 change: 1 addition & 0 deletions apps/shared/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule Shared.MixProject do
[
{:timex, ">= 0.0.0"},
{:httpoison, ">= 0.0.0"},
{:req, "~> 0.4.0"},
{:bypass, "~> 2.1", only: :test},
{:mox, "~> 1.0.0", only: :test},
# Mint is used by our HttpStream shared component, so we add an explicity dependency
Expand Down
1 change: 1 addition & 0 deletions apps/shared/test/support/mocks.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Mox.defmock(Transport.HTTPoison.Mock, for: HTTPoison.Base)
Mox.defmock(Transport.Req.Mock, for: Transport.Req.Behaviour)
Mox.defmock(Transport.Shared.Schemas.Mock, for: Transport.Shared.Schemas.Wrapper)
Mox.defmock(Shared.Validation.GBFSValidator.Mock, for: Shared.Validation.GBFSValidator.Wrapper)
Mox.defmock(Transport.DataVisualization.Mock, for: Transport.DataVisualization)
73 changes: 57 additions & 16 deletions apps/transport/lib/jobs/resource_history_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ defmodule Transport.Jobs.ResourceHistoryJob do
alias DB.{Resource, ResourceHistory}
import Transport.Jobs.Workflow.Notifier, only: [notify_workflow: 2]

@headers_to_keep [
"content-disposition",
"content-encoding",
"content-length",
"content-type",
"etag",
"expires",
"if-modified-since",
"last-modified"
]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"resource_id" => resource_id}} = job) do
Logger.info("Running ResourceHistoryJob for resource##{resource_id}")
Expand All @@ -72,7 +83,9 @@ defmodule Transport.Jobs.ResourceHistoryJob do

notification =
try do
%{resource_history_id: resource_history_id} = resource |> download_resource(path) |> process_download(resource)
%{resource_history_id: resource_history_id} =
:req |> download_resource(resource, path) |> process_download(resource)

%{"success" => true, "job_id" => job.id, "output" => %{resource_history_id: resource_history_id}}
rescue
e -> %{"success" => false, "job_id" => job.id, "reason" => inspect(e)}
Expand All @@ -94,7 +107,7 @@ defmodule Transport.Jobs.ResourceHistoryJob do
Logger.debug("Got an error while downloading resource##{resource_id}: #{message}")
end

defp process_download({:ok, resource_path, headers, body}, %Resource{} = resource) do
defp process_download({:ok, resource_path, headers}, %Resource{} = resource) do
download_datetime = DateTime.utc_now()

hash = resource_hash(resource, resource_path)
Expand Down Expand Up @@ -137,7 +150,7 @@ defmodule Transport.Jobs.ResourceHistoryJob do
Map.merge(base, %{content_hash: hash, filesize: size})
end

Transport.S3.upload_to_s3!(:history, body, filename, acl: :public_read)
Transport.S3.stream_to_s3!(:history, resource_path, filename, acl: :public_read)
%{id: resource_history_id} = store_resource_history!(resource, data)

%{resource_history_id: resource_history_id}
Expand Down Expand Up @@ -242,12 +255,34 @@ defmodule Transport.Jobs.ResourceHistoryJob do
System.tmp_dir!() |> Path.join("resource_#{resource_id}_download")
end

defp download_resource(%Resource{id: resource_id, url: url}, file_path) do
def download_resource(:req, %Resource{id: resource_id, url: url}, file_path) do
file_stream = File.stream!(file_path)
req_options = [compressed: false, decode_body: false, receive_timeout: 180_000, into: file_stream]

# temporary fix until we figure out what must be done (https://github.com/wojtekmach/req/issues/270)
url = url |> String.replace("|", URI.encode("|"))

case Transport.Req.impl().get(url, req_options) do
{:ok, %{status: 200} = r} ->
Logger.debug("Saved resource##{resource_id} to #{file_path}")
{:ok, file_path, relevant_http_headers(r)}

{:ok, %{status: status_code}} ->
# NOTE: the file is still on disk at this point
{:error, "Got a non 200 status: #{status_code}"}

{:error, error} ->
{:error, "Got an error: #{error |> inspect}"}
end
end

# NOTE: dead code, kept to allow comparison in the coming weeks if needed
def download_resource(:legacy, %Resource{id: resource_id, url: url}, file_path) do
case http_client().get(url, [], follow_redirect: true, recv_timeout: 180_000) do
{:ok, %HTTPoison.Response{status_code: 200, body: body} = r} ->
Logger.debug("Saving resource##{resource_id} to #{file_path}")
File.write!(file_path, body)
{:ok, file_path, relevant_http_headers(r), body}
{:ok, file_path, relevant_http_headers(r)}

{:ok, %HTTPoison.Response{status_code: status}} ->
{:error, "Got a non 200 status: #{status}"}
Expand Down Expand Up @@ -293,18 +328,24 @@ defmodule Transport.Jobs.ResourceHistoryJob do
end

def relevant_http_headers(%HTTPoison.Response{headers: headers}) do
headers_to_keep = [
"content-disposition",
"content-encoding",
"content-length",
"content-type",
"etag",
"expires",
"if-modified-since",
"last-modified"
]
headers |> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v} end) |> Map.take(@headers_to_keep)
end

headers |> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v} end) |> Map.take(headers_to_keep)
@doc """
Extract only the HTTP headers we need. Concatenate them if multiple values are found (rare but can occur if we
allow more headers).
iex> relevant_http_headers(%Req.Response{headers: %{"foo" => ["bar"]}})
%{}
iex> relevant_http_headers(%Req.Response{headers: %{"content-type" => ["application/json"]}})
%{"content-type" => "application/json"}
iex> relevant_http_headers(%Req.Response{headers: %{"content-type" => ["application/json", "but-also/csv"]}})
%{"content-type" => "application/json, but-also/csv"}
"""
def relevant_http_headers(%Req.Response{headers: headers}) do
headers
|> Map.take(@headers_to_keep)
|> Enum.into(%{}, fn {h, v} -> {String.downcase(h), v |> Enum.join(", ")} end)
end

defp latest_schema_version_to_date(%Resource{schema_name: nil}), do: nil
Expand Down
111 changes: 45 additions & 66 deletions apps/transport/test/transport/jobs/resource_history_job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,45 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do
@gtfs_path "#{__DIR__}/../../../../shared/test/validation/gtfs.zip"
@gtfs_content File.read!(@gtfs_path)

def setup_req_mock(resource_url, csv_content, opts \\ []) do
opts = Keyword.validate!(opts, status: 200)

Transport.Req.Mock
|> expect(:get, fn ^resource_url, options ->
assert options[:compressed] == false
assert options[:decode_body] == false
stream = options |> Keyword.fetch!(:into)
# fake write
File.write!(stream.path, csv_content)

response =
%Req.Response{status: opts[:status], headers: %{}}
|> Req.Response.put_header("Content-Type", "application/octet-stream")
|> Req.Response.put_header("x-foo", "bar")

{:ok, response}
end)
end

def setup_aws_mock(resource_id) do
Transport.ExAWS.Mock
# Resource upload
|> expect(:request!, fn request ->
bucket_name = Transport.S3.bucket_name(:history)
assert Map.has_key?(request, :body) == false

assert %{
src: %File.Stream{} = _,
service: :s3,
path: path,
bucket: ^bucket_name,
opts: [acl: :public_read]
} = request

assert String.starts_with?(path, "#{resource_id}/#{resource_id}.")
end)
end

describe "ResourceHistoryAndValidationDispatcherJob" do
test "resources_to_historise" do
ids = create_resources_for_history()
Expand Down Expand Up @@ -242,34 +281,8 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do
is_community_resource: false
)

Transport.HTTPoison.Mock
|> expect(:get, fn ^resource_url, _headers, options ->
assert options |> Keyword.fetch!(:follow_redirect) == true

{:ok,
%HTTPoison.Response{
status_code: 200,
body: @gtfs_content,
headers: [{"Content-Type", "application/octet-stream"}, {"x-foo", "bar"}]
}}
end)

Transport.ExAWS.Mock
# Resource upload
|> expect(:request!, fn request ->
bucket_name = Transport.S3.bucket_name(:history)

assert %{
service: :s3,
http_method: :put,
path: path,
bucket: ^bucket_name,
body: @gtfs_content,
headers: %{"x-amz-acl" => "public-read"}
} = request

assert String.starts_with?(path, "#{resource_id}/#{resource_id}.")
end)
setup_req_mock(resource_url, @gtfs_content)
setup_aws_mock(resource_id)

assert 0 == count_resource_history()
assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id})
Expand Down Expand Up @@ -330,34 +343,8 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do
schema_version: schema_version = "0.4.1"
)

Transport.HTTPoison.Mock
|> expect(:get, fn ^resource_url, _headers, options ->
assert options |> Keyword.fetch!(:follow_redirect) == true

{:ok,
%HTTPoison.Response{
status_code: 200,
body: csv_content,
headers: [{"Content-Type", "application/octet-stream"}, {"x-foo", "bar"}]
}}
end)

Transport.ExAWS.Mock
# Resource upload
|> expect(:request!, fn request ->
bucket_name = Transport.S3.bucket_name(:history)

assert %{
service: :s3,
http_method: :put,
path: path,
bucket: ^bucket_name,
body: ^csv_content,
headers: %{"x-amz-acl" => "public-read"}
} = request

assert String.starts_with?(path, "#{resource_id}/#{resource_id}.")
end)
setup_req_mock(resource_url, csv_content)
setup_aws_mock(resource_id)

Transport.Shared.Schemas.Mock
|> expect(:transport_schemas, 1, fn ->
Expand Down Expand Up @@ -435,11 +422,7 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do
payload: %{"zip_metadata" => zip_metadata()}
)

Transport.HTTPoison.Mock
|> expect(:get, fn ^resource_url, _headers, options ->
assert options |> Keyword.fetch!(:follow_redirect) == true
{:ok, %HTTPoison.Response{status_code: 200, body: @gtfs_content, headers: []}}
end)
setup_req_mock(resource_url, @gtfs_content)

assert 1 == count_resource_history()
assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id})
Expand All @@ -466,11 +449,7 @@ defmodule Transport.Test.Transport.Jobs.ResourceHistoryJobTest do
is_community_resource: false
)

Transport.HTTPoison.Mock
|> expect(:get, fn ^resource_url, _headers, options ->
assert options == [follow_redirect: true]
{:ok, %HTTPoison.Response{status_code: 500, body: "", headers: []}}
end)
setup_req_mock(resource_url, "", status: 500)

assert 0 == count_resource_history()
assert :ok == perform_job(ResourceHistoryJob, %{resource_id: resource_id})
Expand Down
1 change: 1 addition & 0 deletions config/dev.secret.template.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ config :transport, TransportWeb.Endpoint,
config :ex_aws,
access_key_id: System.fetch_env!("MINIO_ROOT_USER"),
secret_access_key: System.fetch_env!("MINIO_ROOT_PASSWORD"),
cellar_url: "http://127.0.0.1:9000/~s",
s3: [
scheme: "http://",
host: "127.0.0.1",
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ config :transport,
cache_impl: Transport.Cache.Null,
ex_aws_impl: Transport.ExAWS.Mock,
httpoison_impl: Transport.HTTPoison.Mock,
req_impl: Transport.Req.Mock,
history_impl: Transport.History.Fetcher.Mock,
gtfs_validator: Shared.Validation.Validator.Mock,
gbfs_validator_impl: Shared.Validation.GBFSValidator.Mock,
Expand Down
3 changes: 3 additions & 0 deletions scripts/compare-json.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
set -e
diff <(jq --sort-keys 'del(.timeStamp)' $1) <(jq --sort-keys 'del(.timeStamp)' $2) 2>&1
Loading

0 comments on commit 6b6690e

Please sign in to comment.