Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade complete #158

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,914 changes: 1,091 additions & 823 deletions Cargo.lock

Large diffs are not rendered by default.

64 changes: 31 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,51 @@ sk-api = { version = "1.1.1", path = "sk-api" }
sk-core = { version = "1.1.1", path = "sk-core" }
sk-store = { version = "1.1.1", path = "sk-store" }

anyhow = { version = "1.0.75", features = ["backtrace"] }
async-recursion = "1.0.5"
async-trait = "0.1.80"
bytes = "1.5.0"
anyhow = { version = "1.0.93", features = ["backtrace"] }
async-recursion = "1.1.1"
async-trait = "0.1.83"
bytes = "1.8.0"
chrono = "0.4.38"
clap = { version = "4.3.21", features = ["cargo", "derive", "string"] }
clap_complete = "4.5.6"
clockabilly = "0.1.0"
clap = { version = "4.5.21", features = ["cargo", "derive", "string"] }
clap_complete = "4.5.38"
clockabilly = "0.1.1"
derive_setters = "0.1.6"
dirs = "5.0.1"
either = "1.12.0"
futures = "0.3.28"
# remove this when we bump kube-rs
json-patch = { version = "1.0.0" }
either = "1.13.0"
futures = "0.3.31"
json-patch-ext = "0.1.0"
k8s-openapi = { version = "0.19.0", features = ["v1_27"] }
k8s-openapi = { version = "0.23.0", features = ["v1_27"] }
lazy_static = "1.5.0"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
# remove this fork once https://github.com/uutils/parse_datetime/pull/80 is merged and a new version released
parse_datetime_fork = { version = "0.6.0-custom" }
paste = "1.0.14"
object_store = { version = "0.11.1", features = ["aws", "gcp", "azure", "http"] }
parse_datetime = { git="https://github.com/uutils/parse_datetime", rev = "5e3fc53" }
paste = "1.0.15"
ratatui = "0.28.1"
regex = "1.10.2"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls"] }
rmp-serde = "1.1.2"
rocket = { version = "0.5.0", features = ["json", "tls"] }
schemars = { version = "0.8.12", features = ["chrono"] }
serde = "1.0.188"
serde_json = "1.0.105"
serde_yaml = "0.9.25"
thiserror = "1.0.46"
tokio = { version = "1.28.2", features = ["io-util", "macros", "process", "rt-multi-thread", "signal"] }
tracing = "0.1.37"
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"] }
rmp-serde = "1.3.0"
rocket = { version = "0.5.1", features = ["json", "tls"] }
schemars = { version = "0.8.21", features = ["chrono"] }
serde = "1.0.215"
serde_json = "1.0.132"
serde_yaml = "0.9.34"
thiserror = "1.0.69"
tokio = { version = "1.41.1", features = ["io-util", "macros", "process", "rt-multi-thread", "signal"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.4.1"
url = "2.5.3"

# test dependencies
assertables = "8.18.0"
http = "0.2.9"
http = "1.1.0"
httpmock = "0.6.8"
hyper = "0.14.27"
insta = "1.40.0"
hyper = "1.5.0"
insta = "1.41.1"
mockall = "0.11.4"
rstest = "0.18.2"
tracing-test = "0.2.4"
tracing-test = "0.2.5"

[workspace.dependencies.kube]
version = "0.85.0"
rev = "8b5230f"
git = "https://github.com/kube-rs/kube"
features = ["client", "derive", "runtime", "rustls-tls", "admission", "unstable-runtime"]
default-features = false
2 changes: 2 additions & 0 deletions examples/tracer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
trackedObjects:
apps/v1.Deployment:
podSpecTemplatePath: /spec/template
v1.ServiceAccount: {}
v1.ConfigMap: {}
34 changes: 19 additions & 15 deletions sk-cli/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use sk_api::v1::ExportFilters;
use sk_core::k8s::ApiSet;
use sk_core::prelude::*;
use sk_store::watchers::{
DynObjWatcher,
PodWatcher,
DynObjHandler,
ObjWatcher,
PodHandler,
};
use sk_store::{
TraceStore,
TracerConfig,
};
use tokio::task::JoinSet;

#[derive(clap::Args)]
pub struct Args {
Expand Down Expand Up @@ -53,24 +55,26 @@ pub async fn cmd(args: &Args) -> EmptyResult {

println!("Loading snapshot into store...");
let store = Arc::new(Mutex::new(TraceStore::new(config.clone())));
let (dyn_obj_watcher, do_ready_rx) =
DynObjWatcher::new(store.clone(), &mut apiset, &config.tracked_objects).await?;
let (pod_watcher, pod_ready_rx) = PodWatcher::new(client, store.clone(), apiset);
let mut js = JoinSet::new();
let mut do_ready_rxs = vec![];
for gvk in config.tracked_objects.keys() {
let (dyn_obj_handler, dyn_obj_stream) = DynObjHandler::new_with_stream(gvk, &mut apiset).await?;
let (dyn_obj_watcher, do_ready_rx) = ObjWatcher::new(dyn_obj_handler, dyn_obj_stream, store.clone());
do_ready_rxs.push(do_ready_rx);
js.spawn(dyn_obj_watcher.start());
}

let do_handle = tokio::spawn(dyn_obj_watcher.start());
let pod_handle = tokio::spawn(pod_watcher.start());
let (pod_handler, pod_stream) = PodHandler::new_with_stream(client, apiset);
let (pod_watcher, pod_ready_rx) = ObjWatcher::new(pod_handler, pod_stream, store.clone());
js.spawn(pod_watcher.start());

// the receivers block until they get a message, so don't actually care about the value
let _ = do_ready_rx.recv();
for do_ready_rx in do_ready_rxs {
let _ = do_ready_rx.recv();
}
let _ = pod_ready_rx.recv();

do_handle.abort();
pod_handle.abort();

// When I don't await the tasks, it seems like it hangs. I'm not 100% this was actually
// the issue though, it seemed a bit erratic.
let _ = do_handle.await;
let _ = pod_handle.await;
js.shutdown().await;

println!("Exporting snapshot data from store...");
let filters = ExportFilters::new(args.excluded_namespaces.clone(), vec![], true);
Expand Down
2 changes: 1 addition & 1 deletion sk-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ clockabilly = { workspace = true }
kube = { workspace = true }
k8s-openapi = { workspace = true }
object_store = { workspace = true }
parse_datetime_fork = { workspace = true }
parse_datetime = { workspace = true }
paste = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions sk-core/src/external_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use std::path::{
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
#[cfg(feature = "testutils")]
use mockall::automock;
use object_store::path::Path;
use object_store::{
DynObjectStore,
Expand Down Expand Up @@ -113,6 +111,9 @@ fn parse_path(path_str: &str) -> anyhow::Result<(ObjectStoreScheme, Path)> {
Ok(ObjectStoreScheme::parse(&url)?)
}

#[cfg(feature = "testutils")]
use mockall::automock;

#[cfg(test)]
mod test {
use rstest::*;
Expand Down
4 changes: 1 addition & 3 deletions sk-core/src/k8s/pod_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::cmp::{
Ordering,
};

use clockabilly::Clockable;
use tracing::*;

use super::*;
Expand Down Expand Up @@ -109,10 +108,9 @@ impl PodLifecycleData {
pub fn guess_finished_lifecycle(
pod: &corev1::Pod,
current_lifecycle_data: &PodLifecycleData,
clock: &(dyn Clockable + Send),
now: i64,
) -> anyhow::Result<PodLifecycleData> {
let new_lifecycle_data = PodLifecycleData::new_for(pod).unwrap_or(PodLifecycleData::Empty);
let now = clock.now_ts();

match new_lifecycle_data {
PodLifecycleData::Finished(..) => Ok(new_lifecycle_data),
Expand Down
14 changes: 1 addition & 13 deletions sk-core/src/k8s/testutils/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,7 @@ impl MockServerBuilder {

pub fn make_fake_apiserver() -> (MockServerBuilder, kube::Client) {
let builder = MockServerBuilder::new();
let config = kube::Config {
cluster_url: builder.url(),
default_namespace: "default".into(),
root_cert: None,
connect_timeout: None,
read_timeout: None,
write_timeout: None,
accept_invalid_certs: true,
auth_info: Default::default(),
proxy_url: None,
tls_server_name: None,
};

let config = kube::Config::new(builder.url());
let client = kube::Client::try_from(config).unwrap();
(builder, client)
}
Expand Down
2 changes: 1 addition & 1 deletion sk-core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clockabilly::{
DateTime,
Local,
};
use parse_datetime_fork::{
use parse_datetime::{
parse_datetime,
parse_datetime_at_date,
};
Expand Down
2 changes: 1 addition & 1 deletion sk-ctrl/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub fn build_driver_job(
secrets_list
.iter()
.map(|s| corev1::EnvFromSource {
secret_ref: Some(corev1::SecretEnvSource { name: Some(s.clone()), optional: Some(false) }),
secret_ref: Some(corev1::SecretEnvSource { name: s.clone(), optional: Some(false) }),
..Default::default()
})
.collect()
Expand Down
1 change: 0 additions & 1 deletion sk-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ clockabilly = { workspace = true }
either = { workspace = true }
kube = { workspace = true }
k8s-openapi = { workspace = true }
json-patch = { workspace = true }
json-patch-ext = { workspace = true }
rocket = { workspace = true }
serde_json = { workspace = true }
Expand Down
51 changes: 17 additions & 34 deletions sk-driver/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::sync::Mutex;

use json_patch::{
AddOperation,
use json_patch_ext::{
add_operation,
escape,
format_ptr,
Patch,
PatchOperation,
};
use json_patch_ext::escape;
use kube::core::admission::{
AdmissionRequest,
AdmissionResponse,
Expand Down Expand Up @@ -90,26 +91,14 @@ pub async fn mutate_pod(
return Ok(resp);
}

let mut patches = vec![];
add_simulation_labels(ctx, pod, &mut patches)?;
let mut patches =
vec![add_operation(format_ptr!("/metadata/labels/{}", escape(SIMULATION_LABEL_KEY)), json!(ctx.name))];
add_lifecycle_annotation(ctx, pod, &owners, mut_data, &mut patches)?;
add_node_selector_tolerations(pod, &mut patches)?;

Ok(resp.with_patch(Patch(patches))?)
}

fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.metadata.labels.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}));

Ok(())
}

fn add_lifecycle_annotation(
ctx: &DriverContext,
pod: &corev1::Pod,
Expand All @@ -132,10 +121,7 @@ fn add_lifecycle_annotation(
if let Some(patch) = to_annotation_patch(&lifecycle) {
info!("applying lifecycle annotations (hash={hash}, seq={seq})");
if pod.metadata.annotations.is_none() {
patches.push(PatchOperation::Add(AddOperation {
path: "/metadata/annotations".into(),
value: json!({}),
}));
patches.push(add_operation(format_ptr!("/metadata/annotations"), json!({})));
}
patches.push(patch);
break;
Expand All @@ -150,16 +136,13 @@ fn add_lifecycle_annotation(

fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.spec()?.tolerations.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) }));
patches.push(add_operation(format_ptr!("/spec/tolerations"), json!([])));
}
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}));
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "operator": "Exists", "effect": "NoSchedule"}),
}));
patches.push(add_operation(format_ptr!("/spec/nodeSelector"), json!({"type": "virtual"})));
patches.push(add_operation(
format_ptr!("/spec/tolerations/-"),
json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "operator": "Exists", "effect": "NoSchedule"}),
));

Ok(())
}
Expand All @@ -177,9 +160,9 @@ fn into_pod_review(resp: AdmissionResponse) -> AdmissionReview<corev1::Pod> {
fn to_annotation_patch(pld: &PodLifecycleData) -> Option<PatchOperation> {
match pld {
PodLifecycleData::Empty | PodLifecycleData::Running(_) => None,
PodLifecycleData::Finished(start_ts, end_ts) => Some(PatchOperation::Add(AddOperation {
path: format!("/metadata/annotations/{}", escape(LIFETIME_ANNOTATION_KEY)),
value: Value::String(format!("{}", end_ts - start_ts)),
})),
PodLifecycleData::Finished(start_ts, end_ts) => Some(add_operation(
format_ptr!("/metadata/annotations/{}", escape(LIFETIME_ANNOTATION_KEY)),
Value::String(format!("{}", end_ts - start_ts)),
)),
}
}
6 changes: 3 additions & 3 deletions sk-driver/src/tests/mutation_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use json_patch::{
patch,
use json_patch_ext::{
patch_ext,
Patch,
};
use kube::api::TypeMeta;
Expand Down Expand Up @@ -140,5 +140,5 @@ async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionRespo
adm_resp = mutate_pod(&ctx, adm_resp, &test_pod, &MutationData::new()).await.unwrap();
let mut json_pod = serde_json::to_value(&test_pod).unwrap();
let pod_patch: Patch = serde_json::from_slice(&adm_resp.patch.unwrap()).unwrap();
patch(&mut json_pod, &pod_patch).unwrap();
patch_ext(&mut json_pod, pod_patch.0[0].clone()).unwrap();
}
1 change: 1 addition & 0 deletions sk-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ testutils = ["dep:mockall"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clockabilly = { workspace = true }
futures = { workspace = true }
kube = { workspace = true }
Expand Down
Loading
Loading