From a1083f4aa76ae388e10f7f6863be700dc8a9c632 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Tue, 24 Oct 2023 10:31:50 -0400 Subject: [PATCH 1/2] fix(observer)!: observe annotations instead of spec ID Signed-off-by: Brooks Townsend feat(observer): support annotations on heartbeat Signed-off-by: Brooks Townsend reuse existing "legacy" code --- .../observed/event_processor.ex | 40 ++++++++----------- lib/lattice_observer/observed/instance.ex | 6 +-- lib/lattice_observer/observed/lattice.ex | 20 +++++----- test/observed/actors_test.exs | 2 +- test/observed/decay_test.exs | 4 +- test/observed/hosts_test.exs | 8 ++-- test/observed/providers_test.exs | 16 ++++---- 7 files changed, 45 insertions(+), 51 deletions(-) diff --git a/lib/lattice_observer/observed/event_processor.ex b/lib/lattice_observer/observed/event_processor.ex index 7ea836d..571c9ff 100644 --- a/lib/lattice_observer/observed/event_processor.ex +++ b/lib/lattice_observer/observed/event_processor.ex @@ -1,6 +1,4 @@ defmodule LatticeObserver.Observed.EventProcessor do - @annotation_app_spec "wasmcloud.dev/appspec" - alias LatticeObserver.Observed.{Lattice, Provider, Host, Actor, Instance, LinkDefinition} defp get_claim(_l = %Lattice{claims: claims}, field, pk, default \\ "") do @@ -63,15 +61,15 @@ defmodule LatticeObserver.Observed.EventProcessor do } end - def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, spec, _stamp, claims) - when is_binary(pk) and is_binary(instance_id) and is_binary(spec) do + def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, annotations, _stamp, claims) + when is_binary(pk) and is_binary(instance_id) and is_map(annotations) do actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable")) actor = merge_actor(actor, l, claims) instance = %Instance{ id: instance_id, host_id: host_id, - spec_id: spec, + annotations: annotations, version: Map.get(claims, "version", get_claim(l, :version, pk)), revision: Map.get(claims, "revision", get_claim(l, :rev, pk, 0)) } @@ -91,7 +89,7 @@ defmodule LatticeObserver.Observed.EventProcessor do link_name, contract_id, instance_id, - spec, + annotations, stamp, claims ) do @@ -103,7 +101,7 @@ defmodule LatticeObserver.Observed.EventProcessor do instance = %Instance{ id: instance_id, host_id: source_host, - spec_id: spec, + annotations: annotations, version: Map.get(claims, "version", get_claim(l, :version, pk)), # NOTE - wasmCloud Host does not yet emit provider rev revision: Map.get(claims, "revision", get_claim(l, :rev, pk, "0") |> parse_revision()) @@ -213,8 +211,6 @@ defmodule LatticeObserver.Observed.EventProcessor do def record_heartbeat(l = %Lattice{}, source_host, stamp, data) do labels = Map.get(data, "labels", %{}) friendly_name = Map.get(data, "friendly_name", "") - annotations = Map.get(data, "annotations", %{}) - spec = Map.get(annotations, @annotation_app_spec, "") uptime_seconds = Map.get(data, "uptime_seconds", 0) version = Map.get(data, "version", "v0.0.0") @@ -251,25 +247,23 @@ defmodule LatticeObserver.Observed.EventProcessor do # legacy heartbeat has a list for the actors field... # default to "new format" if this field is missing - # TODO - once a few wasmCloud OTP releases have gone by with the new heartbeat format, stop - # parsing/processing the legacy structure l = if is_list(Map.get(data, "actors", %{})) do - put_legacy_instances(l, source_host, spec, stamp, data) + put_legacy_instances(l, source_host, stamp, data) else actors_expanded = - Enum.flat_map(Map.get(data, "actors", %{}), fn {k, count} -> - Enum.map(1..count, fn _ -> k end) + Enum.flat_map(Map.get(data, "actors", %{}), fn {public_key, count} -> + Enum.map(1..count, fn _ -> {public_key, %{}} end) end) l = - List.foldl(actors_expanded, l, fn pk, acc -> + List.foldl(actors_expanded, l, fn public_key, acc -> put_actor_instance( acc, source_host, - pk, + public_key, "n/a", - spec, + %{}, stamp, %{} ) @@ -283,7 +277,7 @@ defmodule LatticeObserver.Observed.EventProcessor do x["link_name"], Map.get(x, "contract_id", "n/a"), "n/a", - spec, + Map.get(x, "annotations", %{}), stamp, %{} ) @@ -293,15 +287,15 @@ defmodule LatticeObserver.Observed.EventProcessor do l end - defp put_legacy_instances(l = %Lattice{}, source_host, spec, stamp, data) do + defp put_legacy_instances(l = %Lattice{}, source_host, stamp, data) do l = List.foldl(Map.get(data, "actors", []), l, fn x, acc -> put_actor_instance( acc, source_host, x["public_key"], - x["instance_id"], - spec, + "n/a", + x["annotations"], stamp, %{} ) @@ -314,8 +308,8 @@ defmodule LatticeObserver.Observed.EventProcessor do x["public_key"], x["link_name"], x["contract_id"], - x["instance_id"], - spec, + "n/a", + x["annotations"], stamp, %{} ) diff --git a/lib/lattice_observer/observed/instance.ex b/lib/lattice_observer/observed/instance.ex index fb28467..5187b09 100644 --- a/lib/lattice_observer/observed/instance.ex +++ b/lib/lattice_observer/observed/instance.ex @@ -1,8 +1,8 @@ defmodule LatticeObserver.Observed.Instance do alias __MODULE__ - @enforce_keys [:id, :host_id, :spec_id] - defstruct [:id, :host_id, :spec_id, :version, :revision] + @enforce_keys [:id, :host_id, :annotations] + defstruct [:id, :host_id, :annotations, :version, :revision] @typedoc """ An instance represents an observation of a unit of scalability within the lattice. Instances @@ -13,7 +13,7 @@ defmodule LatticeObserver.Observed.Instance do @type t :: %Instance{ id: binary(), host_id: binary(), - spec_id: binary(), + annotations: map(), version: binary(), revision: integer() } diff --git a/lib/lattice_observer/observed/lattice.ex b/lib/lattice_observer/observed/lattice.ex index 435b69d..37bf75d 100644 --- a/lib/lattice_observer/observed/lattice.ex +++ b/lib/lattice_observer/observed/lattice.ex @@ -389,7 +389,7 @@ defmodule LatticeObserver.Observed.Lattice do type: "com.wasmcloud.lattice.actor_started" } ) do - spec = Map.get(d, "annotations", %{}) |> Map.get(@annotation_app_spec, "") + annotations = Map.get(d, "annotations", %{}) claims = Map.get(d, "claims", %{}) l = @@ -404,7 +404,7 @@ defmodule LatticeObserver.Observed.Lattice do sub: pk }) - EventProcessor.put_actor_instance(l, source_host, pk, instance_id, spec, stamp, claims) + EventProcessor.put_actor_instance(l, source_host, pk, instance_id, annotations, stamp, claims) end def apply_event( @@ -523,9 +523,9 @@ defmodule LatticeObserver.Observed.Lattice do nil | binary(), binary() ) :: [%{id: binary(), instance_id: binary(), host_id: binary()}] - def running_instances(%Lattice{} = l, pk, spec_id) when is_binary(pk) do + def running_instances(%Lattice{} = l, pk, app_name) when is_binary(pk) do if String.starts_with?(pk, "M") do - actors = actors_in_appspec(l, spec_id) + actors = actors_in_appspec(l, app_name) actors = if pk != nil do @@ -540,7 +540,7 @@ defmodule LatticeObserver.Observed.Lattice do %{id: pk, instance_id: iid, host_id: hid} end) else - providers = providers_in_appspec(l, spec_id) + providers = providers_in_appspec(l, app_name) providers = if pk != nil do @@ -559,17 +559,17 @@ defmodule LatticeObserver.Observed.Lattice do end end - def running_instances(%Lattice{}, nil, _spec_id) do + def running_instances(%Lattice{}, nil, _app_name) do [] end @spec actors_in_appspec(LatticeObserver.Observed.Lattice.t(), binary) :: [ %{actor_id: binary(), instance_id: binary(), host_id: binary()} ] - def actors_in_appspec(%Lattice{actors: actors}, appspec) when is_binary(appspec) do + def actors_in_appspec(%Lattice{actors: actors}, app_name) when is_binary(app_name) do for {pk, %Actor{instances: instances}} <- actors, instance <- instances, - in_spec?(instance, appspec) do + in_spec?(instance, app_name) do %{ actor_id: pk, instance_id: instance.id, @@ -635,7 +635,7 @@ defmodule LatticeObserver.Observed.Lattice do Map.values(l.invocation_log) end - defp in_spec?(%Instance{spec_id: spec_id}, appspec) do - spec_id == appspec + defp in_spec?(%Instance{annotations: annotations}, appspec) do + Map.get(annotations, @annotation_app_spec, "") == appspec end end diff --git a/test/observed/actors_test.exs b/test/observed/actors_test.exs index 613f316..d3b5275 100644 --- a/test/observed/actors_test.exs +++ b/test/observed/actors_test.exs @@ -24,7 +24,7 @@ defmodule LatticeObserverTest.Observed.ActorsTest do host_id: "Nxxx", id: "abc123", revision: 0, - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, version: "1.0" } ], diff --git a/test/observed/decay_test.exs b/test/observed/decay_test.exs index 62fcf22..0912720 100644 --- a/test/observed/decay_test.exs +++ b/test/observed/decay_test.exs @@ -98,7 +98,7 @@ defmodule LatticeObserverTest.Observed.DecayTest do host_id: "Nxxx", id: "abc123", revision: 0, - spec_id: "none", + annotations: %{"wasmcloud.dev/appspec" => "none"}, version: "1.0" } ], @@ -137,7 +137,7 @@ defmodule LatticeObserverTest.Observed.DecayTest do host_id: "Nxxx", id: "abc123", revision: 0, - spec_id: "none", + annotations: %{"wasmcloud.dev/appspec" => "none"}, version: "1.0" } ], diff --git a/test/observed/hosts_test.exs b/test/observed/hosts_test.exs index e3d42aa..4c1e01b 100644 --- a/test/observed/hosts_test.exs +++ b/test/observed/hosts_test.exs @@ -209,7 +209,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do host_id: "Nxxx", id: "iid1", revision: 0, - spec_id: "", + annotations: %{}, version: "" } ], @@ -228,7 +228,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do host_id: "Nxxx", id: "iid3", revision: 0, - spec_id: "", + annotations: %{}, version: "" } ], @@ -305,7 +305,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do host_id: "Nxxx", id: "n/a", revision: 0, - spec_id: "", + annotations: %{}, version: "" } ], @@ -324,7 +324,7 @@ defmodule LatticeObserverTest.Observed.HostsTest do host_id: "Nxxx", id: "n/a", revision: 0, - spec_id: "", + annotations: %{}, version: "" } ], diff --git a/test/observed/providers_test.exs b/test/observed/providers_test.exs index 58ae800..c0cb830 100644 --- a/test/observed/providers_test.exs +++ b/test/observed/providers_test.exs @@ -3,8 +3,8 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do alias LatticeObserver.Observed.{Lattice, Instance, Provider, EventProcessor} alias TestSupport.CloudEvents - @test_spec "testapp" - @test_spec_2 "othertestapp" + @test_spec %{"wasmcloud.dev/appspec" => "testapp"} + @test_spec_2 %{"wasmcloud.dev/appspec" => "othertestapp"} @test_host "Nxxx" @test_host2 "Nxxy" @test_contract "wasmcloud:test" @@ -51,7 +51,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do host_id: "Nxxx", id: "n/a", revision: 2, - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, version: "1.0" } ], @@ -152,7 +152,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do host_id: "Nxxx", id: "n/a", revision: 2, - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, version: "1.0" } ], @@ -169,7 +169,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do host_id: "Nxxx", id: "n/a", revision: 2, - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, version: "1.0" } ], @@ -224,14 +224,14 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do %Instance{ host_id: "Nxxy", id: "n/a", - spec_id: "othertestapp", + annotations: %{"wasmcloud.dev/appspec" => "othertestapp"}, revision: 2, version: "1.0" }, %Instance{ host_id: "Nxxx", id: "n/a", - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, revision: 2, version: "1.0" } @@ -246,7 +246,7 @@ defmodule LatticeObserverTest.Observed.ProvidersTest do host_id: "Nxxx", id: "n/a", revision: 2, - spec_id: "testapp", + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, version: "1.0" } ], From 8c55daf7b1f94e14a71924b25d85f18f591e5c87 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Tue, 24 Oct 2023 12:32:07 -0400 Subject: [PATCH 2/2] chore: ignore some new events Signed-off-by: Brooks Townsend fix tests Signed-off-by: Brooks Townsend --- .../observed/event_processor.ex | 12 +++--- lib/lattice_observer/observed/lattice.ex | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/lib/lattice_observer/observed/event_processor.ex b/lib/lattice_observer/observed/event_processor.ex index 571c9ff..8ee346e 100644 --- a/lib/lattice_observer/observed/event_processor.ex +++ b/lib/lattice_observer/observed/event_processor.ex @@ -257,13 +257,13 @@ defmodule LatticeObserver.Observed.EventProcessor do end) l = - List.foldl(actors_expanded, l, fn public_key, acc -> + List.foldl(actors_expanded, l, fn {public_key, annotations}, acc -> put_actor_instance( acc, source_host, public_key, "n/a", - %{}, + annotations, stamp, %{} ) @@ -294,8 +294,8 @@ defmodule LatticeObserver.Observed.EventProcessor do acc, source_host, x["public_key"], - "n/a", - x["annotations"], + Map.get(x, "instance_id", "n/a"), + Map.get(x, "annotations", %{}), stamp, %{} ) @@ -308,8 +308,8 @@ defmodule LatticeObserver.Observed.EventProcessor do x["public_key"], x["link_name"], x["contract_id"], - "n/a", - x["annotations"], + Map.get(x, "instance_id", "n/a"), + Map.get(x, "annotations", %{}), stamp, %{} ) diff --git a/lib/lattice_observer/observed/lattice.ex b/lib/lattice_observer/observed/lattice.ex index 37bf75d..8778f16 100644 --- a/lib/lattice_observer/observed/lattice.ex +++ b/lib/lattice_observer/observed/lattice.ex @@ -328,6 +328,45 @@ defmodule LatticeObserver.Observed.Lattice do l end + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + source: _source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.health_check_status" + } + ) do + # This does not currently affect state, but shouldn't generate a warning either + l + end + + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + source: _source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.actors_started" + } + ) do + # This does not currently affect state, but shouldn't generate a warning either + l + end + + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + source: _source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.actors_stopped" + } + ) do + # This does not currently affect state, but shouldn't generate a warning either + l + end + def apply_event( l = %Lattice{}, %Cloudevents.Format.V_1_0.Event{