Skip to content

Commit

Permalink
chore: Better abstract secondary storage - observe secondary storage …
Browse files Browse the repository at this point in the history
…via metrics - refactors and lints
  • Loading branch information
epociask committed Oct 13, 2024
1 parent 13f221b commit 95790f2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 39 deletions.
12 changes: 5 additions & 7 deletions server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri
// create EigenDA backend store
var eigenDA store.GeneratedKeyStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
log.Info("Using memstore backend for EigenDA")
eigenDA, err = memstore.New(ctx, verifier, log, cfg.EigenDAConfig.MemstoreConfig)
} else {
var client *clients.EigenDAClient
Expand Down Expand Up @@ -120,14 +120,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri
// create secondary storage router
fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore)
caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore)
secondary, err := store.NewSecondaryRouter(log, m, caches, fallbacks)
if err != nil {
return nil, err
}
secondary := store.NewSecondaryRouter(log, m, caches, fallbacks)

if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled
log.Debug("Starting secondary stream processing routines")
go secondary.SubscribeToPutNotif(ctx)
// NOTE: in the future the number of threads could be made configurable via env
log.Debug("Starting secondary write loop")
go secondary.WriteLoop(ctx)
}

log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil)
Expand Down
4 changes: 2 additions & 2 deletions store/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va
return nil, err
}

if r.secondary.Enabled() {
r.secondary.Topic() <- PutNotify{
if r.secondary.Enabled() { // publish put notification to secondary's subscription on PutNotification topic
r.secondary.Subscription() <- PutNotify{
Commitment: commit,
Value: value,
}
Expand Down
49 changes: 19 additions & 30 deletions store/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (
)

type ISecondary interface {
Fallbacks() []PrecomputedKeyStore
Caches() []PrecomputedKeyStore
Enabled() bool
Topic() chan<- PutNotify
Subscription() chan<- PutNotify
CachingEnabled() bool
FallbackEnabled() bool
HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error
MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error)
SubscribeToPutNotif(context.Context)
WriteLoop(context.Context)
}

type PutNotify struct {
Expand All @@ -31,29 +29,29 @@ type PutNotify struct {

// SecondaryRouter ... routing abstraction for secondary storage backends
type SecondaryRouter struct {
stream chan PutNotify
log log.Logger
m metrics.Metricer
log log.Logger
m metrics.Metricer

caches []PrecomputedKeyStore
fallbacks []PrecomputedKeyStore

verifyLock sync.RWMutex
verifyLock sync.RWMutex
subscription chan PutNotify
}

// NewSecondaryRouter ... creates a new secondary storage router
func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) {
func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) ISecondary {
return &SecondaryRouter{
stream: make(chan PutNotify),
log: log,
m: m,
caches: caches,
fallbacks: fallbacks,
}, nil
subscription: make(chan PutNotify), // unbuffering channel is critical to ensure that secondary bottlenecks don't impact /put latency for eigenda blob dispersals
log: log,
m: m,
caches: caches,
fallbacks: fallbacks,
}
}

func (r *SecondaryRouter) Topic() chan<- PutNotify {
return r.stream
func (r *SecondaryRouter) Subscription() chan<- PutNotify {
return r.subscription
}

func (r *SecondaryRouter) Enabled() bool {
Expand All @@ -70,9 +68,6 @@ func (r *SecondaryRouter) FallbackEnabled() bool {

// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache)
// and returns an error if NONE of them succeed
// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same
// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type
// vs a fallback read type
func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error {
sources := r.caches
sources = append(sources, r.fallbacks...)
Expand Down Expand Up @@ -100,16 +95,18 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment
return nil
}

func (r *SecondaryRouter) SubscribeToPutNotif(ctx context.Context) {
// WriteLoop ... waits for notifications published to subscription channel to make backend writes
func (r *SecondaryRouter) WriteLoop(ctx context.Context) {
for {
select {
case notif := <-r.stream:
case notif := <-r.subscription:
err := r.HandleRedundantWrites(context.Background(), notif.Commitment, notif.Value)
if err != nil {
r.log.Error("Failed to write to redundant targets", "err", err)
}

case <-ctx.Done():
r.log.Debug("Terminating secondary event loop")
return
}
}
Expand Down Expand Up @@ -157,11 +154,3 @@ func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte
}
return nil, errors.New("no data found in any redundant backend")
}

func (r *SecondaryRouter) Fallbacks() []PrecomputedKeyStore {
return r.fallbacks
}

func (r *SecondaryRouter) Caches() []PrecomputedKeyStore {
return r.caches
}

0 comments on commit 95790f2

Please sign in to comment.