diff --git a/charts/bridgekeeper/templates/crds.yaml b/charts/bridgekeeper/templates/crds.yaml index 0e54ce5..658343e 100644 --- a/charts/bridgekeeper/templates/crds.yaml +++ b/charts/bridgekeeper/templates/crds.yaml @@ -1,5 +1,4 @@ {{- if .Values.installCRDs }} ---- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: @@ -14,88 +13,129 @@ spec: singular: policy scope: Cluster versions: - - additionalPrinterColumns: [] - name: v1alpha1 - schema: - openAPIV3Schema: - description: "Auto-generated derived type for PolicySpec via `CustomResource`" - properties: - spec: - properties: - audit: - nullable: true - type: boolean - enforce: - nullable: true - type: boolean - rule: - properties: - python: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for PolicySpec via `CustomResource` + properties: + spec: + properties: + audit: + nullable: true + type: boolean + enforce: + nullable: true + type: boolean + rule: + properties: + modules: + items: + type: string + nullable: true + type: array + python: + type: string + required: + - python + type: object + target: + properties: + excludedNamespaces: + items: type: string - required: - - python - type: object - target: - properties: - excludedNamespaces: - items: - type: string - nullable: true - type: array - matches: - items: - properties: - apiGroup: - type: string - kind: - type: string - required: - - apiGroup - - kind - type: object - type: array - namespaces: - items: - type: string - nullable: true - type: array - required: - - matches - type: object - required: - - rule - - target - type: object - status: - nullable: true - properties: - audit: - nullable: true - properties: - timestamp: - nullable: true + nullable: true + type: array + matches: + items: + properties: + apiGroup: + type: string + kind: + type: string + required: + - apiGroup + - kind + type: object + type: array + namespaces: + items: type: string - violations: - items: - properties: - identifier: - type: string - message: - type: string - required: - - identifier - - message - type: object - nullable: true - type: array - type: object - type: object - required: - - spec - title: Policy - type: object - served: true - storage: true - subresources: - status: {} + nullable: true + type: array + required: + - matches + type: object + required: + - rule + - target + type: object + status: + nullable: true + properties: + audit: + nullable: true + properties: + timestamp: + nullable: true + type: string + violations: + items: + properties: + identifier: + type: string + message: + type: string + required: + - identifier + - message + type: object + nullable: true + type: array + type: object + type: object + required: + - spec + title: Policy + type: object + served: true + storage: true + subresources: + status: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: modules.bridgekeeper.maibornwolff.de +spec: + group: bridgekeeper.maibornwolff.de + names: + categories: [] + kind: Module + plural: modules + shortNames: [] + singular: module + scope: Cluster + versions: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for ModuleSpec via `CustomResource` + properties: + spec: + properties: + python: + type: string + required: + - python + type: object + required: + - spec + title: Module + type: object + served: true + storage: true + subresources: {} +--- {{- end }} diff --git a/charts/bridgekeeper/templates/rbac.yaml b/charts/bridgekeeper/templates/rbac.yaml index 5ee2d5f..4bfc286 100644 --- a/charts/bridgekeeper/templates/rbac.yaml +++ b/charts/bridgekeeper/templates/rbac.yaml @@ -5,7 +5,7 @@ metadata: {{- include "bridgekeeper.labels" . | nindent 4 }} name: bridgekeeper-role rules: -# During audit runs bridgekeeper needs to potentially access all resources in the cluster +# During audit runs bridgekeeper needs to potentially access all resources in the cluster - apiGroups: - '*' resources: @@ -42,6 +42,8 @@ rules: resources: - policies - policies/status + - modules + - modules/status verbs: - patch - update diff --git a/src/api.rs b/src/api.rs index fbc3211..ed51150 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,5 @@ use crate::crd::Policy; -use crate::evaluator::{validate_policy_admission, EvaluationResult, PolicyEvaluatorRef}; +use crate::evaluator::{EvaluationResult, PolicyEvaluatorRef}; use crate::util::cert::CertKeyPair; use kube::{ api::DynamicObject, @@ -84,6 +84,7 @@ async fn admission_mutate( #[rocket::post("/validate-policy", data = "")] async fn api_validate_policy( data: Json>, + evaluator: &State, ) -> Result>, ApiError> { HTTP_REQUEST_COUNTER .with_label_values(&["/validate-policy"]) @@ -94,7 +95,7 @@ async fn api_validate_policy( })?; let mut response: AdmissionResponse = AdmissionResponse::from(&admission_request); - let (allowed, reason) = validate_policy_admission(&admission_request).await; + let (allowed, reason) = evaluator.validate_policy_admission(&admission_request); response.allowed = allowed; if !allowed { response.result.message = reason.unwrap_or_default(); diff --git a/src/audit.rs b/src/audit.rs index 90934df..bae5360 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -1,6 +1,7 @@ use crate::crd::{Policy, PolicyStatus, Violation}; use crate::events::init_event_watcher; use crate::manager::Manager; +use crate::module::{ModuleStore, ModuleStoreRef, ModuleInfo}; use crate::policy::{load_policies_from_file, PolicyInfo, PolicyStore, PolicyStoreRef}; use crate::util::error::{kube_err, load_err, BridgekeeperError, Result}; use crate::util::k8s::{ @@ -21,6 +22,7 @@ use prometheus::{ }; use serde::Serialize; use serde_json::json; +use std::collections::HashMap; use std::time::SystemTime; use tokio::task; use tokio::time::{sleep, Duration}; @@ -134,6 +136,7 @@ impl AuditViolation { struct Auditor { k8s_client: Client, policies: PolicyStoreRef, + modules: ModuleStoreRef, //event_sender: EventSender, } @@ -141,12 +144,14 @@ impl Auditor { pub fn new( client: Client, policies: PolicyStoreRef, + modules: ModuleStoreRef, //event_sender: EventSender, ) -> Auditor { pyo3::prepare_freethreaded_python(); Auditor { k8s_client: client, policies, + modules, //event_sender, } } @@ -159,18 +164,25 @@ impl Auditor { ) -> Result> { let mut violations = Vec::new(); let mut policies = Vec::new(); - // While holding the lock only collect the policies, directly auditing them would make the future of the method not implement Send which breaks the task spawn + let mut modules = HashMap::new(); + // While holding the lock only collect the policies or modules, directly auditing them would make the future of the method not implement Send which breaks the task spawn { let policy_store = self.policies.lock().expect("lock failed. Cannot continue"); - for policy in policy_store.policies.values() { + for policy in policy_store.get_objects().values() { if all || policy.policy.audit.unwrap_or(false) { policies.push(policy.clone()); } } } + + { + let module_store = self.modules.lock().expect("lock failed. Cannot continue"); + modules.extend(module_store.get_objects()); + } + for policy in policies.iter() { match self - .audit_policy(policy, print_violations, update_status) + .audit_policy(policy, &modules, print_violations, update_status) .await { Ok(mut result) => { @@ -191,13 +203,31 @@ impl Auditor { async fn audit_policy( &self, policy: &PolicyInfo, + modules: &HashMap, print_violations: bool, update_status: bool, ) -> Result> { if print_violations { println!("Auditing policy {}", policy.name); } - let (valid, reason) = crate::evaluator::validate_policy(&policy.name, &policy.policy).await; + + let mut module_code = String::new(); + + if let Some(used_modules) = &policy.policy.rule.modules { + for module_name in used_modules.iter() { + match modules.get(module_name) { + Some(module_info) => { + module_code.push_str(&module_info.module.python); + module_code.push_str("\n"); + }, + None => { + log::warn!("Could not find module '{}'", module_name); + } + }; + } + } + + let (valid, reason) = crate::evaluator::validate_policy(&policy.name, &policy.policy, &module_code); if !valid { if print_violations { println!( @@ -248,7 +278,7 @@ impl Auditor { for object in objects { let target = EvaluationTarget::new(resource_description, &object); let (result, message, _patch) = - crate::evaluator::evaluate_policy_audit(policy, object); + crate::evaluator::evaluate_policy_audit(policy, object, &module_code); NUM_CHECKED_OBJECTS .with_label_values(&[policy.name.as_str(), namespace.as_str()]) .inc(); @@ -280,7 +310,7 @@ impl Auditor { for object in objects { let target = EvaluationTarget::new(resource_description, &object); let (result, message, _patch) = - crate::evaluator::evaluate_policy_audit(policy, object); + crate::evaluator::evaluate_policy_audit(policy, object, &module_code); NUM_CHECKED_OBJECTS .with_label_values(&[policy.name.as_str(), ""]) .inc(); @@ -362,6 +392,7 @@ pub async fn run(args: Args) { .await .expect("Fail early if kube client cannot be created"); let policies = PolicyStore::new(); + let modules = ModuleStore::new(); let event_sender = init_event_watcher(&client); // Load policies either from kubernetes or from file if !args.file.is_empty() { @@ -369,14 +400,18 @@ pub async fn run(args: Args) { load_policies_from_file(policies.clone(), filename).expect("failed to load policy"); } } else { - let mut manager = Manager::new(client.clone(), policies.clone(), event_sender.clone()); + let mut manager = Manager::new(client.clone(), policies.clone(), modules.clone(), event_sender.clone()); manager .load_existing_policies() .await .expect("Could not load existing policies"); + manager + .load_existing_modules() + .await + .expect("Could not load existing policies"); } // Run audit - let auditor = Auditor::new(client, policies); + let auditor = Auditor::new(client, policies, modules); match auditor .audit_policies(!args.silent, args.status, args.all) .await @@ -401,9 +436,9 @@ fn json_result(violations: Vec) { println!("{}", json); } -pub async fn launch_loop(client: kube::Client, policies: PolicyStoreRef, interval: u32) { +pub async fn launch_loop(client: kube::Client, policies: PolicyStoreRef, modules: ModuleStoreRef, interval: u32) { task::spawn(async move { - let auditor = Auditor::new(client, policies); + let auditor = Auditor::new(client, policies, modules); loop { sleep(Duration::from_secs(interval as u64)).await; info!("Starting audit run"); diff --git a/src/crd.rs b/src/crd.rs index 9b1ae9b..014eb27 100644 --- a/src/crd.rs +++ b/src/crd.rs @@ -18,6 +18,18 @@ pub struct PolicySpec { pub enforce: Option, } +#[derive( + CustomResource, Serialize, Deserialize, Debug, Default, Clone, Hash, PartialEq, Eq, JsonSchema, +)] +#[kube( + group = "bridgekeeper.maibornwolff.de", + version = "v1alpha1", + kind = "Module" +)] +pub struct ModuleSpec { + pub python: String, +} + #[derive(Serialize, Deserialize, Debug, Default, Clone, Hash, PartialEq, Eq, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Target { @@ -37,6 +49,7 @@ pub struct Match { #[serde(rename_all = "camelCase")] pub struct Rule { pub python: String, + pub modules: Option>, } #[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] @@ -113,7 +126,7 @@ impl PolicySpec { audit: Some(false), enforce: Some(true), target: Default::default(), - rule: Rule { python }, + rule: Rule { python, modules: None }, } } } diff --git a/src/evaluator.rs b/src/evaluator.rs index 888e196..f06273b 100644 --- a/src/evaluator.rs +++ b/src/evaluator.rs @@ -1,21 +1,20 @@ use crate::{ crd::{Policy, PolicySpec}, - events::{EventSender, PolicyEvent, PolicyEventData}, + events::{EventSender, Event, PolicyEventData, EventType}, policy::{PolicyInfo, PolicyStoreRef}, - util::k8s::find_k8s_resource_matches, + module::{ModuleStoreRef, ModuleInfo}, }; use kube::{ core::{ admission::{self, Operation}, DynamicObject, }, - Client, }; use lazy_static::lazy_static; use prometheus::{register_counter_vec, CounterVec}; use pyo3::prelude::*; use serde_derive::Serialize; -use std::sync::Arc; +use std::{sync::Arc, collections::HashMap}; use tracing::{info, warn}; lazy_static! { @@ -74,6 +73,7 @@ impl ValidationRequest { pub struct PolicyEvaluator { policies: PolicyStoreRef, + modules: ModuleStoreRef, event_sender: EventSender, } @@ -87,9 +87,10 @@ pub struct EvaluationResult { pub type PolicyEvaluatorRef = Arc; impl PolicyEvaluator { - pub fn new(policies: PolicyStoreRef, event_sender: EventSender) -> PolicyEvaluatorRef { + pub fn new(policies: PolicyStoreRef, modules: ModuleStoreRef, event_sender: EventSender) -> PolicyEvaluatorRef { let evaluator = PolicyEvaluator { policies, + modules, event_sender, }; pyo3::prepare_freethreaded_python(); @@ -116,11 +117,13 @@ impl PolicyEvaluator { } }; let policies = self.policies.lock().expect("lock failed. Cannot continue"); + let modules = self.modules.lock().expect("lock failed. Cannot continue"); + let mut matching_policies = Vec::new(); // Collect all matching policies - for value in policies.policies.values() { + for value in policies.get_objects().values() { if value.is_match(&gvk, &namespace) { MATCHED_POLICIES .with_label_values(&[value.name.as_str()]) @@ -149,7 +152,29 @@ impl PolicyEvaluator { namespace.clone().unwrap_or_else(|| "-".to_string()), name ); - let res = evaluate_policy(value, &request); + + let mut module_code = String::new(); + + if let Some(used_modules) = &value.policy.rule.modules { + for module_name in used_modules.iter() { + match modules.get_objects().get(module_name) { + Some(module_info) => { + module_code.push_str(&module_info.module.python); + module_code.push_str("\n"); + }, + None => { + return EvaluationResult { + allowed: false, + patch: None, + reason: Some(format!("Could not find module '{}'", module_name)), + warnings + } + } + }; + } + } + + let res = evaluate_policy(value, &request, &module_code); if let Some(mut patch) = res.2 { if let Some(patches) = patches.as_mut() { patches.0.append(&mut patch.0); @@ -158,13 +183,15 @@ impl PolicyEvaluator { } } self.event_sender - .send(PolicyEvent { - policy_reference: value.ref_info.clone(), - event_data: PolicyEventData::Evaluated { - target_identifier, - result: res.0, - reason: res.1.clone(), - }, + .send(Event { + object_reference: value.ref_info.clone(), + event_data: EventType::Policy( + PolicyEventData::Evaluated { + target_identifier, + result: res.0, + reason: res.1.clone(), + } + ), }) .unwrap_or_else(|err| warn!("Could not send event: {:?}", err)); if res.0 { @@ -204,48 +231,52 @@ impl PolicyEvaluator { patch: patches, } } -} -pub async fn validate_policy_admission( - request: &admission::AdmissionRequest, -) -> (bool, Option) { - if let Some(policy) = request.object.as_ref() { - let name = match policy.metadata.name.as_ref() { - Some(name) => name.as_str(), - None => "-invalidname-", - }; - validate_policy(name, &policy.spec).await - } else { - (false, Some("No rule found".to_string())) + pub fn get_available_modules(&self) -> HashMap { + let module_store = self.modules.lock().expect("lock failed. Cannot continue"); + + return module_store.get_objects(); } -} -pub async fn validate_policy(name: &str, policy: &PolicySpec) -> (bool, Option) { - let client = Client::try_default() - .await - .expect("failed to create kube client"); - - // Iterate through match items and check whether specified resources exist in the cluster - for match_item in policy.target.matches.iter() { - let api_resource_exists = - match find_k8s_resource_matches(&match_item.api_group, &match_item.kind, &client).await - { - Ok(resources) => !resources.is_empty(), - Err(_) => false, + pub fn validate_policy_admission( + &self, + request: &admission::AdmissionRequest + ) -> (bool, Option) { + if let Some(policy) = request.object.as_ref() { + let name = match policy.metadata.name.as_ref() { + Some(name) => name.as_str(), + None => "-invalidname-", }; - if !api_resource_exists { - return ( - false, - Some(format!( - "Specified target {}/{} is not available", - match_item.api_group, match_item.kind - )), - ); + let mut module_code = String::new(); + + if let Some(used_modules) = policy.spec.rule.modules.clone() { + let modules = self.get_available_modules(); + + for module_name in used_modules.iter() { + match modules.get(module_name) { + Some(module_info) => { + module_code.push_str(&module_info.module.python); + module_code.push_str("\n"); + }, + None => { + log::warn!("Could not find module '{}'", module_name) + } + }; + } + } + + validate_policy(name, &policy.spec, &module_code) + } else { + (false, Some("No rule found".to_string())) } } +} + +pub fn validate_policy(name: &str, policy: &PolicySpec, module_code: &str) -> (bool, Option) { + let mut python_code = String::from(module_code); + python_code.push_str(&policy.rule.python.clone()); - let python_code = policy.rule.python.clone(); Python::with_gil(|py| { if let Err(err) = PyModule::from_code(py, &python_code, "rule.py", "bridgekeeper") { POLICY_VALIDATIONS_FAIL.with_label_values(&[name]).inc(); @@ -259,6 +290,7 @@ pub async fn validate_policy(name: &str, policy: &PolicySpec) -> (bool, Option (bool, Option, Option) { let name = &policy.name; Python::with_gil(|py| { @@ -267,7 +299,10 @@ fn evaluate_policy( Err(err) => return fail(name, &format!("Failed to initialize python: {}", err)), }; - match PyModule::from_code(py, &policy.policy.rule.python, "rule.py", "bridgekeeper") { + let mut full_code = String::from(module_code); + full_code.push_str(&policy.policy.rule.python); + + match PyModule::from_code(py, &full_code, "rule.py", "bridgekeeper") { Ok(rule_code) => { if let Ok(validation_function) = rule_code.getattr("validate") { match validation_function.call1((obj,)) { @@ -289,12 +324,13 @@ fn evaluate_policy( pub fn evaluate_policy_audit( policy: &PolicyInfo, object: DynamicObject, + module_code: &str, ) -> (bool, Option, Option) { let request = ValidationRequest { object, operation: Operation::Update, }; - evaluate_policy(policy, &request) + evaluate_policy(policy, &request, module_code) } fn extract_result( @@ -365,7 +401,7 @@ def validate(request): operation: Operation::Create, }; - let (res, reason, patch) = evaluate_policy(&policy, &request); + let (res, reason, patch) = evaluate_policy(&policy, &request, ""); assert!(res, "validate function failed: {}", reason.unwrap()); assert!(reason.is_none()); assert!(patch.is_none()); @@ -391,7 +427,7 @@ def validate(request): operation: Operation::Create, }; - let (res, reason, patch) = evaluate_policy(&policy, &request); + let (res, reason, patch) = evaluate_policy(&policy, &request, ""); assert!(!res); assert!(reason.is_some()); assert_eq!("foobar".to_string(), reason.unwrap()); @@ -418,7 +454,7 @@ def validate(request): operation: Operation::Create, }; - let (res, reason, patch) = evaluate_policy(&policy, &request); + let (res, reason, patch) = evaluate_policy(&policy, &request, ""); assert!(!res); assert!(reason.is_some()); assert_eq!( @@ -451,7 +487,7 @@ def validate(request): operation: Operation::Create, }; - let (res, reason, patch) = evaluate_policy(&policy, &request); + let (res, reason, patch) = evaluate_policy(&policy, &request, ""); assert!(res, "validate function failed: {}", reason.unwrap()); assert!(reason.is_none()); assert!(patch.is_some()); @@ -465,4 +501,36 @@ def validate(request): serde_json::to_value(patch.0).unwrap() ); } + + #[test] + fn test_use_of_module() { + pyo3::prepare_freethreaded_python(); + let python = r#" +def validate(request): + return bool_from_module() + "#; + + let module_code = r#" +def bool_from_module(): + return True + "#; + + let policy_spec = PolicySpec::from_python(python.to_string()); + let policy = PolicyInfo::new("test".to_string(), policy_spec, Default::default()); + + let object = DynamicObject { + types: None, + metadata: ObjectMeta::default(), + data: serde_json::Value::Null, + }; + let request = ValidationRequest { + object, + operation: Operation::Create, + }; + + let (res, reason, patch) = evaluate_policy(&policy, &request, &module_code); + assert!(res, "validate function failed: {}", reason.unwrap()); + assert!(reason.is_none()); + assert!(patch.is_none()); + } } diff --git a/src/events.rs b/src/events.rs index f4001ce..6c653ef 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,4 +1,3 @@ -use crate::policy::PolicyObjectReference; use k8s_openapi::api::core::v1::{Event as KubeEvent, EventSource as KubeEventSource}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; use k8s_openapi::chrono::offset::Utc; @@ -10,13 +9,9 @@ use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::task; use tracing::warn; -pub type EventSender = UnboundedSender; +use crate::util::types::ObjectReference; -#[derive(Debug)] -pub struct PolicyEvent { - pub policy_reference: PolicyObjectReference, - pub event_data: PolicyEventData, -} +pub type EventSender = UnboundedSender; #[derive(Debug, PartialEq, Eq)] pub enum PolicyEventData { @@ -28,45 +23,77 @@ pub enum PolicyEventData { }, } +#[derive(Debug, PartialEq, Eq)] +pub enum ModuleEventData { + Loaded, +} + +#[derive(Debug)] +pub enum EventType { + Policy(PolicyEventData), + Module(ModuleEventData), +} + +#[derive(Debug)] +pub struct Event { + pub object_reference: ObjectReference, + pub event_data: EventType, +} + pub fn init_event_watcher(client: &Client) -> EventSender { - let (sender, mut receiver) = mpsc::unbounded_channel::(); + let (sender, mut receiver) = mpsc::unbounded_channel::(); let events_api: Api = Api::namespaced(client.clone(), "default"); task::spawn(async move { let instance = std::env::var("POD_NAME").unwrap_or_else(|_| "dev".to_string()); while let Some(event) = receiver.recv().await { let mut kube_event = KubeEvent::default(); - kube_event.metadata.generate_name = event.policy_reference.name.clone(); - kube_event.involved_object = event.policy_reference.to_object_reference(); + + kube_event.metadata.generate_name = event.object_reference.name.clone(); + kube_event.involved_object = event.object_reference.to_k8s_object_reference(); kube_event.type_ = Some("Normal".to_string()); kube_event.first_timestamp = Some(Time(Utc::now())); kube_event.source = Some(KubeEventSource { component: Some(format!("bridgekeeper/{}", instance)), host: None, }); + match event.event_data { - PolicyEventData::Loaded => { - kube_event.reason = Some("Loaded".to_string()); - kube_event.message = Some("Policy loaded by bridgekeeper".to_string()); - } - PolicyEventData::Evaluated { - target_identifier, - result, - reason, - } => { - kube_event.reason = Some("Evaluated".to_string()); - kube_event.message = Some(format!( - "Target: {}, Result: {}, Reason: {}", - target_identifier, - result, - reason.unwrap_or_else(|| "-".to_string()) - )); + EventType::Policy(event_data) => { + match event_data { + PolicyEventData::Loaded => { + kube_event.reason = Some("Loaded".to_string()); + kube_event.message = Some("Policy loaded by bridgekeeper".to_string()); + } + PolicyEventData::Evaluated { + target_identifier, + result, + reason, + } => { + kube_event.reason = Some("Evaluated".to_string()); + kube_event.message = Some(format!( + "Target: {}, Result: {}, Reason: {}", + target_identifier, + result, + reason.unwrap_or_else(|| "-".to_string()) + )); + } + } + }, + EventType::Module(event_data) => { + match event_data { + ModuleEventData::Loaded => { + kube_event.reason = Some("Loaded".to_string()); + kube_event.message = Some("Module loaded by bridgekeeper".to_string()); + } + } } } + if let Err(err) = events_api.create(&PostParams::default(), &kube_event).await { warn!( "Could not create event for policy {}. Reason: {}", event - .policy_reference + .object_reference .name .clone() .unwrap_or_else(|| "-".to_string()), diff --git a/src/helper/gencrd.rs b/src/helper/gencrd.rs index 2a3b27b..10cc357 100644 --- a/src/helper/gencrd.rs +++ b/src/helper/gencrd.rs @@ -1,7 +1,8 @@ -use crate::{constants::CRD_FILEPATH, crd::Policy}; +use crate::{constants::CRD_FILEPATH, crd::Policy, crd::Module}; use argh::FromArgs; use kube::CustomResourceExt; use std::fs; +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; #[derive(FromArgs, PartialEq, Eq, Debug)] #[argh(subcommand, name = "gencrd")] @@ -16,20 +17,37 @@ pub struct Args { } pub fn run(args: Args) { - let data = - serde_yaml::to_string(&Policy::crd()).expect("Could not generate yaml from CRD definition"); + let policy_crd = Policy::crd(); + let module_crd = Module::crd(); let filepath = args.file.unwrap_or_else(|| CRD_FILEPATH.to_string()); - let wrapped_data = "{{- if .Values.installCRDs }}\n".to_string() + &data + "{{- end }}\n"; + + let mut crd = String::new(); + crd.push_str(&gen_crd_data(&policy_crd)); + crd.push_str(&gen_crd_data(&module_crd)); + write_crd_str(&filepath, &crd, args.no_wrapping); +} + +fn gen_crd_data(data: &CustomResourceDefinition) -> String { + let mut string_data = serde_yaml::to_string(data).expect("Could not generate yaml from CRD definition"); + string_data.push_str("---\n"); + + string_data +} + +fn write_crd_str(filepath: &str, data: &str, no_wrapping: bool) { if filepath == "-" { println!("{}\n", data); } else { + + let wrapped_data = "{{- if .Values.installCRDs }}\n".to_string() + &data + "{{- end }}\n"; + fs::write( filepath, - match args.no_wrapping { + match no_wrapping { true => data, - false => wrapped_data, + false => &wrapped_data, }, ) .expect("Unable to write crd yaml"); } -} +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 6067302..2866034 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ mod events; mod helper; mod manager; mod policy; +mod module; mod server; mod util; diff --git a/src/manager.rs b/src/manager.rs index 41e235e..687c01b 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,11 +1,14 @@ +use crate::events::{EventType}; use crate::util::error::{kube_err, Result}; use crate::{ crd::Policy, - events::{EventSender, PolicyEvent, PolicyEventData}, + crd::Module, + events::{EventSender, Event, PolicyEventData, ModuleEventData}, policy::PolicyStoreRef, + module::ModuleStoreRef }; use futures::StreamExt; -use kube::runtime::{watcher, watcher::Event}; +use kube::runtime::{watcher, watcher::Event as KubeEvent}; use kube::{ api::{Api, ListParams}, Client, @@ -16,14 +19,16 @@ use tracing::warn; pub struct Manager { k8s_client: Client, policies: PolicyStoreRef, + modules: ModuleStoreRef, event_sender: EventSender, } impl Manager { - pub fn new(client: Client, policies: PolicyStoreRef, event_sender: EventSender) -> Manager { + pub fn new(client: Client, policies: PolicyStoreRef, modules: ModuleStoreRef, event_sender: EventSender) -> Manager { Manager { k8s_client: client, policies, + modules, event_sender, } } @@ -37,11 +42,33 @@ impl Manager { { let mut policies = self.policies.lock().expect("lock failed. Cannot continue"); for policy in res { - if let Some(ref_info) = policies.add_policy(policy) { + if let Some(ref_info) = policies.add_object(policy) { self.event_sender - .send(PolicyEvent { - policy_reference: ref_info, - event_data: PolicyEventData::Loaded, + .send(Event { + object_reference: ref_info, + event_data: EventType::Policy(PolicyEventData::Loaded), + }) + .unwrap_or_else(|err| log::warn!("Could not send event: {:?}", err)); + } + } + } + Ok(()) + } + + pub async fn load_existing_modules(&mut self) -> Result<()> { + let modules_api = self.modules_api(); + let res = modules_api + .list(&ListParams::default()) + .await + .map_err(kube_err)?; + { + let mut modules = self.modules.lock().expect("lock failed. Cannot continue"); + for module in res { + if let Some(ref_info) = modules.add_object(module) { + self.event_sender + .send(Event { + object_reference: ref_info, + event_data: EventType::Module(ModuleEventData::Loaded), }) .unwrap_or_else(|err| warn!("Could not send event: {:?}", err)); } @@ -51,10 +78,16 @@ impl Manager { } pub async fn start(&mut self) { + self.watch_policies(); + self.watch_modules(); + } + + fn watch_policies(&mut self) { let policies_api = self.policies_api(); let policies = self.policies.clone(); let event_sender = self.event_sender.clone(); + // Watcher for policies task::spawn(async move { let watcher = watcher(policies_api.clone(), ListParams::default()); let mut pinned_watcher = Box::pin(watcher); @@ -62,22 +95,61 @@ impl Manager { let res = pinned_watcher.next().await; if let Some(Ok(event)) = res { match event { - Event::Applied(policy) => { + KubeEvent::Applied(policy) => { let mut policies = policies.lock().expect("lock failed. Cannot continue"); - if let Some(ref_info) = policies.add_policy(policy) { + if let Some(ref_info) = policies.add_object(policy) { event_sender - .send(PolicyEvent { - policy_reference: ref_info, - event_data: PolicyEventData::Loaded, + .send(Event { + object_reference: ref_info, + event_data: EventType::Policy(PolicyEventData::Loaded), }) .unwrap_or_else(|err| warn!("Could not send event: {:?}", err)); } } - Event::Deleted(policy) => { + KubeEvent::Deleted(policy) => { let mut policies = policies.lock().expect("lock failed. Cannot continue"); - policies.remove_policy(policy); + policies.remove_object(policy); + } + _ => (), + } + } + } + }); + } + + fn watch_modules(&mut self) { + let modules_api = self.modules_api(); + let modules = self.modules.clone(); + let event_sender = self.event_sender.clone(); + + // Watcher for modules + task::spawn(async move { + let watcher = watcher(modules_api.clone(), ListParams::default()); + let mut pinned_watcher = Box::pin(watcher); + loop { + let res = pinned_watcher.next().await; + if let Some(Ok(event)) = res { + match event { + KubeEvent::Applied(module) => { + let mut modules = + modules.lock().expect("lock failed. Cannot continue"); + if let Some(ref_info) = modules.add_object(module) { + event_sender + .send(Event { + object_reference: ref_info, + event_data: EventType::Module(ModuleEventData::Loaded), + }) + .unwrap_or_else(|err| { + log::warn!("Could not send event: {:?}", err) + }); + } + } + KubeEvent::Deleted(module) => { + let mut modules = + modules.lock().expect("lock failed. Cannot continue"); + modules.remove_object(module); } _ => (), } @@ -89,4 +161,8 @@ impl Manager { fn policies_api(&mut self) -> Api { Api::all(self.k8s_client.clone()) } + + fn modules_api(&mut self) -> Api { + Api::all(self.k8s_client.clone()) + } } diff --git a/src/module.rs b/src/module.rs new file mode 100644 index 0000000..979efb0 --- /dev/null +++ b/src/module.rs @@ -0,0 +1,86 @@ +use crate::crd::{Module, ModuleSpec}; +use crate::util::types::ObjectReference; +use kube::core::Resource; +use lazy_static::lazy_static; +use prometheus::{register_gauge, Gauge}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +lazy_static! { + static ref ACTIVE_MODULES: Gauge = + register_gauge!("bridgekeeper_modules_active", "Number of active modules.") + .expect("creating metric always works"); +} + +pub struct ModuleStore { + pub modules: HashMap, +} + +pub type ModuleStoreRef = Arc>; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ModuleInfo { + pub name: String, + pub module: ModuleSpec, + pub ref_info: ObjectReference, +} + +impl ModuleStore { + pub fn new() -> ModuleStoreRef { + let store = ModuleStore { + modules: HashMap::new(), + }; + Arc::new(Mutex::new(store)) + } + + pub fn add_object(&mut self, module: Module) -> Option { + let ref_info = create_object_reference(&module); + let name = module.metadata.name.expect("name is always set"); + if let Some(existing_module_info) = self.modules.get(&name) { + if existing_module_info.module != module.spec { + let module_info = ModuleInfo::new(name.clone(), module.spec, ref_info.clone()); + log::info!("Module '{}' updated", name); + self.modules.insert(name, module_info); + Some(ref_info) + } else { + None + } + } else { + let module_info = ModuleInfo::new(name.clone(), module.spec, ref_info.clone()); + log::info!("Module '{}' added", name); + self.modules.insert(name, module_info); + ACTIVE_MODULES.inc(); + Some(ref_info) + } + } + + pub fn remove_object(&mut self, module: Module) { + let name = module.metadata.name.expect("name is always set"); + log::info!("Module '{}' removed", name); + self.modules.remove(&name); + ACTIVE_MODULES.dec(); + } + + pub fn get_objects(&self) -> HashMap { + return self.modules.clone(); + } +} + +fn create_object_reference(obj: &Module) -> ObjectReference { + ObjectReference { + api_version: Some(Module::api_version(&()).to_string()), + kind: Some(Module::kind(&()).to_string()), + name: obj.metadata.name.clone(), + uid: obj.metadata.uid.clone(), + } +} + +impl ModuleInfo { + pub fn new(name: String, module: ModuleSpec, ref_info: ObjectReference) -> ModuleInfo { + ModuleInfo { + name, + module, + ref_info, + } + } +} diff --git a/src/policy.rs b/src/policy.rs index a3bf5df..9a2c1d4 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -1,6 +1,6 @@ use crate::crd::{Policy, PolicySpec}; use crate::util::error::{load_err, Result}; -use k8s_openapi::api::core::v1::ObjectReference as KubeObjectReference; +use crate::util::types::ObjectReference; use kube::api::GroupVersionKind; use kube::core::Resource; use lazy_static::lazy_static; @@ -8,7 +8,6 @@ use prometheus::{register_gauge, Gauge}; use serde::Deserialize; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use tracing::info; lazy_static! { static ref ACTIVE_POLICIES: Gauge = @@ -26,27 +25,7 @@ pub type PolicyStoreRef = Arc>; pub struct PolicyInfo { pub name: String, pub policy: PolicySpec, - pub ref_info: PolicyObjectReference, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] -pub struct PolicyObjectReference { - pub api_version: Option, - pub kind: Option, - pub name: Option, - pub uid: Option, -} - -impl PolicyObjectReference { - pub fn to_object_reference(&self) -> KubeObjectReference { - KubeObjectReference { - api_version: self.api_version.clone(), - kind: self.kind.clone(), - name: self.name.clone(), - uid: self.uid.clone(), - ..Default::default() - } - } + pub ref_info: ObjectReference, } impl PolicyStore { @@ -56,10 +35,42 @@ impl PolicyStore { }; Arc::new(Mutex::new(store)) } + + pub fn add_object(&mut self, policy: Policy) -> Option { + let ref_info = create_object_reference(&policy); + let name = policy.metadata.name.expect("name is always set"); + if let Some(existing_policy_info) = self.policies.get(&name) { + if existing_policy_info.policy != policy.spec { + let policy_info = PolicyInfo::new(name.clone(), policy.spec, ref_info.clone()); + log::info!("Policy '{}' updated", name); + self.policies.insert(name, policy_info); + Some(ref_info) + } else { + None + } + } else { + let policy_info = PolicyInfo::new(name.clone(), policy.spec, ref_info.clone()); + log::info!("Policy '{}' added", name); + self.policies.insert(name, policy_info); + ACTIVE_POLICIES.inc(); + Some(ref_info) + } + } + + pub fn remove_object(&mut self, policy: Policy) { + let name = policy.metadata.name.expect("name is always set"); + log::info!("Policy '{}' removed", name); + self.policies.remove(&name); + ACTIVE_POLICIES.dec(); + } + + pub fn get_objects(&self) -> HashMap { + return self.policies.clone(); + } } -fn create_object_reference(obj: &Policy) -> PolicyObjectReference { - PolicyObjectReference { +fn create_object_reference(obj: &Policy) -> ObjectReference { + ObjectReference { api_version: Some(Policy::api_version(&()).to_string()), kind: Some(Policy::kind(&()).to_string()), name: obj.metadata.name.clone(), @@ -68,7 +79,7 @@ fn create_object_reference(obj: &Policy) -> PolicyObjectReference { } impl PolicyInfo { - pub fn new(name: String, policy: PolicySpec, ref_info: PolicyObjectReference) -> PolicyInfo { + pub fn new(name: String, policy: PolicySpec, ref_info: ObjectReference) -> PolicyInfo { PolicyInfo { name, policy, @@ -116,36 +127,6 @@ impl PolicyInfo { } } -impl PolicyStore { - pub fn add_policy(&mut self, policy: Policy) -> Option { - let ref_info = create_object_reference(&policy); - let name = policy.metadata.name.expect("name is always set"); - if let Some(existing_policy_info) = self.policies.get(&name) { - if existing_policy_info.policy != policy.spec { - let policy_info = PolicyInfo::new(name.clone(), policy.spec, ref_info.clone()); - info!("Policy '{}' updated", name); - self.policies.insert(name, policy_info); - Some(ref_info) - } else { - None - } - } else { - let policy_info = PolicyInfo::new(name.clone(), policy.spec, ref_info.clone()); - info!("Policy '{}' added", name); - self.policies.insert(name, policy_info); - ACTIVE_POLICIES.inc(); - Some(ref_info) - } - } - - pub fn remove_policy(&mut self, policy: Policy) { - let name = policy.metadata.name.expect("name is always set"); - info!("Policy '{}' removed", name); - self.policies.remove(&name); - ACTIVE_POLICIES.dec(); - } -} - pub fn load_policies_from_file(policies: PolicyStoreRef, filename: &str) -> Result { let mut policies = policies.lock().expect("Lock failed"); let data = std::fs::read_to_string(filename).map_err(load_err)?; @@ -153,7 +134,7 @@ pub fn load_policies_from_file(policies: PolicyStoreRef, filename: &str) -> Resu let mut count = 0; for document in serde_yaml::Deserializer::from_str(&data) { let policy = Policy::deserialize(document).map_err(load_err)?; - policies.add_policy(policy); + policies.add_object(policy); count += 1; } Ok(count) diff --git a/src/server.rs b/src/server.rs index 84a9d20..7201c1f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ use crate::constants::POD_CERTS_DIR; use crate::evaluator::PolicyEvaluator; use crate::events::init_event_watcher; use crate::manager::Manager; -use crate::policy::PolicyStore; +use crate::{module::ModuleStore, policy::PolicyStore}; #[derive(FromArgs, PartialEq, Eq, Debug)] #[argh(subcommand, name = "server")] @@ -50,17 +50,22 @@ pub async fn run(args: Args) { // Initiate services let policies = PolicyStore::new(); + let modules = ModuleStore::new(); let event_sender = init_event_watcher(&client); - let mut manager = Manager::new(client.clone(), policies.clone(), event_sender.clone()); - let evaluator = PolicyEvaluator::new(policies.clone(), event_sender.clone()); + let mut manager = Manager::new(client.clone(), policies.clone(), modules.clone(), event_sender.clone()); + let evaluator = PolicyEvaluator::new(policies.clone(), modules.clone(), event_sender.clone()); manager.start().await; manager .load_existing_policies() .await .expect("Could not load existing policies"); + manager + .load_existing_modules() + .await + .expect("Could not load existing policies"); if args.audit { - crate::audit::launch_loop(client, policies, args.audit_interval.unwrap_or(600)).await; + crate::audit::launch_loop(client, policies, modules, args.audit_interval.unwrap_or(600)).await; } // Launch API with webhook endpoint diff --git a/src/util/mod.rs b/src/util/mod.rs index 353fe81..e37bd18 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -2,3 +2,4 @@ pub mod cert; pub mod error; pub mod k8s; pub mod webhook; +pub mod types; \ No newline at end of file diff --git a/src/util/types.rs b/src/util/types.rs new file mode 100644 index 0000000..7e81f5a --- /dev/null +++ b/src/util/types.rs @@ -0,0 +1,21 @@ +use k8s_openapi::api::core::v1::ObjectReference as KubeObjectReference; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct ObjectReference { + pub api_version: Option, + pub kind: Option, + pub name: Option, + pub uid: Option, +} + +impl ObjectReference { + pub fn to_k8s_object_reference(&self) -> KubeObjectReference { + KubeObjectReference { + api_version: self.api_version.clone(), + kind: self.kind.clone(), + name: self.name.clone(), + uid: self.uid.clone(), + ..Default::default() + } + } +} \ No newline at end of file