Skip to content

Commit

Permalink
2279 usage tracker fixes (#2280)
Browse files Browse the repository at this point in the history
* Address steps no longer requiring current job ids

* Add UsageTracking crons to Oban plugins

* Update Changelog

* Update test and fix typo
  • Loading branch information
rorymckinley authored Jul 11, 2024
1 parent 628f2ff commit e960141
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 34 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ and this project adheres to
### Changed

### Fixed
- UsageTracking crons are enabled again (if config is enabled)
[#2276](https://github.com/OpenFn/lightning/issues/2276)
- UsageTracking metrics absorb the fact that a step's job\_id may not currently
exist when counting unique jobs.
[#2279](https://github.com/OpenFn/lightning/issues/2279)

## [v2.7.5] - 2024-07-10

Expand Down
19 changes: 12 additions & 7 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,18 @@ defmodule Lightning.Application do
def oban_opts do
opts = Application.get_env(:lightning, Oban)

opts[:plugins]
|> List.keyfind(Oban.Plugins.Cron, 0)
|> then(fn {mod, cron_opts} ->
{mod, put_usage_tracking_cron_opts(cron_opts)}
end)

opts
{_keyword, new_opts} =
opts[:plugins]
|> List.keyfind(Oban.Plugins.Cron, 0)
|> then(fn {mod, cron_opts} ->
{mod, put_usage_tracking_cron_opts(cron_opts)}
end)

updated_plugins =
opts[:plugins]
|> Keyword.merge([{Oban.Plugins.Cron, new_opts}])

opts |> Keyword.put(:plugins, updated_plugins)
end

defp put_usage_tracking_cron_opts(cron_opts) do
Expand Down
6 changes: 3 additions & 3 deletions lib/lightning/usage_tracking/run_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ defmodule Lightning.UsageTracking.RunService do
|> finished_on(date)
end

def unique_jobs(steps, date) do
def unique_job_ids(steps, date) do
steps
|> finished_on(date)
|> Enum.map(& &1.job)
|> Enum.uniq_by(& &1.id)
|> Enum.map(& &1.job_id)
|> Enum.uniq_by(& &1)
end

defp finished_on(collection, date) do
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/usage_tracking/workflow_metrics_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Lightning.UsageTracking.WorkflowMetricsService do
def generate_metrics(workflow, cleartext_enabled, date) do
runs = RunService.finished_runs(workflow.runs, date)
steps = RunService.finished_steps(workflow.runs, date)
active_jobs = RunService.unique_jobs(steps, date)
active_jobs = RunService.unique_job_ids(steps, date)

%{
no_of_active_jobs: Enum.count(active_jobs),
Expand Down
38 changes: 38 additions & 0 deletions test/lightning/application_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Lightning.ApplicationTest do
use Lightning.DataCase, async: true

alias Lightning.Config

import Lightning.ApplicationHelpers, only: [put_temporary_env: 3]

describe ".oban_opts/0" do
test "returns the Oban configuration with usage tracking cron options" do
put_temporary_env(
:lightning,
:usage_tracking,
enabled: true,
daily_batch_size: 100,
resubmission_batch_size: 100
)

before_config = Application.get_env(:lightning, Oban)
plugins = before_config |> Keyword.fetch!(:plugins)
before_cron_plugin = plugins |> Keyword.fetch!(Oban.Plugins.Cron)
before_crontab = before_cron_plugin |> Keyword.fetch!(:crontab)

expected_crontab = before_crontab ++ Config.usage_tracking_cron_opts()

expected_plugins =
Keyword.put(
plugins,
Oban.Plugins.Cron,
crontab: expected_crontab
)

after_config = Lightning.Application.oban_opts()
after_plugins = after_config |> Keyword.fetch!(:plugins)

assert after_plugins == expected_plugins
end
end
end
57 changes: 34 additions & 23 deletions test/lightning/usage_tracking/run_service_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,18 @@ defmodule Lightning.UsageTracking.RunServiceTest do
end
end

describe "unique_jobs/2" do
describe "unique_job_ids/2" do
test "returns unique jobs across all steps finished on report date" do
job_1 = insert(:job)
job_2 = insert(:job)
job_3 = insert(:job)
job_4 = insert(:job)
job_1_id = Ecto.UUID.generate()
job_2_id = Ecto.UUID.generate()
job_3_id = Ecto.UUID.generate()
job_4_id = Ecto.UUID.generate()

finished_on_date_1 = insert_step(job_1, @finished_at)
finished_on_date_2 = insert_step(job_2, @finished_at)
finished_on_date_3 = insert_step(job_1, @finished_at)
finished_on_another_date = insert_step(job_3, @other_finished_at)
unfinished = insert_step(job_4, nil)
finished_on_date_1 = insert_step(job_1_id, @finished_at)
finished_on_date_2 = insert_step(job_2_id, @finished_at)
finished_on_date_3 = insert_step(job_1_id, @finished_at)
finished_on_another_date = insert_step(job_3_id, @other_finished_at)
unfinished = insert_step(job_4_id, nil)

steps = [
finished_on_date_1,
Expand All @@ -149,21 +149,32 @@ defmodule Lightning.UsageTracking.RunServiceTest do
unfinished
]

assert RunService.unique_jobs(steps, @date) == [job_1, job_2]
assert RunService.unique_job_ids(steps, @date) == [job_1_id, job_2_id]
end

defp insert_step(job, finished_at) do
insert(
:run_step,
run:
build(
:run,
work_order: build(:workorder),
starting_job: job,
dataclip: build(:dataclip)
),
step: build(:step, finished_at: finished_at, job: job)
).step
defp insert_step(job_id, finished_at) do
step = %Lightning.Invocation.Step{
job_id: job_id,
input_dataclip: build(:dataclip),
snapshot: build(:snapshot),
finished_at: finished_at
}

run_step =
insert(
:run_step,
run:
build(
:run,
work_order: build(:workorder),
starting_job: build(:job),
dataclip: build(:dataclip)
),
step: step
)
|> Repo.preload(step: :job)

run_step.step
end
end
end

0 comments on commit e960141

Please sign in to comment.