From df347563ff54c6b60b7c8b12072c5a6051420d65 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Thu, 16 Jan 2025 13:18:38 -0800 Subject: [PATCH] go/{libraries, store}: wip, trying to get paging working --- go/libraries/doltcore/doltdb/doltdb.go | 111 ++++++++++---- go/libraries/doltcore/env/actions/tag.go | 10 +- go/libraries/doltcore/env/actions/tag_test.go | 145 ++++++++++++++++++ go/store/datas/database.go | 1 + go/store/datas/database_common.go | 10 ++ go/store/prolly/address_map.go | 26 ++++ go/store/prolly/tree/map.go | 26 ++++ go/store/types/map.go | 39 +++++ 8 files changed, 330 insertions(+), 38 deletions(-) create mode 100644 go/libraries/doltcore/env/actions/tag_test.go diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index c6e936423e..1d1bf1112d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -74,6 +74,13 @@ var InMemDoltDB = "mem://" var ErrNoRootValAtHash = errors.New("there is no dolt root value at that hash") var ErrCannotDeleteLastBranch = errors.New("cannot delete the last branch") +// RefPageToken is a token for a page of refs. +type RefPageToken struct { + Key string +} + +const DefaultRefPageSize = 25 + // DoltDB wraps access to the underlying noms database and hides some of the details of the underlying storage. type DoltDB struct { db hooksDatabase @@ -1076,7 +1083,7 @@ func (ddb *DoltDB) GetTags(ctx context.Context) ([]ref.DoltRef, error) { // GetTagsPage returns a page of tags in the database, along with a token for retrieving the next page. // If pageToken is empty, returns the first page. Returns an empty string for nextPageToken when there // are no more pages. -func (ddb *DoltDB) GetTagsPage(ctx context.Context, pageToken string) ([]ref.DoltRef, string, error) { +func (ddb *DoltDB) GetTagsPage(ctx context.Context, pageToken *RefPageToken) ([]ref.DoltRef, *RefPageToken, error) { return ddb.GetRefsOfTypePage(ctx, tagsRefFilter, pageToken) } @@ -1201,10 +1208,10 @@ func (ddb *DoltDB) VisitRefsOfTypeByNomsRoot(ctx context.Context, refTypeFilter return visitDatasets(ctx, refTypeFilter, visit, dss) } -func (ddb *DoltDB) VisitRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken string, visit func(r ref.DoltRef, addr hash.Hash) error) (nextPageToken string, err error) { +func (ddb *DoltDB) VisitRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken *RefPageToken, visit func(r ref.DoltRef, addr hash.Hash) error) (nextPageToken *RefPageToken, err error) { dss, err := ddb.db.Datasets(ctx) if err != nil { - return "", err + return nil, err } return visitDatasetsPage(ctx, refTypeFilter, visit, dss, pageToken) @@ -1232,48 +1239,86 @@ func visitDatasets(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, }) } -func visitDatasetsPage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, visit func(r ref.DoltRef, addr hash.Hash) error, dss datas.DatasetsMap, pageToken string) (nextPageToken string, err error) { - // Since DatasetsMap interface doesn't have IterPage, we need to use IterAll and handle pagination ourselves - var lastProcessedKey string - var count int - const pageSize = 100 +func visitDatasetsPage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, visit func(r ref.DoltRef, addr hash.Hash) error, dss datas.DatasetsMap, pageToken *RefPageToken) (nextPageToken *RefPageToken, err error) { + var startKey string + if pageToken != nil { + startKey = pageToken.Key + } - err = dss.IterAll(ctx, func(key string, addr hash.Hash) error { - // Skip keys until we reach the page token - if pageToken != "" && key <= pageToken { + if refTypeFilter == nil || len(refTypeFilter) == 0 { + var count int + err = dss.IterFromCount(ctx, startKey, uint64(DefaultRefPageSize+1), func(key string, addr hash.Hash) error { + if ref.IsRef(key) { + dref, err := ref.Parse(key) + if err != nil { + return err + } + + if count < DefaultRefPageSize { + err = visit(dref, addr) + if err != nil { + return err + } + count++ + } else { + nextPageToken = &RefPageToken{Key: key} + return io.EOF + } + } return nil - } + }) - // Process pageSize number of items - if count >= pageSize { - nextPageToken = lastProcessedKey - return io.EOF // Use EOF to break out of iteration + if err == io.EOF { + err = nil } + return nextPageToken, err + } - if ref.IsRef(key) { - dref, err := ref.Parse(key) - if err != nil { - return err - } + var lastProcessedKey string + var count int + var batchSize uint64 = 100 // Process in smaller batches - if _, ok := refTypeFilter[dref.GetType()]; ok { - err = visit(dref, addr) + for count < DefaultRefPageSize { + var batchCount int + err = dss.IterFromCount(ctx, startKey, batchSize, func(key string, addr hash.Hash) error { + if ref.IsRef(key) { + dref, err := ref.Parse(key) if err != nil { return err } - count++ - lastProcessedKey = key - } - } - return nil - }) + if _, ok := refTypeFilter[dref.GetType()]; ok { + if count < DefaultRefPageSize { + err = visit(dref, addr) + if err != nil { + return err + } + count++ + } else { + nextPageToken = &RefPageToken{Key: key} + return io.EOF // Break out of iteration + } + } + } + lastProcessedKey = key + batchCount++ + return nil + }) - if err == io.EOF { - err = nil + if err == io.EOF { + err = nil + break + } + if err != nil { + return nil, err + } + if batchCount < int(batchSize) { + break // No more data to process + } + startKey = lastProcessedKey } - return nextPageToken, err + return nextPageToken, nil } // GetRefByNameInsensitive searches this Dolt database's branch, tag, and head refs for a case-insensitive @@ -1321,7 +1366,7 @@ func (ddb *DoltDB) GetRefsOfType(ctx context.Context, refTypeFilter map[ref.RefT return refs, err } -func (ddb *DoltDB) GetRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken string) ([]ref.DoltRef, string, error) { +func (ddb *DoltDB) GetRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken *RefPageToken) ([]ref.DoltRef, *RefPageToken, error) { var refs []ref.DoltRef nextPageToken, err := ddb.VisitRefsOfTypePage(ctx, refTypeFilter, pageToken, func(r ref.DoltRef, _ hash.Hash) error { refs = append(refs, r) diff --git a/go/libraries/doltcore/env/actions/tag.go b/go/libraries/doltcore/env/actions/tag.go index 074a56cc01..d553db7142 100644 --- a/go/libraries/doltcore/env/actions/tag.go +++ b/go/libraries/doltcore/env/actions/tag.go @@ -142,23 +142,23 @@ func IterResolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *dolt // IterResolvedTagsPage iterates over a page of tags in dEnv.DoltDB from newest to oldest, resolving the tag to a commit and calling cb(). // Returns a token for retrieving the next page. If pageToken is empty, returns the first page. Returns an empty string for nextPageToken // when there are no more pages. -func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken string, cb func(tag *doltdb.Tag) (stop bool, err error)) (nextPageToken string, err error) { +func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken *doltdb.RefPageToken, cb func(tag *doltdb.Tag) (stop bool, err error)) (nextPageToken *doltdb.RefPageToken, err error) { tagRefs, nextToken, err := ddb.GetTagsPage(ctx, pageToken) if err != nil { - return "", err + return nil, err } var resolved []*doltdb.Tag for _, r := range tagRefs { tr, ok := r.(ref.TagRef) if !ok { - return "", fmt.Errorf("DoltDB.GetTagsPage() returned non-tag DoltRef") + return nil, fmt.Errorf("DoltDB.GetTagsPage() returned non-tag DoltRef") } tag, err := ddb.ResolveTag(ctx, tr) if err != nil { - return "", err + return nil, err } resolved = append(resolved, tag) @@ -173,7 +173,7 @@ func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken str stop, err := cb(tag) if err != nil { - return "", err + return nil, err } if stop { break diff --git a/go/libraries/doltcore/env/actions/tag_test.go b/go/libraries/doltcore/env/actions/tag_test.go new file mode 100644 index 0000000000..92b4098458 --- /dev/null +++ b/go/libraries/doltcore/env/actions/tag_test.go @@ -0,0 +1,145 @@ +package actions + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "testing" + + "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/utils/concurrentmap" + "github.com/dolthub/dolt/go/libraries/utils/filesys" + "github.com/dolthub/dolt/go/store/types" + "github.com/stretchr/testify/require" +) + +const ( + testHomeDir = "/user/bheni" + workingDir = "/user/bheni/datasets/addresses" + credsDir = "creds" + + configFile = "config.json" + GlobalConfigFile = "config_global.json" + + repoStateFile = "repo_state.json" +) + +func testHomeDirFunc() (string, error) { + return testHomeDir, nil +} + +func createTestEnv(isInitialized bool, hasLocalConfig bool) (*env.DoltEnv, *filesys.InMemFS) { + initialDirs := []string{testHomeDir, workingDir} + initialFiles := map[string][]byte{} + + if isInitialized { + doltDir := filepath.Join(workingDir, dbfactory.DoltDir) + doltDataDir := filepath.Join(workingDir, dbfactory.DoltDataDir) + initialDirs = append(initialDirs, doltDir) + initialDirs = append(initialDirs, doltDataDir) + + mainRef := ref.NewBranchRef(env.DefaultInitBranch) + repoState := &env.RepoState{Head: ref.MarshalableRef{Ref: mainRef}, Remotes: concurrentmap.New[string, env.Remote](), Backups: concurrentmap.New[string, env.Remote](), Branches: concurrentmap.New[string, env.BranchConfig]()} + repoStateData, err := json.Marshal(repoState) + + if err != nil { + panic("Could not setup test. Could not marshall repostate struct") + } + + initialFiles[getRepoStateFile()] = []byte(repoStateData) + + if hasLocalConfig { + initialFiles[getLocalConfigPath()] = []byte(`{"user.name":"bheni"}`) + } + } else if hasLocalConfig { + panic("Bad test. Cant have a local config in a non initialized directory.") + } + + fs := filesys.NewInMemFS(initialDirs, initialFiles, workingDir) + dEnv := env.Load(context.Background(), testHomeDirFunc, fs, doltdb.InMemDoltDB, "test") + + return dEnv, fs +} + +func TestIterResolvedTagsPage(t *testing.T) { + dEnv, _ := createTestEnv(false, false) + ctx := context.Background() + + // Initialize repo + err := dEnv.InitRepo(ctx, types.Format_Default, "test user", "test@test.com", "main") + require.NoError(t, err) + + totalTags := doltdb.DefaultRefPageSize + doltdb.DefaultRefPageSize + + expectedFirstPage := make(map[string]string) + expectedSecondPage := make(map[string]string) + // Create some commits and tags + for i := 0; i < totalTags; i++ { + name := fmt.Sprintf("tag%d", i) + msg := fmt.Sprintf("tag message %d", i) + err := CreateTag(ctx, dEnv, name, "main", TagProps{TaggerName: "test user", TaggerEmail: "test@test.com", Description: msg}) + require.NoError(t, err) + if i < doltdb.DefaultRefPageSize { + expectedFirstPage[name] = name + } else { + expectedSecondPage[name] = name + } + } + + var tags []string + var pageToken *doltdb.RefPageToken + + // Test first page + _, err = IterResolvedTagsPage(ctx, dEnv.DoltDB, nil, func(tag *doltdb.Tag) (bool, error) { + tags = append(tags, tag.Name) + return false, nil + }) + require.NoError(t, err) + + // Verify tags are in the first page + for _, tag := range tags { + require.Contains(t, expectedFirstPage, tag) + } + + // Test with small page size to verify pagination + tags = []string{} + pageToken = nil + + var nextToken *doltdb.RefPageToken + for { + nextToken, err = IterResolvedTagsPage(ctx, dEnv.DoltDB, pageToken, func(tag *doltdb.Tag) (bool, error) { + tags = append(tags, tag.Name) + return false, nil + }) + require.NoError(t, err) + + if nextToken == nil { + break + } + pageToken = nextToken + } + + require.Equal(t, len(tags), totalTags) + + // Verify tags are in the first page + for _, tag := range expectedFirstPage { + require.Contains(t, tags, tag) + } + + // Verify tags are in the second page + for _, tag := range expectedSecondPage { + require.Contains(t, tags, tag) + } +} + +func getLocalConfigPath() string { + return filepath.Join(dbfactory.DoltDir, configFile) +} + +func getRepoStateFile() string { + return filepath.Join(dbfactory.DoltDir, repoStateFile) +} diff --git a/go/store/datas/database.go b/go/store/datas/database.go index fff64fbe73..76fd9034ac 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -37,6 +37,7 @@ type DatasetsMap interface { Len() (uint64, error) IterAll(ctx context.Context, cb func(id string, addr hash.Hash) error) error + IterFromCount(ctx context.Context, startKey string, count uint64, cb func(id string, addr hash.Hash) error) error } // Database provides versioned storage for noms values. While Values can be diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index 29dc88f4be..7f532ae8fe 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -135,6 +135,11 @@ func (m refmapDatasetsMap) IterAll(ctx context.Context, cb func(string, hash.Has return m.am.IterAll(ctx, cb) } +// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning. +func (m refmapDatasetsMap) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(string, hash.Hash) error) error { + return m.am.IterFromCount(ctx, startKey, count, cb) +} + type nomsDatasetsMap struct { m types.Map } @@ -150,6 +155,11 @@ func (m nomsDatasetsMap) IterAll(ctx context.Context, cb func(string, hash.Hash) }) } +// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning. +func (m nomsDatasetsMap) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(string, hash.Hash) error) error { + return m.m.IterFromCount(ctx, startKey, count, cb) +} + // Datasets returns the Map of Datasets in the current root. If you intend to edit the map and commit changes back, // then you should fetch the current root, then call DatasetsInRoot with that hash. Otherwise another writer could // change the root value between when you get the root hash and call this method. diff --git a/go/store/prolly/address_map.go b/go/store/prolly/address_map.go index 4b1987f046..feeeb82f30 100644 --- a/go/store/prolly/address_map.go +++ b/go/store/prolly/address_map.go @@ -127,6 +127,32 @@ func (c AddressMap) IterAll(ctx context.Context, cb func(name string, address ha return nil } +// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning. +func (c AddressMap) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(id string, addr hash.Hash) error) error { + iter, err := c.addresses.IterFromCount(ctx, stringSlice(startKey), count) + if err != nil { + return err + } + + var n stringSlice + var a address + for { + n, a, err = iter.Next(ctx) + if err == io.EOF { + break + } + if err != nil { + return err + } + + if err = cb(string(n), hash.New(a)); err != nil { + return err + } + } + + return nil +} + func (c AddressMap) Editor() AddressMapEditor { return AddressMapEditor{ addresses: c.addresses.Mutate(), diff --git a/go/store/prolly/tree/map.go b/go/store/prolly/tree/map.go index 555d235d39..0218b1c164 100644 --- a/go/store/prolly/tree/map.go +++ b/go/store/prolly/tree/map.go @@ -334,6 +334,32 @@ func (t StaticMap[K, V, O]) IterAll(ctx context.Context) (*OrderedTreeIter[K, V] return &OrderedTreeIter[K, V]{curr: c, stop: stop, step: c.advance}, nil } +// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning. +func (t StaticMap[K, V, O]) IterFromCount(ctx context.Context, startKey K, count uint64) (*OrderedTreeIter[K, V], error) { + // Get cursor at start key + start, err := newLeafCursorAtKey(ctx, t.NodeStore, t.Root, startKey, t.Order) + if err != nil { + return nil, err + } + + // If start cursor is invalid, return empty iterator + if !start.Valid() { + return &OrderedTreeIter[K, V]{curr: nil}, nil + } + + // Track number of items seen + var seen uint64 + stop := func(curr *cursor) bool { + if seen >= count { + return true + } + seen++ + return false + } + + return &OrderedTreeIter[K, V]{curr: &start, stop: stop, step: start.advance}, nil +} + func (t StaticMap[K, V, O]) IterAllReverse(ctx context.Context) (*OrderedTreeIter[K, V], error) { beginning, err := newCursorAtStart(ctx, t.NodeStore, t.Root) if err != nil { diff --git a/go/store/types/map.go b/go/store/types/map.go index 17469fd6b6..fb5dee2deb 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -471,6 +471,45 @@ func (m Map) IterAll(ctx context.Context, cb mapIterAllCallback) error { return nil } +// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning. +func (m Map) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(id string, addr hash.Hash) error) error { + if count == 0 { + return nil + } + + var startVal Value + if startKey != "" { + startVal = String(startKey) + } + + cur, err := newCursorAtValue(ctx, m.orderedSequence, startVal, false, false) + if err != nil { + return err + } + + var processed uint64 + for ; cur.valid() && processed < count; processed++ { + item, err := cur.current() + if err != nil { + return err + } + + entry := item.(mapEntry) + key := string(entry.key.(String)) + val := entry.value.(Ref) + + if err := cb(key, val.TargetHash()); err != nil { + return err + } + + if _, err := cur.advance(ctx); err != nil { + return err + } + } + + return nil +} + func (m Map) IterRange(ctx context.Context, startIdx, endIdx uint64, cb mapIterAllCallback) error { var k Value _, err := iterRange(ctx, m, startIdx, endIdx, func(v Value) error {