Skip to content

Commit

Permalink
feat: expire messages from the cache based on last seen time (#513)
Browse files Browse the repository at this point in the history
* feat: expire messages from the cache based on last seen time

* chore: minor renaming

* fix: messages should not be found after expiration

* chore: editorial

* fix: use new time cache strategy consistently

* fix: default to old time cache and add todo for background gc
  • Loading branch information
smrz2001 authored Jan 24, 2023
1 parent 3d2eab3 commit 973fef5
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 19 deletions.
12 changes: 3 additions & 9 deletions blacklist.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pubsub

import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/whyrusleeping/timecache"

"github.com/libp2p/go-libp2p-pubsub/timecache"
)

// Blacklist is an interface for peer blacklisting.
Expand Down Expand Up @@ -34,8 +34,7 @@ func (b MapBlacklist) Contains(p peer.ID) bool {

// TimeCachedBlacklist is a blacklist implementation using a time cache
type TimeCachedBlacklist struct {
sync.RWMutex
tc *timecache.TimeCache
tc timecache.TimeCache
}

// NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
Expand All @@ -46,8 +45,6 @@ func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) {

// Add returns a bool saying whether Add of peer was successful
func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
b.Lock()
defer b.Unlock()
s := p.String()
if b.tc.Has(s) {
return false
Expand All @@ -57,8 +54,5 @@ func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
}

func (b *TimeCachedBlacklist) Contains(p peer.ID) bool {
b.RLock()
defer b.RUnlock()

return b.tc.Has(p.String())
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ go 1.17

require (
github.com/benbjohnson/clock v1.3.0
github.com/emirpasic/gods v1.18.1
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/libp2p/go-msgio v0.2.0
github.com/multiformats/go-multiaddr v0.6.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
github.com/multiformats/go-varint v0.0.6
)

require (
Expand All @@ -36,7 +38,6 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
Expand All @@ -62,7 +63,6 @@ require (
github.com/multiformats/go-multicodec v0.5.0 // indirect
github.com/multiformats/go-multihash v0.2.1 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -544,8 +546,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
24 changes: 19 additions & 5 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"

logging "github.com/ipfs/go-log"
"github.com/whyrusleeping/timecache"
)

// DefaultMaximumMessageSize is 1mb.
Expand All @@ -31,6 +31,10 @@ var (
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
TimeCacheDuration = 120 * time.Second

// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
TimeCacheStrategy = timecache.Strategy_FirstSeen

// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
// subscription has been cancelled.
ErrSubscriptionCancelled = errors.New("subscription cancelled")
Expand Down Expand Up @@ -148,9 +152,10 @@ type PubSub struct {
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream

seenMessagesMx sync.Mutex
seenMessages *timecache.TimeCache
seenMsgTTL time.Duration
seenMessagesMx sync.Mutex
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy

// generator used to compute the ID for a message
idGen *msgIDGenerator
Expand Down Expand Up @@ -286,6 +291,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMsgTTL: TimeCacheDuration,
seenMsgStrategy: TimeCacheStrategy,
idGen: newMsgIdGenerator(),
counter: uint64(time.Now().UnixNano()),
}
Expand All @@ -307,7 +313,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
}
}

ps.seenMessages = timecache.NewTimeCache(ps.seenMsgTTL)
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)

if err := ps.disc.Start(ps); err != nil {
return nil, err
Expand Down Expand Up @@ -533,6 +539,14 @@ func WithSeenMessagesTTL(ttl time.Duration) Option {
}
}

// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
return func(ps *PubSub) error {
ps.seenMsgStrategy = strategy
return nil
}
}

// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
// processing them. The inspector is invoked on an accepted RPC just before it
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
Expand Down
71 changes: 71 additions & 0 deletions timecache/first_seen_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package timecache

import (
"container/list"
"sync"
"time"
)

// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache.
type FirstSeenCache struct {
q *list.List
m map[string]time.Time
span time.Duration
guard *sync.RWMutex
}

func newFirstSeenCache(span time.Duration) TimeCache {
return &FirstSeenCache{
q: list.New(),
m: make(map[string]time.Time),
span: span,
guard: new(sync.RWMutex),
}
}

func (tc FirstSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()

_, ok := tc.m[s]
if ok {
panic("putting the same entry twice not supported")
}

// TODO(#515): Do GC in the background
tc.sweep()

tc.m[s] = time.Now()
tc.q.PushFront(s)
}

func (tc FirstSeenCache) sweep() {
for {
back := tc.q.Back()
if back == nil {
return
}

v := back.Value.(string)
t, ok := tc.m[v]
if !ok {
panic("inconsistent cache state")
}

if time.Since(t) > tc.span {
tc.q.Remove(back)
delete(tc.m, v)
} else {
return
}
}
}

func (tc FirstSeenCache) Has(s string) bool {
tc.guard.RLock()
defer tc.guard.RUnlock()

ts, ok := tc.m[s]
// Only consider the entry found if it was present in the cache AND hadn't already expired.
return ok && time.Since(ts) <= tc.span
}
39 changes: 39 additions & 0 deletions timecache/first_seen_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package timecache

import (
"fmt"
"testing"
"time"
)

func TestFirstSeenCacheFound(t *testing.T) {
tc := newFirstSeenCache(time.Minute)

tc.Add("test")

if !tc.Has("test") {
t.Fatal("should have this key")
}
}

func TestFirstSeenCacheExpire(t *testing.T) {
tc := newFirstSeenCache(time.Second)
for i := 0; i < 11; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) {
tc := newFirstSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}
84 changes: 84 additions & 0 deletions timecache/last_seen_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package timecache

import (
"sync"
"time"

"github.com/emirpasic/gods/maps/linkedhashmap"
)

// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed,
// "old" entries will be purged from the cache.
//
// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This
// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network
// issues that might increase the number of duplicate messages in the network.
//
// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the
// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one
// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache.
//
// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache.
type LastSeenCache struct {
m *linkedhashmap.Map
span time.Duration
guard *sync.Mutex
}

func newLastSeenCache(span time.Duration) TimeCache {
return &LastSeenCache{
m: linkedhashmap.New(),
span: span,
guard: new(sync.Mutex),
}
}

func (tc *LastSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()

tc.add(s)

// Garbage collect expired entries
// TODO(#515): Do GC in the background
tc.gc()
}

func (tc *LastSeenCache) add(s string) {
// We don't need a lock here because this function is always called with the lock already acquired.

// If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and
// an accurate sliding window.
tc.m.Remove(s)
now := time.Now()
tc.m.Put(s, &now)
}

func (tc *LastSeenCache) gc() {
// We don't need a lock here because this function is always called with the lock already acquired.
iter := tc.m.Iterator()
for iter.Next() {
key := iter.Key()
ts := iter.Value().(*time.Time)
// Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all
// entries hereafter will be unexpired.
if time.Since(*ts) <= tc.span {
return
}
tc.m.Remove(key)
}
}

func (tc *LastSeenCache) Has(s string) bool {
tc.guard.Lock()
defer tc.guard.Unlock()

// If the entry exists and has not already expired, slide it forward.
if ts, found := tc.m.Get(s); found {
if t := ts.(*time.Time); time.Since(*t) <= tc.span {
tc.add(s)
return true
}
}
return false
}
Loading

0 comments on commit 973fef5

Please sign in to comment.