Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(observer)!: observe actor annotations instead of spec ID #34

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions lib/lattice_observer/observed/event_processor.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
defmodule LatticeObserver.Observed.EventProcessor do
@annotation_app_spec "wasmcloud.dev/appspec"

alias LatticeObserver.Observed.{Lattice, Provider, Host, Actor, Instance, LinkDefinition}

defp get_claim(_l = %Lattice{claims: claims}, field, pk, default \\ "") do
Expand Down Expand Up @@ -63,15 +61,15 @@ defmodule LatticeObserver.Observed.EventProcessor do
}
end

def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, spec, _stamp, claims)
when is_binary(pk) and is_binary(instance_id) and is_binary(spec) do
def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, annotations, _stamp, claims)
when is_binary(pk) and is_binary(instance_id) and is_map(annotations) do
actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable"))
actor = merge_actor(actor, l, claims)

instance = %Instance{
id: instance_id,
host_id: host_id,
spec_id: spec,
annotations: annotations,
version: Map.get(claims, "version", get_claim(l, :version, pk)),
revision: Map.get(claims, "revision", get_claim(l, :rev, pk, 0))
}
Expand All @@ -91,7 +89,7 @@ defmodule LatticeObserver.Observed.EventProcessor do
link_name,
contract_id,
instance_id,
spec,
annotations,
stamp,
claims
) do
Expand All @@ -103,7 +101,7 @@ defmodule LatticeObserver.Observed.EventProcessor do
instance = %Instance{
id: instance_id,
host_id: source_host,
spec_id: spec,
annotations: annotations,
version: Map.get(claims, "version", get_claim(l, :version, pk)),
# NOTE - wasmCloud Host does not yet emit provider rev
revision: Map.get(claims, "revision", get_claim(l, :rev, pk, "0") |> parse_revision())
Expand Down Expand Up @@ -213,8 +211,6 @@ defmodule LatticeObserver.Observed.EventProcessor do
def record_heartbeat(l = %Lattice{}, source_host, stamp, data) do
labels = Map.get(data, "labels", %{})
friendly_name = Map.get(data, "friendly_name", "")
annotations = Map.get(data, "annotations", %{})
spec = Map.get(annotations, @annotation_app_spec, "")
uptime_seconds = Map.get(data, "uptime_seconds", 0)
version = Map.get(data, "version", "v0.0.0")

Expand Down Expand Up @@ -251,25 +247,23 @@ defmodule LatticeObserver.Observed.EventProcessor do

# legacy heartbeat has a list for the actors field...
# default to "new format" if this field is missing
# TODO - once a few wasmCloud OTP releases have gone by with the new heartbeat format, stop
# parsing/processing the legacy structure
l =
if is_list(Map.get(data, "actors", %{})) do
put_legacy_instances(l, source_host, spec, stamp, data)
put_legacy_instances(l, source_host, stamp, data)
else
actors_expanded =
Enum.flat_map(Map.get(data, "actors", %{}), fn {k, count} ->
Enum.map(1..count, fn _ -> k end)
Enum.flat_map(Map.get(data, "actors", %{}), fn {public_key, count} ->
Enum.map(1..count, fn _ -> {public_key, %{}} end)
end)

l =
List.foldl(actors_expanded, l, fn pk, acc ->
List.foldl(actors_expanded, l, fn {public_key, annotations}, acc ->
put_actor_instance(
acc,
source_host,
pk,
public_key,
"n/a",
spec,
annotations,
stamp,
%{}
)
Expand All @@ -283,7 +277,7 @@ defmodule LatticeObserver.Observed.EventProcessor do
x["link_name"],
Map.get(x, "contract_id", "n/a"),
"n/a",
spec,
Map.get(x, "annotations", %{}),
stamp,
%{}
)
Expand All @@ -293,15 +287,15 @@ defmodule LatticeObserver.Observed.EventProcessor do
l
end

defp put_legacy_instances(l = %Lattice{}, source_host, spec, stamp, data) do
defp put_legacy_instances(l = %Lattice{}, source_host, stamp, data) do
l =
List.foldl(Map.get(data, "actors", []), l, fn x, acc ->
put_actor_instance(
acc,
source_host,
x["public_key"],
x["instance_id"],
spec,
Map.get(x, "instance_id", "n/a"),
Map.get(x, "annotations", %{}),
stamp,
%{}
)
Expand All @@ -314,8 +308,8 @@ defmodule LatticeObserver.Observed.EventProcessor do
x["public_key"],
x["link_name"],
x["contract_id"],
x["instance_id"],
spec,
Map.get(x, "instance_id", "n/a"),
Map.get(x, "annotations", %{}),
stamp,
%{}
)
Expand Down
6 changes: 3 additions & 3 deletions lib/lattice_observer/observed/instance.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule LatticeObserver.Observed.Instance do
alias __MODULE__

@enforce_keys [:id, :host_id, :spec_id]
defstruct [:id, :host_id, :spec_id, :version, :revision]
@enforce_keys [:id, :host_id, :annotations]
defstruct [:id, :host_id, :annotations, :version, :revision]

@typedoc """
An instance represents an observation of a unit of scalability within the lattice. Instances
Expand All @@ -13,7 +13,7 @@ defmodule LatticeObserver.Observed.Instance do
@type t :: %Instance{
id: binary(),
host_id: binary(),
spec_id: binary(),
annotations: map(),
version: binary(),
revision: integer()
}
Expand Down
59 changes: 49 additions & 10 deletions lib/lattice_observer/observed/lattice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,45 @@ defmodule LatticeObserver.Observed.Lattice do
l
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
source: _source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.health_check_status"
}
) do
# This does not currently affect state, but shouldn't generate a warning either
l
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
source: _source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.actors_started"
}
) do
# This does not currently affect state, but shouldn't generate a warning either
l
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
source: _source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.actors_stopped"
}
) do
# This does not currently affect state, but shouldn't generate a warning either
l
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
Expand Down Expand Up @@ -389,7 +428,7 @@ defmodule LatticeObserver.Observed.Lattice do
type: "com.wasmcloud.lattice.actor_started"
}
) do
spec = Map.get(d, "annotations", %{}) |> Map.get(@annotation_app_spec, "")
annotations = Map.get(d, "annotations", %{})
claims = Map.get(d, "claims", %{})

l =
Expand All @@ -404,7 +443,7 @@ defmodule LatticeObserver.Observed.Lattice do
sub: pk
})

EventProcessor.put_actor_instance(l, source_host, pk, instance_id, spec, stamp, claims)
EventProcessor.put_actor_instance(l, source_host, pk, instance_id, annotations, stamp, claims)
end

def apply_event(
Expand Down Expand Up @@ -523,9 +562,9 @@ defmodule LatticeObserver.Observed.Lattice do
nil | binary(),
binary()
) :: [%{id: binary(), instance_id: binary(), host_id: binary()}]
def running_instances(%Lattice{} = l, pk, spec_id) when is_binary(pk) do
def running_instances(%Lattice{} = l, pk, app_name) when is_binary(pk) do
if String.starts_with?(pk, "M") do
actors = actors_in_appspec(l, spec_id)
actors = actors_in_appspec(l, app_name)

actors =
if pk != nil do
Expand All @@ -540,7 +579,7 @@ defmodule LatticeObserver.Observed.Lattice do
%{id: pk, instance_id: iid, host_id: hid}
end)
else
providers = providers_in_appspec(l, spec_id)
providers = providers_in_appspec(l, app_name)

providers =
if pk != nil do
Expand All @@ -559,17 +598,17 @@ defmodule LatticeObserver.Observed.Lattice do
end
end

def running_instances(%Lattice{}, nil, _spec_id) do
def running_instances(%Lattice{}, nil, _app_name) do
[]
end

@spec actors_in_appspec(LatticeObserver.Observed.Lattice.t(), binary) :: [
%{actor_id: binary(), instance_id: binary(), host_id: binary()}
]
def actors_in_appspec(%Lattice{actors: actors}, appspec) when is_binary(appspec) do
def actors_in_appspec(%Lattice{actors: actors}, app_name) when is_binary(app_name) do
for {pk, %Actor{instances: instances}} <- actors,
instance <- instances,
in_spec?(instance, appspec) do
in_spec?(instance, app_name) do
%{
actor_id: pk,
instance_id: instance.id,
Expand Down Expand Up @@ -635,7 +674,7 @@ defmodule LatticeObserver.Observed.Lattice do
Map.values(l.invocation_log)
end

defp in_spec?(%Instance{spec_id: spec_id}, appspec) do
spec_id == appspec
defp in_spec?(%Instance{annotations: annotations}, appspec) do
Map.get(annotations, @annotation_app_spec, "") == appspec
end
end
2 changes: 1 addition & 1 deletion test/observed/actors_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule LatticeObserverTest.Observed.ActorsTest do
host_id: "Nxxx",
id: "abc123",
revision: 0,
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
Expand Down
4 changes: 2 additions & 2 deletions test/observed/decay_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ defmodule LatticeObserverTest.Observed.DecayTest do
host_id: "Nxxx",
id: "abc123",
revision: 0,
spec_id: "none",
annotations: %{"wasmcloud.dev/appspec" => "none"},
version: "1.0"
}
],
Expand Down Expand Up @@ -137,7 +137,7 @@ defmodule LatticeObserverTest.Observed.DecayTest do
host_id: "Nxxx",
id: "abc123",
revision: 0,
spec_id: "none",
annotations: %{"wasmcloud.dev/appspec" => "none"},
version: "1.0"
}
],
Expand Down
8 changes: 4 additions & 4 deletions test/observed/hosts_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do
host_id: "Nxxx",
id: "iid1",
revision: 0,
spec_id: "",
annotations: %{},
version: ""
}
],
Expand All @@ -228,7 +228,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do
host_id: "Nxxx",
id: "iid3",
revision: 0,
spec_id: "",
annotations: %{},
version: ""
}
],
Expand Down Expand Up @@ -305,7 +305,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do
host_id: "Nxxx",
id: "n/a",
revision: 0,
spec_id: "",
annotations: %{},
version: ""
}
],
Expand All @@ -324,7 +324,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do
host_id: "Nxxx",
id: "n/a",
revision: 0,
spec_id: "",
annotations: %{},
version: ""
}
],
Expand Down
16 changes: 8 additions & 8 deletions test/observed/providers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
alias LatticeObserver.Observed.{Lattice, Instance, Provider, EventProcessor}
alias TestSupport.CloudEvents

@test_spec "testapp"
@test_spec_2 "othertestapp"
@test_spec %{"wasmcloud.dev/appspec" => "testapp"}
@test_spec_2 %{"wasmcloud.dev/appspec" => "othertestapp"}
@test_host "Nxxx"
@test_host2 "Nxxy"
@test_contract "wasmcloud:test"
Expand Down Expand Up @@ -51,7 +51,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
host_id: "Nxxx",
id: "n/a",
revision: 2,
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
Expand Down Expand Up @@ -152,7 +152,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
host_id: "Nxxx",
id: "n/a",
revision: 2,
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
Expand All @@ -169,7 +169,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
host_id: "Nxxx",
id: "n/a",
revision: 2,
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
Expand Down Expand Up @@ -224,14 +224,14 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
%Instance{
host_id: "Nxxy",
id: "n/a",
spec_id: "othertestapp",
annotations: %{"wasmcloud.dev/appspec" => "othertestapp"},
revision: 2,
version: "1.0"
},
%Instance{
host_id: "Nxxx",
id: "n/a",
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
revision: 2,
version: "1.0"
}
Expand All @@ -246,7 +246,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do
host_id: "Nxxx",
id: "n/a",
revision: 2,
spec_id: "testapp",
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
Expand Down
Loading