Skip to content

Commit

Permalink
invalidate service if node was not updated
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Oct 28, 2024
1 parent b318b7f commit cdb6a0d
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion registry/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type cache struct {
sg singleflight.Group
cache map[string][]*registry.Service
ttls map[string]time.Time
nttls map[string]map[string]time.Time // node ttls
watched map[string]bool

// used to stop the cache
Expand Down Expand Up @@ -94,6 +95,17 @@ func (c *cache) isValid(services []*registry.Service, ttl time.Time) bool {
return false
}

// a node did not get updated
for _, s := range services {
for _, n := range s.Nodes {
nttl := c.nttls[s.Name][n.Id]
if time.Since(nttl) > 0 {
delete(c.nttls, s.Name)
return false
}
}
}

// ok
return true
}
Expand All @@ -115,6 +127,7 @@ func (c *cache) del(service string) {
// otherwise delete entries
delete(c.cache, service)
delete(c.ttls, service)
delete(c.nttls, service)
}

func (c *cache) get(service string) ([]*registry.Service, error) {
Expand All @@ -128,7 +141,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
// make a copy
cp := util.Copy(services)

// got services && within ttl so return cache
// got services, nodes && within ttl so return cache
if c.isValid(cp, ttl) {
c.RUnlock()
// return services
Expand Down Expand Up @@ -197,6 +210,14 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
func (c *cache) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.opts.TTL)
for _, s := range services {
for _, n := range s.Nodes {
if c.nttls[s.Name] == nil {
c.nttls[s.Name] = make(map[string]time.Time)
}
c.nttls[s.Name][n.Id] = time.Now().Add(c.opts.TTL)
}
}
}

func (c *cache) update(res *registry.Result) {
Expand Down Expand Up @@ -483,6 +504,7 @@ func New(r registry.Registry, opts ...Option) Cache {
watchedRunning: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
nttls: make(map[string]map[string]time.Time),
exit: make(chan bool),
}
}

0 comments on commit cdb6a0d

Please sign in to comment.