diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 9bd9d9be..e34b08ed 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -37,6 +37,7 @@ var ( listen = flag.String("listen", ":2024", "Address:port to listen on") spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") signer = flag.String("signer", "", "Note signer to use to sign checkpoints") + persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage") additionalSigners = []string{} ) @@ -65,7 +66,18 @@ func main() { if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } - dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256) + + // Handle dedup configuration + addDelegate := storage.Add + + // PersistentDedup is currently experimental, so there's no terraform or documentation yet! + if *persistentDedup { + addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate) + if err != nil { + klog.Exitf("Failed to create new GCP dedupe: %v", err) + } + } + dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256) // Expose a HTTP handler for the conformance test writes. // This should accept arbitrary bytes POSTed to /add, and return an ascii diff --git a/dedupe.go b/dedupe.go index 23b9b2fa..20a31917 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,9 +16,10 @@ package tessera import ( "context" + "fmt" "sync" - "github.com/hashicorp/golang-lru/v2/expirable" + lru "github.com/hashicorp/golang-lru/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -35,30 +36,38 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { + c, err := lru.New[string, func() IndexFuture](int(size)) + if err != nil { + panic(fmt.Errorf("lru.New(%d): %v", size, err)) + } dedupe := &inMemoryDedupe{ delegate: delegate, - cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0), + cache: c, } return dedupe.add } type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + cache *lru.Cache[string, func() IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - d.mu.Lock() - defer d.mu.Unlock() - f, ok := d.cache.Get(id) - if !ok { - f = d.delegate(ctx, e) - d.cache.Add(id, f) + // However many calls with the same entry come in and are deduped, we should only call delegate + // once for each unique entry: + f := sync.OnceValue(func() IndexFuture { + return d.delegate(ctx, e) + }) + + // if we've seen this entry before, discard our f and replace + // with the one we created last time, otherwise store f against id. + if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok { + f = prev } - return f + + return f() } diff --git a/dedupe_test.go b/dedupe_test.go index 8bc2aa1d..c29b0851 100644 --- a/dedupe_test.go +++ b/dedupe_test.go @@ -60,13 +60,15 @@ func TestDedupe(t *testing.T) { dedupeAdd := tessera.InMemoryDedupe(delegate, 256) // Add foo, bar, baz to prime the cache to make things interesting - dedupeAdd(ctx, tessera.NewEntry([]byte("foo"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("bar"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("baz"))) + for _, s := range []string{"foo", "bar", "baz"} { + if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil { + t.Fatalf("dedupeAdd(%q): %v", s, err) + } + } idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))() if err != nil { - t.Fatal(err) + t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err) } if idx != tC.wantIdx { t.Errorf("got != want (%d != %d)", idx, tC.wantIdx) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 6b5fce2f..aa4a374a 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -37,11 +37,13 @@ import ( "io" "net/http" "os" + "sync/atomic" "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + "github.com/globocom/go-buffer" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" @@ -783,3 +785,170 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e } return r.Attrs.LastModified, r.Close() } + +// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of +// previously seen entries and their assigned indices. Future calls with the same entry +// will return the previously assigned index, as yet unseen entries will be passed to the provided +// delegate function to have an index assigned. +// +// For performance reasons, the ID -> index associations returned by the delegate are buffered before +// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in +// general this should not be a problem. +// +// Note that the storage for this mapping is entirely separate and unconnected to the storage used for +// maintaining the Merkle tree. +// +// This functionality is experimental! +func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) { + /* + Schema for reference: + + CREATE TABLE IDSeq ( + id INT64 NOT NULL, + h BYTES(MAX) NOT NULL, + idx INT64 NOT NULL, + ) PRIMARY KEY (id, h); + */ + dedupDB, err := spanner.NewClient(ctx, spannerDB) + if err != nil { + return nil, fmt.Errorf("failed to connect to Spanner: %v", err) + } + + r := &dedupStorage{ + ctx: ctx, + dbPool: dedupDB, + delegate: delegate, + } + + // TODO(al): Make these configurable + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load()) + } + } + }(ctx) + return r.add, nil +} + +type dedupStorage struct { + ctx context.Context + dbPool *spanner.Client + delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture + + numLookups atomic.Uint64 + numWrites atomic.Uint64 + numDBDedups atomic.Uint64 + numPushErrs atomic.Uint64 + + buf *buffer.Buffer +} + +// index returns the index (if any) previously associated with the provided hash +func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { + d.numLookups.Add(1) + var idx int64 + if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { + if c := spanner.ErrCode(err); c == codes.NotFound { + return nil, nil + } + return nil, err + } else { + if err := row.Column(0, &idx); err != nil { + return nil, fmt.Errorf("failed to read dedup index: %v", err) + } + idx := uint64(idx) + d.numDBDedups.Add(1) + return &idx, nil + } +} + +// storeMappings stores the associations between the keys and IDs in a non-atomic fashion +// (i.e. it does not store all or none in a transactional sense). +// +// Returns an error if one or more mappings cannot be stored. +func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error { + m := make([]*spanner.MutationGroup, 0, len(entries)) + for _, e := range entries { + m = append(m, &spanner.MutationGroup{ + Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})}, + }) + } + + i := d.dbPool.BatchWrite(ctx, m) + return i.Do(func(r *spannerpb.BatchWriteResponse) error { + s := r.GetStatus() + if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists { + return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c) + } + return nil + }) +} + +// dedupeMapping represents an ID -> index mapping. +type dedupeMapping struct { + ID []byte + Idx uint64 +} + +// add adds the entry to the underlying delegate only if e isn't already known. In either case, +// an IndexFuture will be returned that the client can use to get the sequence number of this entry. +func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + idx, err := d.index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + i, err := d.delegate(ctx, e)() + if err != nil { + return func() (uint64, error) { return 0, err } + } + + err = d.enqueueMapping(ctx, e.Identity(), i) + return func() (uint64, error) { + return i, err + } +} + +// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage. +func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error { + err := d.buf.Push(dedupeMapping{ID: h, Idx: idx}) + if err != nil { + d.numPushErrs.Add(1) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +// flush writes enqueued mappings to storage. +func (d *dedupStorage) flush(items []interface{}) { + entries := make([]dedupeMapping, len(items)) + for i := range items { + entries[i] = items[i].(dedupeMapping) + } + + ctx, c := context.WithTimeout(d.ctx, 15*time.Second) + defer c() + + if err := d.storeMappings(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + d.numWrites.Add(uint64(len(entries))) +}