diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml index 5a2de8703bf..c96bbe3b2c3 100644 --- a/.github/workflows/changelog.yml +++ b/.github/workflows/changelog.yml @@ -9,6 +9,7 @@ on: - reopened - labeled - unlabeled + - ready_for_review paths: - '**.go' - '**/go.mod' @@ -17,7 +18,8 @@ on: jobs: changelog: if: contains(github.event.pull_request.title, '[skip changelog]') == false && - contains(github.event.pull_request.labels.*.name, 'skip/changelog') == false + contains(github.event.pull_request.labels.*.name, 'skip/changelog') == false && + github.event.pull_request.draft == false runs-on: ubuntu-latest name: Changelog steps: diff --git a/.github/workflows/pr-title-check.yml b/.github/workflows/pr-title-check.yml index 84c7cae525f..f9a1deffb24 100644 --- a/.github/workflows/pr-title-check.yml +++ b/.github/workflows/pr-title-check.yml @@ -2,13 +2,18 @@ name: PR Title Check on: pull_request_target: - types: [opened, edited, synchronize, reopened] + types: + - opened + - edited + - reopened + - ready_for_review permissions: pull-requests: write jobs: check-pr-title: + if: github.event.pull_request.draft == false name: Check PR Title runs-on: ubuntu-latest steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 14d892a7f1e..1c9a1dd2eaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,31 +2,12 @@ # UNRELEASED -- https://github.com/filecoin-project/lotus/pull/12203: Fix slice modification bug in ETH Tx Events Bloom Filter -- https://github.com/filecoin-project/lotus/pull/12221: Fix a nil reference panic in the ETH Trace API -- https://github.com/filecoin-project/lotus/pull/12112: Moved consts from build/ to build/buildconstants/ for ligher curio deps. -- https://github.com/filecoin-project/lotus/pull/12237: Upgrade to go-f3 `v0.0.4`. -- https://github.com/filecoin-project/lotus/pull/12251: Dropping support from ProveCommitSector1 method from lotus-miner -- https://github.com/filecoin-project/lotus/pull/12276: chore: deps: Update GST, Filecoin-FFI and Actors to final versions NV23 -- https://github.com/filecoin-project/lotus/pull/12278: chore: Set Mainnet upgrade epoch for NV23. -- https://github.com/filecoin-project/lotus/pull/12269 Fix `logIndex` ordering in `EthGetTransactionReceipt` by using the EventIndex to fetch logs -- https://github.com/filecoin-project/lotus/pull/12270: Feat expose `settle-deal` command for lotus miner to settle deals manually -- https://github.com/filecoin-project/lotus/pull/12285 Set up OpenTelemetry metrics reporting to prometheus -- https://github.com/filecoin-project/lotus/pull/12279 Upgrade to go-f3 v0.0.5 -- https://github.com/filecoin-project/lotus/pull/12295 Upgrade to go-f3 v0.0.6 -- https://github.com/filecoin-project/lotus/pull/12292: feat: p2p: allow overriding bootstrap nodes with environmemnt variable -- https://github.com/filecoin-project/lotus/pull/12319: feat: `lotus send CLI`: allow sending to ETH addresses -- https://github.com/filecoin-project/lotus/pull/12332: fix: ETH RPC: receipts: use correct txtype in receipts -- https://github.com/filecoin-project/lotus/pull/12335: fix: lotus-shed: store processed tipset after backfilling events -- https://github.com/filecoin-project/lotus/pull/12341: fix: miner: Fix DDO pledge math - ## ☢️ Upgrade Warnings ☢️ -- lotus-gateway behaviour, CLI arguments and APIs have received minor changes. See the improvements section below. - ## New features - feat: Add trace filter API supporting RPC method `trace_filter` ([filecoin-project/lotus#12123](https://github.com/filecoin-project/lotus/pull/12123)). Configuring `EthTraceFilterMaxResults` sets a limit on how many results are returned in any individual `trace_filter` RPC API call. +- feat: Add support for Badger version 4. The default remains Badger version 2 to ensure backward compatibility. ([filecoin-project/lotus#12316](https://github.com/filecoin-project/lotus/pull/12316)). Configuring `LOTUS_CHAINSTORE_BADGERVERSION` can configure lotus to use badger version 2 and 4. - feat: `FilecoinAddressToEthAddress` RPC can now return ETH addresses for all Filecoin address types ("f0"/"f1"/"f2"/"f3") based on client's re-org tolerance. This is a breaking change if you are using the API via the go-jsonrpc library or by using Lotus as a library, but is a non-breaking change when using the API via any other RPC method as it adds an optional second argument. ([filecoin-project/lotus#12324](https://github.com/filecoin-project/lotus/pull/12324)). - feat: Added `lotus-shed indexes inspect-events` health-check command ([filecoin-project/lotus#12346](https://github.com/filecoin-project/lotus/pull/12346)). @@ -38,16 +19,22 @@ ## Improvements -- feat!: gateway: fix rate limiting, better stateful handling ([filecoin-project/lotus#12315](https://github.com/filecoin-project/lotus/pull/12315)). - - CLI usage documentation has been improved for `lotus-gateway` - - `--per-conn-rate-limit` now works as advertised. - - `--eth-max-filters-per-conn` is new and allows you to set the maximum number of filters and subscription per connection, it defaults to 16. - - Previously, this limit was set to `16` and applied separately to filters and subscriptions. This limit is now unified and applies to both filters and subscriptions. - - Stateful Ethereum APIs (those involving filters and subscriptions) are now disabled for plain HTTP connections. A client must be using websockets to access these APIs. - - These APIs are also now automatically removed from the node by the gateway when a client disconnects. - - Some APIs have changed which may impact users consuming Lotus Gateway code as a library. - - The default value for the `Events.FilterTTL` config option has been reduced from 24h to 1h. This means that filters will expire on a Lotus node after 1 hour of not being accessed by the client. -- feat(f3): F3 has been updated with many performance improvements and additional metrics. +# 1.28.2 / 2024-08-15 + +This is a Lotus patch release v1.28.2 for Node operators and Storage Providers. + +For node operators, this patch release is HIGHLY RECOMMENDED as it fixes an issue where excessive bandwidth usage (issue #12381) was caused by a routing loop in pubsub, where small "manifest" messages were cycling repeatedly around the network due to an ineffective routing loop prevention mechanism. The new f3 release also has a couple performance improvements around CPU usage. (If you are curious about the progress of F3 testing, follow the updates [here](https://github.com/filecoin-project/lotus/discussions/12287#discussioncomment-10343447)). + +For storage providers, this patch release fixes pledge issues users have been encountering. This update addresses existing issues, including the too-small pledge in snap and the lack of DDO-awareness in PoRep Commit. + +## ☢️ Upgrade Warnings ☢️ +- The `releases` branch has been deprecated with the 202408 split of 'Lotus Node' and 'Lotus Miner'. See https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#why-is-the-releases-branch-deprecated-and-what-are-alternatives for more info and alternatives for getting the latest release for both the 'Lotus Node' and 'Lotus Miner' based on the [Branch and Tag Strategy](https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#branch-and-tag-strategy). + - To get the latest Lotus Node tag: `git tag -l 'v*' | sort -V -r | head -n 1` + - To get the latest Lotus Miner tag: `git tag -l 'miner/v*' | sort -V -r | head -n 1` +- Breaking change in Miner public APIs `storage/pipeline.NewPreCommitBatcher` and `storage/pipeline.New`. They now have an additional error return to deal with errors arising from fetching the sealing config. + +- https://github.com/filecoin-project/lotus/pull/12390: Update go-f3 to 0.2.0 +- https://github.com/filecoin-project/lotus/pull/12341: fix: miner: Fix DDO pledge math # v1.28.1 / 2024-07-24 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fa2df627df9..1eba2eb51fa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,10 +13,10 @@ Lotus is a universally open project and welcomes contributions of all kinds: cod ## Before Contributing -1. If the proposal entails a protocol change, please first submit a [Filecoin Improvement Proposal](https://github.com/filecoin-project/FIPs). -2. If the change is complex and requires prior discussion, [open an issue](github.com/filecoin-project/lotus/issues) or a [discussion](https://github.com/filecoin-project/lotus/discussions) to request feedback before you start working on a pull request. This is to avoid disappointment and sunk costs, in case the change is not actually needed or accepted. -3. Please refrain from submitting PRs to adapt existing code to subjective preferences. The changeset should contain functional or technical improvements/enhancements, bug fixes, new features, or some other clear material contribution. Simple stylistic changes are likely to be rejected in order to reduce code churn. -4. Familiarize yourself with our [release flow](LOTUS_RELEASE_FLOW.md) to understand how changes are incorporated into releases. +1. Familiarize yourself with our [release flow](LOTUS_RELEASE_FLOW.md) to understand how changes are incorporated into releases. This includes our branch strategy and where to target PRs. +2. If the proposal entails a protocol change, please first submit a [Filecoin Improvement Proposal](https://github.com/filecoin-project/FIPs). +3. If the change is complex and requires prior discussion, [open an issue](github.com/filecoin-project/lotus/issues) or a [discussion](https://github.com/filecoin-project/lotus/discussions) to request feedback before you start working on a pull request. This is to avoid disappointment and sunk costs, in case the change is not actually needed or accepted. +4. Please refrain from submitting PRs to adapt existing code to subjective preferences. The changeset should contain functional or technical improvements/enhancements, bug fixes, new features, or some other clear material contribution. Simple stylistic changes are likely to be rejected in order to reduce code churn. ## Implementing Changes diff --git a/LOTUS_RELEASE_FLOW.md b/LOTUS_RELEASE_FLOW.md index 387c8be6e38..3223603a630 100644 --- a/LOTUS_RELEASE_FLOW.md +++ b/LOTUS_RELEASE_FLOW.md @@ -107,24 +107,19 @@ Unless a security issue is actively being exploited or a significant number of u ## Branch and Tag Strategy -> [!NOTE] -> - Blue text indicates node-related information. -> - Orange text indicates miner-related information. -> - System default colored text applies to both node and miner releases. - -* Releases are branched from the `master` branch, regardless of whether they include a network upgrade or not. +* Releases are usually branched from the `master` branch, regardless of whether they include a network upgrade or not. + * For certain patch releases where we can't risk including recent `master` changes (such as for security or emergency bug-fix releases): + * Node: `release/vX.Y.Z+1` will be created from `release/vX.Y.Z` + * Miner: `release/miner/vX.Y.Z+1` will be created from `release/miner/vX.Y.Z` * PRs usually target the `master` branch, even if they need to be backported to a release branch. + * The primary exception is CHANGELOG editorializing and callouts. As part of the [release process](https://github.com/filecoin-project/lotus/blob/master/documentation/misc/RELEASE_ISSUE_TEMPLATE.md), those changes happen directly in a release branch and are cherry-picked back to `master` at the end of a release. * PRs that need to be backported should be marked with a `backport` label. -* Node release branches are named `release/vX.Y.Z` -* Miner release branches are named `release/miner/vX.Y.Z` +* Node release branches are named `release/vX.Y.Z` +* Miner release branches are named `release/miner/vX.Y.Z` * By the end of the release process: - * A `release/vX.Y.Z` branch (node) will have an associated `vX.Y.Z` tag - * A `release/miner/vX.Y.Z` branch (miner) will have an associated `miner/vX.Y.Z` tag -* Both node and miner releases may have additional `vX.Y.Z-rcN` or `miner/vX.Y.Z-rcN` tags for release candidates -* The `master` branch is typically the source for creating release branches -* For emergency patch releases where we can't risk including recent `master` changes: - * Node: `release/vX.Y.Z+1` will be created from `release/vX.Y.Z` - * Miner: `release/miner/vX.Y.Z+1` will be created from `release/miner/vX.Y.Z` + * A `release/vX.Y.Z` branch (node) will have an associated `vX.Y.Z` tag + * A `release/miner/vX.Y.Z` branch (miner) will have an associated `miner/vX.Y.Z` tag +* Both node and miner releases may have additional `vX.Y.Z-rcN` or `miner/vX.Y.Z-rcN` tags for release candidates. * As of 202408, the `releases` branch is no longer used and no longer tracks the latest release. See [Why is the `releases` branch deprecated and what are alternatives?](#why-is-the-releases-branch-deprecated-and-what-are-alternatives). ## FAQ diff --git a/README.md b/README.md index 90436576728..5c088bf706e 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Lotus is an implementation of the Filecoin Distributed Storage Network. For more ## Building & Documentation -> Note: The default `master` branch is the dev branch, please use with caution. For the latest stable version, checkout the most recent [`Latest release`](https://github.com/filecoin-project/lotus/releases). +> Note: The default `master` branch is the dev branch, please use with caution. For the latest stable version, checkout the [latest release](https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#why-is-the-releases-branch-deprecated-and-what-are-alternatives). For complete instructions on how to build, install and setup lotus, please visit [https://lotus.filecoin.io](https://lotus.filecoin.io/lotus/install/prerequisites/#supported-platforms). Basic build instructions can be found further down in this readme. @@ -83,9 +83,9 @@ Once all the dependencies are installed, you can build and install the Lotus sui cd lotus/ ``` -Note: The default branch `master` is the dev branch where the latest new features, bug fixes and improvement are in. However, if you want to run lotus on Filecoin mainnet and want to run a production-ready lotus, get the latest release[ here](https://github.com/filecoin-project/lotus/releases). +Note: The default branch `master` is the dev branch where the latest new features, bug fixes and improvement are in. However, if you want to run lotus on Filecoin mainnet and want to run a production-ready lotus, get the [latest release](https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#why-is-the-releases-branch-deprecated-and-what-are-alternatives). -2. To join mainnet, checkout the [latest release](https://github.com/filecoin-project/lotus/releases). +2. To join mainnet, checkout the [latest release](https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#why-is-the-releases-branch-deprecated-and-what-are-alternatives). If you are changing networks from a previous Lotus installation or there has been a network reset, read the [Switch networks guide](https://lotus.filecoin.io/lotus/manage/switch-networks/) before proceeding. diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 269870e2320..b3611dd2b47 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -10,9 +10,6 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/options" - badgerstruct "github.com/dgraph-io/badger/v2/pb" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -20,10 +17,16 @@ import ( pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-base32" "go.uber.org/multierr" - "go.uber.org/zap" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/blockstore/badger/versions" + badger "github.com/filecoin-project/lotus/blockstore/badger/versions" +) + +// aliases to mask badger dependencies. +const ( + defaultGCThreshold = 0.125 ) var ( @@ -39,46 +42,6 @@ var ( log = logger.Logger("badgerbs") ) -// aliases to mask badger dependencies. -const ( - // FileIO is equivalent to badger/options.FileIO. - FileIO = options.FileIO - // MemoryMap is equivalent to badger/options.MemoryMap. - MemoryMap = options.MemoryMap - // LoadToRAM is equivalent to badger/options.LoadToRAM. - LoadToRAM = options.LoadToRAM - defaultGCThreshold = 0.125 -) - -// Options embeds the badger options themselves, and augments them with -// blockstore-specific options. -type Options struct { - badger.Options - - // Prefix is an optional prefix to prepend to keys. Default: "". - Prefix string -} - -func DefaultOptions(path string) Options { - return Options{ - Options: badger.DefaultOptions(path), - Prefix: "", - } -} - -// badgerLogger is a local wrapper for go-log to make the interface -// compatible with badger.Logger (namely, aliasing Warnf to Warningf) -type badgerLogger struct { - *zap.SugaredLogger // skips 1 caller to get useful line info, skipping over badger.Options. - - skip2 *zap.SugaredLogger // skips 2 callers, just like above + this logger. -} - -// Warningf is required by the badger logger APIs. -func (b *badgerLogger) Warningf(format string, args ...interface{}) { - b.skip2.Warnf(format, args...) -} - // bsState is the current blockstore state type bsState int @@ -115,9 +78,9 @@ type Blockstore struct { moveState bsMoveState rlock int - db *badger.DB - dbNext *badger.DB // when moving - opts Options + db badger.BadgerDB + dbNext badger.BadgerDB // when moving + opts badger.Options prefixing bool prefix []byte @@ -132,13 +95,9 @@ var _ blockstore.BlockstoreSize = (*Blockstore)(nil) var _ io.Closer = (*Blockstore)(nil) // Open creates a new badger-backed blockstore, with the supplied options. -func Open(opts Options) (*Blockstore, error) { - opts.Logger = &badgerLogger{ - SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), - skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), - } +func Open(opts badger.Options) (*Blockstore, error) { - db, err := badger.Open(opts.Options) + db, err := badger.OpenBadgerDB(opts) if err != nil { return nil, fmt.Errorf("failed to open badger blockstore: %w", err) } @@ -315,10 +274,10 @@ func (b *Blockstore) movingGC(ctx context.Context) error { log.Infof("moving blockstore from %s to %s", b.opts.Dir, newPath) opts := b.opts - opts.Dir = newPath - opts.ValueDir = newPath + opts.SetDir(newPath) + opts.SetValueDir(newPath) - dbNew, err := badger.Open(opts.Options) + dbNew, err := badger.OpenBadgerDB(opts) if err != nil { return fmt.Errorf("failed to open badger blockstore in %s: %w", newPath, err) } @@ -391,65 +350,8 @@ func symlink(path, linkTo string) error { } // doCopy copies a badger blockstore to another -func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) { - batch := to.NewWriteBatch() - defer func() { - if defErr == nil { - defErr = batch.Flush() - } - if defErr != nil { - batch.Cancel() - } - }() - - return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error { - // check whether context is closed on every kv group - if err := ctx.Err(); err != nil { - return err - } - for _, kv := range kvs { - if err := batch.Set(kv.Key, kv.Value); err != nil { - return err - } - } - return nil - }) -} - -var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 ) - -func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error { - workers := IterateLSMWorkers - if workers == 0 { - workers = between(2, 8, runtime.NumCPU()/2) - } - - stream := db.NewStream() - stream.NumGo = workers - stream.LogPrefix = "iterateBadgerKVs" - stream.Send = func(kvl *badgerstruct.KVList) error { - kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv)) - for _, kv := range kvl.Kv { - if kv.Key != nil && kv.Value != nil { - kvs = append(kvs, kv) - } - } - if len(kvs) == 0 { - return nil - } - return iter(kvs) - } - return stream.Orchestrate(ctx) -} - -func between(min, max, val int) int { - if val > max { - val = max - } - if val < min { - val = min - } - return val +func (b *Blockstore) doCopy(ctx context.Context, from versions.BadgerDB, to versions.BadgerDB) error { + return from.Copy(ctx, to) } func (b *Blockstore) deleteDB(path string) { @@ -505,7 +407,7 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq } } - if err == badger.ErrNoRewrite { + if err == b.db.GetErrNoRewrite() { // not really an error in this case, it signals the end of GC return nil } @@ -578,7 +480,7 @@ func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGC // Note no compaction needed before single GC as we will hit at most one vlog anyway err := b.db.RunValueLogGC(threshold) - if err == badger.ErrNoRewrite { + if err == b.db.GetErrNoRewrite() { // not really an error in this case, it signals the end of GC return nil } @@ -636,11 +538,14 @@ func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) erro defer KeyPool.Put(k) } - return b.db.View(func(txn *badger.Txn) error { + return b.db.View(func(txn badger.Txn) error { + + errKeyNotFound := b.db.GetErrKeyNotFound() + switch item, err := txn.Get(k); err { case nil: return item.Value(fn) - case badger.ErrKeyNotFound: + case errKeyNotFound: return ipld.ErrNotFound{Cid: cid} default: return fmt.Errorf("failed to view block from badger blockstore: %w", err) @@ -683,13 +588,14 @@ func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { defer KeyPool.Put(k) } - err := b.db.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn badger.Txn) error { _, err := txn.Get(k) return err }) + errKeyNotFound := b.db.GetErrKeyNotFound() switch err { - case badger.ErrKeyNotFound: + case errKeyNotFound: return false, nil case nil: return true, nil @@ -718,12 +624,13 @@ func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) } var val []byte - err := b.db.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn badger.Txn) error { + errKeyNotFound := b.db.GetErrKeyNotFound() switch item, err := txn.Get(k); err { case nil: val, err = item.ValueCopy(nil) return err - case badger.ErrKeyNotFound: + case errKeyNotFound: return ipld.ErrNotFound{Cid: cid} default: return fmt.Errorf("failed to get block from badger blockstore: %w", err) @@ -751,11 +658,12 @@ func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { } var size int - err := b.db.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn badger.Txn) error { + errKeyNotFound := b.db.GetErrKeyNotFound() switch item, err := txn.Get(k); err { case nil: size = int(item.ValueSize()) - case badger.ErrKeyNotFound: + case errKeyNotFound: return ipld.ErrNotFound{Cid: cid} default: return fmt.Errorf("failed to get block size from badger blockstore: %w", err) @@ -805,10 +713,13 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { keys = append(keys, k) } - err := b.db.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn badger.Txn) error { + + errKeyNotFound := b.db.GetErrKeyNotFound() + for i, k := range keys { switch _, err := txn.Get(k); err { - case badger.ErrKeyNotFound: + case errKeyNotFound: case nil: keys[i] = nil default: @@ -822,7 +733,7 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { return err } - put := func(db *badger.DB) error { + put := func(db badger.BadgerDB) error { batch := db.NewWriteBatch() defer batch.Cancel() @@ -1070,6 +981,6 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte { // DB is added for lotus-shed needs // WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE -func (b *Blockstore) DB() *badger.DB { +func (b *Blockstore) DB() badger.BadgerDB { return b.db } diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index d253f37d95d..725a20f1dd4 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -16,69 +16,89 @@ import ( "golang.org/x/sync/errgroup" "github.com/filecoin-project/lotus/blockstore" + versions "github.com/filecoin-project/lotus/blockstore/badger/versions" ) +const ( + BADGER_VERSION_2 = 2 + BADGER_VERSION_4 = 4 +) + +var SUPPORTED_BADGER_VERSIONS = []int{BADGER_VERSION_2, BADGER_VERSION_4} + func TestBadgerBlockstore(t *testing.T) { //stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001 //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 - (&Suite{ - NewBlockstore: newBlockstore(DefaultOptions), - OpenBlockstore: openBlockstore(DefaultOptions), - }).RunTests(t, "non_prefixed") - - prefixed := func(path string) Options { - opts := DefaultOptions(path) - opts.Prefix = "/prefixed/" - return opts - } + for _, version := range SUPPORTED_BADGER_VERSIONS { + t.Run(fmt.Sprintf("non_prefixed_v%d", version), func(t *testing.T) { + (&Suite{ + NewBlockstore: newBlockstore(versions.DefaultOptions, version), + OpenBlockstore: openBlockstore(versions.DefaultOptions, version), + }).RunTests(t, "non_prefixed") + }) + + t.Run(fmt.Sprintf("prefixed_v%d", version), func(t *testing.T) { + prefixed := func(path string, _ bool, _ int) versions.Options { + opts := versions.DefaultOptions(path, false, version) + opts.Prefix = "/prefixed/" + return opts + } - (&Suite{ - NewBlockstore: newBlockstore(prefixed), - OpenBlockstore: openBlockstore(prefixed), - }).RunTests(t, "prefixed") + (&Suite{ + NewBlockstore: newBlockstore(prefixed, version), + OpenBlockstore: openBlockstore(prefixed, version), + }).RunTests(t, "prefixed") + }) + } } func TestStorageKey(t *testing.T) { //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 //stm: @SPLITSTORE_BADGER_STORAGE_KEY_001 - bs, _ := newBlockstore(DefaultOptions)(t) - bbs := bs.(*Blockstore) - defer bbs.Close() //nolint:errcheck - - cid1 := blocks.NewBlock([]byte("some data")).Cid() - cid2 := blocks.NewBlock([]byte("more data")).Cid() - cid3 := blocks.NewBlock([]byte("a little more data")).Cid() - require.NotEqual(t, cid1, cid2) // sanity check - require.NotEqual(t, cid2, cid3) // sanity check - - // nil slice; let StorageKey allocate for us. - k1 := bbs.StorageKey(nil, cid1) - require.Len(t, k1, 55) - require.True(t, cap(k1) == len(k1)) - - // k1's backing array is reused. - k2 := bbs.StorageKey(k1, cid2) - require.Len(t, k2, 55) - require.True(t, cap(k2) == len(k1)) - - // bring k2 to len=0, and verify that its backing array gets reused - // (i.e. k1 and k2 are overwritten) - k3 := bbs.StorageKey(k2[:0], cid3) - require.Len(t, k3, 55) - require.True(t, cap(k3) == len(k3)) - - // backing array of k1 and k2 has been modified, i.e. memory is shared. - require.Equal(t, k3, k1) - require.Equal(t, k3, k2) + for _, version := range SUPPORTED_BADGER_VERSIONS { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + bs, _ := newBlockstore(versions.DefaultOptions, version)(t) + bbs := bs.(*Blockstore) + defer bbs.Close() //nolint:errcheck + + cid1 := blocks.NewBlock([]byte("some data")).Cid() + cid2 := blocks.NewBlock([]byte("more data")).Cid() + cid3 := blocks.NewBlock([]byte("a little more data")).Cid() + require.NotEqual(t, cid1, cid2) // sanity check + require.NotEqual(t, cid2, cid3) // sanity check + + // nil slice; let StorageKey allocate for us. + k1 := bbs.StorageKey(nil, cid1) + fmt.Println(cid1) + fmt.Println(k1) + require.Len(t, k1, 55) + require.True(t, cap(k1) == len(k1)) + + // k1's backing array is reused. + k2 := bbs.StorageKey(k1, cid2) + require.Len(t, k2, 55) + require.True(t, cap(k2) == len(k1)) + + // bring k2 to len=0, and verify that its backing array gets reused + // (i.e. k1 and k2 are overwritten) + k3 := bbs.StorageKey(k2[:0], cid3) + require.Len(t, k3, 55) + require.True(t, cap(k3) == len(k3)) + + // backing array of k1 and k2 has been modified, i.e. memory is shared. + require.Equal(t, k3, k1) + require.Equal(t, k3, k2) + }) + } } -func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { +func newBlockstore(optsSupplier func(path string, readonly bool, badgerVersion int) versions.Options, version int) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { tb.Helper() path = tb.TempDir() - db, err := Open(optsSupplier(path)) + db, err := Open(optsSupplier(path, false, version)) if err != nil { tb.Fatal(err) } @@ -87,20 +107,20 @@ func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) ( } } -func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) { +func openBlockstore(optsSupplier func(path string, readonly bool, badgerVersion int) versions.Options, version int) func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) { return func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) { tb.Helper() - return Open(optsSupplier(path)) + return Open(optsSupplier(path, false, version)) } } -func testMove(t *testing.T, optsF func(string) Options) { +func testMove(t *testing.T, badgerVersion int, optsF func(string, bool, int) versions.Options) { ctx := context.Background() basePath := t.TempDir() dbPath := filepath.Join(basePath, "db") - db, err := Open(optsF(dbPath)) + db, err := Open(optsF(dbPath, false, badgerVersion)) if err != nil { t.Fatal(err) } @@ -243,7 +263,7 @@ func testMove(t *testing.T, optsF func(string) Options) { t.Fatal(err) } - db, err = Open(optsF(dbPath)) + db, err = Open(optsF(dbPath, false, badgerVersion)) if err != nil { t.Fatal(err) } @@ -257,16 +277,26 @@ func TestMoveNoPrefix(t *testing.T) { //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 //stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001 //stm: @SPLITSTORE_BADGER_DELETE_001, @SPLITSTORE_BADGER_COLLECT_GARBAGE_001 - testMove(t, DefaultOptions) + for _, version := range SUPPORTED_BADGER_VERSIONS { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + testMove(t, version, func(path string, readonly bool, badgerVersion int) versions.Options { + return versions.DefaultOptions(path, false, version) + }) + }) + } } func TestMoveWithPrefix(t *testing.T) { //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 //stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001 //stm: @SPLITSTORE_BADGER_DELETE_001, @SPLITSTORE_BADGER_COLLECT_GARBAGE_001 - testMove(t, func(path string) Options { - opts := DefaultOptions(path) - opts.Prefix = "/prefixed/" - return opts - }) + for _, version := range SUPPORTED_BADGER_VERSIONS { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + testMove(t, version, func(path string, readonly bool, badgerVersion int) versions.Options { + opts := versions.DefaultOptions(path, false, version) + opts.Prefix = "/prefixed/" + return opts + }) + }) + } } diff --git a/blockstore/badger/versions/badger.go b/blockstore/badger/versions/badger.go new file mode 100644 index 00000000000..59d9249cf65 --- /dev/null +++ b/blockstore/badger/versions/badger.go @@ -0,0 +1,49 @@ +package versions + +import ( + "fmt" + + badgerV2 "github.com/dgraph-io/badger/v2" + badgerV4 "github.com/dgraph-io/badger/v4" + "go.uber.org/zap" +) + +// BadgerLogger is a local wrapper for go-log to make the interface +// compatible with badger.Logger (namely, aliasing Warnf to Warningf) +type BadgerLogger struct { + *zap.SugaredLogger // skips 1 caller to get useful line info, skipping over badger.Options. + + Skip2 *zap.SugaredLogger // skips 2 callers, just like above + this logger. +} + +// Warningf is required by the badger logger APIs. +func (b *BadgerLogger) Warningf(format string, args ...interface{}) { + b.Skip2.Warnf(format, args...) +} + +func OpenBadgerDB(opts Options) (BadgerDB, error) { + var db BadgerDB + var err error + + switch opts.BadgerVersion { + case 4: + var dbV4 *badgerV4.DB + dbV4, err = badgerV4.Open(opts.V4Options) + if err == nil { + db = BadgerDB(&BadgerV4{dbV4}) + } + case 2: + var dbV2 *badgerV2.DB + dbV2, err = badgerV2.Open(opts.V2Options) + if err == nil { + db = BadgerDB(&BadgerV2{dbV2}) + } + default: + err = fmt.Errorf("unsupported badger version: %v", opts.BadgerVersion) + } + + if err != nil { + return nil, err + } + return db, nil +} diff --git a/blockstore/badger/versions/badger_interface.go b/blockstore/badger/versions/badger_interface.go new file mode 100644 index 00000000000..ecad48c94ab --- /dev/null +++ b/blockstore/badger/versions/badger_interface.go @@ -0,0 +1,88 @@ +package versions + +import ( + "context" + "io" + + "github.com/dgraph-io/ristretto" +) + +// BadgerDB defines the common interface for both v2 and v4 versions of Badger. +type BadgerDB interface { + Close() error + IsClosed() bool + NewStream() BadgerStream + Update(func(txn Txn) error) error + View(func(txn Txn) error) error + NewTransaction(update bool) Txn + RunValueLogGC(discardRatio float64) error + Sync() error + MaxBatchCount() int64 + MaxBatchSize() int64 + IndexCacheMetrics() *ristretto.Metrics + GetErrKeyNotFound() error + GetErrNoRewrite() error + NewWriteBatch() WriteBatch + Flatten(workers int) error + Size() (lsm int64, vlog int64) + Copy(ctx context.Context, to BadgerDB) error + Load(r io.Reader, maxPendingWrites int) error + Backup(w io.Writer, since uint64) (uint64, error) +} + +// BadgerStream defines the common interface for streaming data in Badger. +type BadgerStream interface { + SetNumGo(numGo int) + SetLogPrefix(prefix string) + + Orchestrate(ctx context.Context) error + ForEach(ctx context.Context, fn func(key string, value string) error) error +} + +// Txn defines the common interface for transactions in Badger. +type Txn interface { + Get(key []byte) (Item, error) + Set(key, val []byte) error + Delete(key []byte) error + Commit() error + Discard() + NewIterator(opts IteratorOptions) Iterator +} + +type IteratorOptions struct { + PrefetchSize int + Prefix []byte +} + +type Iterator interface { + Next() + Rewind() + Seek(key []byte) + Close() + Item() Item + Valid() bool +} + +// Item defines the common interface for items in a transaction. +type Item interface { + Value(fn func([]byte) error) error + Key() []byte + ValueCopy(dst []byte) ([]byte, error) + ValueSize() int64 +} + +type WriteBatch interface { + Set(key, val []byte) error + Delete(key []byte) error + Flush() error + Cancel() +} + +type KVList interface { + GetKV() []*KV +} + +type KV struct { + Key []byte + Value []byte +} diff --git a/blockstore/badger/versions/badger_v2.go b/blockstore/badger/versions/badger_v2.go new file mode 100644 index 00000000000..26e5f528c0f --- /dev/null +++ b/blockstore/badger/versions/badger_v2.go @@ -0,0 +1,274 @@ +package versions + +import ( + "context" + "io" + "runtime" + + badger "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dgraph-io/ristretto" + "golang.org/x/xerrors" +) + +// BadgerV2 wraps the Badger v2 database to implement the BadgerDB interface. +type BadgerV2 struct { + *badger.DB +} + +func (b *BadgerV2) Close() error { + return b.DB.Close() +} + +func (b *BadgerV2) IsClosed() bool { + return b.DB.IsClosed() +} + +func (b *BadgerV2) NewStream() BadgerStream { + return &BadgerV2Stream{b.DB.NewStream()} +} + +func (b *BadgerV2) Update(fn func(txn Txn) error) error { + return b.DB.Update(func(txn *badger.Txn) error { + return fn(&BadgerV2Txn{txn}) + }) +} + +func (b *BadgerV2) View(fn func(txn Txn) error) error { + return b.DB.View(func(txn *badger.Txn) error { + return fn(&BadgerV2Txn{txn}) + }) +} + +func (b *BadgerV2) NewTransaction(update bool) Txn { + return &BadgerV2Txn{b.DB.NewTransaction(update)} +} + +func (b *BadgerV2) RunValueLogGC(discardRatio float64) error { + return b.DB.RunValueLogGC(discardRatio) +} + +func (b *BadgerV2) Sync() error { + return b.DB.Sync() +} + +func (b *BadgerV2) MaxBatchCount() int64 { + return b.DB.MaxBatchCount() +} +func (b *BadgerV2) MaxBatchSize() int64 { + return b.DB.MaxBatchSize() +} + +func (b *BadgerV2) IndexCacheMetrics() *ristretto.Metrics { + return b.DB.IndexCacheMetrics() +} + +func (b *BadgerV2) GetErrKeyNotFound() error { + return badger.ErrKeyNotFound +} + +func (b *BadgerV2) GetErrNoRewrite() error { + return badger.ErrNoRewrite +} + +func (b *BadgerV2) NewWriteBatch() WriteBatch { + return &BadgerV2WriteBatch{b.DB.NewWriteBatch()} +} + +func (b *BadgerV2) Flatten(workers int) error { + return b.DB.Flatten(workers) +} + +func (b *BadgerV2) Size() (lsm int64, vlog int64) { + return b.DB.Size() +} + +func (b *BadgerV2) Copy(ctx context.Context, to BadgerDB) (defErr error) { + + batch := to.NewWriteBatch() + defer func() { + if defErr == nil { + defErr = batch.Flush() + } + if defErr != nil { + batch.Cancel() + } + }() + + stream := b.DB.NewStream() + + return iterateBadger(ctx, stream, func(kvs []*pb.KV) error { + // check whether context is closed on every kv group + if err := ctx.Err(); err != nil { + return err + } + for _, kv := range kvs { + if err := batch.Set(kv.Key, kv.Value); err != nil { + return err + } + } + return nil + }) +} + +var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 ) + +func iterateBadger(ctx context.Context, stream *badger.Stream, iter func([]*pb.KV) error) error { + workers := IterateLSMWorkers + if workers == 0 { + workers = between(2, 8, runtime.NumCPU()/2) + } + + stream.NumGo = workers + stream.LogPrefix = "iterateBadgerKVs" + stream.Send = func(kvl *pb.KVList) error { + kvs := make([]*pb.KV, 0, len(kvl.Kv)) + for _, kv := range kvl.Kv { + if kv.Key != nil && kv.Value != nil { + kvs = append(kvs, kv) + } + } + if len(kvs) == 0 { + return nil + } + return iter(kvs) + } + return stream.Orchestrate(ctx) +} + +func between(min, max, val int) int { + if val > max { + val = max + } + if val < min { + val = min + } + return val +} + +func (b *BadgerV2) Load(r io.Reader, maxPendingWrites int) error { + return b.DB.Load(r, maxPendingWrites) +} + +func (b *BadgerV2) Backup(w io.Writer, since uint64) (uint64, error) { + return b.DB.Backup(w, since) +} + +type BadgerV2WriteBatch struct { + *badger.WriteBatch +} + +func (wb *BadgerV2WriteBatch) Set(key, val []byte) error { + return wb.WriteBatch.Set(key, val) +} + +func (wb *BadgerV2WriteBatch) Delete(key []byte) error { + return wb.WriteBatch.Delete(key) +} + +func (wb *BadgerV2WriteBatch) Flush() error { + return wb.WriteBatch.Flush() +} + +func (wb *BadgerV2WriteBatch) Cancel() { + wb.WriteBatch.Cancel() +} + +type BadgerV2Stream struct { + *badger.Stream +} + +func (s *BadgerV2Stream) SetNumGo(numGo int) { + s.Stream.NumGo = numGo +} + +func (s *BadgerV2Stream) SetLogPrefix(prefix string) { + s.Stream.LogPrefix = prefix +} + +func (s *BadgerV2Stream) Orchestrate(ctx context.Context) error { + return s.Stream.Orchestrate(ctx) +} + +func (s *BadgerV2Stream) ForEach(ctx context.Context, fn func(key string, value string) error) error { + s.Stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + err := fn(string(kv.Key), string(kv.Value)) + if err != nil { + return xerrors.Errorf("foreach function: %w", err) + } + + } + return nil + } + if err := s.Orchestrate(ctx); err != nil { + return xerrors.Errorf("orchestrate stream: %w", err) + } + return nil +} + +type BadgerV2Txn struct { + *badger.Txn +} + +func (txn *BadgerV2Txn) Get(key []byte) (Item, error) { + item, err := txn.Txn.Get(key) + return &BadgerV2Item{item}, err +} + +func (txn *BadgerV2Txn) Set(key, val []byte) error { + return txn.Txn.Set(key, val) +} + +func (txn *BadgerV2Txn) Delete(key []byte) error { + return txn.Txn.Delete(key) +} + +func (txn *BadgerV2Txn) Commit() error { + return txn.Txn.Commit() +} + +func (txn *BadgerV2Txn) Discard() { + txn.Txn.Discard() +} + +func (txn *BadgerV2Txn) NewIterator(opts IteratorOptions) Iterator { + badgerOpts := badger.DefaultIteratorOptions + badgerOpts.PrefetchSize = opts.PrefetchSize + badgerOpts.Prefix = opts.Prefix + return &BadgerV2Iterator{txn.Txn.NewIterator(badgerOpts)} +} + +type BadgerV2Iterator struct { + *badger.Iterator +} + +func (it *BadgerV2Iterator) Next() { it.Iterator.Next() } +func (it *BadgerV2Iterator) Rewind() { it.Iterator.Rewind() } +func (it *BadgerV2Iterator) Seek(key []byte) { it.Iterator.Seek(key) } +func (it *BadgerV2Iterator) Close() { it.Iterator.Close() } +func (it *BadgerV2Iterator) Item() Item { return &BadgerV2Item{it.Iterator.Item()} } +func (it *BadgerV2Iterator) Valid() bool { return it.Iterator.Valid() } + +type BadgerV2Item struct { + *badger.Item +} + +func (item *BadgerV2Item) Value(fn func([]byte) error) error { + return item.Item.Value(fn) +} + +func (item *BadgerV2Item) Key() []byte { + return item.Item.Key() +} + +func (item *BadgerV2Item) ValueCopy(dst []byte) ([]byte, error) { + return item.Item.ValueCopy(dst) +} + +func (item *BadgerV2Item) ValueSize() int64 { + return item.Item.ValueSize() +} diff --git a/blockstore/badger/versions/badger_v4.go b/blockstore/badger/versions/badger_v4.go new file mode 100644 index 00000000000..1b1af46d1d9 --- /dev/null +++ b/blockstore/badger/versions/badger_v4.go @@ -0,0 +1,273 @@ +package versions + +import ( + "context" + "fmt" + "io" + "runtime" + + badger "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/pb" + "github.com/dgraph-io/ristretto" + "github.com/dgraph-io/ristretto/z" + "golang.org/x/xerrors" +) + +// BadgerV4 wraps the Badger v4 database to implement the BadgerDB interface. +type BadgerV4 struct { + *badger.DB +} + +func (b *BadgerV4) Close() error { + return b.DB.Close() +} + +func (b *BadgerV4) IsClosed() bool { + return b.DB.IsClosed() +} + +func (b *BadgerV4) NewStream() BadgerStream { + return &BadgerV4Stream{b.DB.NewStream()} +} + +func (b *BadgerV4) Update(fn func(txn Txn) error) error { + return b.DB.Update(func(txn *badger.Txn) error { + return fn(&BadgerV4Txn{txn}) + }) +} + +func (b *BadgerV4) View(fn func(txn Txn) error) error { + return b.DB.View(func(txn *badger.Txn) error { + return fn(&BadgerV4Txn{txn}) + }) +} + +func (b *BadgerV4) NewTransaction(update bool) Txn { + return &BadgerV4Txn{b.DB.NewTransaction(update)} +} + +func (b *BadgerV4) RunValueLogGC(discardRatio float64) error { + return b.DB.RunValueLogGC(discardRatio) +} + +func (b *BadgerV4) Sync() error { + return b.DB.Sync() +} + +func (b *BadgerV4) MaxBatchCount() int64 { + return b.DB.MaxBatchCount() +} + +func (b *BadgerV4) MaxBatchSize() int64 { + return b.DB.MaxBatchSize() +} + +func (b *BadgerV4) IndexCacheMetrics() *ristretto.Metrics { + return b.DB.IndexCacheMetrics() +} + +func (b *BadgerV4) GetErrKeyNotFound() error { + return badger.ErrKeyNotFound +} + +func (b *BadgerV4) GetErrNoRewrite() error { + return badger.ErrNoRewrite +} + +func (b *BadgerV4) NewWriteBatch() WriteBatch { + return &BadgerV4WriteBatch{b.DB.NewWriteBatch()} +} + +func (b *BadgerV4) Flatten(workers int) error { + return b.DB.Flatten(workers) +} + +func (b *BadgerV4) Size() (lsm int64, vlog int64) { + return b.DB.Size() +} + +func (b *BadgerV4) Copy(ctx context.Context, to BadgerDB) (defErr error) { + + batch := to.NewWriteBatch() + defer func() { + if defErr == nil { + defErr = batch.Flush() + } + if defErr != nil { + batch.Cancel() + } + }() + + stream := b.DB.NewStream() + + return iterateBadgerV4(ctx, stream, func(kvs []*pb.KV) error { + // check whether context is closed on every kv group + if err := ctx.Err(); err != nil { + return err + } + for _, kv := range kvs { + if err := batch.Set(kv.Key, kv.Value); err != nil { + return err + } + } + return nil + }) +} + +func iterateBadgerV4(ctx context.Context, stream *badger.Stream, iter func([]*pb.KV) error) error { + workers := IterateLSMWorkers + if workers == 0 { + workers = between(2, 8, runtime.NumCPU()/2) + } + + stream.NumGo = workers + stream.LogPrefix = "iterateBadgerKVs" + stream.Send = func(buf *z.Buffer) error { + kvl, err := badger.BufferToKVList(buf) + if err != nil { + return fmt.Errorf("buffer to KV list conversion: %w", err) + } + + kvs := make([]*pb.KV, 0, len(kvl.Kv)) + for _, kv := range kvl.Kv { + if kv.Key != nil && kv.Value != nil { + kvs = append(kvs, kv) + } + } + if len(kvs) == 0 { + return nil + } + return iter(kvs) + } + return stream.Orchestrate(ctx) +} + +func (b *BadgerV4) Load(r io.Reader, maxPendingWrites int) error { + return b.DB.Load(r, maxPendingWrites) +} + +func (b *BadgerV4) Backup(w io.Writer, since uint64) (uint64, error) { + return b.DB.Backup(w, since) +} + +type BadgerV4WriteBatch struct { + *badger.WriteBatch +} + +func (wb *BadgerV4WriteBatch) Set(key, val []byte) error { + return wb.WriteBatch.Set(key, val) +} + +func (wb *BadgerV4WriteBatch) Delete(key []byte) error { + return wb.WriteBatch.Delete(key) +} + +func (wb *BadgerV4WriteBatch) Flush() error { + return wb.WriteBatch.Flush() +} + +func (wb *BadgerV4WriteBatch) Cancel() { + wb.WriteBatch.Cancel() +} + +type BadgerV4Stream struct { + *badger.Stream +} + +func (s *BadgerV4Stream) SetNumGo(numGo int) { + s.Stream.NumGo = numGo +} + +func (s *BadgerV4Stream) SetLogPrefix(prefix string) { + s.Stream.LogPrefix = prefix +} +func (s *BadgerV4Stream) ForEach(ctx context.Context, fn func(key string, value string) error) error { + s.Stream.Send = func(buf *z.Buffer) error { + list, err := badger.BufferToKVList(buf) + if err != nil { + return fmt.Errorf("buffer to KV list conversion: %w", err) + } + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + err := fn(string(kv.Key), string(kv.Value)) + if err != nil { + return xerrors.Errorf("foreach function: %w", err) + } + + } + return nil + } + if err := s.Orchestrate(ctx); err != nil { + return xerrors.Errorf("orchestrate stream: %w", err) + } + return nil +} + +func (s *BadgerV4Stream) Orchestrate(ctx context.Context) error { + return s.Stream.Orchestrate(ctx) +} + +type BadgerV4Txn struct { + *badger.Txn +} + +func (txn *BadgerV4Txn) Get(key []byte) (Item, error) { + item, err := txn.Txn.Get(key) + return &BadgerV4Item{item}, err +} + +func (txn *BadgerV4Txn) Set(key, val []byte) error { + return txn.Txn.Set(key, val) +} + +func (txn *BadgerV4Txn) Delete(key []byte) error { + return txn.Txn.Delete(key) +} + +func (txn *BadgerV4Txn) Commit() error { + return txn.Txn.Commit() +} + +func (txn *BadgerV4Txn) Discard() { + txn.Txn.Discard() +} + +func (txn *BadgerV4Txn) NewIterator(opts IteratorOptions) Iterator { + badgerOpts := badger.DefaultIteratorOptions + badgerOpts.PrefetchSize = opts.PrefetchSize + badgerOpts.Prefix = opts.Prefix + return &BadgerV4Iterator{txn.Txn.NewIterator(badgerOpts)} +} + +type BadgerV4Iterator struct { + *badger.Iterator +} + +func (it *BadgerV4Iterator) Next() { it.Iterator.Next() } +func (it *BadgerV4Iterator) Rewind() { it.Iterator.Rewind() } +func (it *BadgerV4Iterator) Seek(key []byte) { it.Iterator.Seek(key) } +func (it *BadgerV4Iterator) Close() { it.Iterator.Close() } +func (it *BadgerV4Iterator) Item() Item { return &BadgerV4Item{it.Iterator.Item()} } +func (it *BadgerV4Iterator) Valid() bool { return it.Iterator.Valid() } + +type BadgerV4Item struct { + *badger.Item +} + +func (item *BadgerV4Item) Value(fn func([]byte) error) error { + return item.Item.Value(fn) +} + +func (item *BadgerV4Item) Key() []byte { + return item.Item.Key() +} + +func (item *BadgerV4Item) ValueCopy(dst []byte) ([]byte, error) { + return item.Item.ValueCopy(dst) +} + +func (item *BadgerV4Item) ValueSize() int64 { + return item.Item.ValueSize() +} diff --git a/blockstore/badger/versions/options.go b/blockstore/badger/versions/options.go new file mode 100644 index 00000000000..9a5182a5ea5 --- /dev/null +++ b/blockstore/badger/versions/options.go @@ -0,0 +1,121 @@ +package versions + +import ( + "os" + "strconv" + + badgerV2 "github.com/dgraph-io/badger/v2" + optionsV2 "github.com/dgraph-io/badger/v2/options" + badgerV4 "github.com/dgraph-io/badger/v4" + optionsV4 "github.com/dgraph-io/badger/v4/options" +) + +// Options embeds the badger options themselves, and augments them with +// blockstore-specific options. +type Options struct { + V2Options badgerV2.Options + V4Options badgerV4.Options + + // BadgerVersion sets the release version of badger to use + BadgerVersion int + + Prefix string + Dir string + ValueDir string + SyncWrites bool + + Logger BadgerLogger +} + +func (o *Options) SetDir(dir string) { + o.Dir = dir + o.V2Options.Dir = dir + o.V4Options.Dir = dir +} + +func (o *Options) SetValueDir(valueDir string) { + o.ValueDir = valueDir + o.V2Options.ValueDir = valueDir + o.V4Options.ValueDir = valueDir +} + +func BlockStoreOptions(path string, readonly bool, badgerVersion int) Options { + opts := DefaultOptions(path, readonly, badgerVersion) + + // Due to legacy usage of blockstore.Blockstore, over a datastore, all + // blocks are prefixed with this namespace. In the future, this can go away, + // in order to shorten keys, but it'll require a migration. + opts.Prefix = "/blocks/" + + // Disable Snappy Compression + opts.V4Options.Compression = optionsV4.None + + // Blockstore values are immutable; therefore we do not expect any + // conflicts to emerge. + opts.V2Options.DetectConflicts = false + opts.V4Options.DetectConflicts = false + + // This is to optimize the database on close so it can be opened + // read-only and efficiently queried. + opts.V2Options.CompactL0OnClose = true + opts.V4Options.CompactL0OnClose = true + + // The alternative is "crash on start and tell the user to fix it". This + // will truncate corrupt and unsynced data, which we don't guarantee to + // persist anyways. + // Badger V4 has no such option + opts.V2Options.Truncate = true + + // We mmap the index and the value logs; this is important to enable + // zero-copy value access. + opts.V2Options.ValueLogLoadingMode = optionsV2.MemoryMap + opts.V2Options.TableLoadingMode = optionsV2.MemoryMap + + // Embed only values < 128 bytes in the LSM tree; larger values are stored + // in value logs. + opts.V2Options.ValueThreshold = 128 + opts.V4Options.ValueThreshold = 128 + + // Default table size is already 64MiB. This is here to make it explicit. + // Badger V4 removed the option + opts.V2Options.MaxTableSize = 64 << 20 + + // NOTE: The chain blockstore doesn't require any GC (blocks are never + // deleted). This will change if we move to a tiered blockstore. + + opts.V2Options.ReadOnly = readonly + opts.V4Options.ReadOnly = readonly + + // Envvar LOTUS_CHAIN_BADGERSTORE_COMPACTIONWORKERNUM + // Allows the number of compaction workers used by BadgerDB to be adjusted + // Unset - leaves the default number of compaction workers (4) + // "0" - disables compaction + // Positive integer - enables that number of compaction workers + if badgerNumCompactors, badgerNumCompactorsSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_COMPACTIONWORKERNUM"); badgerNumCompactorsSet { + if numWorkers, err := strconv.Atoi(badgerNumCompactors); err == nil && numWorkers >= 0 { + opts.V2Options.NumCompactors = numWorkers + opts.V4Options.NumCompactors = numWorkers + } + } + + return opts +} + +func DefaultOptions(path string, readonly bool, badgerVersion int) Options { + var opts Options + + opts.BadgerVersion = badgerVersion + + opts.SetDir(path) + opts.SetValueDir(path) + + //v2 + bopts := badgerV2.DefaultOptions(path) + opts.V2Options = bopts + + //v4 + boptsv4 := badgerV4.DefaultOptions(path) + opts.V4Options = boptsv4 + + return opts +} diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index 2dac673cd76..dbb12a28091 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -416,7 +416,7 @@ func openBadgerDB(path string, recover bool) (*badger.DB, error) { } opts.Logger = &badgerLogger{ SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), - skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + Skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), } return badger.Open(opts) @@ -443,7 +443,7 @@ func closeBadgerDB(db *badger.DB, path string, persist bool) error { // badger logging through go-log type badgerLogger struct { *zap.SugaredLogger - skip2 *zap.SugaredLogger + Skip2 *zap.SugaredLogger } func (b *badgerLogger) Warningf(format string, args ...interface{}) {} diff --git a/build/openrpc/full.json b/build/openrpc/full.json index 42d764b59af..fbb37165ab4 100644 --- a/build/openrpc/full.json +++ b/build/openrpc/full.json @@ -2,7 +2,7 @@ "openrpc": "1.2.6", "info": { "title": "Lotus RPC API", - "version": "1.28.2-dev" + "version": "1.29.1-dev" }, "methods": [ { diff --git a/build/openrpc/gateway.json b/build/openrpc/gateway.json index c48d4a27508..171febdee99 100644 --- a/build/openrpc/gateway.json +++ b/build/openrpc/gateway.json @@ -2,7 +2,7 @@ "openrpc": "1.2.6", "info": { "title": "Lotus RPC API", - "version": "1.28.2-dev" + "version": "1.29.1-dev" }, "methods": [ { diff --git a/build/openrpc/miner.json b/build/openrpc/miner.json index a2a5ae75af3..0ba3fde3e16 100644 --- a/build/openrpc/miner.json +++ b/build/openrpc/miner.json @@ -2,7 +2,7 @@ "openrpc": "1.2.6", "info": { "title": "Lotus RPC API", - "version": "1.28.2-dev" + "version": "1.29.1-dev" }, "methods": [ { diff --git a/build/openrpc/worker.json b/build/openrpc/worker.json index ed08b043887..f0a3b9e7ad5 100644 --- a/build/openrpc/worker.json +++ b/build/openrpc/worker.json @@ -2,7 +2,7 @@ "openrpc": "1.2.6", "info": { "title": "Lotus RPC API", - "version": "1.28.2-dev" + "version": "1.29.1-dev" }, "methods": [ { diff --git a/build/version.go b/build/version.go index 1da3be510aa..0a72a46e766 100644 --- a/build/version.go +++ b/build/version.go @@ -7,7 +7,7 @@ import ( ) // NodeBuildVersion is the local build version of the Lotus daemon -const NodeBuildVersion string = "1.28.2-dev" +const NodeBuildVersion string = "1.29.1-dev" func NodeUserVersion() BuildVersion { if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" { diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index 2514320ea7b..eba612e77b1 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -10,9 +10,6 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/fx" "golang.org/x/xerrors" @@ -53,20 +50,6 @@ type F3Params struct { var log = logging.Logger("f3") -func init() { - // Set up otel to prometheus reporting so that F3 metrics are reported via lotus - // prometheus metrics. This bridge eventually gets picked up by opencensus - // exporter as HTTP handler. This by default registers an otel collector against - // the global prometheus registry. In the future, we should clean up metrics in - // Lotus and move it all to use otel. For now, bridge away. - if bridge, err := prometheus.New(); err != nil { - log.Errorf("could not create the otel prometheus exporter: %v", err) - } else { - provider := metric.NewMeterProvider(metric.WithReader(bridge)) - otel.SetMeterProvider(provider) - } -} - func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) { ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3")) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index cb7fc5f8239..77bd1679146 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -954,7 +954,7 @@ func TestMessageSignatureInvalid(t *testing.T) { } err = mp.Add(context.TODO(), sm) assert.Error(t, err) - assert.Contains(t, err.Error(), "invalid signature length") + assert.Contains(t, err.Error(), "invalid signature for message bafy2bz") } } diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 500ef4af3ed..66865a73e67 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -16,10 +16,9 @@ import ( "time" ocprom "contrib.go.opencensus.io/exporter/prometheus" - bdg "github.com/dgraph-io/badger/v2" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - badger "github.com/ipfs/go-ds-badger2" + badgerIpfs "github.com/ipfs/go-ds-badger2" measure "github.com/ipfs/go-ds-measure" metricsprometheus "github.com/ipfs/go-metrics-prometheus" "github.com/ipld/go-car" @@ -33,6 +32,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore" badgerbs "github.com/filecoin-project/lotus/blockstore/badger" + badger "github.com/filecoin-project/lotus/blockstore/badger/versions" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/index" @@ -46,6 +46,7 @@ import ( _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/delegated" _ "github.com/filecoin-project/lotus/lib/sigs/secp" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -171,11 +172,33 @@ var importBenchCmd = &cli.Command{ err error ) + fsrepo, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return err + } + lkrepo, err := fsrepo.Lock(repo.FullNode) + if err != nil { + return err + } + + defer lkrepo.Close() //nolint:errcheck + + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + switch { case cctx.Bool("use-native-badger"): log.Info("using native badger") - var opts badgerbs.Options - if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil { + + var opts badger.Options + if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false, badgerVersion); err != nil { return err } opts.SyncWrites = false @@ -183,14 +206,12 @@ var importBenchCmd = &cli.Command{ default: // legacy badger via datastore. log.Info("using legacy badger") - bdgOpt := badger.DefaultOptions - bdgOpt.GcInterval = 0 - bdgOpt.Options = bdg.DefaultOptions("") - bdgOpt.Options.SyncWrites = false - bdgOpt.Options.Truncate = true - bdgOpt.Options.DetectConflicts = false + var opts badger.Options + if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false, badgerVersion); err != nil { + return err + } - ds, err = badger.NewDatastore(tdir, &bdgOpt) + bs, err = badgerbs.Open(opts) } if err != nil { @@ -209,7 +230,7 @@ var importBenchCmd = &cli.Command{ var verifier proofs.Verifier = proofsffi.ProofVerifier if cctx.IsSet("syscall-cache") { - scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions) + scds, err := badgerIpfs.NewDatastore(cctx.String("syscall-cache"), &badgerIpfs.DefaultOptions) if err != nil { return xerrors.Errorf("opening syscall-cache datastore: %w", err) } diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 8e31ccc3c44..b3a1e6e65c8 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -9,7 +9,6 @@ import ( "os" "strings" - "github.com/dgraph-io/badger/v2" "github.com/docker/go-units" "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" @@ -20,8 +19,10 @@ import ( "go.uber.org/multierr" "golang.org/x/xerrors" + badger "github.com/filecoin-project/lotus/blockstore/badger/versions" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/backupds" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -321,26 +322,48 @@ var datastoreRewriteCmd = &cli.Command{ } var ( - from *badger.DB - to *badger.DB + from badger.BadgerDB + to badger.BadgerDB ) + fsrepo, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return err + } + lkrepo, err := fsrepo.Lock(repo.FullNode) + if err != nil { + return err + } + + defer lkrepo.Close() //nolint:errcheck + + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + // open the destination (to) store. - opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false) + opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false, badgerVersion) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } opts.SyncWrites = false - if to, err = badger.Open(opts.Options); err != nil { + if to, err = badger.OpenBadgerDB(opts); err != nil { + return xerrors.Errorf("opening 'to' badger store: %w", err) } // open the source (from) store. - opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true) + opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true, badgerVersion) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } - if from, err = badger.Open(opts.Options); err != nil { + if from, err = badger.OpenBadgerDB(opts); err != nil { return xerrors.Errorf("opening 'from' datastore: %w", err) } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index e6d0c4e056f..9374282f541 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -12,8 +12,6 @@ import ( "strings" "sync" - "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/pb" "github.com/dustin/go-humanize" "github.com/ipfs/boxo/blockservice" offline "github.com/ipfs/boxo/exchange/offline" @@ -31,9 +29,11 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/blockstore" + badger "github.com/filecoin-project/lotus/blockstore/badger/versions" "github.com/filecoin-project/lotus/chain/store" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -234,20 +234,31 @@ var exportRawCmd = &cli.Command{ } { + + c, err := lr.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + path := filepath.Join(lr.Path(), "datastore", "chain") - opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false) + opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false, badgerVersion) if err != nil { return err } - opts.Logger = &badgerLog{ + opts.Logger = badger.BadgerLogger{ SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), - skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + Skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), } log.Infow("open db") - db, err := badger.Open(opts.Options) + db, err := badger.OpenBadgerDB(opts) if err != nil { return fmt.Errorf("failed to open badger blockstore: %w", err) } @@ -258,50 +269,40 @@ var exportRawCmd = &cli.Command{ var wlk sync.Mutex str := db.NewStream() - str.NumGo = 16 - str.LogPrefix = "bstream" - str.Send = func(list *pb.KVList) (err error) { - defer func() { - if err != nil { - log.Errorw("send error", "err", err) - } - }() - - for _, kv := range list.Kv { - if kv.Key == nil || kv.Value == nil { - continue - } - if !strings.HasPrefix(string(kv.Key), "/blocks/") { - log.Infow("no blocks prefix", "key", string(kv.Key)) - continue - } - - h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):])) - if err != nil { - return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err) - } - - c := cid.NewCidV1(cid.Raw, h) - - b, err := block.NewBlockWithCid(kv.Value, c) - if err != nil { - return xerrors.Errorf("readblk: %w", err) - } - - wlk.Lock() - err = consume(c, b) - wlk.Unlock() - if err != nil { - return xerrors.Errorf("consume stream block: %w", err) - } + str.SetNumGo(16) + str.SetLogPrefix("bstream") + err = str.ForEach(ctx, func(key string, value string) error { + if !strings.HasPrefix(key, "/blocks/") { + log.Infow("no blocks prefix", "key", key) + return nil } - return nil - } + h, err := base32.RawStdEncoding.DecodeString(key[len("/blocks/"):]) + if err != nil { + return xerrors.Errorf("decode b32 ds key %x: %w", key, err) + } + + c := cid.NewCidV1(cid.Raw, h) + + b, err := block.NewBlockWithCid([]byte(value), c) + if err != nil { + return xerrors.Errorf("readblk: %w", err) + } + + wlk.Lock() + err = consume(c, b) + wlk.Unlock() + if err != nil { + return xerrors.Errorf("consume stream block: %w", err) + } - if err := str.Orchestrate(ctx); err != nil { + return nil + }) + if err != nil { + log.Errorw("send error", "err", err) return xerrors.Errorf("orchestrate stream: %w", err) } + } } @@ -479,12 +480,3 @@ func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) erro } var _ blockstore.Blockstore = &rawCarb{} - -type badgerLog struct { - *zap.SugaredLogger - skip2 *zap.SugaredLogger -} - -func (b *badgerLog) Warningf(format string, args ...interface{}) { - b.skip2.Warnf(format, args...) -} diff --git a/cmd/lotus-shed/fevmanalytics.go b/cmd/lotus-shed/fevmanalytics.go index 19416b77e12..061e0c0ed87 100644 --- a/cmd/lotus-shed/fevmanalytics.go +++ b/cmd/lotus-shed/fevmanalytics.go @@ -21,6 +21,7 @@ import ( evm2 "github.com/filecoin-project/lotus/chain/actors/builtin/evm" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -80,7 +81,17 @@ var FevmBalanceCmd = &cli.Command{ return err } - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly()) + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly(), badgerVersion) if err != nil { return err } @@ -175,7 +186,17 @@ var FevmActorsCmd = &cli.Command{ return err } - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly()) + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly(), badgerVersion) if err != nil { return err } diff --git a/cmd/lotus-shed/invariants.go b/cmd/lotus-shed/invariants.go index 378f6af5843..6c77069bd67 100644 --- a/cmd/lotus-shed/invariants.go +++ b/cmd/lotus-shed/invariants.go @@ -36,6 +36,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -93,7 +94,16 @@ var invariantsCmd = &cli.Command{ return err } - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly()) + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly(), badgerVersion) if err != nil { return err } diff --git a/cmd/lotus-shed/migrations.go b/cmd/lotus-shed/migrations.go index 24110860404..5e66f008488 100644 --- a/cmd/lotus-shed/migrations.go +++ b/cmd/lotus-shed/migrations.go @@ -70,6 +70,7 @@ import ( "github.com/filecoin-project/lotus/chain/vm" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/must" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" ) @@ -142,7 +143,16 @@ var migrationsCmd = &cli.Command{ return err } - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly()) + c, err := lkrepo.Config() + if err != nil { + return err + } + lotusCfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := lotusCfg.Chainstore.BadgerVersion + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, lkrepo.Readonly(), badgerVersion) if err != nil { return err } diff --git a/cmd/lotus-shed/splitstore.go b/cmd/lotus-shed/splitstore.go index e8c45a0c5e8..e686ff3602f 100644 --- a/cmd/lotus-shed/splitstore.go +++ b/cmd/lotus-shed/splitstore.go @@ -9,7 +9,6 @@ import ( "path/filepath" "runtime" - "github.com/dgraph-io/badger/v2" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" "github.com/urfave/cli/v2" @@ -18,6 +17,7 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/xerrors" + badger "github.com/filecoin-project/lotus/blockstore/badger/versions" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo" @@ -195,31 +195,40 @@ func copyHotstoreToColdstore(lr repo.LockedRepo, gcColdstore bool) error { coldPath := filepath.Join(dataPath, "chain") hotPath := filepath.Join(dataPath, "splitstore", "hot.badger") - blog := &badgerLogger{ + blog := badger.BadgerLogger{ SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), - skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + Skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), } - coldOpts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, coldPath, false) + c, err := lr.Config() + if err != nil { + return err + } + cfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + badgerVersion := cfg.Chainstore.BadgerVersion + coldOpts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, coldPath, false, badgerVersion) if err != nil { return xerrors.Errorf("error getting coldstore badger options: %w", err) } coldOpts.SyncWrites = false coldOpts.Logger = blog - hotOpts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, hotPath, true) + hotOpts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, hotPath, true, badgerVersion) if err != nil { return xerrors.Errorf("error getting hotstore badger options: %w", err) } hotOpts.Logger = blog - cold, err := badger.Open(coldOpts.Options) + cold, err := badger.OpenBadgerDB(coldOpts) if err != nil { return xerrors.Errorf("error opening coldstore: %w", err) } defer cold.Close() //nolint - hot, err := badger.Open(hotOpts.Options) + hot, err := badger.OpenBadgerDB(hotOpts) if err != nil { return xerrors.Errorf("error opening hotstore: %w", err) } @@ -278,7 +287,7 @@ func copyHotstoreToColdstore(lr repo.LockedRepo, gcColdstore bool) error { err = cold.RunValueLogGC(0.0625) } - if err != badger.ErrNoRewrite { + if err != cold.GetErrNoRewrite() { return xerrors.Errorf("error garbage collecting coldstore: %w", err) } } @@ -353,16 +362,6 @@ func deleteSplitstoreKeys(lr repo.LockedRepo) error { return nil } -// badger logging through go-log -type badgerLogger struct { - *zap.SugaredLogger - skip2 *zap.SugaredLogger -} - -func (b *badgerLogger) Warningf(format string, args ...interface{}) {} -func (b *badgerLogger) Infof(format string, args ...interface{}) {} -func (b *badgerLogger) Debugf(format string, args ...interface{}) {} - var splitstoreCheckCmd = &cli.Command{ Name: "check", Description: "runs a healthcheck on a splitstore installation", diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index 980b8be11fd..3996f0c3654 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -7,7 +7,7 @@ USAGE: lotus [global options] command [command options] [arguments...] VERSION: - 1.28.2-dev + 1.29.1-dev COMMANDS: daemon Start a lotus daemon process diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 7f39f23a5b8..ad418eadc60 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -149,6 +149,19 @@ [Chainstore] + # EXPERIMENTAL FEATURE. USE WITH CAUTION + # BadgerVersion switches the version of the Badger database engine. The default is 2 which has + # been well-tested and widely deployed. Switching this to version 4 will enable the new version + # of Badger, but the blockstore will be incompatible with Lotus running with version 2. Switching + # versions can only be done when the blockstore is empty and will be repopulated from a snapshot + # or chain sync. It cannot be upgraded or downgraded with existing data and there is currently + # no automatic migration tooling. + # This is an experimental feature and should not be used in production. + # + # type: int + # env var: LOTUS_CHAINSTORE_BADGERVERSION + #BadgerVersion = 2 + # type: bool # env var: LOTUS_CHAINSTORE_ENABLESPLITSTORE EnableSplitstore = true diff --git a/documentation/misc/RELEASE_ISSUE_TEMPLATE.md b/documentation/misc/RELEASE_ISSUE_TEMPLATE.md index 3a4ec3259a1..b40888b57d7 100644 --- a/documentation/misc/RELEASE_ISSUE_TEMPLATE.md +++ b/documentation/misc/RELEASE_ISSUE_TEMPLATE.md @@ -129,9 +129,9 @@ ### Post-Release -- [ ] Open a pull request against `master` with a merge of the `release/vX.Y.Z` branch. - - [ ] Conflict resolution should ignore the changes to `version.go` (keep the `-dev` version from master). +- [ ] Open a pull request against `master` cherry-picking the CHANGELOG commits from the `release/vX.Y.Z` branch. - Link to PR: + - Assuming we followed [the process of merging changes into `master` first before backporting to the release branch](https://github.com/filecoin-project/lotus/blob/master/LOTUS_RELEASE_FLOW.md#branch-and-tag-strategy), the only changes should be CHANGELOG updates. - [ ] Finish updating/merging the [RELEASE_ISSUE_TEMPLATE.md](https://github.com/filecoin-project/lotus/blob/master/documentation/misc/RELEASE_ISSUE_TEMPLATE.md) PR from `Before RC1` with any improvements determined from this latest release iteration. ## ❤️ Contributors diff --git a/go.mod b/go.mod index 047f1500084..67b95e39d4d 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,8 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e github.com/dgraph-io/badger/v2 v2.2007.4 + github.com/dgraph-io/badger/v4 v4.2.0 + github.com/dgraph-io/ristretto v0.1.1 github.com/docker/go-units v0.5.0 github.com/drand/drand v1.5.11 github.com/drand/kyber v1.3.1 @@ -41,8 +43,8 @@ require ( github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-cbor-util v0.0.1 github.com/filecoin-project/go-commp-utils/v2 v2.1.0 - github.com/filecoin-project/go-crypto v0.0.1 - github.com/filecoin-project/go-f3 v0.1.0 + github.com/filecoin-project/go-crypto v0.1.0 + github.com/filecoin-project/go-f3 v0.2.0 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 github.com/filecoin-project/go-jsonrpc v0.6.0 @@ -147,6 +149,7 @@ require ( go.opentelemetry.io/otel/bridge/opencensus v1.28.0 go.opentelemetry.io/otel/exporters/jaeger v1.14.0 go.opentelemetry.io/otel/exporters/prometheus v0.50.0 + go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 go.uber.org/atomic v1.11.0 @@ -187,7 +190,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/drand/kyber-bls12381 v0.3.1 // indirect github.com/elastic/go-windows v1.0.0 // indirect @@ -218,6 +220,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240509144519-723abb6459b7 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect @@ -238,7 +241,6 @@ require ( github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/ipld/go-codec-dagpb v1.6.0 // indirect - github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/pgx/v5 v5.6.0 // indirect @@ -323,7 +325,8 @@ require ( github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/zondax/hid v0.9.2 // indirect github.com/zondax/ledger-go v0.14.3 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect + gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect + gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/mock v0.4.0 // indirect diff --git a/go.sum b/go.sum index d2711be15e6..57c8dd7eb0d 100644 --- a/go.sum +++ b/go.sum @@ -197,6 +197,8 @@ github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrV github.com/dgraph-io/badger/v2 v2.2007.3/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= +github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= +github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= @@ -266,10 +268,10 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8 github.com/filecoin-project/go-commp-utils/v2 v2.1.0 h1:KWNRalUp2bhN1SW7STsJS2AHs9mnfGKk9LnQgzDe+gI= github.com/filecoin-project/go-commp-utils/v2 v2.1.0/go.mod h1:NbxJYlhxtWaNhlVCj/gysLNu26kYII83IV5iNrAO9iI= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= -github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-f3 v0.1.0 h1:W/AcaF3FBg00Eiyuz/HIs5hCcPtVMK7JncWXpg4MP98= -github.com/filecoin-project/go-f3 v0.1.0/go.mod h1:43fBLX0iX0+Nnw4Z91wSrdfDYAd6YEDexy7GcLnIJtk= +github.com/filecoin-project/go-crypto v0.1.0 h1:Pob2MphoipMbe/ksxZOMcQvmBHAd3sI/WEqcbpIsGI0= +github.com/filecoin-project/go-crypto v0.1.0/go.mod h1:K9UFXvvoyAVvB+0Le7oGlKiT9mgA5FHOJdYQXEE8IhI= +github.com/filecoin-project/go-f3 v0.2.0 h1:Gis44+hOrDjSUEw3IDmU7CudNILi5e+bb1pgZgp680k= +github.com/filecoin-project/go-f3 v0.2.0/go.mod h1:43fBLX0iX0+Nnw4Z91wSrdfDYAd6YEDexy7GcLnIJtk= github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88OqLYEo6roi+GiIeOh8= github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 h1:HYIUugzjq78YvV3vC6rL95+SfC/aSTVSnZSZiDV5pCk= @@ -454,6 +456,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -1342,6 +1346,10 @@ github.com/zondax/ledger-go v0.14.3 h1:wEpJt2CEcBJ428md/5MgSLsXLBos98sBOyxNmCjfU github.com/zondax/ledger-go v0.14.3/go.mod h1:IKKaoxupuB43g4NxeQmbLXv7T9AlQyie1UpHb342ycI= github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= +gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b h1:CzigHMRySiX3drau9C6Q5CAbNIApmLdat5jPMqChvDA= +gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b/go.mod h1:/y/V339mxv2sZmYYR64O07VuCpdNZqCTwO8ZcouTMI8= +gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 h1:qwDnMxjkyLmAFgcfgTnfJrmYKWhHnci3GjDqcZp1M3Q= +gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02/go.mod h1:JTnUj0mpYiAsuZLmKjTx/ex3AtMowcCgnE7YNyCEP0I= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/protobuf v1.0.11 h1:FTYVIEzY/bfl37lu3pR4lIj+F9Vp1jE8oh91VmxKgLo= diff --git a/metrics/otel_bridge.go b/metrics/otel_bridge.go new file mode 100644 index 00000000000..76572f5afb2 --- /dev/null +++ b/metrics/otel_bridge.go @@ -0,0 +1,21 @@ +package metrics + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" +) + +func init() { + // Set up otel to prometheus reporting so that F3 metrics are reported via lotus + // prometheus metrics. This bridge eventually gets picked up by opencensus + // exporter as HTTP handler. This by default registers an otel collector against + // the global prometheus registry. In the future, we should clean up metrics in + // Lotus and move it all to use otel. For now, bridge away. + if bridge, err := prometheus.New(); err != nil { + log.Errorf("could not create the otel prometheus exporter: %v", err) + } else { + provider := metric.NewMeterProvider(metric.WithReader(bridge)) + otel.SetMeterProvider(provider) + } +} diff --git a/node/builder_chain.go b/node/builder_chain.go index ffdcf3a64a2..4ac67f02950 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -210,7 +210,7 @@ func ConfigFullNode(c interface{}) Option { If(cfg.Chainstore.Splitstore.ColdStoreType == "discard", Override(new(dtypes.ColdBlockstore), modules.DiscardColdBlockstore)), If(cfg.Chainstore.Splitstore.HotStoreType == "badger", - Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)), + Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore(&cfg.Chainstore))), Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(&cfg.Chainstore)), Override(new(dtypes.BasicChainBlockstore), modules.ChainSplitBlockstore), Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore), diff --git a/node/config/def.go b/node/config/def.go index cc390371302..feea521aadc 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -71,6 +71,7 @@ func DefaultFullNode() *FullNode { Chainstore: Chainstore{ EnableSplitstore: true, + BadgerVersion: 2, Splitstore: Splitstore{ ColdStoreType: "discard", HotStoreType: "badger", diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 6420c0f5f14..ed1d84850f8 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -72,6 +72,19 @@ your node if metadata log is disabled`, }, }, "Chainstore": { + { + Name: "BadgerVersion", + Type: "int", + + Comment: `EXPERIMENTAL FEATURE. USE WITH CAUTION +BadgerVersion switches the version of the Badger database engine. The default is 2 which has +been well-tested and widely deployed. Switching this to version 4 will enable the new version +of Badger, but the blockstore will be incompatible with Lotus running with version 2. Switching +versions can only be done when the blockstore is empty and will be repopulated from a snapshot +or chain sync. It cannot be upgraded or downgraded with existing data and there is currently +no automatic migration tooling. +This is an experimental feature and should not be used in production.`, + }, { Name: "EnableSplitstore", Type: "bool", diff --git a/node/config/types.go b/node/config/types.go index d7753d4e19e..70464d5f498 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -482,6 +482,15 @@ type Pubsub struct { } type Chainstore struct { + // EXPERIMENTAL FEATURE. USE WITH CAUTION + // BadgerVersion switches the version of the Badger database engine. The default is 2 which has + // been well-tested and widely deployed. Switching this to version 4 will enable the new version + // of Badger, but the blockstore will be incompatible with Lotus running with version 2. Switching + // versions can only be done when the blockstore is empty and will be repopulated from a snapshot + // or chain sync. It cannot be upgraded or downgraded with existing data and there is currently + // no automatic migration tooling. + // This is an experimental feature and should not be used in production. + BadgerVersion int EnableSplitstore bool Splitstore Splitstore } diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 9c54d51e60f..ee4ad1df35d 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -45,33 +45,36 @@ func DiscardColdBlockstore(lc fx.Lifecycle, bs dtypes.UniversalBlockstore) (dtyp return blockstore.NewDiscardStore(bs), nil } -func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { - path, err := r.SplitstorePath() - if err != nil { - return nil, err - } +func BadgerHotBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { + path, err := r.SplitstorePath() + if err != nil { + return nil, err + } - path = filepath.Join(path, "hot.badger") - if err := os.MkdirAll(path, 0755); err != nil { - return nil, err - } + path = filepath.Join(path, "hot.badger") + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly()) - if err != nil { - return nil, err - } + badgerVersion := cfg.BadgerVersion + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly(), badgerVersion) + if err != nil { + return nil, err + } - bs, err := badgerbs.Open(opts) - if err != nil { - return nil, err - } + bs, err := badgerbs.Open(opts) + if err != nil { + return nil, err + } - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return bs.Close() - }}) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bs.Close() + }}) - return bs, nil + return bs, nil + } } func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { diff --git a/node/modules/lp2p/host.go b/node/modules/lp2p/host.go index baea4cf0656..fd723fbb2c4 100644 --- a/node/modules/lp2p/host.go +++ b/node/modules/lp2p/host.go @@ -62,12 +62,6 @@ func Host(mctx helpers.MetricsCtx, buildVersion build.BuildVersion, lc fx.Lifecy return nil, err } - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return h.Close() - }, - }) - return h, nil } diff --git a/node/modules/lp2p/metrics.go b/node/modules/lp2p/metrics.go new file mode 100644 index 00000000000..1d0d6473b52 --- /dev/null +++ b/node/modules/lp2p/metrics.go @@ -0,0 +1,30 @@ +package lp2p + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var otelmeter = otel.Meter("libp2p") + +var attrIdentity = attribute.Key("identity") +var attrProtocolID = attribute.Key("protocol") +var attrDirectionInbound = attribute.String("direction", "inbound") +var attrDirectionOutbound = attribute.String("direction", "outbound") + +var otelmetrics = struct { + bandwidth metric.Int64ObservableCounter +}{ + bandwidth: must(otelmeter.Int64ObservableCounter("lotus_libp2p_bandwidth", + metric.WithDescription("Libp2p stream bandwidth."), + metric.WithUnit("By"), + )), +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/node/modules/lp2p/transport.go b/node/modules/lp2p/transport.go index 536f612b1aa..6a95937d284 100644 --- a/node/modules/lp2p/transport.go +++ b/node/modules/lp2p/transport.go @@ -1,11 +1,16 @@ package lp2p import ( + "context" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/peer" noise "github.com/libp2p/go-libp2p/p2p/security/noise" tls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) var DefaultTransports = simpleOpt(libp2p.DefaultTransports) @@ -31,8 +36,33 @@ func Security(enabled, preferTLS bool) interface{} { } } -func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { +func BandwidthCounter(lc fx.Lifecycle, id peer.ID) (opts Libp2pOpts, reporter metrics.Reporter, err error) { reporter = metrics.NewBandwidthCounter() opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - return opts, reporter + + // Register it with open telemetry. We report by-callback instead of implementing a custom + // bandwidth counter to avoid allocating every time we read/write to a stream (and to stay + // out of the hot path). + // + // Identity is required to ensure this observer observes with unique attributes. + identityAttr := attrIdentity.String(id.String()) + registration, err := otelmeter.RegisterCallback(func(ctx context.Context, obs metric.Observer) error { + for p, bw := range reporter.GetBandwidthByProtocol() { + if p == "" { + p = "" + } + protoAttr := attrProtocolID.String(string(p)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalOut, + metric.WithAttributes(identityAttr, protoAttr, attrDirectionOutbound)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalIn, + metric.WithAttributes(identityAttr, protoAttr, attrDirectionInbound)) + } + return nil + }, otelmetrics.bandwidth) + if err != nil { + return Libp2pOpts{}, nil, err + } + lc.Append(fx.StopHook(registration.Unregister)) + + return opts, reporter, nil } diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go index 81f8b9ff416..7be35b2bc48 100644 --- a/node/repo/blockstore_opts.go +++ b/node/repo/blockstore_opts.go @@ -1,63 +1,12 @@ package repo import ( - "os" - "strconv" - - badgerbs "github.com/filecoin-project/lotus/blockstore/badger" + versions "github.com/filecoin-project/lotus/blockstore/badger/versions" ) // BadgerBlockstoreOptions returns the badger options to apply for the provided // domain. -func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) { - opts := badgerbs.DefaultOptions(path) - - // Due to legacy usage of blockstore.Blockstore, over a datastore, all - // blocks are prefixed with this namespace. In the future, this can go away, - // in order to shorten keys, but it'll require a migration. - opts.Prefix = "/blocks/" - - // Blockstore values are immutable; therefore we do not expect any - // conflicts to emerge. - opts.DetectConflicts = false - - // This is to optimize the database on close so it can be opened - // read-only and efficiently queried. - opts.CompactL0OnClose = true - - // The alternative is "crash on start and tell the user to fix it". This - // will truncate corrupt and unsynced data, which we don't guarantee to - // persist anyways. - opts.Truncate = true - - // We mmap the index and the value logs; this is important to enable - // zero-copy value access. - opts.ValueLogLoadingMode = badgerbs.MemoryMap - opts.TableLoadingMode = badgerbs.MemoryMap - - // Embed only values < 128 bytes in the LSM tree; larger values are stored - // in value logs. - opts.ValueThreshold = 128 - - // Default table size is already 64MiB. This is here to make it explicit. - opts.MaxTableSize = 64 << 20 - - // NOTE: The chain blockstore doesn't require any GC (blocks are never - // deleted). This will change if we move to a tiered blockstore. - - opts.ReadOnly = readonly - - // Envvar LOTUS_CHAIN_BADGERSTORE_COMPACTIONWORKERNUM - // Allows the number of compaction workers used by BadgerDB to be adjusted - // Unset - leaves the default number of compaction workers (4) - // "0" - disables compaction - // Positive integer - enables that number of compaction workers - if badgerNumCompactors, badgerNumCompactorsSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_COMPACTIONWORKERNUM"); badgerNumCompactorsSet { - if numWorkers, err := strconv.Atoi(badgerNumCompactors); err == nil && numWorkers >= 0 { - opts.NumCompactors = numWorkers - } - } - +func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool, badgerVersion int) (versions.Options, error) { + opts := versions.BlockStoreOptions(path, readonly, badgerVersion) return opts, nil - } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 26cbbd6b135..0e0a1b807a8 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -437,7 +437,19 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain return } - opts, err := BadgerBlockstoreOptions(domain, path, readonly) + c, err := fsr.Config() + if err != nil { + fsr.bsErr = err + return + } + cfg, ok := c.(*config.FullNode) + if !ok { + fsr.bsErr = xerrors.Errorf("invalid config for repo, got: %T", c) + return + } + badgerVersion := cfg.Chainstore.BadgerVersion + + opts, err := BadgerBlockstoreOptions(domain, path, readonly, badgerVersion) if err != nil { fsr.bsErr = err return diff --git a/storage/pipeline/pledge.go b/storage/pipeline/pledge.go index 7cdda9fa4cd..890b4e7248e 100644 --- a/storage/pipeline/pledge.go +++ b/storage/pipeline/pledge.go @@ -104,7 +104,7 @@ func (m *Sealing) sectorWeight(ctx context.Context, sector SectorInfo, expiratio alloc, err := piece.GetAllocation(ctx, m.Api, ts.Key()) if err != nil || alloc == nil { - if err == nil { + if err != nil { log.Errorw("failed to get allocation", "error", err) } w = big.Add(w, big.Mul(sectorDuration, abi.NewStoragePower(int64(piece.Piece().Size))))