Skip to content

Commit

Permalink
Merge pull request #53 from kubescape/bugfix/memory-leak
Browse files Browse the repository at this point in the history
Bugfix/memory leak
  • Loading branch information
amitschendel authored Jan 14, 2024
2 parents 4f9b43f + e138fd3 commit 0220117
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ IMAGE_NAME ?= kapprofiler:latest


$(BINARY_NAME): $(GOFILES) go.mod go.sum Makefile
CGO_ENABLED=1 go build -o $(BINARY_NAME) -v
CGO_ENABLED=0 go build -o $(BINARY_NAME) -v

test:
$(GOTEST_SUDO_PREFIX) $(GOTEST) -v ./...
Expand Down
11 changes: 9 additions & 2 deletions pkg/collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,15 @@ func (a DnsCalls) Equals(b DnsCalls) bool {
if len(a.Addresses) != len(b.Addresses) {
return false
}
for i, addr := range a.Addresses {
if addr != b.Addresses[i] {
for _, addr := range a.Addresses {
found := false
for _, baddr := range b.Addresses {
if addr == baddr {
found = true
break
}
}
if !found {
return false
}
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (c *Controller) StartController() {
},
UpdateFunc: func(obj *unstructured.Unstructured) {
c.handleApplicationProfile(obj)

},
DeleteFunc: func(obj *unstructured.Unstructured) {
c.handleApplicationProfile(obj)
Expand All @@ -77,7 +76,6 @@ func (c *Controller) StartController() {

// Set the watcher
c.watcher = appProfileWatcher

}

// Stop the AppProfile controller
Expand Down Expand Up @@ -112,7 +110,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
if err != nil {
return
}
deploymentApplicationProfile := &collector.ApplicationProfile{
deploymentApplicationProfile := collector.ApplicationProfile{
TypeMeta: metav1.TypeMeta{
Kind: collector.ApplicationProfileKind,
APIVersion: collector.ApplicationProfileApiVersion,
Expand All @@ -125,7 +123,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
Containers: applicationProfile.Spec.Containers,
},
}
deploymentApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(deploymentApplicationProfile)
deploymentApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deploymentApplicationProfile)
if err != nil {
return
}
Expand All @@ -145,7 +143,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
return
}

deploymentApplicationProfile := &collector.ApplicationProfile{}
deploymentApplicationProfile := collector.ApplicationProfile{}
deploymentApplicationProfile.Labels = applicationProfile.GetLabels()
deploymentApplicationProfile.Spec.Containers = applicationProfile.Spec.Containers
deploymentApplicationProfileRaw, _ := json.Marshal(deploymentApplicationProfile)
Expand Down Expand Up @@ -357,7 +355,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
// Fetch ApplicationProfile of the controller
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pod.Namespace).Get(context.TODO(), applicationProfileNameForController, metav1.GetOptions{})
if err != nil { // ApplicationProfile of controller doesn't exist so create a new one
controllerApplicationProfile := &collector.ApplicationProfile{
controllerApplicationProfile := collector.ApplicationProfile{
TypeMeta: metav1.TypeMeta{
Kind: collector.ApplicationProfileKind,
APIVersion: collector.ApplicationProfileApiVersion,
Expand All @@ -370,7 +368,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
Containers: containers,
},
}
controllerApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(controllerApplicationProfile)
controllerApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&controllerApplicationProfile)
if err != nil {
log.Printf("Error converting ApplicationProfile of controller %v", err)
return
Expand All @@ -386,7 +384,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
// Don't update the application profile
return
}
controllerApplicationProfile := &collector.ApplicationProfile{}
controllerApplicationProfile := collector.ApplicationProfile{}
controllerApplicationProfile.Labels = applicationProfileUnstructured.GetLabels()
controllerApplicationProfile.Spec.Containers = containers
controllerApplicationProfileRaw, _ := json.Marshal(controllerApplicationProfile)
Expand Down
127 changes: 71 additions & 56 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,69 +90,84 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
}
w.watcher = watcher
w.running = true

// Use a context to gracefully stop the goroutine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
// Watch for events
defer func() {
// Ensure the watcher is closed when the goroutine exits
watcher.Stop()
}()

for {
event, ok := <-watcher.ResultChan()
if !ok {
if w.running {
// Need to restart the watcher: wait a bit and restart
time.Sleep(5 * time.Second)
listOptions.ResourceVersion = resourceVersion
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.Background(), listOptions)
if err != nil {
log.Printf("watcher restart error: %v", err)
select {
case event, ok := <-watcher.ResultChan():
if !ok {
if w.running {
// Need to restart the watcher: wait a bit and restart
time.Sleep(5 * time.Second)
listOptions.ResourceVersion = resourceVersion
newWatcher, err := w.client.Resource(gvr).Namespace("").Watch(context.Background(), listOptions)
if err != nil {
log.Printf("watcher restart error: %v", err)
return
}
w.watcher = newWatcher
// Restart the loop
continue
} else {
// Stop the watcher
return
}
// Restart the loop
continue
} else {
// Stop the watcher
return
}
}
switch event.Type {
case watch.Added:
// Convert the object to unstructured
addedObject := event.Object.(*unstructured.Unstructured)
if addedObject == nil {
log.Printf("watcher error: addedObject is nil")
continue
}
// Update the resourceVersion
if addedObject.GetResourceVersion() > resourceVersion {
resourceVersion = addedObject.GetResourceVersion()
}
notifyF.AddFunc(addedObject)
addedObject = nil // Make sure the item is scraped by the GC
case watch.Modified:
// Convert the object to unstructured
modifiedObject := event.Object.(*unstructured.Unstructured)
if modifiedObject == nil {
log.Printf("watcher error: modifiedObject is nil")
continue
}
// Update the resourceVersion
if modifiedObject.GetResourceVersion() > resourceVersion {
resourceVersion = modifiedObject.GetResourceVersion()
}
notifyF.UpdateFunc(modifiedObject)
modifiedObject = nil // Make sure the item is scraped by the GC
case watch.Deleted:
// Convert the object to unstructured
deletedObject := event.Object.(*unstructured.Unstructured)
if deletedObject == nil {
log.Printf("watcher error: deletedObject is nil")
continue
}
// Update the resourceVersion
if deletedObject.GetResourceVersion() > resourceVersion {
resourceVersion = deletedObject.GetResourceVersion()

switch event.Type {
case watch.Added:
// Convert the object to unstructured
addedObject := event.Object.(*unstructured.Unstructured)
if addedObject == nil {
log.Printf("watcher error: addedObject is nil")
continue
}
// Update the resourceVersion
if addedObject.GetResourceVersion() > resourceVersion {
resourceVersion = addedObject.GetResourceVersion()
}
notifyF.AddFunc(addedObject)
case watch.Modified:
// Convert the object to unstructured
modifiedObject := event.Object.(*unstructured.Unstructured)
if modifiedObject == nil {
log.Printf("watcher error: modifiedObject is nil")
continue
}
// Update the resourceVersion
if modifiedObject.GetResourceVersion() > resourceVersion {
resourceVersion = modifiedObject.GetResourceVersion()
}
notifyF.UpdateFunc(modifiedObject)
case watch.Deleted:
// Convert the object to unstructured
deletedObject := event.Object.(*unstructured.Unstructured)
if deletedObject == nil {
log.Printf("watcher error: deletedObject is nil")
continue
}
// Update the resourceVersion
if deletedObject.GetResourceVersion() > resourceVersion {
resourceVersion = deletedObject.GetResourceVersion()
}
notifyF.DeleteFunc(deletedObject)
case watch.Error:
log.Printf("watcher error: %v", event.Object)
}
notifyF.DeleteFunc(deletedObject)
deletedObject = nil // Make sure the item is scraped by the GC
case watch.Error:
log.Printf("watcher error: %v", event.Object)

case <-ctx.Done():
// Exit the goroutine when the context is canceled
return
}
}
}()
Expand Down

0 comments on commit 0220117

Please sign in to comment.