Skip to content

Commit

Permalink
chore: upgrade complete
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Nov 17, 2024
1 parent 80dbad6 commit c2f14c8
Show file tree
Hide file tree
Showing 21 changed files with 1,596 additions and 1,330 deletions.
1,914 changes: 1,091 additions & 823 deletions Cargo.lock

Large diffs are not rendered by default.

64 changes: 31 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,51 @@ sk-api = { version = "1.1.1", path = "sk-api" }
sk-core = { version = "1.1.1", path = "sk-core" }
sk-store = { version = "1.1.1", path = "sk-store" }

anyhow = { version = "1.0.75", features = ["backtrace"] }
async-recursion = "1.0.5"
async-trait = "0.1.80"
bytes = "1.5.0"
anyhow = { version = "1.0.93", features = ["backtrace"] }
async-recursion = "1.1.1"
async-trait = "0.1.83"
bytes = "1.8.0"
chrono = "0.4.38"
clap = { version = "4.3.21", features = ["cargo", "derive", "string"] }
clap_complete = "4.5.6"
clockabilly = "0.1.0"
clap = { version = "4.5.21", features = ["cargo", "derive", "string"] }
clap_complete = "4.5.38"
clockabilly = "0.1.1"
derive_setters = "0.1.6"
dirs = "5.0.1"
either = "1.12.0"
futures = "0.3.28"
# remove this when we bump kube-rs
json-patch = { version = "1.0.0" }
either = "1.13.0"
futures = "0.3.31"
json-patch-ext = "0.1.0"
k8s-openapi = { version = "0.19.0", features = ["v1_27"] }
k8s-openapi = { version = "0.23.0", features = ["v1_27"] }
lazy_static = "1.5.0"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
# remove this fork once https://github.com/uutils/parse_datetime/pull/80 is merged and a new version released
parse_datetime_fork = { version = "0.6.0-custom" }
paste = "1.0.14"
object_store = { version = "0.11.1", features = ["aws", "gcp", "azure", "http"] }
parse_datetime = { git="https://github.com/uutils/parse_datetime", rev = "5e3fc53" }
paste = "1.0.15"
ratatui = "0.28.1"
regex = "1.10.2"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls"] }
rmp-serde = "1.1.2"
rocket = { version = "0.5.0", features = ["json", "tls"] }
schemars = { version = "0.8.12", features = ["chrono"] }
serde = "1.0.188"
serde_json = "1.0.105"
serde_yaml = "0.9.25"
thiserror = "1.0.46"
tokio = { version = "1.28.2", features = ["io-util", "macros", "process", "rt-multi-thread", "signal"] }
tracing = "0.1.37"
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"] }
rmp-serde = "1.3.0"
rocket = { version = "0.5.1", features = ["json", "tls"] }
schemars = { version = "0.8.21", features = ["chrono"] }
serde = "1.0.215"
serde_json = "1.0.132"
serde_yaml = "0.9.34"
thiserror = "1.0.69"
tokio = { version = "1.41.1", features = ["io-util", "macros", "process", "rt-multi-thread", "signal"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.4.1"
url = "2.5.3"

# test dependencies
assertables = "8.18.0"
http = "0.2.9"
http = "1.1.0"
httpmock = "0.6.8"
hyper = "0.14.27"
insta = "1.40.0"
hyper = "1.5.0"
insta = "1.41.1"
mockall = "0.11.4"
rstest = "0.18.2"
tracing-test = "0.2.4"
tracing-test = "0.2.5"

[workspace.dependencies.kube]
version = "0.85.0"
rev = "8b5230f"
git = "https://github.com/kube-rs/kube"
features = ["client", "derive", "runtime", "rustls-tls", "admission", "unstable-runtime"]
default-features = false
2 changes: 2 additions & 0 deletions examples/tracer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
trackedObjects:
apps/v1.Deployment:
podSpecTemplatePath: /spec/template
v1.ServiceAccount: {}
v1.ConfigMap: {}
34 changes: 19 additions & 15 deletions sk-cli/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use sk_api::v1::ExportFilters;
use sk_core::k8s::ApiSet;
use sk_core::prelude::*;
use sk_store::watchers::{
DynObjWatcher,
PodWatcher,
DynObjHandler,
ObjWatcher,
PodHandler,
};
use sk_store::{
TraceStore,
TracerConfig,
};
use tokio::task::JoinSet;

#[derive(clap::Args)]
pub struct Args {
Expand Down Expand Up @@ -53,24 +55,26 @@ pub async fn cmd(args: &Args) -> EmptyResult {

println!("Loading snapshot into store...");
let store = Arc::new(Mutex::new(TraceStore::new(config.clone())));
let (dyn_obj_watcher, do_ready_rx) =
DynObjWatcher::new(store.clone(), &mut apiset, &config.tracked_objects).await?;
let (pod_watcher, pod_ready_rx) = PodWatcher::new(client, store.clone(), apiset);
let mut js = JoinSet::new();
let mut do_ready_rxs = vec![];
for gvk in config.tracked_objects.keys() {
let (dyn_obj_handler, dyn_obj_stream) = DynObjHandler::new_with_stream(gvk, &mut apiset).await?;
let (dyn_obj_watcher, do_ready_rx) = ObjWatcher::new(dyn_obj_handler, dyn_obj_stream, store.clone());
do_ready_rxs.push(do_ready_rx);
js.spawn(dyn_obj_watcher.start());
}

let do_handle = tokio::spawn(dyn_obj_watcher.start());
let pod_handle = tokio::spawn(pod_watcher.start());
let (pod_handler, pod_stream) = PodHandler::new_with_stream(client, apiset);
let (pod_watcher, pod_ready_rx) = ObjWatcher::new(pod_handler, pod_stream, store.clone());
js.spawn(pod_watcher.start());

// the receivers block until they get a message, so don't actually care about the value
let _ = do_ready_rx.recv();
for do_ready_rx in do_ready_rxs {
let _ = do_ready_rx.recv();
}
let _ = pod_ready_rx.recv();

do_handle.abort();
pod_handle.abort();

// When I don't await the tasks, it seems like it hangs. I'm not 100% this was actually
// the issue though, it seemed a bit erratic.
let _ = do_handle.await;
let _ = pod_handle.await;
js.shutdown().await;

println!("Exporting snapshot data from store...");
let filters = ExportFilters::new(args.excluded_namespaces.clone(), vec![], true);
Expand Down
2 changes: 1 addition & 1 deletion sk-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ clockabilly = { workspace = true }
kube = { workspace = true }
k8s-openapi = { workspace = true }
object_store = { workspace = true }
parse_datetime_fork = { workspace = true }
parse_datetime = { workspace = true }
paste = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions sk-core/src/external_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use std::path::{
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
#[cfg(feature = "testutils")]
use mockall::automock;
use object_store::path::Path;
use object_store::{
DynObjectStore,
Expand Down Expand Up @@ -113,6 +111,9 @@ fn parse_path(path_str: &str) -> anyhow::Result<(ObjectStoreScheme, Path)> {
Ok(ObjectStoreScheme::parse(&url)?)
}

#[cfg(feature = "testutils")]
use mockall::automock;

#[cfg(test)]
mod test {
use rstest::*;
Expand Down
4 changes: 1 addition & 3 deletions sk-core/src/k8s/pod_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::cmp::{
Ordering,
};

use clockabilly::Clockable;
use tracing::*;

use super::*;
Expand Down Expand Up @@ -109,10 +108,9 @@ impl PodLifecycleData {
pub fn guess_finished_lifecycle(
pod: &corev1::Pod,
current_lifecycle_data: &PodLifecycleData,
clock: &(dyn Clockable + Send),
now: i64,
) -> anyhow::Result<PodLifecycleData> {
let new_lifecycle_data = PodLifecycleData::new_for(pod).unwrap_or(PodLifecycleData::Empty);
let now = clock.now_ts();

match new_lifecycle_data {
PodLifecycleData::Finished(..) => Ok(new_lifecycle_data),
Expand Down
14 changes: 1 addition & 13 deletions sk-core/src/k8s/testutils/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,7 @@ impl MockServerBuilder {

pub fn make_fake_apiserver() -> (MockServerBuilder, kube::Client) {
let builder = MockServerBuilder::new();
let config = kube::Config {
cluster_url: builder.url(),
default_namespace: "default".into(),
root_cert: None,
connect_timeout: None,
read_timeout: None,
write_timeout: None,
accept_invalid_certs: true,
auth_info: Default::default(),
proxy_url: None,
tls_server_name: None,
};

let config = kube::Config::new(builder.url());
let client = kube::Client::try_from(config).unwrap();
(builder, client)
}
Expand Down
2 changes: 1 addition & 1 deletion sk-core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clockabilly::{
DateTime,
Local,
};
use parse_datetime_fork::{
use parse_datetime::{
parse_datetime,
parse_datetime_at_date,
};
Expand Down
2 changes: 1 addition & 1 deletion sk-ctrl/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub fn build_driver_job(
secrets_list
.iter()
.map(|s| corev1::EnvFromSource {
secret_ref: Some(corev1::SecretEnvSource { name: Some(s.clone()), optional: Some(false) }),
secret_ref: Some(corev1::SecretEnvSource { name: s.clone(), optional: Some(false) }),
..Default::default()
})
.collect()
Expand Down
1 change: 0 additions & 1 deletion sk-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ clockabilly = { workspace = true }
either = { workspace = true }
kube = { workspace = true }
k8s-openapi = { workspace = true }
json-patch = { workspace = true }
json-patch-ext = { workspace = true }
rocket = { workspace = true }
serde_json = { workspace = true }
Expand Down
51 changes: 17 additions & 34 deletions sk-driver/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::sync::Mutex;

use json_patch::{
AddOperation,
use json_patch_ext::{
add_operation,
escape,
format_ptr,
Patch,
PatchOperation,
};
use json_patch_ext::escape;
use kube::core::admission::{
AdmissionRequest,
AdmissionResponse,
Expand Down Expand Up @@ -90,26 +91,14 @@ pub async fn mutate_pod(
return Ok(resp);
}

let mut patches = vec![];
add_simulation_labels(ctx, pod, &mut patches)?;
let mut patches =
vec![add_operation(format_ptr!("/metadata/labels/{}", escape(SIMULATION_LABEL_KEY)), json!(ctx.name))];
add_lifecycle_annotation(ctx, pod, &owners, mut_data, &mut patches)?;
add_node_selector_tolerations(pod, &mut patches)?;

Ok(resp.with_patch(Patch(patches))?)
}

fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.metadata.labels.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}));

Ok(())
}

fn add_lifecycle_annotation(
ctx: &DriverContext,
pod: &corev1::Pod,
Expand All @@ -132,10 +121,7 @@ fn add_lifecycle_annotation(
if let Some(patch) = to_annotation_patch(&lifecycle) {
info!("applying lifecycle annotations (hash={hash}, seq={seq})");
if pod.metadata.annotations.is_none() {
patches.push(PatchOperation::Add(AddOperation {
path: "/metadata/annotations".into(),
value: json!({}),
}));
patches.push(add_operation(format_ptr!("/metadata/annotations"), json!({})));
}
patches.push(patch);
break;
Expand All @@ -150,16 +136,13 @@ fn add_lifecycle_annotation(

fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.spec()?.tolerations.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) }));
patches.push(add_operation(format_ptr!("/spec/tolerations"), json!([])));
}
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}));
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "operator": "Exists", "effect": "NoSchedule"}),
}));
patches.push(add_operation(format_ptr!("/spec/nodeSelector"), json!({"type": "virtual"})));
patches.push(add_operation(
format_ptr!("/spec/tolerations/-"),
json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "operator": "Exists", "effect": "NoSchedule"}),
));

Ok(())
}
Expand All @@ -177,9 +160,9 @@ fn into_pod_review(resp: AdmissionResponse) -> AdmissionReview<corev1::Pod> {
fn to_annotation_patch(pld: &PodLifecycleData) -> Option<PatchOperation> {
match pld {
PodLifecycleData::Empty | PodLifecycleData::Running(_) => None,
PodLifecycleData::Finished(start_ts, end_ts) => Some(PatchOperation::Add(AddOperation {
path: format!("/metadata/annotations/{}", escape(LIFETIME_ANNOTATION_KEY)),
value: Value::String(format!("{}", end_ts - start_ts)),
})),
PodLifecycleData::Finished(start_ts, end_ts) => Some(add_operation(
format_ptr!("/metadata/annotations/{}", escape(LIFETIME_ANNOTATION_KEY)),
Value::String(format!("{}", end_ts - start_ts)),
)),
}
}
6 changes: 3 additions & 3 deletions sk-driver/src/tests/mutation_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use json_patch::{
patch,
use json_patch_ext::{
patch_ext,
Patch,
};
use kube::api::TypeMeta;
Expand Down Expand Up @@ -140,5 +140,5 @@ async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionRespo
adm_resp = mutate_pod(&ctx, adm_resp, &test_pod, &MutationData::new()).await.unwrap();
let mut json_pod = serde_json::to_value(&test_pod).unwrap();
let pod_patch: Patch = serde_json::from_slice(&adm_resp.patch.unwrap()).unwrap();
patch(&mut json_pod, &pod_patch).unwrap();
patch_ext(&mut json_pod, pod_patch.0[0].clone()).unwrap();
}
1 change: 1 addition & 0 deletions sk-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ testutils = ["dep:mockall"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clockabilly = { workspace = true }
futures = { workspace = true }
kube = { workspace = true }
Expand Down
Loading

0 comments on commit c2f14c8

Please sign in to comment.