-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Better abstract secondary storage #182
base: main
Are you sure you want to change the base?
Conversation
…ondary insertions
…ondary insertions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the refactor! Think there are ways to make the API for secondary struct easier to understand and use though..! I'm not a big fan of async APIs. See my reasoning here: https://www.notion.so/eigen-labs/Bls-Agg-Service-2-0-synchronous-API-98abef46040a48fc8d044e7de0781839
Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via: | ||
``` | ||
docker pull redis | ||
docker pull minio | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really needed? I would think testcontainer would still pull the images when attempting to run them if they are not present locally?
server/load_store.go
Outdated
fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) | ||
caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) | ||
secondary, err := store.NewSecondaryRouter(log, caches, fallbacks) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap error
// primary storage backends | ||
eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client | ||
s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// primary storage backends | |
eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client | |
s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type | |
// primary storage backends: | |
// GeneratedKeyStore is used for simple commitment mode && generic OP commitment mode | |
// PrecomputedKeyStore is used for keccak256 OP commitment mode | |
eigenda GeneratedKeyStore | |
s3 PrecomputedKeyStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is correct. Tried to make your comments mroe precise (I was confused by them) but might have misunderstood them.
store/secondary.go
Outdated
// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same | ||
// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type | ||
// vs a fallback read type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rephrase? very long sentence, I don't understand it.
store/secondary.go
Outdated
for _, src := range sources { | ||
err := src.Put(ctx, key, value) | ||
if err != nil { | ||
r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) | ||
} else { | ||
successes++ | ||
} | ||
} | ||
|
||
if successes == 0 { | ||
return errors.New("failed to write blob to any redundant targets") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use an errgroup here instead? If there are a bunch of sources, we should prob write to them in parallel?
store/secondary.go
Outdated
Ingress() chan<- PutNotif | ||
CachingEnabled() bool | ||
FallbackEnabled() bool | ||
HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error | ||
MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) | ||
StreamProcess(context.Context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments to explain the interface. Not sure how this works from first reading. Ingress, StreamProcess, etc. are not super descriptive. Might want to use longer more descriptive names?
server/load_store.go
Outdated
for i := 0; i < 10; i++ { | ||
go secondary.StreamProcess(ctx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
am I understanding correctly that you're spinning up 10 workers to pull from the queue? Are we sure that 1 is not enough? Why is 10 sufficient? Can we make it a config parameter instead. Also probably change the name StreamProcess to RequestProcessor or something more explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe if you want to go this route to expose an init()
function or startWorkers(numWorkers uint64)
function which spins up the goroutines?
if r.secondary.FallbackEnabled() { | ||
data, err = r.secondary.MultiSourceRead(ctx, key, true, r.eigenda.Verify) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking through fallbacks some more. Do we really need them? I can't think of a use case when I would want a fallback instead of a cache. Why would I only want to read after eigenDA, instead of always read before eigenDA? Would greatly simplify the code if we could just get rid of fallbacks. But I might be missing something here still..
…via metrics - cleanups
…via metrics - refactors and lints
…via metrics - refactors and lints
…via metrics - refactors and lints
…via metrics - ensure thread safety for secondary stores
Fixes Issue
Related to #164
first PR of likely a few - initially wanna refactor so we can extend this be concurrently decoupled from the request processing flow. E.g, with 2 secondaries the dispersal latency could increase to a few seconds. With
EigenDAV2
we expect avg request latency to be around 30 seconds but someone overlaying secondaries could increase this to e.g 32-45 seconds which would cause a diff of 100 - 200 kb/s (post-compression && serial submission strategy):Fixes #
Changes proposed
Screenshots (Optional)
Note to reviewers