Skip to content

Commit

Permalink
Merge pull request #60 from kubescape/feature/namespaced-profiles
Browse files Browse the repository at this point in the history
Adding namespaced application profiles support
  • Loading branch information
amitschendel authored Feb 5, 2024
2 parents 8cc628f + 4ba621f commit ae7398e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 40 deletions.
11 changes: 8 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,22 @@ func main() {
if os.Getenv("OPEN_IGNORE_PREFIXES") != "" {
ignorePrefixes = strings.Split(os.Getenv("OPEN_IGNORE_PREFIXES"), ",")
}
storeNamespace := ""
if os.Getenv("STORE_NAMESPACE") != "" {
storeNamespace = os.Getenv("STORE_NAMESPACE")
}
collectorManagerConfig := &collector.CollectorManagerConfig{
EventSink: eventSink,
Tracer: tracer,
Interval: 60, // 60 seconds for now, TODO: make it configurable
FinalizeTime: 0, // 0 seconds to disable finalization
FinalizeJitter: 0, // 0 seconds to disable finalization jitter
FinalizeTime: 80, // 0 seconds to disable finalization
FinalizeJitter: 10, // 0 seconds to disable finalization jitter
K8sConfig: k8sConfig,
RecordStrategy: collector.RecordStrategyOnlyIfNotExists,
NodeName: NodeName,
IgnoreMounts: ignoreMounts,
IgnorePrefixes: ignorePrefixes,
StoreNamespace: storeNamespace,
}
cm, err := collector.StartCollectorManager(collectorManagerConfig)
if err != nil {
Expand All @@ -132,7 +137,7 @@ func main() {
defer tracer.Stop()

// Start AppProfile controller
appProfileController := controller.NewController(k8sConfig)
appProfileController := controller.NewController(k8sConfig, storeNamespace)
appProfileController.StartController()
defer appProfileController.StopController()

Expand Down
50 changes: 37 additions & 13 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type CollectorManagerConfig struct {
IgnoreMounts bool
// Should ignore prefixes
IgnorePrefixes []string
// Should store profiles in the same namespace
StoreNamespace string
}

type TotalEvents struct {
Expand Down Expand Up @@ -163,7 +165,7 @@ func (cm *CollectorManager) ContainerStarted(id *ContainerId, attach bool) {
// Check if applicaton profile already exists
appProfileExists, err := cm.doesApplicationProfileExists(id.Namespace, id.PodName, true, true)
if err != nil {
//log.Printf("error checking if application profile exists: %s\n", err)
// log.Printf("error checking if application profile exists: %s\n", err)
} else if appProfileExists {
// If application profile exists, check if record strategy is RecordStrategyOnlyIfNotExists
if cm.config.RecordStrategy == RecordStrategyOnlyIfNotExists {
Expand Down Expand Up @@ -415,10 +417,14 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
}

// The name of the ApplicationProfile you're looking for.
appProfileName := fmt.Sprintf("pod-%s", id.PodName)
namespace := id.Namespace
appProfileName := cm.GetApplicationProfileName(id.Namespace, "pod", id.PodName)
if cm.config.StoreNamespace != "" {
namespace = cm.config.StoreNamespace
}

// Get the ApplicationProfile object with the name specified above.
existingApplicationProfile, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Get(context.Background(), appProfileName, v1.GetOptions{})
existingApplicationProfile, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Get(context.Background(), appProfileName, v1.GetOptions{})
if err != nil {
// it does not exist, create it
appProfile := &ApplicationProfile{
Expand All @@ -433,18 +439,23 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
Containers: []ContainerProfile{containerProfile},
},
}
labels := map[string]string{}
if containerState.attached {
appProfile.ObjectMeta.Labels = map[string]string{"kapprofiler.kubescape.io/partial": "true"}
labels["kapprofiler.kubescape.io/partial"] = "true"
}
// Check if we have over the limit of open events, if so, mark as failed.
if len(containerProfile.Opens) >= MaxOpenEvents {
appProfile.ObjectMeta.Labels = map[string]string{"kapprofiler.kubescape.io/failed": "true"}
labels["kapprofiler.kubescape.io/failed"] = "true"
}
if cm.config.StoreNamespace != "" {
labels["kapprofiler.kubescape.io/namespace"] = id.Namespace
}
appProfile.ObjectMeta.SetLabels(labels)
appProfileRawNew, err := runtime.DefaultUnstructuredConverter.ToUnstructured(appProfile)
if err != nil {
log.Printf("error converting application profile: %s\n", err)
}
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Create(
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Create(
context.Background(),
&unstructured.Unstructured{
Object: appProfileRawNew,
Expand Down Expand Up @@ -475,7 +486,7 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
// Check if we have over the limit of open events, if so, mark as failed.
if len(containerProfile.Opens) >= MaxOpenEvents {
// Mark as failed
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Patch(context.Background(),
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Patch(context.Background(),
appProfileName, apitypes.MergePatchType, []byte("{\"metadata\":{\"labels\":{\"kapprofiler.kubescape.io/failed\":\"true\"}}}"), v1.PatchOptions{})
if err != nil {
log.Printf("error patching application profile: %s\n", err)
Expand All @@ -500,7 +511,7 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
if err != nil {
log.Printf("error converting application profile: %s\n", err)
}
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Update(
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Update(
context.Background(),
&unstructured.Unstructured{
Object: unstructuredAppProfile,
Expand All @@ -513,7 +524,6 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
cm.eventSink.RemoveFilter(&eventsink.EventSinkFilter{EventType: tracing.AllEventType, ContainerID: id.ContainerID})
// Stop tracing container
cm.tracer.StopTraceContainer(id.NsMntId, id.Pid, tracing.AllEventType)

// Mark stop recording
cm.MarkPodNotRecording(id.PodName, id.Namespace)

Expand All @@ -523,7 +533,7 @@ func (cm *CollectorManager) CollectContainerEvents(id *ContainerId) {
cm.containersMutex.Unlock()

// Mark pod as failed recording
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Patch(context.Background(),
_, err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Patch(context.Background(),
appProfileName, apitypes.MergePatchType, []byte("{\"metadata\":{\"labels\":{\"kapprofiler.kubescape.io/failed\":\"true\"}}}"), v1.PatchOptions{})
if err != nil {
log.Printf("error patching application profile: %s\n", err)
Expand Down Expand Up @@ -637,8 +647,12 @@ func (cm *CollectorManager) FinalizeApplicationProfile(id *ContainerId) {
if _, ok := cm.containers[*id]; ok {
cm.containersMutex.Unlock()
// Patch the application profile to make it immutable with the final label
appProfileName := fmt.Sprintf("pod-%s", id.PodName)
_, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(id.Namespace).Patch(context.Background(),
namespace := id.Namespace
appProfileName := cm.GetApplicationProfileName(id.Namespace, "pod", id.PodName)
if cm.config.StoreNamespace != "" {
namespace = cm.config.StoreNamespace
}
_, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Patch(context.Background(),
appProfileName, apitypes.MergePatchType, []byte("{\"metadata\":{\"labels\":{\"kapprofiler.kubescape.io/final\":\"true\"}}}"), v1.PatchOptions{})
if err != nil {
log.Printf("error patching application profile: %s\n", err)
Expand Down Expand Up @@ -687,7 +701,10 @@ func (cm *CollectorManager) doesApplicationProfileExists(namespace string, podNa
}

// The name of the ApplicationProfile you're looking for.
appProfileName := fmt.Sprintf("%s-%s", strings.ToLower(workloadKind), strings.ToLower(workloadName))
appProfileName := cm.GetApplicationProfileName(namespace, workloadKind, workloadName)
if cm.config.StoreNamespace != "" {
namespace = cm.config.StoreNamespace
}

// Get the ApplicationProfile object with the name specified above.
existingApplicationProfile, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Get(context.Background(), appProfileName, v1.GetOptions{})
Expand Down Expand Up @@ -865,3 +882,10 @@ func (cm *CollectorManager) getPodMounts(podName, namespace string) ([]string, e

return mounts, nil
}

func (cm *CollectorManager) GetApplicationProfileName(namespace, kind, name string) string {
if cm.config.StoreNamespace != "" {
return fmt.Sprintf("%s-%s-%s", strings.ToLower(kind), strings.ToLower(name), namespace)
}
return fmt.Sprintf("%s-%s", strings.ToLower(kind), strings.ToLower(name))
}
16 changes: 12 additions & 4 deletions pkg/collector/pod_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,13 @@ func (cm *CollectorManager) startFinalizationTimer(pod *v1.Pod) *time.Timer {

func (cm *CollectorManager) finalizePodProfile(pod *v1.Pod) {
// Generate pod application profile name
appProfileName := fmt.Sprintf("pod-%s", pod.GetName())
namespace := pod.GetNamespace()
appProfileName := cm.GetApplicationProfileName(namespace, "pod", pod.GetName())
// Put label on pod application profile to mark it as finalized
_, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(pod.GetNamespace()).Patch(context.Background(),
if cm.config.StoreNamespace != "" {
namespace = cm.config.StoreNamespace
}
_, err := cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Patch(context.Background(),
appProfileName, apitypes.MergePatchType, []byte("{\"metadata\":{\"labels\":{\"kapprofiler.kubescape.io/final\":\"true\"}}}"), metav1.PatchOptions{})
if err != nil {
log.Printf("error patching application profile: %s\n", err)
Expand All @@ -191,9 +195,13 @@ func (cm *CollectorManager) handlePodDeleteEvent(obj interface{}) {
cm.stopTimer(&pod.ObjectMeta)

// Generate pod application profile name
appProfileName := fmt.Sprintf("pod-%s", pod.Name)
appProfileName := cm.GetApplicationProfileName(pod.Namespace, "pod", pod.Name)
namespace := pod.Namespace
if cm.config.StoreNamespace != "" {
namespace = cm.config.StoreNamespace
}
// Delete pod application profile CRD
err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(pod.GetNamespace()).Delete(context.TODO(), appProfileName, metav1.DeleteOptions{})
err = cm.dynamicClient.Resource(AppProfileGvr).Namespace(namespace).Delete(context.TODO(), appProfileName, metav1.DeleteOptions{})
if err != nil {
log.Printf("Error deleting pod application profile: %v", err)
return
Expand Down
10 changes: 10 additions & 0 deletions pkg/collector/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: kubescape.io/v1
kind: ApplicationProfile
metadata:
name: pod-nginx
namespace: default
spec:
containers:
- name: app
syscalls:
- open
67 changes: 47 additions & 20 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,27 @@ import (

// AppProfile controller struct
type Controller struct {
config *rest.Config
staticClient *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
appProfileGvr schema.GroupVersionResource
watcher watcher.WatcherInterface
config *rest.Config
staticClient *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
appProfileGvr schema.GroupVersionResource
watcher watcher.WatcherInterface
storeNamespace string
}

// Create a new controller based on given config
func NewController(config *rest.Config) *Controller {
func NewController(config *rest.Config, storeNamespace string) *Controller {

// Initialize clients and channels
staticClient, _ := kubernetes.NewForConfig(config)
dynamicClient, _ := dynamic.NewForConfig(config)

return &Controller{
config: config,
staticClient: staticClient,
dynamicClient: dynamicClient,
appProfileGvr: collector.AppProfileGvr,
config: config,
staticClient: staticClient,
dynamicClient: dynamicClient,
appProfileGvr: collector.AppProfileGvr,
storeNamespace: storeNamespace,
}
}

Expand Down Expand Up @@ -94,19 +96,34 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
}

// Get Object name from ApplicationProfile. Application profile name has the kind in as the the prefix like deployment-nginx
objectName := strings.Join(strings.Split(applicationProfileUnstructured.GetName(), "-")[1:], "-")
var objectName string
var namespace string
if c.storeNamespace == "" {
objectName = strings.Join(strings.Split(applicationProfileUnstructured.GetName(), "-")[1:], "-")
namespace = applicationProfileUnstructured.GetNamespace()
} else {
namespace = applicationProfileUnstructured.GetLabels()["kapprofiler.kubescape.io/namespace"]
// Trim the namespace from the application profile name
objectName = strings.Join(strings.Split(applicationProfileUnstructured.GetName(), "-")[1:], "-")
objectName = strings.TrimSuffix(objectName, fmt.Sprintf("-%s", namespace))
}

// Get pod to which the ApplicationProfile belongs to
pod, err := c.staticClient.CoreV1().Pods(applicationProfileUnstructured.GetNamespace()).Get(context.TODO(), objectName, metav1.GetOptions{})
pod, err := c.staticClient.CoreV1().Pods(namespace).Get(context.TODO(), objectName, metav1.GetOptions{})
if err != nil { // Ensures that the ApplicationProfile belongs to a pod or a replicaset
replicaSet, err := c.staticClient.AppsV1().ReplicaSets(applicationProfileUnstructured.GetNamespace()).Get(context.TODO(), objectName, metav1.GetOptions{})
replicaSet, err := c.staticClient.AppsV1().ReplicaSets(namespace).Get(context.TODO(), objectName, metav1.GetOptions{})
if err != nil { // ApplicationProfile belongs to neither
return
}

if len(replicaSet.OwnerReferences) > 0 && replicaSet.OwnerReferences[0].Kind == "Deployment" { // If owner of replicaset is a deployment
profileName := fmt.Sprintf("deployment-%v", replicaSet.OwnerReferences[0].Name)
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSet.Namespace).Get(context.TODO(), profileName, metav1.GetOptions{})
replicaSetNamespace := replicaSet.GetNamespace()
if c.storeNamespace != "" {
profileName = fmt.Sprintf("%s-%s", profileName, replicaSet.Namespace)
replicaSetNamespace = c.storeNamespace
}
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSetNamespace).Get(context.TODO(), profileName, metav1.GetOptions{})
if err != nil { // ApplicationProfile doesn't exist for deployment
applicationProfile, err := getApplicationProfileFromUnstructured(applicationProfileUnstructured)
if err != nil {
Expand All @@ -129,7 +146,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
if err != nil {
return
}
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSet.Namespace).Create(context.TODO(), &unstructured.Unstructured{Object: deploymentApplicationProfileRaw}, metav1.CreateOptions{})
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSetNamespace).Create(context.TODO(), &unstructured.Unstructured{Object: deploymentApplicationProfileRaw}, metav1.CreateOptions{})
if err != nil {
return
}
Expand All @@ -149,7 +166,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
deploymentApplicationProfile.Labels = applicationProfile.GetLabels()
deploymentApplicationProfile.Spec.Containers = applicationProfile.Spec.Containers
deploymentApplicationProfileRaw, _ := json.Marshal(deploymentApplicationProfile)
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSet.Namespace).Patch(context.TODO(), profileName, apitypes.MergePatchType, deploymentApplicationProfileRaw, metav1.PatchOptions{})
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(replicaSetNamespace).Patch(context.TODO(), profileName, apitypes.MergePatchType, deploymentApplicationProfileRaw, metav1.PatchOptions{})
if err != nil {
return
}
Expand Down Expand Up @@ -236,7 +253,12 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
// Merge all the container information of all the pods
for i := 0; i < len(pods.Items); i++ {
appProfileNameForPod := fmt.Sprintf("pod-%s", pods.Items[i].GetName())
typedObj, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pods.Items[i].GetNamespace()).Get(context.TODO(), appProfileNameForPod, metav1.GetOptions{})
podNamespace := pods.Items[i].GetNamespace()
if c.storeNamespace != "" {
appProfileNameForPod = fmt.Sprintf("%s-%s", appProfileNameForPod, podNamespace)
podNamespace = c.storeNamespace
}
typedObj, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(podNamespace).Get(context.TODO(), appProfileNameForPod, metav1.GetOptions{})
if err != nil {
log.Printf("ApplicationProfile for pod %v doesn't exist", pods.Items[i].GetName())
return
Expand Down Expand Up @@ -354,8 +376,13 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
}

applicationProfileNameForController := fmt.Sprintf("%s-%s", podControllerKind, podControllerName)
controllerApplicationProfileNamespace := pod.Namespace
if c.storeNamespace != "" {
applicationProfileNameForController = fmt.Sprintf("%s-%s", applicationProfileNameForController, controllerApplicationProfileNamespace)
controllerApplicationProfileNamespace = c.storeNamespace
}
// Fetch ApplicationProfile of the controller
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pod.Namespace).Get(context.TODO(), applicationProfileNameForController, metav1.GetOptions{})
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(controllerApplicationProfileNamespace).Get(context.TODO(), applicationProfileNameForController, metav1.GetOptions{})
if err != nil { // ApplicationProfile of controller doesn't exist so create a new one
controllerApplicationProfile := &collector.ApplicationProfile{
TypeMeta: metav1.TypeMeta{
Expand All @@ -375,7 +402,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
log.Printf("Error converting ApplicationProfile of controller %v", err)
return
}
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pod.Namespace).Create(context.TODO(), &unstructured.Unstructured{Object: controllerApplicationProfileRaw}, metav1.CreateOptions{})
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(controllerApplicationProfileNamespace).Create(context.TODO(), &unstructured.Unstructured{Object: controllerApplicationProfileRaw}, metav1.CreateOptions{})
if err != nil {
log.Printf("Error creating ApplicationProfile of controller %v", err)
return
Expand All @@ -390,7 +417,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
controllerApplicationProfile.Labels = applicationProfileUnstructured.GetLabels()
controllerApplicationProfile.Spec.Containers = containers
controllerApplicationProfileRaw, _ := json.Marshal(controllerApplicationProfile)
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pod.Namespace).Patch(context.TODO(), applicationProfileNameForController, apitypes.MergePatchType, controllerApplicationProfileRaw, metav1.PatchOptions{})
_, err = c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(controllerApplicationProfileNamespace).Patch(context.TODO(), applicationProfileNameForController, apitypes.MergePatchType, controllerApplicationProfileRaw, metav1.PatchOptions{})
if err != nil {
log.Printf("Error updating ApplicationProfile of controller %v", err)
return
Expand Down

0 comments on commit ae7398e

Please sign in to comment.