From 3f57a7a3841536cc78375d76282912797d1fe445 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 11 Oct 2016 12:57:56 -0400 Subject: [PATCH] enable periodic resync of containers with -r flag --- docker.go | 104 ++++++++++++++++++++++++++++++++++++++++++++---------- handle.go | 3 +- main.go | 7 +++- 3 files changed, 94 insertions(+), 20 deletions(-) diff --git a/docker.go b/docker.go index e13ddf4..6867493 100644 --- a/docker.go +++ b/docker.go @@ -19,12 +19,17 @@ import ( const dockerVersion = "v1.23" var ( - containers map[string]dockertypes.ContainerJSON + containers map[string]containerData containerlock sync.RWMutex ) -func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool) { - containers = make(map[string]dockertypes.ContainerJSON) +type containerData struct { + Cjson dockertypes.ContainerJSON + DockerHostID string +} + +func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool, resync int) { + containers = make(map[string]containerData) records = make(map[string][]net.IP) rev_records = make(map[string]string) @@ -47,31 +52,64 @@ func monDocker(dockerEndpoints []string, ca, cert, key string, verify bool) { log.WithError(err).Error("Error connecting to docker socket") continue } - go dockerWatch(dockerClient) + go dockerWatch(dockerClient, resync) + } +} + +func syncAllContainers(dockerClient *dockerclient.Client, cxt context.Context, hostID string) error { + dockerContainers, err := dockerClient.ContainerList(cxt, dockertypes.ContainerListOptions{}) + if err != nil { + return err + } + containerlock.Lock() + for _, dc := range dockerContainers { + cjson, err := dockerClient.ContainerInspect(context.Background(), dc.ID) + if err != nil { + log.WithError(err).WithField("Container ID", dc.ID).Error("Error inspecting container") + continue + } + containers[dc.ID] = containerData{Cjson: cjson, DockerHostID: hostID} } + for id, c := range containers { + if c.DockerHostID != hostID { + continue + } + r := false + for _, dc := range dockerContainers { + if c.Cjson.ID == dc.ID { + r = true + break + } + } + if !r { + delete(containers, id) + } + } + containerlock.Unlock() + go updateRecords() + return nil } -func dockerWatch(dockerClient *dockerclient.Client) { +func dockerWatch(dockerClient *dockerclient.Client, resync int) { for { cxt, cancel := context.WithCancel(context.Background()) - // on startup populate containers - dockerContainers, err := dockerClient.ContainerList(cxt, dockertypes.ContainerListOptions{}) + // Get host ID + dockerInfo, err := dockerClient.Info(cxt) if err != nil { - log.WithError(err).Error("Error getting container list") + log.WithError(err).Error("Error getting host id") cancel() time.Sleep(2 * time.Second) continue } - containerlock.Lock() - for _, dc := range dockerContainers { - cjson, err := dockerClient.ContainerInspect(context.Background(), dc.ID) - if err != nil { - log.WithError(err).WithField("Container ID", dc.ID).Error("Error inspecting container") - } - containers[dc.ID] = cjson + hostID := dockerInfo.ID + // on startup populate containers + err = syncAllContainers(dockerClient, cxt, hostID) + if err != nil { + log.WithError(err).Error("Error syncing all containers") + cancel() + time.Sleep(2 * time.Second) + continue } - containerlock.Unlock() - go updateRecords() // Start monitoring docker events dockerEventErr := events.Monitor(cxt, dockerClient, dockertypes.EventsOptions{}, func(event dockerevents.Message) { @@ -96,14 +134,44 @@ func dockerWatch(dockerClient *dockerclient.Client) { } containerlock.Lock() defer containerlock.Unlock() - containers[cid] = cjson + containers[cid] = containerData{Cjson: cjson, DockerHostID: hostID} go updateRecords() return }) + + endResync := make(chan struct{}) + if resync != 0 { + go func() { + t := time.NewTicker(time.Duration(resync) * time.Second) + for { + err = syncAllContainers(dockerClient, cxt, hostID) + if err != nil { + log.WithError(err).Error("Error syncing all containers") + } + select { + case <-t.C: + continue + case <-endResync: + t.Stop() + break + } + } + }() + } + // wait for an error from the monitoring, if we get one. Log and start over. err = <-dockerEventErr log.WithError(err).Error("Error from docker event subscription") + close(endResync) cancel() + // cleanup containers + containerlock.Lock() + for id, c := range containers { + if c.DockerHostID == hostID { + delete(containers, id) + } + } + containerlock.Unlock() } } diff --git a/handle.go b/handle.go index 942061c..89b060d 100644 --- a/handle.go +++ b/handle.go @@ -27,7 +27,8 @@ func updateRecords() { for k := range records { delete(records, k) } - for _, cjson := range containers { + for _, cdata := range containers { + cjson := cdata.Cjson for _, es := range cjson.NetworkSettings.Networks { if es.IPAddress != "" { ip := net.ParseIP(es.IPAddress) diff --git a/main.go b/main.go index 3f8b249..c5fa1ae 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,11 @@ func main() { Name: "debug, d", Usage: "Enable debugging.", }, + cli.IntFlag{ + Name: "resync, r", + Value: 0, + Usage: "Periodically resync all containers from docker hosts.", + }, cli.StringFlag{ Name: "domain, dom", Value: "dkdns.", @@ -118,7 +123,7 @@ func Run(ctx *cli.Context) error { endpoints = []string{"unix:///var/run/docker.sock"} } - go monDocker(endpoints, ctx.String("ca"), ctx.String("cert"), ctx.String("key"), !ctx.Bool("no-validate")) + go monDocker(endpoints, ctx.String("ca"), ctx.String("cert"), ctx.String("key"), !ctx.Bool("no-validate"), ctx.Int("resync")) sig := make(chan os.Signal) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)