From 572cd168eea46f431b23eaf66a6062fd91078370 Mon Sep 17 00:00:00 2001 From: Midigo Frank <39288959+midigofrank@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:39:19 +0300 Subject: [PATCH] Delete project data in batches to avoid timeouts (#2658) * delete project data in batches to avoid timeouts * make a much needed UI fix --------- Co-authored-by: Stuart Corbishley --- CHANGELOG.md | 2 + lib/lightning/extensions/project_hook.ex | 10 +- lib/lightning/projects.ex | 136 ++++++++++++------ lib/lightning_web/components/new_inputs.ex | 5 +- .../live/components/project_deletion_modal.ex | 29 ++-- .../live/project_live/settings.html.heex | 6 +- ...0241112040853_add_indexes_to_run_steps.exs | 7 + test/lightning/projects_test.exs | 17 ++- test/lightning_web/live/project_live_test.exs | 3 +- 9 files changed, 126 insertions(+), 89 deletions(-) create mode 100644 priv/repo/migrations/20241112040853_add_indexes_to_run_steps.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index eba797d899..be5229cbac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ and this project adheres to - Fix LiveView crash when pressing "esc" on inspector [#2622](https://github.com/OpenFn/lightning/issues/2622) +- Delete project data in batches to avoid timeouts in the db connection + [#2632](https://github.com/OpenFn/lightning/issues/2632) - Fix MetadataService crashing when errors are encountered [#2659](https://github.com/OpenFn/lightning/issues/2659) diff --git a/lib/lightning/extensions/project_hook.ex b/lib/lightning/extensions/project_hook.ex index de767367b4..508bce35fa 100644 --- a/lib/lightning/extensions/project_hook.ex +++ b/lib/lightning/extensions/project_hook.ex @@ -20,13 +20,7 @@ defmodule Lightning.Extensions.ProjectHook do @spec handle_delete_project(Project.t()) :: {:ok, Project.t()} | {:error, Changeset.t()} def handle_delete_project(project) do - Projects.project_runs_query(project) |> Repo.delete_all() - - Projects.project_run_step_query(project) |> Repo.delete_all() - - Projects.project_workorders_query(project) |> Repo.delete_all() - - Projects.project_steps_query(project) |> Repo.delete_all() + Projects.delete_project_workorders(project) Projects.project_jobs_query(project) |> Repo.delete_all() @@ -38,7 +32,7 @@ defmodule Lightning.Extensions.ProjectHook do Projects.project_credentials_query(project) |> Repo.delete_all() - Projects.project_dataclips_query(project) |> Repo.delete_all() + Projects.delete_project_dataclips(project) Repo.delete(project) end diff --git a/lib/lightning/projects.ex b/lib/lightning/projects.ex index ee5951c0f3..7be582f528 100644 --- a/lib/lightning/projects.ex +++ b/lib/lightning/projects.ex @@ -84,17 +84,27 @@ defmodule Lightning.Projects do will find projects that are ready for permanent deletion and purge them. """ @impl Oban.Worker + def perform(%Oban.Job{ + args: %{"project_id" => project_id, "type" => "purge_deleted"} + }) do + project_id |> get_project!() |> delete_project() + + :ok + end + def perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) do - projects_to_delete = + jobs = from(p in Project, where: p.scheduled_deletion <= ago(0, "second") ) |> Repo.all() + |> Enum.map(fn project -> + new(%{project_id: project.id, type: "purge_deleted"}, max_attempts: 3) + end) - :ok = - Enum.each(projects_to_delete, fn project -> delete_project(project) end) + Oban.insert_all(Lightning.Oban, jobs) - {:ok, %{projects_deleted: projects_to_delete}} + :ok end def perform(%Oban.Job{ @@ -439,20 +449,24 @@ defmodule Lightning.Projects do # coveralls-ignore-stop end) - Repo.transact(fn -> - with {:ok, project} <- ProjectHook.handle_delete_project(project) do - Logger.debug(fn -> - # coveralls-ignore-start - "Project ##{project.id} deleted." - # coveralls-ignore-stop - end) + with {:ok, project} <- ProjectHook.handle_delete_project(project) do + Logger.debug(fn -> + # coveralls-ignore-start + "Project ##{project.id} deleted." + # coveralls-ignore-stop + end) - {:ok, project} - end - end) - |> tap(fn result -> - with {:ok, _project} <- result, do: Events.project_deleted(project) - end) + Events.project_deleted(project) + + {:ok, project} + end + end + + @spec delete_project_async(Project.t()) :: {:ok, Oban.Job.t()} + def delete_project_async(project) do + job = new(%{project_id: project.id, type: "purge_deleted"}, max_attempts: 3) + + {:ok, _} = Oban.insert(Lightning.Oban, job) end def project_runs_query(project) do @@ -712,6 +726,30 @@ defmodule Lightning.Projects do end) end + @doc """ + Deletes project work orders in batches + """ + @spec delete_project_workorders(Project.t(), non_neg_integer()) :: :ok + def delete_project_workorders(project, batch_size \\ 1000) do + workorders_query = + from wo in WorkOrder, + join: wf in assoc(wo, :workflow), + on: wf.project_id == ^project.id, + select: wo.id + + delete_workorders_history(workorders_query, batch_size) + end + + @doc """ + Deletes project dataclips in batches + """ + @spec delete_project_dataclips(Project.t(), non_neg_integer()) :: :ok + def delete_project_dataclips(project, batch_size \\ 1000) do + project + |> project_dataclips_query() + |> delete_dataclips(batch_size) + end + @doc """ Returns an `%Ecto.Changeset{}` for changing the project scheduled_deletion. @@ -756,8 +794,33 @@ defmodule Lightning.Projects do where: wo.last_activity < ago(^period, "day"), select: wo.id + delete_workorders_history(workorders_query, 1000) + + dataclips_query = + from d in Dataclip, + as: :dataclip, + where: d.project_id == ^project.id, + where: d.inserted_at < ago(^period, "day"), + left_join: wo in WorkOrder, + on: d.id == wo.dataclip_id, + left_join: r in Run, + on: d.id == r.dataclip_id, + left_join: s in Step, + on: d.id == s.input_dataclip_id or d.id == s.output_dataclip_id, + where: is_nil(wo.id) and is_nil(r.id) and is_nil(s.id), + select: d.id + + delete_dataclips(dataclips_query, 1000) + + :ok + end + + defp delete_history_for(_project) do + {:error, :missing_history_retention_period} + end + + defp delete_workorders_history(workorders_query, batch_size) do workorders_count = Repo.aggregate(workorders_query, :count) - batch_size = 1000 workorders_delete_query = WorkOrder @@ -788,33 +851,20 @@ defmodule Lightning.Projects do ) end - dataclips_query = - from d in Dataclip, - as: :dataclip, - where: d.project_id == ^project.id, - where: d.inserted_at < ago(^period, "day"), - left_join: wo in WorkOrder, - on: d.id == wo.dataclip_id, - left_join: r in Run, - on: d.id == r.dataclip_id, - left_join: s in Step, - on: d.id == s.input_dataclip_id or d.id == s.output_dataclip_id, - where: is_nil(wo.id) and is_nil(r.id) and is_nil(s.id), - select: d.id + :ok + end + defp delete_dataclips(dataclips_query, batch_size) do dataclips_count = Repo.aggregate(dataclips_query, :count) - dataclips_batch_size = 500 - for i <- 1..ceil(dataclips_count / dataclips_batch_size) do - count_to_delete = dataclips_batch_size * i - - dataclips_delete_query = - Dataclip - |> with_cte("dataclips_to_delete", - as: ^limit(dataclips_query, ^count_to_delete) - ) - |> join(:inner, [d], dtd in "dataclips_to_delete", on: d.id == dtd.id) + dataclips_delete_query = + Dataclip + |> with_cte("dataclips_to_delete", + as: ^limit(dataclips_query, ^batch_size) + ) + |> join(:inner, [d], dtd in "dataclips_to_delete", on: d.id == dtd.id) + for _i <- 1..ceil(dataclips_count / batch_size) do {_count, _dataclips} = Repo.delete_all(dataclips_delete_query, returning: false, @@ -825,10 +875,6 @@ defmodule Lightning.Projects do :ok end - defp delete_history_for(_project) do - {:error, :missing_history_retention_period} - end - def invite_collaborators(project, collaborators, inviter) do Multi.new() |> Multi.put(:collaborators, collaborators) diff --git a/lib/lightning_web/components/new_inputs.ex b/lib/lightning_web/components/new_inputs.ex index 698e2f9acf..82d3b79d69 100644 --- a/lib/lightning_web/components/new_inputs.ex +++ b/lib/lightning_web/components/new_inputs.ex @@ -225,8 +225,9 @@ defmodule LightningWeb.Components.NewInputs do "block w-full rounded-lg border border-secondary-300 bg-white", "sm:text-sm shadow-sm", "focus:border-primary-300 focus:ring focus:ring-primary-200 focus:ring-opacity-50", - "disabled:cursor-not-allowed #{@button_placement == "right" && "rounded-r-none"}", - "#{@button_placement == "left" && "rounded-l-none"}", + "disabled:cursor-not-allowed", + @button_placement == "right" && "rounded-r-none", + @button_placement == "left" && "rounded-l-none", @class ]} multiple={@multiple} diff --git a/lib/lightning_web/live/components/project_deletion_modal.ex b/lib/lightning_web/live/components/project_deletion_modal.ex index 9a22e19490..eb4da39556 100644 --- a/lib/lightning_web/live/components/project_deletion_modal.ex +++ b/lib/lightning_web/live/components/project_deletion_modal.ex @@ -36,15 +36,17 @@ defmodule LightningWeb.Components.ProjectDeletionModal do |> Map.put(:action, :validate) if changeset.valid? do - delete_project(socket.assigns.project) - |> case do - {:deleted, _project} -> + case delete_project(socket.assigns.project) do + {:ok, %Oban.Job{}} -> {:noreply, socket - |> put_flash(:info, "Project deleted") + |> put_flash( + :info, + "Project deletion started. This may take a while to complete." + ) |> push_navigate(to: socket.assigns.save_return_to)} - {:scheduled, _project} -> + {:ok, %Projects.Project{}} -> {:noreply, socket |> put_flash(:info, "Project scheduled for deletion") @@ -63,26 +65,11 @@ defmodule LightningWeb.Components.ProjectDeletionModal do {:noreply, push_navigate(socket, to: socket.assigns.cancel_return_to)} end - # TODO: This should be moved into the Projects module defp delete_project(project) do if project.scheduled_deletion do - Projects.delete_project(project) - |> case do - {:ok, project} -> - {:deleted, project} - - any -> - any - end + Projects.delete_project_async(project) else Projects.schedule_project_deletion(project) - |> case do - {:ok, project} -> - {:scheduled, project} - - any -> - any - end end end diff --git a/lib/lightning_web/live/project_live/settings.html.heex b/lib/lightning_web/live/project_live/settings.html.heex index 2a3ba1299b..595eeccfce 100644 --- a/lib/lightning_web/live/project_live/settings.html.heex +++ b/lib/lightning_web/live/project_live/settings.html.heex @@ -915,7 +915,7 @@ Enum.count(@data_retention_periods) == 1 } field={f[:history_retention_period]} - class="h-4 w-4 border-gray-300 text-indigo-600 focus:ring-indigo-600" + class="border-gray-300 focus:ring-indigo-600" />
<%= case assigns[:data_retention_limit_message] do %> @@ -1009,7 +1009,7 @@
-
+
<.input type="select" prompt="Select Period" @@ -1024,7 +1024,7 @@ checked?(@project_changeset, :erase_all) } field={f[:dataclip_retention_period]} - class="h-4 w-4 border-gray-300 text-indigo-600 focus:ring-indigo-600" + class="border-gray-300 ocus:ring-indigo-600" />
diff --git a/priv/repo/migrations/20241112040853_add_indexes_to_run_steps.exs b/priv/repo/migrations/20241112040853_add_indexes_to_run_steps.exs new file mode 100644 index 0000000000..b2e9cd9647 --- /dev/null +++ b/priv/repo/migrations/20241112040853_add_indexes_to_run_steps.exs @@ -0,0 +1,7 @@ +defmodule Lightning.Repo.Migrations.AddIndexesToRunSteps do + use Ecto.Migration + + def change do + create_if_not_exists index("run_steps", [:step_id]) + end +end diff --git a/test/lightning/projects_test.exs b/test/lightning/projects_test.exs index a22d6e45dc..455b1a1133 100644 --- a/test/lightning/projects_test.exs +++ b/test/lightning/projects_test.exs @@ -770,20 +770,19 @@ defmodule Lightning.ProjectsTest do scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: -10) ) - project_fixture( - scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: 10) - ) + not_to_delete = + project_fixture( + scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: 10) + ) count_before = Repo.all(Project) |> Enum.count() - {:ok, %{projects_deleted: projects_deleted}} = - Projects.perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) + :ok = Projects.perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) - assert count_before - 1 == Repo.all(Project) |> Enum.count() - assert 1 == projects_deleted |> Enum.count() + assert Repo.aggregate(Project, :count) == count_before - 1 - assert project_to_delete.id == - projects_deleted |> Enum.at(0) |> Map.get(:id) + refute Repo.get(Project, project_to_delete.id) + assert Repo.get(Project, not_to_delete.id) end end diff --git a/test/lightning_web/live/project_live_test.exs b/test/lightning_web/live/project_live_test.exs index 557d7a1b5d..3e1539ba08 100644 --- a/test/lightning_web/live/project_live_test.exs +++ b/test/lightning_web/live/project_live_test.exs @@ -348,7 +348,8 @@ defmodule LightningWeb.ProjectLiveTest do |> render_submit() |> follow_redirect(conn, Routes.project_index_path(conn, :index)) - assert html =~ "Project deleted" + assert html =~ + "Project deletion started. This may take a while to complete." refute index_live |> element("project-#{project.id}") |> has_element?() end