Skip to content

Commit

Permalink
Translate all cpu -> processing, disk -> storage for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Jan 31, 2025
1 parent f595db3 commit ba3639b
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 86 deletions.
116 changes: 65 additions & 51 deletions lib/uplink/availability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ defmodule Uplink.Availability do
end

defp compute_placeability(resources, requirements) do
template = %{"cpu" => [], "memory" => [], "disk" => []}
template = %{"processing" => [], "memory" => [], "storage" => []}

inputs = Enum.reduce(resources, template, &to_inputs(&1, &2, requirements))

Expand All @@ -104,11 +104,12 @@ defmodule Uplink.Availability do

zero = Decimal.new("0")

requested_cpu =
if requirement.actual_cpu && Decimal.gt?(requirement.actual_cpu, zero) do
Decimal.to_float(requirement.actual_cpu)
requested_processing =
if requirement.actual_processing &&
Decimal.gt?(requirement.actual_processing, zero) do
Decimal.to_float(requirement.actual_processing)
else
Decimal.to_float(requirement.cpu)
Decimal.to_float(requirement.processing)
end

requested_memory =
Expand All @@ -119,17 +120,18 @@ defmodule Uplink.Availability do
Decimal.to_float(requirement.memory)
end

requested_disk =
if requirement.actual_disk && Decimal.gt?(requirement.actual_disk, zero) do
Decimal.to_float(requirement.actual_disk)
requested_storage =
if requirement.actual_storage &&
Decimal.gt?(requirement.actual_storage, zero) do
Decimal.to_float(requirement.actual_storage)
else
Decimal.to_float(requirement.disk)
Decimal.to_float(requirement.storage)
end

%{
"cpu" =>
acc["cpu"] ++
[[requested_cpu, Decimal.to_float(resource.used.load_norm_5)]],
acc["processing"] ++
[[requested_processing, Decimal.to_float(resource.used.load_norm_5)]],
"memory" =>
acc["memory"] ++
[
Expand All @@ -140,12 +142,18 @@ defmodule Uplink.Availability do
]
],
"disk" =>
acc["disk"] ++
[[requested_disk, Decimal.to_float(resource.used.storage)]]
acc["storage"] ++
[[requested_storage, Decimal.to_float(resource.used.storage)]]
}
end

defp patch_with_placeability({resource, prediction}) do
prediction = %{
"processing" => prediction.cpu,
"memory" => prediction.memory,
"storage" => prediction.disk
}

%{resource | placeability: Placeability.parse(prediction)}
end

Expand Down Expand Up @@ -173,59 +181,65 @@ defmodule Uplink.Availability do
|> Enum.filter(fn {key, _} -> key in requirement.instances end)

summed_metrics =
Enum.reduce(metrics, %{cpu: 0.0, memory: 0.0, disk: 0.0}, fn {_key,
values},
acc ->
cpu = Enum.find(values, fn v -> Map.has_key?(v, "load_norm_5") end)

memory =
Enum.find(values, fn v -> Map.has_key?(v, "memory_used_bytes") end)

disk =
Enum.find(values, fn v -> Map.has_key?(v, "filesystem_used_bytes") end)

cpu = cpu["load_norm_5"]
memory = memory["memory_used_bytes"]
disk = disk["filesystem_used_bytes"]

cpu = cpu["top"]
memory = memory["top"]
disk = disk["top"]

cpu = List.first(cpu)
memory = List.first(memory)
disk = List.first(disk)

cpu = cpu["metrics"]["system.load.norm.5"] || 0.0
memory = memory["metrics"]["system.memory.actual.used.bytes"] || 0.0
disk = disk["metrics"]["system.filesystem.used.bytes"] || 0.0

Map.merge(acc, %{
cpu: acc.cpu + cpu,
memory: acc.memory + memory,
disk: acc.disk + disk
})
end)
Enum.reduce(
metrics,
%{processing: 0.0, memory: 0.0, storage: 0.0},
fn {_key, values}, acc ->
processing =
Enum.find(values, fn v -> Map.has_key?(v, "load_norm_5") end)

memory =
Enum.find(values, fn v -> Map.has_key?(v, "memory_used_bytes") end)

storage =
Enum.find(values, fn v ->
Map.has_key?(v, "filesystem_used_bytes")
end)

processing = processing["load_norm_5"]
memory = memory["memory_used_bytes"]
storage = storage["filesystem_used_bytes"]

processing = processing["top"]
memory = memory["top"]
storage = storage["top"]

processing = List.first(processing)
memory = List.first(memory)
storage = List.first(storage)

processing = processing["metrics"]["system.load.norm.5"] || 0.0
memory = memory["metrics"]["system.memory.actual.used.bytes"] || 0.0
storage = storage["metrics"]["system.filesystem.used.bytes"] || 0.0

Map.merge(acc, %{
processing: acc.processing + processing,
memory: acc.memory + memory,
storage: acc.storage + storage
})
end
)

node = Enum.find(nodes, fn n -> n.name == requirement.node end)

mean_metrics =
if length(metrics) > 0 do
%{
cpu: summed_metrics.cpu / length(metrics),
processing: summed_metrics.processing / length(metrics),
memory: summed_metrics.memory / length(metrics),
disk: summed_metrics.disk / length(metrics)
storage: summed_metrics.storage / length(metrics)
}
else
%{cpu: 0.0, memory: 0.0, disk: 0.0}
%{processing: 0.0, memory: 0.0, storage: 0.0}
end

%{
requirement
| actual_cpu: Decimal.new("#{mean_metrics.cpu}"),
| actual_processing: Decimal.new("#{mean_metrics.processing}"),
actual_memory:
Decimal.new("#{mean_metrics.memory / node.total_memory}"),
actual_disk: Decimal.new("#{mean_metrics.disk / node.total_storage}")
actual_storage:
Decimal.new("#{mean_metrics.storage / node.total_storage}")
}
end
end
13 changes: 10 additions & 3 deletions lib/uplink/availability/placeability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ defmodule Uplink.Availability.Placeability do
use Ecto.Schema
import Ecto.Changeset

@valid_attrs [
:processing,
:memory,
:storage
]

@derive Jason.Encoder

@primary_key false
embedded_schema do
field :cpu, :decimal
field :processing, :decimal
field :memory, :decimal
field :disk, :decimal
field :storage, :decimal
end

def changeset(placeability, params) do
placeability
|> cast(params, [:cpu, :memory, :disk])
|> cast(params, @valid_attrs)
|> validate_required(@valid_attrs)
end

def parse(params) do
Expand Down
31 changes: 25 additions & 6 deletions lib/uplink/availability/requirement.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,45 @@ defmodule Uplink.Availability.Requirement do
use Ecto.Schema
import Ecto.Changeset

@valid_attrs [
:node,
:instances,
:processing,
:memory,
:storage,
:actual_processing,
:actual_memory,
:actual_storage
]

@required_attrs [
:node,
:instances,
:processing,
:memory,
:storage
]

@derive Jason.Encoder

@primary_key false
embedded_schema do
field :node, :string
field :instances, {:array, :string}

field :cpu, :decimal
field :processing, :decimal
field :memory, :decimal
field :disk, :decimal
field :storage, :decimal

field :actual_cpu, :decimal
field :actual_processing, :decimal
field :actual_memory, :decimal
field :actual_disk, :decimal
field :actual_storage, :decimal
end

def changeset(requirement, params) do
requirement
|> cast(params, [:node, :instances, :cpu, :memory, :disk])
|> validate_required([:node, :instances, :cpu, :memory, :disk])
|> cast(params, @valid_attrs)
|> validate_required(@required_attrs)
end

def parse(params) do
Expand Down
5 changes: 3 additions & 2 deletions lib/uplink/availability/requirement/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ defmodule Uplink.Availability.Requirement.Manager do
%{
"node" => node.name,
"instances" => instance_names,
"cpu" => normalize(params.cpu, node.cpu_cores_count),
"processing" =>
normalize(params.processing, node.cpu_cores_count),
"memory" => normalize(params.memory, node.total_memory),
"disk" => normalize(params.disk, node.total_storage)
"storage" => normalize(params.storage, node.total_storage)
}
end)
|> Enum.map(&Requirement.parse(&1))
Expand Down
8 changes: 4 additions & 4 deletions lib/uplink/availability/requirement/params.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ defmodule Uplink.Availability.Requirement.Params do

embedded_schema do
field :project, :string
field :cpu, :decimal
field :processing, :decimal
field :memory, :decimal
field :disk, :decimal
field :storage, :decimal
end

def changeset(params_struct, params) do
params_struct
|> cast(params, [:project, :cpu, :memory, :disk])
|> validate_required([:project, :cpu, :memory, :disk])
|> cast(params, [:project, :processing, :memory, :storage])
|> validate_required([:project, :processing, :memory, :storage])
end

def parse(params) do
Expand Down
8 changes: 4 additions & 4 deletions test/uplink/availability/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ defmodule Uplink.Availability.RouterTest do
},
"requirement" => %{
"project" => "test",
"cpu" => 1,
"processing" => 1,
"memory" => 128_000_000,
"disk" => 300_000_000
"storage" => 300_000_000
}
})

Expand Down Expand Up @@ -200,8 +200,8 @@ defmodule Uplink.Availability.RouterTest do

assert %{
"errors" => %{
"cpu" => ["can't be blank"],
"disk" => ["can't be blank"],
"processing" => ["can't be blank"],
"storage" => ["can't be blank"],
"memory" => ["can't be blank"]
}
} = errors
Expand Down
Loading

0 comments on commit ba3639b

Please sign in to comment.