Skip to content

Commit

Permalink
Setup availability check structure and modules
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Jan 9, 2025
1 parent b9ddbb6 commit b966cd6
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 12 deletions.
46 changes: 46 additions & 0 deletions lib/uplink/availability.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Uplink.Availability do
alias Uplink.Metrics
alias Uplink.Pipelines

alias Uplink.Clients.LXD
alias Uplink.Clients.Instellar

alias __MODULE__.Query

def check! do
case get_monitor() do
%{"attributes" => _attributes} = monitor ->
check(monitor)

{:ok, monitors} ->
monitors
|> List.first()
|> check()

_ ->
raise "No monitor found"
end
end

def check(%{"attributes" => _} = monitor) when is_map(monitor) do
indices =
Query.index_types()
|> Enum.map(&Metrics.index/1)

members = LXD.list_cluster_members()

query = Query.build(members, indices)

Metrics.query!(monitor, query)
end

defp get_monitor do
monitors = Pipelines.get_monitors(:metrics)

if monitors == [] do
Instellar.list_monitors()
else
List.first(monitors)
end
end
end
3 changes: 3 additions & 0 deletions lib/uplink/availability/attribute.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Uplink.Availability.Attribute do
defstruct [:field, :name]
end
121 changes: 121 additions & 0 deletions lib/uplink/availability/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
defmodule Uplink.Availability.Query do
alias Uplink.Availability.Attribute
alias Uplink.Clients.LXD.Cluster.Member

@metrics_mappings %{
"metrics-system.memory-" => :memory,
"metrics-system.load-" => :load,
"metrics-system.filesystem-" => :filesystem
}

@metrics_aggregate_attributes %{
memory: [
%Attribute{
name: "memory_used_bytes",
field: "system.memory.actual.used.bytes"
}
],
load: [
%Attribute{
name: "load_norm_5",
field: "system.load.norm.5"
}
],
filesystem: [
%Attribute{
name: "filesystem_used_bytes",
field: "system.filesystem.used.bytes"
}
]
}

def index_types do
Map.values(@metrics_mappings)
end

@spec build([Member.t()] | Member.t(), [String.t()]) :: String.t()
def build(members, indices) when is_list(members) do
members
|> Enum.flat_map(fn node ->
build(node, indices)
end)
|> Enum.join("\n")
end

def build(%Member{server_name: key}, indices)
when is_list(indices) do
valid_prefixes = Map.keys(@metrics_mappings)

indices
|> Enum.filter(fn index ->
Enum.any?(valid_prefixes, fn prefix ->
index =~ prefix
end)
end)
|> Enum.flat_map(fn index ->
{_prefix, type} =
Enum.find(@metrics_mappings, fn {prefix, _type} ->
index =~ prefix
end)

attributes = Map.fetch!(@metrics_aggregate_attributes, type)

aggregates = %{
key => %{
terms: %{
field: "host.name",
size: 1000
},
aggs: Enum.reduce(attributes, %{}, &build_aggregate/2)
}
}

query = %{
size: 0,
query: %{
term: %{
"cloud.instance.id" => key
}
},
aggs: Enum.reduce(attributes, aggregates, &build_sum(key, &1, &2))
}

[%{index: index}, query]
end)
|> Enum.map(&Jason.encode_to_iodata!/1)
|> Enum.join("\n")
end

def retrieval_keys do
@metrics_aggregate_attributes
|> Map.values()
|> List.flatten()
|> Enum.map(& &1.name)
end

defp build_aggregate(attribute, acc) do
query = %{
top_metrics: %{
metrics: [
%{field: attribute.field}
],
sort: %{
"@timestamp" => "desc"
},
size: 1
}
}

Map.put(acc, attribute.name, query)
end

defp build_sum(key, attribute, acc) do
sum = %{
sum_bucket: %{
buckets_path: "#{key}>#{attribute.name}[#{attribute.field}]"
}
}

Map.put(acc, attribute.name, sum)
end
end
Empty file.
2 changes: 2 additions & 0 deletions lib/uplink/clients/lxd/cluster/member.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Uplink.Clients.LXD.Cluster.Member do
use Ecto.Schema
import Ecto.Changeset

@type t :: %__MODULE__{}

@valid_attrs ~w(
roles
failure_domain
Expand Down
38 changes: 26 additions & 12 deletions lib/uplink/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ defmodule Uplink.Metrics do
to: __MODULE__.Instance,
as: :metrics

def query!(%{"attributes" => attributes} = monitor, query) do
headers = headers(monitor)
endpoint = Map.fetch!(attributes, "endpoint")

request = request(endpoint, headers)

Req.post!(request, url: "/_msearch", body: query)
end

def query!(_, _), do: {:error, :invalid_monitor}

def push!(%{"attributes" => attributes} = monitor, documents) do
headers = headers(monitor)
endpoint = Map.fetch!(attributes, "endpoint")

request =
Req.new(
base_url: endpoint,
connect_options: [
protocols: [:http1],
transport_opts: [
verify: :verify_none
]
],
headers: headers
)
request = request(endpoint, headers)

Req.post!(request, url: "/_bulk", body: documents)
end
Expand All @@ -30,7 +31,20 @@ defmodule Uplink.Metrics do
"metrics-system.#{type}-uplink-#{uplink_id}"
end

defp headers(%{"attributes" => %{"uid" => uid, "token" => token}}) do
def request(endpoint, headers) do
Req.new(
base_url: endpoint,
connect_options: [
protocols: [:http1],
transport_opts: [
verify: :verify_none
]
],
headers: headers
)
end

def headers(%{"attributes" => %{"uid" => uid, "token" => token}}) do
encoded_token = Base.encode64("#{uid}:#{token}")

[
Expand Down

0 comments on commit b966cd6

Please sign in to comment.