Skip to content

Commit

Permalink
enable periodic resync of containers with -r flag
Browse files Browse the repository at this point in the history
  • Loading branch information
clinta committed Oct 11, 2016
1 parent 959ef77 commit 3f57a7a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 20 deletions.
104 changes: 86 additions & 18 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ import (
const dockerVersion = "v1.23"

var (
containers map[string]dockertypes.ContainerJSON
containers map[string]containerData
containerlock sync.RWMutex
)

func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool) {
containers = make(map[string]dockertypes.ContainerJSON)
type containerData struct {
Cjson dockertypes.ContainerJSON
DockerHostID string
}

func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool, resync int) {
containers = make(map[string]containerData)
records = make(map[string][]net.IP)
rev_records = make(map[string]string)

Expand All @@ -47,31 +52,64 @@ func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool) {
log.WithError(err).Error("Error connecting to docker socket")
continue
}
go dockerWatch(dockerClient)
go dockerWatch(dockerClient, resync)
}
}

func syncAllContainers(dockerClient *dockerclient.Client, cxt context.Context, hostID string) error {
dockerContainers, err := dockerClient.ContainerList(cxt, dockertypes.ContainerListOptions{})
if err != nil {
return err
}
containerlock.Lock()
for _, dc := range dockerContainers {
cjson, err := dockerClient.ContainerInspect(context.Background(), dc.ID)
if err != nil {
log.WithError(err).WithField("Container ID", dc.ID).Error("Error inspecting container")
continue
}
containers[dc.ID] = containerData{Cjson: cjson, DockerHostID: hostID}
}
for id, c := range containers {
if c.DockerHostID != hostID {
continue
}
r := false
for _, dc := range dockerContainers {
if c.Cjson.ID == dc.ID {
r = true
break
}
}
if !r {
delete(containers, id)
}
}
containerlock.Unlock()
go updateRecords()
return nil
}

func dockerWatch(dockerClient *dockerclient.Client) {
func dockerWatch(dockerClient *dockerclient.Client, resync int) {
for {
cxt, cancel := context.WithCancel(context.Background())
// on startup populate containers
dockerContainers, err := dockerClient.ContainerList(cxt, dockertypes.ContainerListOptions{})
// Get host ID
dockerInfo, err := dockerClient.Info(cxt)
if err != nil {
log.WithError(err).Error("Error getting container list")
log.WithError(err).Error("Error getting host id")
cancel()
time.Sleep(2 * time.Second)
continue
}
containerlock.Lock()
for _, dc := range dockerContainers {
cjson, err := dockerClient.ContainerInspect(context.Background(), dc.ID)
if err != nil {
log.WithError(err).WithField("Container ID", dc.ID).Error("Error inspecting container")
}
containers[dc.ID] = cjson
hostID := dockerInfo.ID
// on startup populate containers
err = syncAllContainers(dockerClient, cxt, hostID)
if err != nil {
log.WithError(err).Error("Error syncing all containers")
cancel()
time.Sleep(2 * time.Second)
continue
}
containerlock.Unlock()
go updateRecords()

// Start monitoring docker events
dockerEventErr := events.Monitor(cxt, dockerClient, dockertypes.EventsOptions{}, func(event dockerevents.Message) {
Expand All @@ -96,14 +134,44 @@ func dockerWatch(dockerClient *dockerclient.Client) {
}
containerlock.Lock()
defer containerlock.Unlock()
containers[cid] = cjson
containers[cid] = containerData{Cjson: cjson, DockerHostID: hostID}
go updateRecords()
return
})

endResync := make(chan struct{})
if resync != 0 {
go func() {
t := time.NewTicker(time.Duration(resync) * time.Second)
for {
err = syncAllContainers(dockerClient, cxt, hostID)
if err != nil {
log.WithError(err).Error("Error syncing all containers")
}
select {
case <-t.C:
continue
case <-endResync:
t.Stop()
break
}
}
}()
}

// wait for an error from the monitoring, if we get one. Log and start over.
err = <-dockerEventErr
log.WithError(err).Error("Error from docker event subscription")
close(endResync)
cancel()
// cleanup containers
containerlock.Lock()
for id, c := range containers {
if c.DockerHostID == hostID {
delete(containers, id)
}
}
containerlock.Unlock()
}
}

Expand Down
3 changes: 2 additions & 1 deletion handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func updateRecords() {
for k := range records {
delete(records, k)
}
for _, cjson := range containers {
for _, cdata := range containers {
cjson := cdata.Cjson
for _, es := range cjson.NetworkSettings.Networks {
if es.IPAddress != "" {
ip := net.ParseIP(es.IPAddress)
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func main() {
Name: "debug, d",
Usage: "Enable debugging.",
},
cli.IntFlag{
Name: "resync, r",
Value: 0,
Usage: "Periodically resync all containers from docker hosts.",
},
cli.StringFlag{
Name: "domain, dom",
Value: "dkdns.",
Expand Down Expand Up @@ -118,7 +123,7 @@ func Run(ctx *cli.Context) error {
endpoints = []string{"unix:///var/run/docker.sock"}
}

go monDocker(endpoints, ctx.String("ca"), ctx.String("cert"), ctx.String("key"), !ctx.Bool("no-validate"))
go monDocker(endpoints, ctx.String("ca"), ctx.String("cert"), ctx.String("key"), !ctx.Bool("no-validate"), ctx.Int("resync"))

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
Expand Down

0 comments on commit 3f57a7a

Please sign in to comment.