Skip to content

Commit

Permalink
Code dans le bon module
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitfred committed Dec 19, 2024
1 parent c3344a0 commit d1ad80d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 26 deletions.
12 changes: 6 additions & 6 deletions apps/transport/lib/registry/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ defmodule Transport.Registry.Engine do
Stream eligible resources and run extractors to produce a raw registry at the end.
"""

alias Transport.Registry.Extractor
alias Transport.Registry.GTFS
alias Transport.Registry.Model.Stop
alias Transport.Registry.Result

import Ecto.Query

Expand All @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do
create_empty_csv_with_headers(output_file)

enumerate_gtfs_resources(limit, formats)
|> Extractor.map_result(&prepare_extractor/1)
|> Result.map_result(&prepare_extractor/1)
|> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000)
# one for Task.async_stream
|> Extractor.cat_results()
|> Result.cat_results()
# one for download/1
|> Extractor.cat_results()
|> Extractor.map_result(&extract_from_archive/1)
|> Result.cat_results()
|> Result.map_result(&extract_from_archive/1)
|> dump_to_csv(output_file)
end

Expand Down Expand Up @@ -84,7 +84,7 @@ defmodule Transport.Registry.Engine do
end
end

@spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()])
@spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()])
def extract_from_archive({extractor, file}) do
Logger.debug("extract_from_archive #{extractor} #{file}")
extractor.extract_from_archive(file)
Expand Down
18 changes: 2 additions & 16 deletions apps/transport/lib/registry/extractor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,7 @@ defmodule Transport.Registry.Extractor do
require Logger

alias Transport.Registry.Model.Stop
alias Transport.Registry.Result

@type result(positive) :: {:ok, positive} | {:error, binary()}

@callback extract_from_archive(path :: Path.t()) :: result([Stop.t()])

@spec cat_results(Stream.t(result(term()))) :: Stream.t(term())
def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1)

defp keep_ok({:ok, result}), do: [result]
defp keep_ok(_), do: []

@spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term())
def map_result(enumerable, mapper) do
enumerable
|> Stream.map(mapper)
|> cat_results()
end
@callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()])
end
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Transport.Registry.ExtractorTest do
defmodule Transport.Registry.ResultTest do
use ExUnit.Case, async: false

require Integer
alias Transport.Registry.Extractor
alias Transport.Registry.Result

test "cat_results" do
assert [] == cat_results([])
Expand All @@ -16,11 +16,11 @@ defmodule Transport.Registry.ExtractorTest do
end

defp cat_results(enumerable) do
enumerable |> Extractor.cat_results() |> Enum.to_list()
enumerable |> Result.cat_results() |> Enum.to_list()
end

defp map_result(enumerable, mapper) do
enumerable |> Extractor.map_result(mapper) |> Enum.to_list()
enumerable |> Result.map_result(mapper) |> Enum.to_list()
end

defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i}
Expand Down

0 comments on commit d1ad80d

Please sign in to comment.