Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition work_orders #1392

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ bearing with us as we move towards our first stable Lightning release.)
[#1327](https://github.com/OpenFn/Lightning/issues/1327)
- Have user create workflow name before moving to the canvas
[#1103](https://github.com/OpenFn/Lightning/issues/1103)
- Partition `work_orders` by week and year and add functionality to maintain
the partitions. [#1254](https://github.com/OpenFn/Lightning/issues/1254)

### Changed

Expand Down
6 changes: 5 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ base_oban_cron = [
{"0 10 * * 1", Lightning.DigestEmailWorker,
args: %{"type" => "weekly_project_digest"}},
{"0 10 1 * *", Lightning.DigestEmailWorker,
args: %{"type" => "monthly_project_digest"}}
args: %{"type" => "monthly_project_digest"}},
{"0 1 * * *", Lightning.PartitionTableService,
args: %{"add_headroom" => %{"weeks" => 2}}},
{"0 0 * * *", Lightning.PartitionTableService,
args: %{"drop_older_than" => %{"weeks" => -6}}}
]

conditional_cron =
Expand Down
10 changes: 10 additions & 0 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,14 @@ defmodule Lightning.Application do
def oban_opts() do
Application.get_env(:lightning, Oban)
end

@impl true
@doc """
Perform any idempotent database setup that must be done after the repo is started.
"""
def start_phase(:ensure_db_config, :normal, _opts) do
Lightning.PartitionTableService.add_headroom(:all, 2)
Lightning.PartitionTableService.add_headroom(:all, -5)
:ok
end
end
17 changes: 17 additions & 0 deletions lib/lightning/maintenance/admin_tools.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Lightning.AdminTools do
def generate_iso_weeks(start_date, end_date) do
Date.range(start_date, end_date)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was setting up tests for this class, I noticed that there was a small edge case that emerged when dates that are not mondays are passed in as arguments. I can't imagine that it was a big deal before, but the tweak to fix it was small, so I made an executive decision :).

|> Enum.map(&Date.beginning_of_week(&1))
|> Enum.uniq()
|> Enum.map(fn date ->
{year, week} = Timex.iso_week(date)

{
year |> Integer.to_string(),
week |> Integer.to_string() |> String.pad_leading(2, "0"),
date |> Date.to_string(),
date |> Date.add(7) |> Date.to_string()
}
end)
end
end
182 changes: 182 additions & 0 deletions lib/lightning/maintenance/partition_table_service.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
defmodule Lightning.PartitionTableService do
@moduledoc """
Service to keep the partition tables up to date.
"""

use Oban.Worker,
queue: :background,
max_attempts: 1

import Ecto.Query

alias Lightning.Repo

require Logger

@impl Oban.Worker
def perform(%Oban.Job{args: %{"add_headroom" => %{"weeks" => weeks}}})
when is_integer(weeks) do
add_headroom(:all, weeks)
end

@impl Oban.Worker
def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not get this method to work as originally implemented in platform so I modelled the arguments on the other perform variant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah interesting, this looks good to me though :)

when is_integer(weeks) do
upper_bound = DateTime.add(DateTime.utc_now(), weeks * 7, :day)

remove_empty("work_orders", upper_bound)
end

def add_headroom(:all, weeks) when is_integer(weeks) do
add_headroom(:work_orders, weeks) |> log_partition_creation()
end

def add_headroom(:work_orders, weeks) when is_integer(weeks) do
proposed_tables = tables_to_add("work_orders", weeks)

:ok =
Enum.each(proposed_tables, fn {partition_name, from, to} ->
{
Repo.query(create_query(partition_name, "work_orders", from, to))
}
end)

proposed_tables
end

def tables_to_add(table, weeks) do
today = Date.utc_today()

existing_tables = get_partitions(table)

Lightning.AdminTools.generate_iso_weeks(today, today |> Date.add(weeks * 7))
|> Enum.map(&to_partition_details(table, &1))
|> Enum.reject(fn {name, _from, _to} ->
Enum.find(existing_tables, &String.equivalent?(name, &1))
end)
end

def get_partitions(parent) do
%Postgrex.Result{rows: rows} =
Repo.query!(
~S[
SELECT CAST(inhrelid::regclass AS text) AS child
FROM pg_catalog.pg_inherits
WHERE inhparent = $1::text::regclass;
],
[parent]
)

rows |> List.flatten()
end

@doc """
Drops empty partition tables that have an upper partition bound less than the
date given.

This bound is the `TO` part of the partition:

```
FOR VALUES FROM ('2020-12-28 00:00:00') TO ('2021-01-04 00:00:00')
```
"""
def remove_empty(parent, upper_bound) do
parent
|> find_range_partitions
|> partitions_older_than(upper_bound)
|> Enum.each(&drop_empty_partition(parent, &1))
end

def find_range_partitions(parent) do
Repo.query!(
~S[
SELECT
pt.relname AS partition_name,
pg_get_expr(pt.relpartbound,
pt.oid,
TRUE) AS partition_expression
FROM
pg_class base_tb
JOIN pg_inherits i ON
i.inhparent = base_tb.oid
JOIN pg_class pt ON
pt.oid = i.inhrelid
WHERE
base_tb.oid = $1::text::regclass
AND pg_get_expr(
pt.relpartbound,
pt.oid,
TRUE
) != 'DEFAULT'
],
[parent]
).rows
end

def partitions_older_than(partitions, bound) do
partitions
|> Enum.map(fn [table, range_expression] ->
[_, to_as_string] =
~r/TO \('(.+)'\)/
|> Regex.run(range_expression)

{:ok, to_as_dt, _} = DateTime.from_iso8601(to_as_string <> "Z")

[table, to_as_dt]
end)
|> Enum.filter(fn [_table, to] -> DateTime.compare(to, bound) == :lt end)
|> Enum.map(fn [table, _to] -> table end)
end

def drop_empty_partition(parent, partition) do
unless valid_chars?(parent) && valid_chars?(partition) do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String interpolation in queries makes me nervous :)- so I added some simple validation against the more obvious forms of injection (as I know that we can't bind params in DDL statements) just in case this method ends up being used someplace else.

raise ArgumentError, message: "Table name contains invalid characters"
end

partition
|> partition_empty?
|> handle_drop(parent, partition)
end

defp valid_chars?(table_name) do
table_name =~ ~r/\A\w+\z/
end

defp partition_empty?(partition) do
from(r in partition, select: count()) |> Repo.one!() == 0
end

defp handle_drop(true, parent, partition) do
Logger.info("Detaching #{partition} from #{parent}")
Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};")
Logger.info("Dropping #{partition}")
Repo.query!("DROP TABLE #{partition};")
end

defp handle_drop(false, _parent, _partition) do
end

defp create_query(partition, parent, from, to) do
"""
CREATE TABLE #{partition}
PARTITION OF #{parent}
FOR VALUES FROM ('#{from}') TO ('#{to}');
"""
end

defp to_partition_details(table, {year, week, from, to}) do
{"#{table}_#{year}_#{week}", from, to}
end

defp log_partition_creation(partitions) when length(partitions) > 0 do
partitions
|> Enum.map_join("\n", fn {partition_name, from, to} ->
"Created #{partition_name} for #{from} -> #{to}"

Check warning on line 174 in lib/lightning/maintenance/partition_table_service.ex

View check run for this annotation

Codecov / codecov/patch

lib/lightning/maintenance/partition_table_service.ex#L174

Added line #L174 was not covered by tests
end)
|> Logger.info()
end

defp log_partition_creation(partitions) when partitions == [] do
Logger.info("No extra partitions were needed.")
end
end
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ defmodule Lightning.MixProject do
def application do
[
mod: {Lightning.Application, [:timex]},
extra_applications: [:logger, :runtime_tools, :os_mon]
extra_applications: [:logger, :runtime_tools, :os_mon],
start_phases: [
ensure_db_config: []
]
]
end

Expand Down
Loading