Skip to content

Commit

Permalink
Delete project data in batches to avoid timeouts (#2658)
Browse files Browse the repository at this point in the history
* delete project data in batches to avoid timeouts

* make a much needed UI fix

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
midigofrank and stuartc authored Nov 12, 2024
1 parent 56e0ec6 commit 572cd16
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 89 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ and this project adheres to

- Fix LiveView crash when pressing "esc" on inspector
[#2622](https://github.com/OpenFn/lightning/issues/2622)
- Delete project data in batches to avoid timeouts in the db connection
[#2632](https://github.com/OpenFn/lightning/issues/2632)
- Fix MetadataService crashing when errors are encountered
[#2659](https://github.com/OpenFn/lightning/issues/2659)

Expand Down
10 changes: 2 additions & 8 deletions lib/lightning/extensions/project_hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@ defmodule Lightning.Extensions.ProjectHook do
@spec handle_delete_project(Project.t()) ::
{:ok, Project.t()} | {:error, Changeset.t()}
def handle_delete_project(project) do
Projects.project_runs_query(project) |> Repo.delete_all()

Projects.project_run_step_query(project) |> Repo.delete_all()

Projects.project_workorders_query(project) |> Repo.delete_all()

Projects.project_steps_query(project) |> Repo.delete_all()
Projects.delete_project_workorders(project)

Projects.project_jobs_query(project) |> Repo.delete_all()

Expand All @@ -38,7 +32,7 @@ defmodule Lightning.Extensions.ProjectHook do

Projects.project_credentials_query(project) |> Repo.delete_all()

Projects.project_dataclips_query(project) |> Repo.delete_all()
Projects.delete_project_dataclips(project)

Repo.delete(project)
end
Expand Down
136 changes: 91 additions & 45 deletions lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,27 @@ defmodule Lightning.Projects do
will find projects that are ready for permanent deletion and purge them.
"""
@impl Oban.Worker
def perform(%Oban.Job{
args: %{"project_id" => project_id, "type" => "purge_deleted"}
}) do
project_id |> get_project!() |> delete_project()

:ok
end

def perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) do
projects_to_delete =
jobs =
from(p in Project,
where: p.scheduled_deletion <= ago(0, "second")
)
|> Repo.all()
|> Enum.map(fn project ->
new(%{project_id: project.id, type: "purge_deleted"}, max_attempts: 3)
end)

:ok =
Enum.each(projects_to_delete, fn project -> delete_project(project) end)
Oban.insert_all(Lightning.Oban, jobs)

{:ok, %{projects_deleted: projects_to_delete}}
:ok
end

def perform(%Oban.Job{
Expand Down Expand Up @@ -439,20 +449,24 @@ defmodule Lightning.Projects do
# coveralls-ignore-stop
end)

Repo.transact(fn ->
with {:ok, project} <- ProjectHook.handle_delete_project(project) do
Logger.debug(fn ->
# coveralls-ignore-start
"Project ##{project.id} deleted."
# coveralls-ignore-stop
end)
with {:ok, project} <- ProjectHook.handle_delete_project(project) do
Logger.debug(fn ->
# coveralls-ignore-start
"Project ##{project.id} deleted."
# coveralls-ignore-stop
end)

{:ok, project}
end
end)
|> tap(fn result ->
with {:ok, _project} <- result, do: Events.project_deleted(project)
end)
Events.project_deleted(project)

{:ok, project}
end
end

@spec delete_project_async(Project.t()) :: {:ok, Oban.Job.t()}
def delete_project_async(project) do
job = new(%{project_id: project.id, type: "purge_deleted"}, max_attempts: 3)

{:ok, _} = Oban.insert(Lightning.Oban, job)
end

def project_runs_query(project) do
Expand Down Expand Up @@ -712,6 +726,30 @@ defmodule Lightning.Projects do
end)
end

@doc """
Deletes project work orders in batches
"""
@spec delete_project_workorders(Project.t(), non_neg_integer()) :: :ok
def delete_project_workorders(project, batch_size \\ 1000) do
workorders_query =
from wo in WorkOrder,
join: wf in assoc(wo, :workflow),
on: wf.project_id == ^project.id,
select: wo.id

delete_workorders_history(workorders_query, batch_size)
end

@doc """
Deletes project dataclips in batches
"""
@spec delete_project_dataclips(Project.t(), non_neg_integer()) :: :ok
def delete_project_dataclips(project, batch_size \\ 1000) do
project
|> project_dataclips_query()
|> delete_dataclips(batch_size)
end

@doc """
Returns an `%Ecto.Changeset{}` for changing the project scheduled_deletion.
Expand Down Expand Up @@ -756,8 +794,33 @@ defmodule Lightning.Projects do
where: wo.last_activity < ago(^period, "day"),
select: wo.id

delete_workorders_history(workorders_query, 1000)

dataclips_query =
from d in Dataclip,
as: :dataclip,
where: d.project_id == ^project.id,
where: d.inserted_at < ago(^period, "day"),
left_join: wo in WorkOrder,
on: d.id == wo.dataclip_id,
left_join: r in Run,
on: d.id == r.dataclip_id,
left_join: s in Step,
on: d.id == s.input_dataclip_id or d.id == s.output_dataclip_id,
where: is_nil(wo.id) and is_nil(r.id) and is_nil(s.id),
select: d.id

delete_dataclips(dataclips_query, 1000)

:ok
end

defp delete_history_for(_project) do
{:error, :missing_history_retention_period}
end

defp delete_workorders_history(workorders_query, batch_size) do
workorders_count = Repo.aggregate(workorders_query, :count)
batch_size = 1000

workorders_delete_query =
WorkOrder
Expand Down Expand Up @@ -788,33 +851,20 @@ defmodule Lightning.Projects do
)
end

dataclips_query =
from d in Dataclip,
as: :dataclip,
where: d.project_id == ^project.id,
where: d.inserted_at < ago(^period, "day"),
left_join: wo in WorkOrder,
on: d.id == wo.dataclip_id,
left_join: r in Run,
on: d.id == r.dataclip_id,
left_join: s in Step,
on: d.id == s.input_dataclip_id or d.id == s.output_dataclip_id,
where: is_nil(wo.id) and is_nil(r.id) and is_nil(s.id),
select: d.id
:ok
end

defp delete_dataclips(dataclips_query, batch_size) do
dataclips_count = Repo.aggregate(dataclips_query, :count)
dataclips_batch_size = 500

for i <- 1..ceil(dataclips_count / dataclips_batch_size) do
count_to_delete = dataclips_batch_size * i

dataclips_delete_query =
Dataclip
|> with_cte("dataclips_to_delete",
as: ^limit(dataclips_query, ^count_to_delete)
)
|> join(:inner, [d], dtd in "dataclips_to_delete", on: d.id == dtd.id)
dataclips_delete_query =
Dataclip
|> with_cte("dataclips_to_delete",
as: ^limit(dataclips_query, ^batch_size)
)
|> join(:inner, [d], dtd in "dataclips_to_delete", on: d.id == dtd.id)

for _i <- 1..ceil(dataclips_count / batch_size) do
{_count, _dataclips} =
Repo.delete_all(dataclips_delete_query,
returning: false,
Expand All @@ -825,10 +875,6 @@ defmodule Lightning.Projects do
:ok
end

defp delete_history_for(_project) do
{:error, :missing_history_retention_period}
end

def invite_collaborators(project, collaborators, inviter) do
Multi.new()
|> Multi.put(:collaborators, collaborators)
Expand Down
5 changes: 3 additions & 2 deletions lib/lightning_web/components/new_inputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ defmodule LightningWeb.Components.NewInputs do
"block w-full rounded-lg border border-secondary-300 bg-white",
"sm:text-sm shadow-sm",
"focus:border-primary-300 focus:ring focus:ring-primary-200 focus:ring-opacity-50",
"disabled:cursor-not-allowed #{@button_placement == "right" && "rounded-r-none"}",
"#{@button_placement == "left" && "rounded-l-none"}",
"disabled:cursor-not-allowed",
@button_placement == "right" && "rounded-r-none",
@button_placement == "left" && "rounded-l-none",
@class
]}
multiple={@multiple}
Expand Down
29 changes: 8 additions & 21 deletions lib/lightning_web/live/components/project_deletion_modal.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ defmodule LightningWeb.Components.ProjectDeletionModal do
|> Map.put(:action, :validate)

if changeset.valid? do
delete_project(socket.assigns.project)
|> case do
{:deleted, _project} ->
case delete_project(socket.assigns.project) do
{:ok, %Oban.Job{}} ->
{:noreply,
socket
|> put_flash(:info, "Project deleted")
|> put_flash(
:info,
"Project deletion started. This may take a while to complete."
)
|> push_navigate(to: socket.assigns.save_return_to)}

{:scheduled, _project} ->
{:ok, %Projects.Project{}} ->
{:noreply,
socket
|> put_flash(:info, "Project scheduled for deletion")
Expand All @@ -63,26 +65,11 @@ defmodule LightningWeb.Components.ProjectDeletionModal do
{:noreply, push_navigate(socket, to: socket.assigns.cancel_return_to)}
end

# TODO: This should be moved into the Projects module
defp delete_project(project) do
if project.scheduled_deletion do
Projects.delete_project(project)
|> case do
{:ok, project} ->
{:deleted, project}

any ->
any
end
Projects.delete_project_async(project)
else
Projects.schedule_project_deletion(project)
|> case do
{:ok, project} ->
{:scheduled, project}

any ->
any
end
end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/lightning_web/live/project_live/settings.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@
Enum.count(@data_retention_periods) == 1
}
field={f[:history_retention_period]}
class="h-4 w-4 border-gray-300 text-indigo-600 focus:ring-indigo-600"
class="border-gray-300 focus:ring-indigo-600"
/>
<div class="text-xs">
<%= case assigns[:data_retention_limit_message] do %>
Expand Down Expand Up @@ -1009,7 +1009,7 @@
</div>
</div>

<div class="space-y-2">
<div class="flex space-y-2">
<.input
type="select"
prompt="Select Period"
Expand All @@ -1024,7 +1024,7 @@
checked?(@project_changeset, :erase_all)
}
field={f[:dataclip_retention_period]}
class="h-4 w-4 border-gray-300 text-indigo-600 focus:ring-indigo-600"
class="border-gray-300 ocus:ring-indigo-600"
/>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Lightning.Repo.Migrations.AddIndexesToRunSteps do
use Ecto.Migration

def change do
create_if_not_exists index("run_steps", [:step_id])
end
end
17 changes: 8 additions & 9 deletions test/lightning/projects_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -770,20 +770,19 @@ defmodule Lightning.ProjectsTest do
scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: -10)
)

project_fixture(
scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: 10)
)
not_to_delete =
project_fixture(
scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: 10)
)

count_before = Repo.all(Project) |> Enum.count()

{:ok, %{projects_deleted: projects_deleted}} =
Projects.perform(%Oban.Job{args: %{"type" => "purge_deleted"}})
:ok = Projects.perform(%Oban.Job{args: %{"type" => "purge_deleted"}})

assert count_before - 1 == Repo.all(Project) |> Enum.count()
assert 1 == projects_deleted |> Enum.count()
assert Repo.aggregate(Project, :count) == count_before - 1

assert project_to_delete.id ==
projects_deleted |> Enum.at(0) |> Map.get(:id)
refute Repo.get(Project, project_to_delete.id)
assert Repo.get(Project, not_to_delete.id)
end
end

Expand Down
3 changes: 2 additions & 1 deletion test/lightning_web/live/project_live_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ defmodule LightningWeb.ProjectLiveTest do
|> render_submit()
|> follow_redirect(conn, Routes.project_index_path(conn, :index))

assert html =~ "Project deleted"
assert html =~
"Project deletion started. This may take a while to complete."

refute index_live |> element("project-#{project.id}") |> has_element?()
end
Expand Down

0 comments on commit 572cd16

Please sign in to comment.