Skip to content

Commit

Permalink
Separate inotify handling into its own object.
Browse files Browse the repository at this point in the history
This makes it easier to ensure the error conditions are handled well and
that we don't leak watches.

Fixes #703
  • Loading branch information
vmarmol committed May 21, 2015
1 parent a55aece commit 36a6e68
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 35 deletions.
70 changes: 35 additions & 35 deletions container/raw/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,11 @@ type rawContainerHandler struct {
machineInfoFactory info.MachineInfoFactory

// Inotify event watcher.
watcher *inotify.Watcher
watcher *InotifyWatcher

// Signal for watcher thread to stop.
stopWatcher chan error

// Containers being watched for new subcontainers.
watches map[string]struct{}

// Cgroup paths being watched for new subcontainers
cgroupWatches map[string]struct{}

// Absolute path to the cgroup hierarchies of this container.
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
Expand Down Expand Up @@ -107,8 +101,6 @@ func newRawContainerHandler(name string, cgroupSubsystems *libcontainer.CgroupSu
cgroupSubsystems: cgroupSubsystems,
machineInfoFactory: machineInfoFactory,
stopWatcher: make(chan error),
watches: make(map[string]struct{}),
cgroupWatches: make(map[string]struct{}),
cgroupPaths: cgroupPaths,
cgroupManager: cgroupManager,
fsInfo: fsInfo,
Expand Down Expand Up @@ -402,29 +394,43 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i
return libcontainer.GetProcesses(self.cgroupManager)
}

func (self *rawContainerHandler) watchDirectory(dir string, containerName string) error {
err := self.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE)
// Watches the specified directory and all subdirectories. Returns whether the path was
// already being watched and an error (if any).
func (self *rawContainerHandler) watchDirectory(dir string, containerName string) (bool, error) {
alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
if err != nil {
return err
return alreadyWatching, err
}
self.watches[containerName] = struct{}{}
self.cgroupWatches[dir] = struct{}{}

// Remove the watch if further operations failed.
cleanup := true
defer func() {
if cleanup {
_, err := self.watcher.RemoveWatch(containerName, dir)
if err != nil {
glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
}
}
}()

// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
// Watch subdirectories as well.
entries, err := ioutil.ReadDir(dir)
if err != nil {
return err
return alreadyWatching, err
}
for _, entry := range entries {
if entry.IsDir() {
err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
// TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can.
_, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
if err != nil {
return err
return alreadyWatching, err
}
}
}
return nil

cleanup = false
return alreadyWatching, nil
}

func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error {
Expand Down Expand Up @@ -460,33 +466,27 @@ func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan
// Maintain the watch for the new or deleted container.
switch {
case eventType == container.SubcontainerAdd:
_, alreadyWatched := self.watches[containerName]

// New container was created, watch it.
err := self.watchDirectory(event.Name, containerName)
alreadyWatched, err := self.watchDirectory(event.Name, containerName)
if err != nil {
return err
}

// Only report container creation once.
if alreadyWatched {
if !alreadyWatched {
return nil
}
case eventType == container.SubcontainerDelete:
// Container was deleted, stop watching for it. Only delete the event if we registered it.
if _, ok := self.cgroupWatches[event.Name]; ok {
err := self.watcher.RemoveWatch(event.Name)
if err != nil {
return err
}
delete(self.cgroupWatches, event.Name)
// Container was deleted, stop watching for it.
wasWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
if err != nil {
return err
}

// Only report container deletion once.
if _, ok := self.watches[containerName]; !ok {
if wasWatched {
return nil
}
delete(self.watches, containerName)
default:
return fmt.Errorf("unknown event type %v", eventType)
}
Expand All @@ -503,7 +503,7 @@ func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan
func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
// Lazily initialize the watcher so we don't use it when not asked to.
if self.watcher == nil {
w, err := inotify.NewWatcher()
w, err := NewInotifyWatcher()
if err != nil {
return err
}
Expand All @@ -512,7 +512,7 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon

// Watch this container (all its cgroups) and all subdirectories.
for _, cgroupPath := range self.cgroupPaths {
err := self.watchDirectory(cgroupPath, self.name)
_, err := self.watchDirectory(cgroupPath, self.name)
if err != nil {
return err
}
Expand All @@ -522,12 +522,12 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon
go func() {
for {
select {
case event := <-self.watcher.Event:
case event := <-self.watcher.Event():
err := self.processEvent(event, events)
if err != nil {
glog.Warningf("Error while processing event (%+v): %v", event, err)
}
case err := <-self.watcher.Error:
case err := <-self.watcher.Error():
glog.Warningf("Error while watching %q:", self.name, err)
case <-self.stopWatcher:
err := self.watcher.Close()
Expand Down
128 changes: 128 additions & 0 deletions container/raw/inotify_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raw

import (
"sync"

"golang.org/x/exp/inotify"
)

// Watcher for container-related inotify events in the cgroup hierarchy.
//
// Implementation is thread-safe.
type InotifyWatcher struct {
// Underlying inotify watcher.
watcher *inotify.Watcher

// Containers being watched.
containersWatched map[string]bool

// Full cgroup paths being watched.
cgroupsWatched map[string]bool

// Lock for all datastructure access.
lock sync.Mutex
}

func NewInotifyWatcher() (*InotifyWatcher, error) {
w, err := inotify.NewWatcher()
if err != nil {
return nil, err
}

return &InotifyWatcher{
watcher: w,
containersWatched: make(map[string]bool),
cgroupsWatched: make(map[string]bool),
}, nil
}

// Add a watch to the specified directory. Returns if the container was already being watched.
func (iw *InotifyWatcher) AddWatch(containerName, dir string) (bool, error) {
iw.lock.Lock()
defer iw.lock.Unlock()

alreadyWatched := iw.containersWatched[containerName]

// Register an inotify notification.
if !iw.cgroupsWatched[dir] {
err := iw.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE)
if err != nil {
return alreadyWatched, err
}
iw.cgroupsWatched[dir] = true
}

// Record our watching of the container.
if !alreadyWatched {
iw.containersWatched[containerName] = true
}
return alreadyWatched, nil
}

// Remove watch from the specified directory. Returns if the container was already being watched.
func (iw *InotifyWatcher) RemoveWatch(containerName, dir string) (bool, error) {
iw.lock.Lock()
defer iw.lock.Unlock()

alreadyWatched := iw.containersWatched[containerName]

// Remove the inotify watch if it exists.
if iw.cgroupsWatched[dir] {
err := iw.watcher.RemoveWatch(dir)
if err != nil {
return alreadyWatched, nil
}
delete(iw.cgroupsWatched, dir)
}

// Record the container as no longer being watched.
if alreadyWatched {
delete(iw.containersWatched, containerName)
}

return alreadyWatched, nil
}

// Errors are returned on this channel.
func (iw *InotifyWatcher) Error() chan error {
return iw.watcher.Error
}

// Events are returned on this channel.
func (iw *InotifyWatcher) Event() chan *inotify.Event {
return iw.watcher.Event
}

// Closes the inotify watcher.
func (iw *InotifyWatcher) Close() error {
return iw.watcher.Close()
}

// Returns a list of:
// - Containers being watched.
// - Cgroup paths being watched.
func (iw *InotifyWatcher) GetWatches() ([]string, []string) {
return mapToSlice(iw.containersWatched), mapToSlice(iw.cgroupsWatched)
}

func mapToSlice(m map[string]bool) []string {
out := make([]string, 0, len(m))
for k := range m {
out = append(out, k)
}
return out
}

0 comments on commit 36a6e68

Please sign in to comment.