Skip to content

Commit

Permalink
2636 reduce usage tracker memory usage (#2667)
Browse files Browse the repository at this point in the history
* Stream runs when generating usage tracking reports

* Cleanup redundant tests

* Clean up obsolete endpoint

* Update CHANGELOG

* Replace env setting with Mox


---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
rorymckinley and stuartc authored Nov 15, 2024
1 parent 7fe5798 commit 119bb2e
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 317 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@
# of reports resubmittd on each run.
# USAGE_TRACKING_RESUBMISSION_BATCH_SIZE=10

# Generating usage tracking reports can result in increased memory usage.
# Decreasing the run batch size shouls reduce the memory consumption.
# USAGE_TRACKING_RUN_CHUNK_SIZE=100

# OpenFn.org hosts a public sandbox that gets reset every night. If you'd like to
# make your instance "resettable" (a highly destructive action—this destroys all
# data in your instance) you can set the following environment variable to "yes"
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ and this project adheres to

- Error when the logger receives a boolean
[#2666](https://github.com/OpenFn/lightning/issues/2666)
- Attempt to reduce memory consumption when generating UsageTracking reports.
[#2636](https://github.com/OpenFn/lightning/issues/2636)

## [v2.10.1] - 2024-11-13

Expand Down
1 change: 1 addition & 0 deletions DEPLOYMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ For Google Cloud Storage, the following environment variables are required:
| `USAGE_TRACKING_DAILY_BATCH_SIZE` | The number of days that will be reported on with each run of `UsageTracking.DayWorker`. This will only have a noticeable effect in cases where there is a backlog or where reports are being generated retroactively (defaults to 10). |
| `USAGE_TRACKING_ENABLED` | Enables the submission of anonymized usage data to OpenFn (defaults to `true`) |
| `USAGE_TRACKING_RESUBMISSION_BATCH_SIZE` | The number of failed reports that will be submitted on each resubmission run (defaults to 10) |
| `USAGE_TRACKING_RUN_CHUNK_SIZE` | The size of each batch of runs that is streamed from the database when generating UsageTracking reports (default 100). Decreasing this may decrease memory consumption when generating reports. |
| `USAGE_TRACKING_UUIDS` | Indicates whether submissions should include cleartext UUIDs or not. Options are `cleartext` or `hashed_only`, with the default being `hashed_only`. |

### AI Chat
Expand Down
58 changes: 49 additions & 9 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ defmodule Lightning.Config do
|> Keyword.get(:sender_name)
end

@impl true
def usage_tracking do
Application.get_env(:lightning, :usage_tracking)
end

@impl true
def reset_password_token_validity_in_days do
1
Expand All @@ -141,6 +136,31 @@ defmodule Lightning.Config do
|> Keyword.get(key)
end

@impl true
def usage_tracking do
Application.get_env(:lightning, :usage_tracking)
end

@impl true
def usage_tracking_cleartext_uuids_enabled? do
usage_tracking() |> Keyword.get(:cleartext_uuids_enabled)
end

@impl true
def usage_tracking_enabled? do
usage_tracking() |> Keyword.get(:enabled)
end

@impl true
def usage_tracking_host do
usage_tracking() |> Keyword.get(:host)
end

@impl true
def usage_tracking_run_chunk_size do
usage_tracking() |> Keyword.get(:run_chunk_size)
end

@impl true
def usage_tracking_cron_opts do
opts = usage_tracking()
Expand Down Expand Up @@ -235,7 +255,11 @@ defmodule Lightning.Config do
@callback storage(key :: atom()) :: term()
@callback token_signer() :: Joken.Signer.t()
@callback usage_tracking() :: Keyword.t()
@callback usage_tracking_cleartext_uuids_enabled?() :: boolean()
@callback usage_tracking_cron_opts() :: [Oban.Plugins.Cron.cron_input()]
@callback usage_tracking_enabled?() :: boolean()
@callback usage_tracking_host() :: String.t()
@callback usage_tracking_run_chunk_size() :: integer()
@callback worker_secret() :: binary() | nil
@callback worker_token_signer() :: Joken.Signer.t()

Expand Down Expand Up @@ -321,10 +345,6 @@ defmodule Lightning.Config do
impl().email_sender_name()
end

def usage_tracking do
impl().usage_tracking()
end

def reset_password_token_validity_in_days do
impl().reset_password_token_validity_in_days()
end
Expand All @@ -337,10 +357,30 @@ defmodule Lightning.Config do
impl().storage(key)
end

def usage_tracking do
impl().usage_tracking()
end

def usage_tracking_cleartext_uuids_enabled? do
impl().usage_tracking_cleartext_uuids_enabled?()
end

def usage_tracking_cron_opts do
impl().usage_tracking_cron_opts()
end

def usage_tracking_enabled? do
impl().usage_tracking_enabled?()
end

def usage_tracking_host do
impl().usage_tracking_host()
end

def usage_tracking_run_chunk_size do
impl().usage_tracking_run_chunk_size()
end

def kafka_triggers_enabled? do
impl().kafka_triggers_enabled?()
end
Expand Down
3 changes: 2 additions & 1 deletion lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ defmodule Lightning.Config.Bootstrap do
host: env!("USAGE_TRACKER_HOST", :string, "https://impact.openfn.org"),
resubmission_batch_size:
env!("USAGE_TRACKING_RESUBMISSION_BATCH_SIZE", :integer, 10),
daily_batch_size: env!("USAGE_TRACKING_DAILY_BATCH_SIZE", :integer, 10)
daily_batch_size: env!("USAGE_TRACKING_DAILY_BATCH_SIZE", :integer, 10),
run_chunk_size: env!("USAGE_TRACKING_RUN_CHUNK_SIZE", :integer, 100)

config :lightning, :kafka_triggers,
alternate_storage_enabled:
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/usage_tracking/project_metrics_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Lightning.UsageTracking.ProjectMetricsService do
query =
from p in Project,
where: p.inserted_at <= ^report_time,
preload: [:users, [workflows: [:jobs, runs: [steps: [:job]]]]]
preload: [:users, [workflows: [:jobs]]]

query |> Repo.all()
end
Expand Down
12 changes: 7 additions & 5 deletions lib/lightning/usage_tracking/report_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ defmodule Lightning.UsageTracking.ReportWorker do
def perform(%{args: %{"date" => date_string}}) do
date = Date.from_iso8601!(date_string)

env = Application.get_env(:lightning, :usage_tracking)

config = UsageTracking.find_enabled_daily_report_config()

if env[:enabled] && config do
cleartext_uuids_enabled = env[:cleartext_uuids_enabled]
if Lightning.Config.usage_tracking_enabled?() && config do
cleartext_uuids_enabled =
Lightning.Config.usage_tracking_cleartext_uuids_enabled?()

case UsageTracking.insert_report(config, cleartext_uuids_enabled, date) do
{:ok, report} ->
report |> UsageTracking.submit_report(env[:host])
UsageTracking.submit_report(
report,
Lightning.Config.usage_tracking_host()
)

_error ->
nil
Expand Down
10 changes: 2 additions & 8 deletions lib/lightning/usage_tracking/run_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@ defmodule Lightning.UsageTracking.RunService do
"""
def finished_runs(all_runs, date) do
all_runs
|> finished_on(date)
end

def finished_steps(runs, date) do
runs
|> Enum.flat_map(& &1.steps)
def finished_steps(run, date) do
run.steps
|> finished_on(date)
end

Expand Down
31 changes: 31 additions & 0 deletions lib/lightning/usage_tracking/workflow_metrics_query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Lightning.UsageTracking.WorkflowMetricsQuery do
@moduledoc """
Query module to support workflow metrics while allowing for fewer test
permutations.
"""
import Ecto.Query

alias Lightning.Run

def workflow_runs(%{id: workflow_id}) do
from r in Run,
join: w in assoc(r, :workflow),
on: w.id == ^workflow_id
end

def runs_finished_on(query, date) do
start_of_day = DateTime.new!(date, ~T[00:00:00])

start_of_next_day =
date
|> Date.add(1)
|> DateTime.new!(~T[00:00:00])

from r in query,
where: not is_nil(r.finished_at),
where: r.finished_at >= ^start_of_day,
where: r.finished_at < ^start_of_next_day
end
end
70 changes: 62 additions & 8 deletions lib/lightning/usage_tracking/workflow_metrics_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,79 @@ defmodule Lightning.UsageTracking.WorkflowMetricsService do
"""

alias Lightning.Repo
alias Lightning.UsageTracking.RunService
alias Lightning.UsageTracking.WorkflowMetricsQuery

def find_eligible_workflows(workflows, date) do
workflows
|> Enum.filter(fn workflow -> eligible_workflow?(workflow, date) end)
end

def generate_metrics(workflow, cleartext_enabled, date) do
runs = RunService.finished_runs(workflow.runs, date)
steps = RunService.finished_steps(workflow.runs, date)
active_jobs = RunService.unique_job_ids(steps, date)
workflow
|> metrics_for_runs(date)
|> add_active_jobs()
|> Map.delete(:job_ids)
|> Map.merge(instrument_identity(workflow.id, cleartext_enabled))
end

defp metrics_for_runs(workflow, date) do
chunk_size = Lightning.Config.usage_tracking_run_chunk_size()

{:ok, updated_metrics} =
Repo.transaction(fn ->
workflow
|> WorkflowMetricsQuery.workflow_runs()
|> WorkflowMetricsQuery.runs_finished_on(date)
|> Repo.stream(max_rows: chunk_size)
|> Stream.chunk_every(chunk_size)
|> Enum.flat_map(&preload_assocs/1)
|> Enum.reduce(base_metrics(workflow), metric_updater(date))
end)

updated_metrics
end

defp preload_assocs(run_chunk), do: Repo.preload(run_chunk, steps: [:job])

defp base_metrics(%{jobs: jobs}) do
%{
no_of_active_jobs: Enum.count(active_jobs),
no_of_jobs: Enum.count(workflow.jobs),
no_of_runs: Enum.count(runs),
no_of_steps: Enum.count(steps)
no_of_jobs: Enum.count(jobs),
no_of_runs: 0,
no_of_steps: 0,
job_ids: []
}
|> Map.merge(instrument_identity(workflow.id, cleartext_enabled))
end

defp metric_updater(date) do
fn run, acc ->
%{
no_of_steps: no_of_steps,
no_of_runs: no_of_runs,
job_ids: job_ids
} = acc

steps = RunService.finished_steps(run, date)
active_jobs = RunService.unique_job_ids(steps, date)

Map.merge(
acc,
%{
no_of_runs: no_of_runs + 1,
no_of_steps: no_of_steps + Enum.count(steps),
job_ids: Enum.concat(job_ids, active_jobs)
}
)
end
end

defp add_active_jobs(%{job_ids: job_ids} = metrics) do
metrics
|> Map.merge(%{
no_of_active_jobs: job_ids |> Enum.uniq() |> Enum.count()
})
end

defp instrument_identity(identity, false = _cleartext_enabled) do
Expand Down
Loading

0 comments on commit 119bb2e

Please sign in to comment.