Skip to content

Commit

Permalink
Merge pull request #54 from kubescape/hotfix/watcher
Browse files Browse the repository at this point in the history
Revert "Merge pull request #53 from kubescape/bugfix/memory-leak"
  • Loading branch information
slashben authored Jan 14, 2024
2 parents 0220117 + a561cbe commit 5e60256
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ IMAGE_NAME ?= kapprofiler:latest


$(BINARY_NAME): $(GOFILES) go.mod go.sum Makefile
CGO_ENABLED=0 go build -o $(BINARY_NAME) -v
CGO_ENABLED=1 go build -o $(BINARY_NAME) -v

test:
$(GOTEST_SUDO_PREFIX) $(GOTEST) -v ./...
Expand Down
11 changes: 2 additions & 9 deletions pkg/collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,8 @@ func (a DnsCalls) Equals(b DnsCalls) bool {
if len(a.Addresses) != len(b.Addresses) {
return false
}
for _, addr := range a.Addresses {
found := false
for _, baddr := range b.Addresses {
if addr == baddr {
found = true
break
}
}
if !found {
for i, addr := range a.Addresses {
if addr != b.Addresses[i] {
return false
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (c *Controller) StartController() {
},
UpdateFunc: func(obj *unstructured.Unstructured) {
c.handleApplicationProfile(obj)

},
DeleteFunc: func(obj *unstructured.Unstructured) {
c.handleApplicationProfile(obj)
Expand All @@ -76,6 +77,7 @@ func (c *Controller) StartController() {

// Set the watcher
c.watcher = appProfileWatcher

}

// Stop the AppProfile controller
Expand Down Expand Up @@ -110,7 +112,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
if err != nil {
return
}
deploymentApplicationProfile := collector.ApplicationProfile{
deploymentApplicationProfile := &collector.ApplicationProfile{
TypeMeta: metav1.TypeMeta{
Kind: collector.ApplicationProfileKind,
APIVersion: collector.ApplicationProfileApiVersion,
Expand All @@ -123,7 +125,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
Containers: applicationProfile.Spec.Containers,
},
}
deploymentApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deploymentApplicationProfile)
deploymentApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(deploymentApplicationProfile)
if err != nil {
return
}
Expand All @@ -143,7 +145,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
return
}

deploymentApplicationProfile := collector.ApplicationProfile{}
deploymentApplicationProfile := &collector.ApplicationProfile{}
deploymentApplicationProfile.Labels = applicationProfile.GetLabels()
deploymentApplicationProfile.Spec.Containers = applicationProfile.Spec.Containers
deploymentApplicationProfileRaw, _ := json.Marshal(deploymentApplicationProfile)
Expand Down Expand Up @@ -355,7 +357,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
// Fetch ApplicationProfile of the controller
existingApplicationProfile, err := c.dynamicClient.Resource(collector.AppProfileGvr).Namespace(pod.Namespace).Get(context.TODO(), applicationProfileNameForController, metav1.GetOptions{})
if err != nil { // ApplicationProfile of controller doesn't exist so create a new one
controllerApplicationProfile := collector.ApplicationProfile{
controllerApplicationProfile := &collector.ApplicationProfile{
TypeMeta: metav1.TypeMeta{
Kind: collector.ApplicationProfileKind,
APIVersion: collector.ApplicationProfileApiVersion,
Expand All @@ -368,7 +370,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
Containers: containers,
},
}
controllerApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&controllerApplicationProfile)
controllerApplicationProfileRaw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(controllerApplicationProfile)
if err != nil {
log.Printf("Error converting ApplicationProfile of controller %v", err)
return
Expand All @@ -384,7 +386,7 @@ func (c *Controller) handleApplicationProfile(applicationProfileUnstructured *un
// Don't update the application profile
return
}
controllerApplicationProfile := collector.ApplicationProfile{}
controllerApplicationProfile := &collector.ApplicationProfile{}
controllerApplicationProfile.Labels = applicationProfileUnstructured.GetLabels()
controllerApplicationProfile.Spec.Containers = containers
controllerApplicationProfileRaw, _ := json.Marshal(controllerApplicationProfile)
Expand Down
127 changes: 56 additions & 71 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,84 +90,69 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
}
w.watcher = watcher
w.running = true

// Use a context to gracefully stop the goroutine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
// Watch for events
defer func() {
// Ensure the watcher is closed when the goroutine exits
watcher.Stop()
}()

for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
if w.running {
// Need to restart the watcher: wait a bit and restart
time.Sleep(5 * time.Second)
listOptions.ResourceVersion = resourceVersion
newWatcher, err := w.client.Resource(gvr).Namespace("").Watch(context.Background(), listOptions)
if err != nil {
log.Printf("watcher restart error: %v", err)
return
}
w.watcher = newWatcher
// Restart the loop
continue
} else {
// Stop the watcher
return
event, ok := <-watcher.ResultChan()
if !ok {
if w.running {
// Need to restart the watcher: wait a bit and restart
time.Sleep(5 * time.Second)
listOptions.ResourceVersion = resourceVersion
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.Background(), listOptions)
if err != nil {
log.Printf("watcher restart error: %v", err)
}
// Restart the loop
continue
} else {
// Stop the watcher
return
}

switch event.Type {
case watch.Added:
// Convert the object to unstructured
addedObject := event.Object.(*unstructured.Unstructured)
if addedObject == nil {
log.Printf("watcher error: addedObject is nil")
continue
}
// Update the resourceVersion
if addedObject.GetResourceVersion() > resourceVersion {
resourceVersion = addedObject.GetResourceVersion()
}
notifyF.AddFunc(addedObject)
case watch.Modified:
// Convert the object to unstructured
modifiedObject := event.Object.(*unstructured.Unstructured)
if modifiedObject == nil {
log.Printf("watcher error: modifiedObject is nil")
continue
}
// Update the resourceVersion
if modifiedObject.GetResourceVersion() > resourceVersion {
resourceVersion = modifiedObject.GetResourceVersion()
}
notifyF.UpdateFunc(modifiedObject)
case watch.Deleted:
// Convert the object to unstructured
deletedObject := event.Object.(*unstructured.Unstructured)
if deletedObject == nil {
log.Printf("watcher error: deletedObject is nil")
continue
}
// Update the resourceVersion
if deletedObject.GetResourceVersion() > resourceVersion {
resourceVersion = deletedObject.GetResourceVersion()
}
notifyF.DeleteFunc(deletedObject)
case watch.Error:
log.Printf("watcher error: %v", event.Object)
}
switch event.Type {
case watch.Added:
// Convert the object to unstructured
addedObject := event.Object.(*unstructured.Unstructured)
if addedObject == nil {
log.Printf("watcher error: addedObject is nil")
continue
}

case <-ctx.Done():
// Exit the goroutine when the context is canceled
return
// Update the resourceVersion
if addedObject.GetResourceVersion() > resourceVersion {
resourceVersion = addedObject.GetResourceVersion()
}
notifyF.AddFunc(addedObject)
addedObject = nil // Make sure the item is scraped by the GC
case watch.Modified:
// Convert the object to unstructured
modifiedObject := event.Object.(*unstructured.Unstructured)
if modifiedObject == nil {
log.Printf("watcher error: modifiedObject is nil")
continue
}
// Update the resourceVersion
if modifiedObject.GetResourceVersion() > resourceVersion {
resourceVersion = modifiedObject.GetResourceVersion()
}
notifyF.UpdateFunc(modifiedObject)
modifiedObject = nil // Make sure the item is scraped by the GC
case watch.Deleted:
// Convert the object to unstructured
deletedObject := event.Object.(*unstructured.Unstructured)
if deletedObject == nil {
log.Printf("watcher error: deletedObject is nil")
continue
}
// Update the resourceVersion
if deletedObject.GetResourceVersion() > resourceVersion {
resourceVersion = deletedObject.GetResourceVersion()
}
notifyF.DeleteFunc(deletedObject)
deletedObject = nil // Make sure the item is scraped by the GC
case watch.Error:
log.Printf("watcher error: %v", event.Object)
}
}
}()
Expand Down

0 comments on commit 5e60256

Please sign in to comment.