Skip to content

Commit

Permalink
GTFSDiff job accepte également des URLs
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitfred committed Mar 5, 2025
1 parent 71ba87a commit f296355
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 52 deletions.
106 changes: 79 additions & 27 deletions apps/transport/lib/jobs/gtfs_diff_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,89 @@ defmodule Transport.Jobs.GTFSDiff do
use Oban.Worker, max_attempts: 1, queue: :on_demand_validation

@impl Oban.Worker
def perform(
%Oban.Job{
args: %{
"gtfs_file_name_1" => gtfs_file_name_1,
"gtfs_file_name_2" => gtfs_file_name_2,
"gtfs_original_file_name_1" => gtfs_original_file_name_1,
"gtfs_original_file_name_2" => gtfs_original_file_name_2,
"bucket" => bucket,
"locale" => locale,
"profile" => profile
}
} = job
) do
def perform(%Oban.Job{args: args} = job) do
Oban.Notifier.notify(Oban, :gossip, %{started: job.id})

{:ok, unzip_1} = Transport.Unzip.S3.get_unzip(gtfs_file_name_1, bucket)
{:ok, unzip_2} = Transport.Unzip.S3.get_unzip(gtfs_file_name_2, bucket)
case args do
%{"gtfs_object_1" => gtfs_object_1, "gtfs_object_2" => gtfs_object_2} ->
process_s3_objects(job.id, gtfs_object_1, gtfs_object_2, args)

%{"gtfs_url_1" => gtfs_url_1, "gtfs_url_2" => gtfs_url_2} ->
process_urls(job.id, gtfs_url_1, gtfs_url_2, args)
end

:ok
end

@impl Oban.Worker
def timeout(_job), do: :timer.seconds(job_timeout_sec())

# 30 minutes, in seconds
def job_timeout_sec, do: 30 * 60

defp process_s3_objects(job_id, gtfs_object_1, gtfs_object_2, args) do
bucket_name = Transport.S3.bucket_name(:gtfs_diff)

{:ok, unzip_1} = Transport.Unzip.S3.get_unzip(gtfs_object_1, bucket_name)
{:ok, unzip_2} = Transport.Unzip.S3.get_unzip(gtfs_object_2, bucket_name)

process_diff(job_id, unzip_1, unzip_2, args)
after
Transport.S3.delete_object!(:gtfs_diff, gtfs_object_1)
Transport.S3.delete_object!(:gtfs_diff, gtfs_object_2)
end

defp process_urls(job_id, gtfs_url_1, gtfs_url_2, args) do
{:ok, file_1} = download_resource(gtfs_url_1)
{:ok, file_2} = download_resource(gtfs_url_2)

try do
{:ok, unzip_1} = unzip_local(file_1)
{:ok, unzip_2} = unzip_local(file_2)

process_diff(job_id, unzip_1, unzip_2, args)
after
File.rm(file_1)
File.rm(file_2)
end
end

defp download_resource(url) do
file_path = mk_tmp_file()
file_stream = File.stream!(file_path)

req_options = [compressed: false, decode_body: false, receive_timeout: 180_000, into: file_stream]

case Transport.Req.impl().get(url, req_options) do
{:ok, %{status: 200}} ->
{:ok, file_path}

{:ok, %{status: status_code}} ->
# NOTE: the file is still on disk at this point
{:error, "Got a non 200 status: #{status_code}"}

{:error, error} ->
{:error, "Got an error: #{error |> inspect}"}
end
end

defp mk_tmp_file do
System.tmp_dir!() |> Path.join(Ecto.UUID.generate())
end

defp unzip_local(file) do
file |> Unzip.LocalFile.open() |> Unzip.new()
end

defp process_diff(job_id, unzip_1, unzip_2, %{
"gtfs_original_file_name_1" => gtfs_original_file_name_1,
"gtfs_original_file_name_2" => gtfs_original_file_name_2,
"profile" => profile,
"locale" => locale
}) do
notify = fn log_msg ->
Oban.Notifier.notify(Oban, :gossip, %{
running: job.id,
running: job_id,
log: log_msg
})
end
Expand All @@ -40,23 +102,13 @@ defmodule Transport.Jobs.GTFSDiff do
Transport.S3.stream_to_s3!(:gtfs_diff, filepath, diff_file_name, acl: :public_read)

Oban.Notifier.notify(Oban, :gossip, %{
complete: job.id,
complete: job_id,
diff_file_url: Transport.S3.permanent_url(:gtfs_diff, diff_file_name),
gtfs_original_file_name_1: gtfs_original_file_name_1,
gtfs_original_file_name_2: gtfs_original_file_name_2
})
after
File.rm(filepath)
end

Transport.S3.delete_object!(:gtfs_diff, gtfs_file_name_1)
Transport.S3.delete_object!(:gtfs_diff, gtfs_file_name_2)
:ok
end

@impl Oban.Worker
def timeout(_job), do: :timer.seconds(job_timeout_sec())

# 30 minutes, in seconds
def job_timeout_sec, do: 30 * 60
end
100 changes: 75 additions & 25 deletions apps/transport/lib/transport_web/live/gtfs_diff_select_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,37 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do
import TransportWeb.Live.GTFSDiffSelectLive.Shared
import TransportWeb.Live.GTFSDiffSelectLive.Steps

def mount(_params, %{"locale" => locale} = _session, socket) do
def mount(params, %{"locale" => locale} = _session, socket) do
Gettext.put_locale(locale)

{:ok,
socket
|> clean_slate()
|> setup_uploads()}
socket = clean_slate(socket)

case params do
:not_mounted_at_router -> {:ok, setup_uploads(socket)}
%{} -> {:ok, setup_uploads(socket)}
_ -> {:ok, do_handle_params(socket, params)}
end
end

def handle_params(params, _uri, socket) do
{:noreply, set_profile(socket, params)}
{:noreply, do_handle_params(socket, params)}
end

defp do_handle_params(socket, params) do
socket |> set_profile(params) |> handle_urls(params)
end

defp handle_urls(socket, %{"reference_url" => reference_url, "modified_url" => modified_url}) do
profile = socket.assigns[:profile]

socket
|> assign(current_step: :analysis)
|> trigger_job(reference_url, modified_url, profile)
|> scroll_to_steps()
end

defp handle_urls(socket, _) do
setup_uploads(socket)
end

def set_profile(socket, %{"profile" => profile}) do
Expand Down Expand Up @@ -71,18 +91,9 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do
end

def handle_info(:enqueue_job, socket) do
[gtfs_file_name_2, gtfs_file_name_1] = read_uploaded_files(socket)

:ok = listen_job_notifications()

job_id = schedule_job(gtfs_file_name_1, gtfs_file_name_2, socket.assigns[:profile])

socket =
socket
|> assign(job_id: job_id)
|> assign(diff_logs: [dgettext("gtfs-diff", "Job started")])
[gtfs_file_2, gtfs_file_1] = read_uploaded_files(socket)

{:noreply, socket}
{:noreply, socket |> trigger_job(gtfs_file_1, gtfs_file_2, socket.assigns[:profile])}
end

def handle_info({:generate_diff_summary, diff_file_url}, socket) do
Expand Down Expand Up @@ -113,6 +124,16 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do
{:noreply, socket}
end

defp trigger_job(socket, gtfs_file_1, gtfs_file_2, profile) do
:ok = listen_job_notifications()

job_id = schedule_job(gtfs_file_1, gtfs_file_2, profile)

socket
|> assign(job_id: job_id)
|> assign(diff_logs: [dgettext("gtfs-diff", "Job started")])
end

# job has started
def handle_job_notification(%{"started" => job_id}, job_id, socket) do
schedule_timeout()
Expand Down Expand Up @@ -143,6 +164,8 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do
|> scroll_to_steps()
end

def handle_job_notification(_, _, socket), do: socket

defp read_uploaded_files(socket) do
consume_uploaded_entries(socket, :gtfs, &consume_uploaded_entry/2)
end
Expand All @@ -153,17 +176,44 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do
{:ok, %{uploaded_file_name: file_name, original_file_name: original_file_name}}
end

defp schedule_job(gtfs_file_name_1, gtfs_file_name_2, profile) do
%{id: job_id} =
defp schedule_job(gtfs_url_1, gtfs_url_2, profile) when is_binary(gtfs_url_1) do
schedule_job(
%{
gtfs_file_name_1: gtfs_file_name_1.uploaded_file_name,
gtfs_file_name_2: gtfs_file_name_2.uploaded_file_name,
gtfs_original_file_name_1: gtfs_file_name_1.original_file_name,
gtfs_original_file_name_2: gtfs_file_name_2.original_file_name,
bucket: Transport.S3.bucket_name(:gtfs_diff),
gtfs_url_1: gtfs_url_1,
gtfs_url_2: gtfs_url_2,
gtfs_original_file_name_1: "reference.zip",
gtfs_original_file_name_2: "modified.zip"
},
profile
)
end

defp schedule_job(
%{uploaded_file_name: gtfs_object_1, original_file_name: gtfs_original_file_name_1},
%{
uploaded_file_name: gtfs_object_2,
original_file_name: gtfs_original_file_name_2
},
profile
) do
schedule_job(
%{
gtfs_object_1: gtfs_object_1,
gtfs_object_2: gtfs_object_2,
gtfs_original_file_name_1: gtfs_original_file_name_1,
gtfs_original_file_name_2: gtfs_original_file_name_2
},
profile
)
end

defp schedule_job(args, profile) do
%{id: job_id} =
args
|> Map.merge(%{
locale: Gettext.get_locale(),
profile: profile
}
})
|> Transport.Jobs.GTFSDiff.new()
|> Oban.insert!()

Expand Down

0 comments on commit f296355

Please sign in to comment.