Skip to content

Commit

Permalink
fix: pod lifecycle data using gvks
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Nov 15, 2024
1 parent 7678afc commit 80dbad6
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 88 deletions.
5 changes: 5 additions & 0 deletions sk-core/src/k8s/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ where
build_object_meta_helper(Some(namespace.into()), name, sim_name, owner)
}

pub fn format_gvk_name(gvk: &GVK, ns_name: &str) -> String {
format!("{gvk}:{ns_name}")
}


pub fn sanitize_obj(obj: &mut DynamicObject, api_version: &str, kind: &str) {
obj.metadata.creation_timestamp = None;
obj.metadata.deletion_timestamp = None;
Expand Down
6 changes: 3 additions & 3 deletions sk-driver/src/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@ fn add_lifecycle_annotation(
) -> EmptyResult {
if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) {
for owner in owners {
let gvk = GVK::from_owner_ref(owner)?;
let owner_gvk = GVK::from_owner_ref(owner)?;
let owner_ns_name = format!("{}/{}", orig_ns, owner.name);
if !ctx.store.has_obj(&gvk, &owner_ns_name) {
if !ctx.store.has_obj(&owner_gvk, &owner_ns_name) {
continue;
}

let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?);
let seq = mut_data.count(hash);

let lifecycle = ctx.store.lookup_pod_lifecycle(&owner_ns_name, hash, seq);
let lifecycle = ctx.store.lookup_pod_lifecycle(&owner_gvk, &owner_ns_name, hash, seq);
if let Some(patch) = to_annotation_patch(&lifecycle) {
info!("applying lifecycle annotations (hash={hash}, seq={seq})");
if pod.metadata.annotations.is_none() {
Expand Down
4 changes: 2 additions & 2 deletions sk-driver/src/tests/mutation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionRespo
let mut store = MockTraceStore::new();
let _ = store
.expect_lookup_pod_lifecycle()
.with(predicate::always(), predicate::eq(EMPTY_POD_SPEC_HASH), predicate::eq(0))
.returning(|_, _, _| PodLifecycleData::Finished(1, 2))
.with(predicate::always(), predicate::always(), predicate::eq(EMPTY_POD_SPEC_HASH), predicate::eq(0))
.returning(|_, _, _, _| PodLifecycleData::Finished(1, 2))
.once();
let _ = store.expect_has_obj().returning(move |_gvk, o| o == owner_ns_name);

Expand Down
7 changes: 5 additions & 2 deletions sk-store/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use serde::{
Deserialize,
Serialize,
};
use sk_core::k8s::GVK;
use sk_core::k8s::{
format_gvk_name,
GVK,
};

#[derive(Default, Deserialize, Serialize)]
pub struct TraceIndex {
Expand All @@ -25,7 +28,7 @@ impl TraceIndex {
pub fn flattened_keys(&self) -> Vec<String> {
self.index
.iter()
.flat_map(|(gvk, gvk_hash)| gvk_hash.keys().map(move |k| format!("{gvk}:{k}")))
.flat_map(|(gvk, gvk_hash)| gvk_hash.keys().map(move |ns_name| format_gvk_name(gvk, ns_name)))
.collect()
}

Expand Down
6 changes: 3 additions & 3 deletions sk-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub struct ExportedTrace {
config: TracerConfig,
events: Vec<TraceEvent>,
index: TraceIndex,
pod_lifecycles: HashMap<String, PodLifecyclesMap>,
pod_lifecycles: HashMap<(GVK, String), PodLifecyclesMap>,
}

pub trait TraceStorable {
fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option<u64>) -> EmptyResult;
fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult;
fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult;
fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData;
fn lookup_pod_lifecycle(&self, gvk: &GVK, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData;
fn record_pod_lifecycle(
&mut self,
ns_name: &str,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub mod mock {
fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option<u64>) -> EmptyResult;
fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult;
fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult;
fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData;
fn lookup_pod_lifecycle(&self, owner_gvk: &GVK, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData;
fn record_pod_lifecycle(
&mut self,
ns_name: &str,
Expand Down
73 changes: 44 additions & 29 deletions sk-store/src/pod_owners_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;

use sk_core::errors::*;
use sk_core::k8s::PodLifecycleData;
use sk_core::k8s::{
format_gvk_name,
PodLifecycleData,
GVK,
};
use tracing::*;

use crate::TraceIndex;

// The PodOwnersMap tracks lifecycle data for all pods that are owned by some object that we care
// about (e.g., if we are tracking Deployments, the owners map will track the lifecycle data for
// all pods that are owned by a Deployment).
Expand Down Expand Up @@ -46,14 +52,14 @@ pub type PodLifecyclesMap = HashMap<u64, Vec<PodLifecycleData>>;

#[derive(Default)]
pub(crate) struct PodOwnersMap {
m: HashMap<String, PodLifecyclesMap>,
index: HashMap<String, (String, u64, usize)>,
m: HashMap<(GVK, String), PodLifecyclesMap>,
index: HashMap<String, ((GVK, String), u64, usize)>,
}

impl PodOwnersMap {
pub(crate) fn new_from_parts(
m: HashMap<String, PodLifecyclesMap>,
index: HashMap<String, (String, u64, usize)>,
m: HashMap<(GVK, String), PodLifecyclesMap>,
index: HashMap<String, ((GVK, String), u64, usize)>,
) -> PodOwnersMap {
PodOwnersMap { m, index }
}
Expand All @@ -64,20 +70,23 @@ impl PodOwnersMap {

pub(crate) fn lifecycle_data_for<'a>(
&'a self,
owner_gvk: &GVK,
owner_ns_name: &str,
pod_hash: u64,
) -> Option<&'a Vec<PodLifecycleData>> {
self.m.get(owner_ns_name)?.get(&pod_hash)
self.m.get(&(owner_gvk.clone(), owner_ns_name.into()))?.get(&pod_hash)
}

pub(crate) fn store_new_pod_lifecycle(
&mut self,
ns_name: &str,
pod_ns_name: &str,
owner_gvk: &GVK,
owner_ns_name: &str,
hash: u64,
lifecycle_data: &PodLifecycleData,
) {
let idx = match self.m.entry(owner_ns_name.into()) {
let owner_gvk_name = format_gvk_name(owner_gvk, owner_ns_name);
let idx = match self.m.entry((owner_gvk.clone(), owner_ns_name.into())) {
Entry::Vacant(e) => {
e.insert([(hash, vec![lifecycle_data.clone()])].into());
0
Expand All @@ -89,30 +98,33 @@ impl PodOwnersMap {
},
};

info!("inserting pod {ns_name} owned by {owner_ns_name} with hash {hash}: {lifecycle_data:?}");
self.index.insert(ns_name.into(), (owner_ns_name.into(), hash, idx));
info!("inserting pod {pod_ns_name} owned by {owner_gvk_name} with hash {hash}: {lifecycle_data:?}");
self.index
.insert(pod_ns_name.into(), ((owner_gvk.clone(), owner_ns_name.into()), hash, idx));
}

pub(crate) fn update_pod_lifecycle(&mut self, ns_name: &str, lifecycle_data: &PodLifecycleData) -> EmptyResult {
match self.index.get(ns_name) {
None => bail!("pod {} not present in index", ns_name),
Some((owner_ns_name, hash, sequence_idx)) => {
pub(crate) fn update_pod_lifecycle(&mut self, pod_ns_name: &str, lifecycle_data: &PodLifecycleData) -> EmptyResult {
match self.index.get(pod_ns_name) {
None => bail!("pod {} not present in index", pod_ns_name),
Some(((owner_gvk, owner_ns_name), hash, sequence_idx)) => {
let owner_entry = self
.m
.get_mut(owner_ns_name)
.ok_or(anyhow!("no owner entry for pod {}", ns_name))?;
let pods =
owner_entry
.get_mut(hash)
.ok_or(anyhow!("no entry for pod {} matching hash {}", ns_name, hash))?;
.get_mut(&(owner_gvk.clone(), owner_ns_name.into()))
.ok_or(anyhow!("no owner entry for pod {}", pod_ns_name))?;
let pods = owner_entry.get_mut(hash).ok_or(anyhow!(
"no entry for pod {} matching hash {}",
pod_ns_name,
hash
))?;
let pod_entry = pods.get_mut(*sequence_idx).ok_or(anyhow!(
"no sequence index {} for pod {} matching hash {}",
sequence_idx,
ns_name,
pod_ns_name,
hash
))?;

info!("updating pod {ns_name} owned by {owner_ns_name} with hash {hash}: {lifecycle_data:?}");
let owner_gvk_name = format_gvk_name(owner_gvk, owner_ns_name);
info!("updating pod {pod_ns_name} owned by {owner_gvk_name} with hash {hash}: {lifecycle_data:?}");
*pod_entry = lifecycle_data.clone();
Ok(())
},
Expand All @@ -126,21 +138,24 @@ impl PodOwnersMap {
&self,
start_ts: i64,
end_ts: i64,
index: &HashMap<String, u64>,
) -> HashMap<String, PodLifecyclesMap> {
index: &TraceIndex,
) -> HashMap<(GVK, String), PodLifecyclesMap> {
self.m
.iter()
// The filtering is a little complicated here; if the owning object isn't in the index,
// we discard it. Also, if none of the pods belonging to the owning object land
// within the given time window, we want to discard it. Otherwise, we want to filter
// down the list of pods to the ones that fall between the given time window.
.filter_map(|(owner, lifecycles_map)| {
if !index.contains_key(owner) {
.filter_map(|((owner_gvk, owner_ns_name), lifecycles_map)| {
if !index.contains(owner_gvk, owner_ns_name) {
return None;
}

// Note the question mark here, doing a bunch of heavy lifting
Some((owner.clone(), filter_lifecycles_map(start_ts, end_ts, lifecycles_map)?))
Some((
(owner_gvk.clone(), owner_ns_name.clone()),
filter_lifecycles_map(start_ts, end_ts, lifecycles_map)?,
))
})
.collect()
}
Expand Down Expand Up @@ -170,7 +185,7 @@ pub(crate) fn filter_lifecycles_map(

#[cfg(test)]
impl PodOwnersMap {
pub(crate) fn pod_owner_meta(&self, ns_name: &str) -> Option<&(String, u64, usize)> {
self.index.get(ns_name)
pub(crate) fn pod_owner_meta(&self, pod_ns_name: &str) -> Option<&((GVK, String), u64, usize)> {
self.index.get(pod_ns_name)
}
}
22 changes: 14 additions & 8 deletions sk-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl TraceStore {

// Collect all pod lifecycle data that is a) between the start and end times, and b) is
// owned by some object contained in the trace
// let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
let data = rmp_serde::to_vec_named(&ExportedTrace {
version: CURRENT_TRACE_VERSION,
config: self.config.clone(),
events,
index,
pod_lifecycles: HashMap::new(), // TODO pod lifecycles don't work in SK2.0
pod_lifecycles,
})?;

info!("Exported {} events", num_events);
Expand Down Expand Up @@ -230,8 +230,14 @@ impl TraceStorable for TraceStore {
Ok(())
}

fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData {
let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_ns_name, pod_hash);
fn lookup_pod_lifecycle(
&self,
owner_gvk: &GVK,
owner_ns_name: &str,
pod_hash: u64,
seq: usize,
) -> PodLifecycleData {
let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_gvk, owner_ns_name, pod_hash);
match maybe_lifecycle_data {
Some(data) => data[seq % data.len()].clone(),
_ => PodLifecycleData::Empty,
Expand Down Expand Up @@ -260,12 +266,12 @@ impl TraceStorable for TraceStore {
for owner in &owners {
// Pods are guaranteed to have namespaces, so the unwrap is fine
let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
let gvk = GVK::from_owner_ref(owner)?;
if !self.has_obj(&gvk, &owner_ns_name) {
let owner_gvk = GVK::from_owner_ref(owner)?;
if !self.has_obj(&owner_gvk, &owner_ns_name) {
continue;
}

if !self.config.track_lifecycle_for(&gvk) {
if !self.config.track_lifecycle_for(&owner_gvk) {
continue;
}

Expand All @@ -280,7 +286,7 @@ impl TraceStorable for TraceStore {
// more things out from this and/or allow users to specify what is filtered out.
let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?);
self.pod_owners
.store_new_pod_lifecycle(ns_name, &owner_ns_name, hash, lifecycle_data);
.store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
break;
}
} else {
Expand Down
46 changes: 30 additions & 16 deletions sk-store/src/tests/pod_owners_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,48 @@ fn owners_map() -> PodOwnersMap {

#[rstest]
fn test_store_new_pod_lifecycle(mut owners_map: PodOwnersMap) {
owners_map.store_new_pod_lifecycle("podA", "deployment1", 1234, &PodLifecycleData::Running(5));
owners_map.store_new_pod_lifecycle("podB", "deployment1", 1234, &PodLifecycleData::Running(7));
owners_map.store_new_pod_lifecycle("podC", "deployment1", 5678, &PodLifecycleData::Running(9));
owners_map.store_new_pod_lifecycle("podD", "deployment2", 5678, &PodLifecycleData::Running(13));
owners_map.store_new_pod_lifecycle("podA", &DEPL_GVK, "deployment1", 1234, &PodLifecycleData::Running(5));
owners_map.store_new_pod_lifecycle("podB", &DEPL_GVK, "deployment1", 1234, &PodLifecycleData::Running(7));
owners_map.store_new_pod_lifecycle("podC", &DEPL_GVK, "deployment1", 5678, &PodLifecycleData::Running(9));
owners_map.store_new_pod_lifecycle("podD", &DEPL_GVK, "deployment2", 5678, &PodLifecycleData::Running(13));
assert_eq!(
owners_map.lifecycle_data_for("deployment1", 1234).unwrap(),
owners_map.lifecycle_data_for(&DEPL_GVK, "deployment1", 1234).unwrap(),
&vec![PodLifecycleData::Running(5), PodLifecycleData::Running(7)]
);
assert_eq!(owners_map.lifecycle_data_for("deployment1", 5678).unwrap(), &vec![PodLifecycleData::Running(9)]);
assert_eq!(owners_map.lifecycle_data_for("deployment2", 5678).unwrap(), &vec![PodLifecycleData::Running(13)]);
assert_eq!(
owners_map.lifecycle_data_for(&DEPL_GVK, "deployment1", 5678).unwrap(),
&vec![PodLifecycleData::Running(9)]
);
assert_eq!(
owners_map.lifecycle_data_for(&DEPL_GVK, "deployment2", 5678).unwrap(),
&vec![PodLifecycleData::Running(13)]
);

assert_eq!(*owners_map.pod_owner_meta("podA").unwrap(), ("deployment1".to_string(), 1234, 0));
assert_eq!(*owners_map.pod_owner_meta("podB").unwrap(), ("deployment1".to_string(), 1234, 1));
assert_eq!(*owners_map.pod_owner_meta("podC").unwrap(), ("deployment1".to_string(), 5678, 0));
assert_eq!(*owners_map.pod_owner_meta("podD").unwrap(), ("deployment2".to_string(), 5678, 0));
assert_eq!(*owners_map.pod_owner_meta("podA").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 1234, 0));
assert_eq!(*owners_map.pod_owner_meta("podB").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 1234, 1));
assert_eq!(*owners_map.pod_owner_meta("podC").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 5678, 0));
assert_eq!(*owners_map.pod_owner_meta("podD").unwrap(), ((DEPL_GVK.clone(), "deployment2".into()), 5678, 0));
}

#[rstest]
fn test_filter_owners_map() {
let index = HashMap::from([("test/deployment1".into(), 9876), ("test/deployment2".into(), 5432)]);
let mut index = TraceIndex::new();
index.insert(DEPL_GVK.clone(), "test/deployment1".into(), 9876);
index.insert(DEPL_GVK.clone(), "test/deployment2".into(), 5432);
let owners_map = PodOwnersMap::new_from_parts(
HashMap::from([
("test/deployment1".into(), PodLifecyclesMap::from([(1234, vec![PodLifecycleData::Finished(1, 2)])])),
(
"test/deployment2".into(),
(DEPL_GVK.clone(), "test/deployment1".into()),
PodLifecyclesMap::from([(1234, vec![PodLifecycleData::Finished(1, 2)])]),
),
(
(DEPL_GVK.clone(), "test/deployment2".into()),
PodLifecyclesMap::from([(5678, vec![PodLifecycleData::Running(6), PodLifecycleData::Running(11)])]),
),
("test/deployment3".into(), PodLifecyclesMap::from([(9999, vec![PodLifecycleData::Finished(1, 2)])])),
(
(DEPL_GVK.clone(), "test/deployment3".into()),
PodLifecyclesMap::from([(9999, vec![PodLifecycleData::Finished(1, 2)])]),
),
]),
HashMap::new(),
);
Expand All @@ -53,7 +67,7 @@ fn test_filter_owners_map() {
assert_eq!(
res,
HashMap::from([(
"test/deployment2".into(),
(DEPL_GVK.clone(), "test/deployment2".into()),
PodLifecyclesMap::from([(5678, vec![PodLifecycleData::Running(6)])]),
)])
);
Expand Down
Loading

0 comments on commit 80dbad6

Please sign in to comment.