diff --git a/apps/transport/lib/jobs/gtfs_diff_job.ex b/apps/transport/lib/jobs/gtfs_diff_job.ex index 3588c60362..533b7ea340 100644 --- a/apps/transport/lib/jobs/gtfs_diff_job.ex +++ b/apps/transport/lib/jobs/gtfs_diff_job.ex @@ -5,27 +5,89 @@ defmodule Transport.Jobs.GTFSDiff do use Oban.Worker, max_attempts: 1, queue: :on_demand_validation @impl Oban.Worker - def perform( - %Oban.Job{ - args: %{ - "gtfs_file_name_1" => gtfs_file_name_1, - "gtfs_file_name_2" => gtfs_file_name_2, - "gtfs_original_file_name_1" => gtfs_original_file_name_1, - "gtfs_original_file_name_2" => gtfs_original_file_name_2, - "bucket" => bucket, - "locale" => locale, - "profile" => profile - } - } = job - ) do + def perform(%Oban.Job{args: args} = job) do Oban.Notifier.notify(Oban, :gossip, %{started: job.id}) - {:ok, unzip_1} = Transport.Unzip.S3.get_unzip(gtfs_file_name_1, bucket) - {:ok, unzip_2} = Transport.Unzip.S3.get_unzip(gtfs_file_name_2, bucket) + case args do + %{"gtfs_object_1" => gtfs_object_1, "gtfs_object_2" => gtfs_object_2} -> + process_s3_objects(job.id, gtfs_object_1, gtfs_object_2, args) + %{"gtfs_url_1" => gtfs_url_1, "gtfs_url_2" => gtfs_url_2} -> + process_urls(job.id, gtfs_url_1, gtfs_url_2, args) + end + + :ok + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(job_timeout_sec()) + + # 30 minutes, in seconds + def job_timeout_sec, do: 30 * 60 + + defp process_s3_objects(job_id, gtfs_object_1, gtfs_object_2, args) do + bucket_name = Transport.S3.bucket_name(:gtfs_diff) + + {:ok, unzip_1} = Transport.Unzip.S3.get_unzip(gtfs_object_1, bucket_name) + {:ok, unzip_2} = Transport.Unzip.S3.get_unzip(gtfs_object_2, bucket_name) + + process_diff(job_id, unzip_1, unzip_2, args) + after + Transport.S3.delete_object!(:gtfs_diff, gtfs_object_1) + Transport.S3.delete_object!(:gtfs_diff, gtfs_object_2) + end + + defp process_urls(job_id, gtfs_url_1, gtfs_url_2, args) do + {:ok, file_1} = download_resource(gtfs_url_1) + {:ok, file_2} = download_resource(gtfs_url_2) + + try do + {:ok, unzip_1} = unzip_local(file_1) + {:ok, unzip_2} = unzip_local(file_2) + + process_diff(job_id, unzip_1, unzip_2, args) + after + File.rm(file_1) + File.rm(file_2) + end + end + + defp download_resource(url) do + file_path = mk_tmp_file() + file_stream = File.stream!(file_path) + + req_options = [compressed: false, decode_body: false, receive_timeout: 180_000, into: file_stream] + + case Transport.Req.impl().get(url, req_options) do + {:ok, %{status: 200}} -> + {:ok, file_path} + + {:ok, %{status: status_code}} -> + # NOTE: the file is still on disk at this point + {:error, "Got a non 200 status: #{status_code}"} + + {:error, error} -> + {:error, "Got an error: #{error |> inspect}"} + end + end + + defp mk_tmp_file do + System.tmp_dir!() |> Path.join(Ecto.UUID.generate()) + end + + defp unzip_local(file) do + file |> Unzip.LocalFile.open() |> Unzip.new() + end + + defp process_diff(job_id, unzip_1, unzip_2, %{ + "gtfs_original_file_name_1" => gtfs_original_file_name_1, + "gtfs_original_file_name_2" => gtfs_original_file_name_2, + "profile" => profile, + "locale" => locale + }) do notify = fn log_msg -> Oban.Notifier.notify(Oban, :gossip, %{ - running: job.id, + running: job_id, log: log_msg }) end @@ -40,7 +102,7 @@ defmodule Transport.Jobs.GTFSDiff do Transport.S3.stream_to_s3!(:gtfs_diff, filepath, diff_file_name, acl: :public_read) Oban.Notifier.notify(Oban, :gossip, %{ - complete: job.id, + complete: job_id, diff_file_url: Transport.S3.permanent_url(:gtfs_diff, diff_file_name), gtfs_original_file_name_1: gtfs_original_file_name_1, gtfs_original_file_name_2: gtfs_original_file_name_2 @@ -48,15 +110,5 @@ defmodule Transport.Jobs.GTFSDiff do after File.rm(filepath) end - - Transport.S3.delete_object!(:gtfs_diff, gtfs_file_name_1) - Transport.S3.delete_object!(:gtfs_diff, gtfs_file_name_2) - :ok end - - @impl Oban.Worker - def timeout(_job), do: :timer.seconds(job_timeout_sec()) - - # 30 minutes, in seconds - def job_timeout_sec, do: 30 * 60 end diff --git a/apps/transport/lib/transport_web/live/gtfs_diff_select_live.ex b/apps/transport/lib/transport_web/live/gtfs_diff_select_live.ex index b953a455cc..59977e13fa 100644 --- a/apps/transport/lib/transport_web/live/gtfs_diff_select_live.ex +++ b/apps/transport/lib/transport_web/live/gtfs_diff_select_live.ex @@ -14,17 +14,37 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do import TransportWeb.Live.GTFSDiffSelectLive.Shared import TransportWeb.Live.GTFSDiffSelectLive.Steps - def mount(_params, %{"locale" => locale} = _session, socket) do + def mount(params, %{"locale" => locale} = _session, socket) do Gettext.put_locale(locale) - {:ok, - socket - |> clean_slate() - |> setup_uploads()} + socket = clean_slate(socket) + + case params do + :not_mounted_at_router -> {:ok, setup_uploads(socket)} + %{} -> {:ok, setup_uploads(socket)} + _ -> {:ok, do_handle_params(socket, params)} + end end def handle_params(params, _uri, socket) do - {:noreply, set_profile(socket, params)} + {:noreply, do_handle_params(socket, params)} + end + + defp do_handle_params(socket, params) do + socket |> set_profile(params) |> handle_urls(params) + end + + defp handle_urls(socket, %{"reference_url" => reference_url, "modified_url" => modified_url}) do + profile = socket.assigns[:profile] + + socket + |> assign(current_step: :analysis) + |> trigger_job(reference_url, modified_url, profile) + |> scroll_to_steps() + end + + defp handle_urls(socket, _) do + setup_uploads(socket) end def set_profile(socket, %{"profile" => profile}) do @@ -71,18 +91,9 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do end def handle_info(:enqueue_job, socket) do - [gtfs_file_name_2, gtfs_file_name_1] = read_uploaded_files(socket) - - :ok = listen_job_notifications() - - job_id = schedule_job(gtfs_file_name_1, gtfs_file_name_2, socket.assigns[:profile]) - - socket = - socket - |> assign(job_id: job_id) - |> assign(diff_logs: [dgettext("gtfs-diff", "Job started")]) + [gtfs_file_2, gtfs_file_1] = read_uploaded_files(socket) - {:noreply, socket} + {:noreply, socket |> trigger_job(gtfs_file_1, gtfs_file_2, socket.assigns[:profile])} end def handle_info({:generate_diff_summary, diff_file_url}, socket) do @@ -113,6 +124,16 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do {:noreply, socket} end + defp trigger_job(socket, gtfs_file_1, gtfs_file_2, profile) do + :ok = listen_job_notifications() + + job_id = schedule_job(gtfs_file_1, gtfs_file_2, profile) + + socket + |> assign(job_id: job_id) + |> assign(diff_logs: [dgettext("gtfs-diff", "Job started")]) + end + # job has started def handle_job_notification(%{"started" => job_id}, job_id, socket) do schedule_timeout() @@ -143,6 +164,8 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do |> scroll_to_steps() end + def handle_job_notification(_, _, socket), do: socket + defp read_uploaded_files(socket) do consume_uploaded_entries(socket, :gtfs, &consume_uploaded_entry/2) end @@ -153,17 +176,44 @@ defmodule TransportWeb.Live.GTFSDiffSelectLive do {:ok, %{uploaded_file_name: file_name, original_file_name: original_file_name}} end - defp schedule_job(gtfs_file_name_1, gtfs_file_name_2, profile) do - %{id: job_id} = + defp schedule_job(gtfs_url_1, gtfs_url_2, profile) when is_binary(gtfs_url_1) do + schedule_job( %{ - gtfs_file_name_1: gtfs_file_name_1.uploaded_file_name, - gtfs_file_name_2: gtfs_file_name_2.uploaded_file_name, - gtfs_original_file_name_1: gtfs_file_name_1.original_file_name, - gtfs_original_file_name_2: gtfs_file_name_2.original_file_name, - bucket: Transport.S3.bucket_name(:gtfs_diff), + gtfs_url_1: gtfs_url_1, + gtfs_url_2: gtfs_url_2, + gtfs_original_file_name_1: "reference.zip", + gtfs_original_file_name_2: "modified.zip" + }, + profile + ) + end + + defp schedule_job( + %{uploaded_file_name: gtfs_object_1, original_file_name: gtfs_original_file_name_1}, + %{ + uploaded_file_name: gtfs_object_2, + original_file_name: gtfs_original_file_name_2 + }, + profile + ) do + schedule_job( + %{ + gtfs_object_1: gtfs_object_1, + gtfs_object_2: gtfs_object_2, + gtfs_original_file_name_1: gtfs_original_file_name_1, + gtfs_original_file_name_2: gtfs_original_file_name_2 + }, + profile + ) + end + + defp schedule_job(args, profile) do + %{id: job_id} = + args + |> Map.merge(%{ locale: Gettext.get_locale(), profile: profile - } + }) |> Transport.Jobs.GTFSDiff.new() |> Oban.insert!()