Skip to content

Commit

Permalink
Partition work_orders
Browse files Browse the repository at this point in the history
  • Loading branch information
rorymckinley committed Nov 20, 2023
1 parent c4e0e54 commit 3411599
Show file tree
Hide file tree
Showing 9 changed files with 979 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ bearing with us as we move towards our first stable Lightning release.)
[#1327](https://github.com/OpenFn/Lightning/issues/1327)
- Have user create workflow name before moving to the canvas
[#1103](https://github.com/OpenFn/Lightning/issues/1103)
- Partition `work_orders` by week and year and add functionality to maintain
the partitions. [#1254](https://github.com/OpenFn/Lightning/issues/1254)

### Changed

Expand Down
6 changes: 5 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ base_oban_cron = [
{"0 10 * * 1", Lightning.DigestEmailWorker,
args: %{"type" => "weekly_project_digest"}},
{"0 10 1 * *", Lightning.DigestEmailWorker,
args: %{"type" => "monthly_project_digest"}}
args: %{"type" => "monthly_project_digest"}},
{"0 1 * * *", Lightning.PartitionTableService,
args: %{"add_headroom" => %{"weeks" => 2}}},
{"0 0 * * *", Lightning.PartitionTableService,
args: %{"drop_older_than" => %{"weeks" => -6}}}
]

conditional_cron =
Expand Down
10 changes: 10 additions & 0 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,14 @@ defmodule Lightning.Application do
def oban_opts() do
Application.get_env(:lightning, Oban)
end

@impl true
@doc """
Perform any idempotent database setup that must be done after the repo is started.
"""
def start_phase(:ensure_db_config, :normal, _opts) do
Lightning.PartitionTableService.add_headroom(:all, 2)
Lightning.PartitionTableService.add_headroom(:all, -5)
:ok
end
end
17 changes: 17 additions & 0 deletions lib/lightning/maintenance/admin_tools.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Lightning.AdminTools do
def generate_iso_weeks(start_date, end_date) do
Date.range(start_date, end_date)
|> Enum.map(&Timex.beginning_of_week(&1, :mon))
|> Enum.uniq()
|> Enum.map(fn date ->
{year, week} = Timex.iso_week(date)

{
year |> Integer.to_string(),
week |> Integer.to_string() |> String.pad_leading(2, "0"),
date |> Date.to_string(),
date |> Timex.shift(weeks: 1) |> Date.to_string()
}
end)
end
end
182 changes: 182 additions & 0 deletions lib/lightning/maintenance/partition_table_service.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
defmodule Lightning.PartitionTableService do
@moduledoc """
Service to keep the partition tables up to date.
"""

use Oban.Worker,
queue: :background,
max_attempts: 1

import Ecto.Query

alias Lightning.Repo

require Logger

@impl Oban.Worker
def perform(%Oban.Job{args: %{"add_headroom" => %{"weeks" => weeks}}})
when is_integer(weeks) do
add_headroom(:all, weeks)
end

@impl Oban.Worker
def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}})
when is_integer(weeks) do
upper_bound = Timex.shift(DateTime.utc_now(), weeks: weeks)

remove_empty("work_orders", upper_bound)
end

def add_headroom(:all, weeks) when is_integer(weeks) do
add_headroom(:work_orders, weeks) |> log_partition_creation()
end

def add_headroom(:work_orders, weeks) when is_integer(weeks) do
proposed_tables = tables_to_add("work_orders", weeks)

:ok =
Enum.each(proposed_tables, fn {partition_name, from, to} ->
{
Repo.query(create_query(partition_name, "work_orders", from, to))
}
end)

proposed_tables
end

def tables_to_add(table, weeks) do
today = Date.utc_today()

existing_tables = get_partitions(table)

Lightning.AdminTools.generate_iso_weeks(today, today |> Date.add(weeks * 7))
|> Enum.map(&to_partition_details(table, &1))
|> Enum.reject(fn {name, _from, _to} ->
Enum.find(existing_tables, &String.equivalent?(name, &1))
end)
end

def get_partitions(parent) do
%Postgrex.Result{rows: rows} =
Repo.query!(
~S[
SELECT CAST(inhrelid::regclass AS text) AS child
FROM pg_catalog.pg_inherits
WHERE inhparent = $1::text::regclass;
],
[parent]
)

rows |> List.flatten()
end

@doc """
Drops empty partition tables that have an upper partition bound less than the
date given.
This bound is the `TO` part of the partition:
```
FOR VALUES FROM ('2020-12-28 00:00:00') TO ('2021-01-04 00:00:00')
```
"""
def remove_empty(parent, upper_bound) do
parent
|> find_range_partitions
|> partitions_older_than(upper_bound)
|> Enum.each(&drop_empty_partition(parent, &1))
end

def find_range_partitions(parent) do
Repo.query!(
~S[
SELECT
pt.relname AS partition_name,
pg_get_expr(pt.relpartbound,
pt.oid,
TRUE) AS partition_expression
FROM
pg_class base_tb
JOIN pg_inherits i ON
i.inhparent = base_tb.oid
JOIN pg_class pt ON
pt.oid = i.inhrelid
WHERE
base_tb.oid = $1::text::regclass
AND pg_get_expr(
pt.relpartbound,
pt.oid,
TRUE
) != 'DEFAULT'
],
[parent]
).rows
end

def partitions_older_than(partitions, bound) do
partitions
|> Enum.map(fn [table, range_expression] ->
[_, to_as_string] =
~r/TO \('(.+)'\)/
|> Regex.run(range_expression)

{:ok, to_as_dt, _} = DateTime.from_iso8601(to_as_string <> "Z")

[table, to_as_dt]
end)
|> Enum.filter(fn [_table, to] -> DateTime.compare(to, bound) == :lt end)
|> Enum.map(fn [table, _to] -> table end)
end

def drop_empty_partition(parent, partition) do
unless valid_chars?(parent) && valid_chars?(partition) do
raise ArgumentError, message: "Table name contains invalid characters"
end

partition
|> partition_empty?
|> handle_drop(parent, partition)
end

defp valid_chars?(table_name) do
table_name =~ ~r/\A\w+\z/
end

defp partition_empty?(partition) do
from(r in partition, select: count()) |> Repo.one!() == 0
end

defp handle_drop(true, parent, partition) do
Logger.info("Detaching #{partition} from #{parent}")
Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};")
Logger.info("Dropping #{partition}")
Repo.query!("DROP TABLE #{partition};")
end

defp handle_drop(false, _parent, _partition) do
end

defp create_query(partition, parent, from, to) do
"""
CREATE TABLE #{partition}
PARTITION OF #{parent}
FOR VALUES FROM ('#{from}') TO ('#{to}');
"""
end

defp to_partition_details(table, {year, week, from, to}) do
{"#{table}_#{year}_#{week}", from, to}
end

defp log_partition_creation(partitions) when length(partitions) > 0 do
partitions
|> Enum.map_join("\n", fn {partition_name, from, to} ->
"Created #{partition_name} for #{from} -> #{to}"

Check warning on line 174 in lib/lightning/maintenance/partition_table_service.ex

View check run for this annotation

Codecov / codecov/patch

lib/lightning/maintenance/partition_table_service.ex#L174

Added line #L174 was not covered by tests
end)
|> Logger.info()
end

defp log_partition_creation(partitions) when partitions == [] do
Logger.info("No extra partitions were needed.")
end
end
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ defmodule Lightning.MixProject do
def application do
[
mod: {Lightning.Application, [:timex]},
extra_applications: [:logger, :runtime_tools, :os_mon]
extra_applications: [:logger, :runtime_tools, :os_mon],
start_phases: [
ensure_db_config: []
]
]
end

Expand Down
Loading

0 comments on commit 3411599

Please sign in to comment.