Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing socket path #406

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/component-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
Test_02_AllAlertsFromMaliciousApp,
Test_03_BasicLoadActivities,
# Test_04_MemoryLeak,
Test_05_MemoryLeak_10K_Alerts,
# Test_05_MemoryLeak_10K_Alerts,
Test_06_KillProcessInTheMiddle,
Test_07_RuleBindingApplyTest,
Test_08_ApplicationProfilePatching,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
k8s.io/kubectl v0.31.0
k8s.io/kubelet v0.31.1
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -252,7 +253,6 @@ require (
k8s.io/cri-api v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240812233141-91dab695df6f // indirect
k8s.io/kubelet v0.31.1 // indirect
oras.land/oras-go/v2 v2.4.0 // indirect
sigs.k8s.io/controller-runtime v0.19.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
10 changes: 1 addition & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ func main() {

nodeName := os.Getenv(config.NodeNameEnvVar)

// Detect the container containerRuntime of the node
containerRuntime, err := utils.DetectContainerRuntimeViaK8sAPI(ctx, k8sClient, nodeName)
if err != nil {
logger.L().Ctx(ctx).Fatal("error detecting the container runtime", helpers.Error(err))
}

logger.L().Ctx(ctx).Info("Detected container runtime", helpers.String("containerRuntime", containerRuntime.Name.String()))

// Create watchers
dWatcher := dynamicwatcher.NewWatchHandler(k8sClient, storageClient.StorageClient, cfg.SkipNamespace)
// create k8sObject cache
Expand Down Expand Up @@ -283,7 +275,7 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, processManager)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, nil, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
containerutilsTypes "github.com/inspektor-gadget/inspektor-gadget/pkg/container-utils/types"
tracerseccomp "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/advise/seccomp/tracer"
tracercapabilities "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/capabilities/tracer"
tracercapabilitiestype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/capabilities/types"
Expand Down Expand Up @@ -152,15 +151,13 @@ type IGContainerWatcher struct {
metrics metricsmanager.MetricsManager
// cache
ruleBindingPodNotify *chan rulebinding.RuleBindingNotify
// container runtime
runtime *containerutilsTypes.RuntimeConfig
// process manager
processManager processmanager.ProcessManagerClient
}

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand Down Expand Up @@ -450,7 +447,6 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
ruleBindingPodNotify: ruleBindingPodNotify,
timeBasedContainers: mapset.NewSet[string](),
ruleManagedPods: mapset.NewSet[string](),
runtime: runtime,
thirdPartyTracers: mapset.NewSet[containerwatcher.CustomTracer](),
thirdPartyContainerReceivers: mapset.NewSet[containerwatcher.ContainerReceiver](),
processManager: processManager,
Expand Down
23 changes: 14 additions & 9 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,25 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
// Enrich events with Linux namespaces information, it is needed for per container filtering
containercollection.WithLinuxNamespaceEnrichment(),

// Get containers created with container runtimes
containercollection.WithContainerRuntimeEnrichment(ch.runtime),

// Get containers created with ebpf (works also if hostPid=false)
containercollection.WithContainerFanotifyEbpf(),
// Enrich those containers with data from the Kubernetes API
containercollection.WithKubernetesEnrichment(ch.nodeName, ch.k8sClient.K8SConfig),

// WithTracerCollection enables the interation between the TracerCollection and ContainerCollection packages.
containercollection.WithTracerCollection(ch.tracerCollection),

// Enrich those containers with data from the Kubernetes API
containercollection.WithKubernetesEnrichment(ch.nodeName, ch.k8sClient.K8SConfig),
// WithPodInformer uses a pod informer to get both initial containers and the stream of container events.
containercollection.WithPodInformer(ch.nodeName),
}

// Initialize the container collection
if err := ch.containerCollection.Initialize(opts...); err != nil {
return fmt.Errorf("initializing container collection: %w", err)
}

// add containers that are already running
// Add pre-running containers to preRunningContainersIDs and ruleManagedPods, this is needed for the following reasons:
// 1. For runtime threat detection we want to keep monitoring the containers and not stop monitoring them after the max sniffing time.
// 2. To know in different parts of the code if a container "learning" is partial or complete.
// In addition, this routine will keep monitoring for rule bindings notifications for the entire lifecycle of the node-agent.
go ch.startRunningContainers()

return nil
Expand Down Expand Up @@ -182,7 +183,11 @@ func (ch *IGContainerWatcher) addRunningContainers(k8sClient IGK8sClient, notf *
helpers.String("containerName", runningContainers[i].K8s.ContainerName))

ch.preRunningContainersIDs.Add(runningContainers[i].Runtime.ContainerID)
ch.containerCollection.AddContainer(&runningContainers[i])
if ch.containerCollection.GetContainer(runningContainers[i].Runtime.ContainerID) == nil {
// This should not happen, but just in case (it should be added by the podInformer).
logger.L().Info("adding container - pod managed by rules", helpers.String("ContainerName", runningContainers[i].K8s.ContainerName))
ch.containerCollection.AddContainer(&runningContainers[i])
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/containerwatcher/v1/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func BenchmarkIGContainerWatcher_openEventCallback(b *testing.B) {
assert.NoError(b, err)
mockExporter := metricsmanager.NewMetricsMock()

mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil, nil)
mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil)
assert.NoError(b, err)
event := &traceropentype.Event{
Event: types.Event{
Expand Down
52 changes: 52 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
igtypes "github.com/inspektor-gadget/inspektor-gadget/pkg/types"

apitypes "github.com/armosec/armoapi-go/armotypes"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
)

var (
Expand Down Expand Up @@ -745,6 +747,11 @@ func DetectContainerRuntimeViaK8sAPI(ctx context.Context, k8sClient *k8sinterfac
if runtimeConfig.Name == igtypes.RuntimeNameUnknown {
return nil, fmt.Errorf("unknown container runtime: %s", node.Status.NodeInfo.ContainerRuntimeVersion)
}
realSocketPath, err := getContainerRuntimeSocketPath(k8sClient, nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get container runtime socket path from Kubelet configz: %v", err)
}
runtimeConfig.SocketPath = realSocketPath

return runtimeConfig, nil
}
Expand Down Expand Up @@ -783,3 +790,48 @@ func parseRuntimeInfo(version string) *containerutilsTypes.RuntimeConfig {
}
}
}

func getContainerRuntimeSocketPath(clientset *k8sinterface.KubernetesApi, nodeName string) (string, error) {
kubeletConfig, err := getCurrentKubeletConfig(clientset, nodeName)
if err != nil {
return "", fmt.Errorf("getting /configz: %w", err)
}
socketPath, found := strings.CutPrefix(kubeletConfig.ContainerRuntimeEndpoint, "unix://")
if !found {
return "", fmt.Errorf("socket path does not start with unix://")
}
logger.L().Info("using the detected container runtime socket path from Kubelet's config", helpers.String("socketPath", socketPath))
return socketPath, nil
}

// The /configz endpoint isn't officially documented. It was introduced in Kubernetes 1.26 and been around for a long time
// as stated in https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/configz/OWNERS
func getCurrentKubeletConfig(clientset *k8sinterface.KubernetesApi, nodeName string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
resp, err := clientset.GetKubernetesClient().CoreV1().RESTClient().Get().Resource("nodes").
Name(nodeName).Suffix("proxy", "configz").DoRaw(context.TODO())
if err != nil {
return nil, fmt.Errorf("fetching /configz from %q: %w", nodeName, err)
}
kubeCfg, err := decodeConfigz(resp)
if err != nil {
return nil, err
}
return kubeCfg, nil
}

// Decodes the http response from /configz and returns the kubelet configuration
func decodeConfigz(respBody []byte) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
// This hack because /configz reports the following structure:
// {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
type configzWrapper struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

configz := configzWrapper{}
err := json.Unmarshal(respBody, &configz)
if err != nil {
return nil, err
}

return &configz.ComponentConfig, nil
}
2 changes: 1 addition & 1 deletion tests/chart/templates/node-agent/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
kubescape.io/ignore: "true"
rules:
- apiGroups: [""]
resources: ["pods", "nodes", "services", "endpoints", "namespaces"]
resources: ["pods", "nodes", "nodes/proxy", "services", "endpoints", "namespaces"]
verbs: ["get", "watch", "list"]
- apiGroups: [""]
resources: ["events"]
Expand Down
Loading