Skip to content

Commit

Permalink
go/{libraries, store}: wip, trying to get paging working
Browse files Browse the repository at this point in the history
  • Loading branch information
coffeegoddd committed Jan 16, 2025
1 parent 03a5157 commit df34756
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 38 deletions.
111 changes: 78 additions & 33 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var InMemDoltDB = "mem://"
var ErrNoRootValAtHash = errors.New("there is no dolt root value at that hash")
var ErrCannotDeleteLastBranch = errors.New("cannot delete the last branch")

// RefPageToken is a token for a page of refs.
type RefPageToken struct {
Key string
}

const DefaultRefPageSize = 25

// DoltDB wraps access to the underlying noms database and hides some of the details of the underlying storage.
type DoltDB struct {
db hooksDatabase
Expand Down Expand Up @@ -1076,7 +1083,7 @@ func (ddb *DoltDB) GetTags(ctx context.Context) ([]ref.DoltRef, error) {
// GetTagsPage returns a page of tags in the database, along with a token for retrieving the next page.
// If pageToken is empty, returns the first page. Returns an empty string for nextPageToken when there
// are no more pages.
func (ddb *DoltDB) GetTagsPage(ctx context.Context, pageToken string) ([]ref.DoltRef, string, error) {
func (ddb *DoltDB) GetTagsPage(ctx context.Context, pageToken *RefPageToken) ([]ref.DoltRef, *RefPageToken, error) {
return ddb.GetRefsOfTypePage(ctx, tagsRefFilter, pageToken)
}

Expand Down Expand Up @@ -1201,10 +1208,10 @@ func (ddb *DoltDB) VisitRefsOfTypeByNomsRoot(ctx context.Context, refTypeFilter
return visitDatasets(ctx, refTypeFilter, visit, dss)
}

func (ddb *DoltDB) VisitRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken string, visit func(r ref.DoltRef, addr hash.Hash) error) (nextPageToken string, err error) {
func (ddb *DoltDB) VisitRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken *RefPageToken, visit func(r ref.DoltRef, addr hash.Hash) error) (nextPageToken *RefPageToken, err error) {
dss, err := ddb.db.Datasets(ctx)
if err != nil {
return "", err
return nil, err
}

return visitDatasetsPage(ctx, refTypeFilter, visit, dss, pageToken)
Expand Down Expand Up @@ -1232,48 +1239,86 @@ func visitDatasets(ctx context.Context, refTypeFilter map[ref.RefType]struct{},
})
}

func visitDatasetsPage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, visit func(r ref.DoltRef, addr hash.Hash) error, dss datas.DatasetsMap, pageToken string) (nextPageToken string, err error) {
// Since DatasetsMap interface doesn't have IterPage, we need to use IterAll and handle pagination ourselves
var lastProcessedKey string
var count int
const pageSize = 100
func visitDatasetsPage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, visit func(r ref.DoltRef, addr hash.Hash) error, dss datas.DatasetsMap, pageToken *RefPageToken) (nextPageToken *RefPageToken, err error) {
var startKey string
if pageToken != nil {
startKey = pageToken.Key
}

err = dss.IterAll(ctx, func(key string, addr hash.Hash) error {
// Skip keys until we reach the page token
if pageToken != "" && key <= pageToken {
if refTypeFilter == nil || len(refTypeFilter) == 0 {
var count int
err = dss.IterFromCount(ctx, startKey, uint64(DefaultRefPageSize+1), func(key string, addr hash.Hash) error {
if ref.IsRef(key) {
dref, err := ref.Parse(key)
if err != nil {
return err
}

if count < DefaultRefPageSize {
err = visit(dref, addr)
if err != nil {
return err
}
count++
} else {
nextPageToken = &RefPageToken{Key: key}
return io.EOF
}
}
return nil
}
})

// Process pageSize number of items
if count >= pageSize {
nextPageToken = lastProcessedKey
return io.EOF // Use EOF to break out of iteration
if err == io.EOF {
err = nil
}
return nextPageToken, err
}

if ref.IsRef(key) {
dref, err := ref.Parse(key)
if err != nil {
return err
}
var lastProcessedKey string
var count int
var batchSize uint64 = 100 // Process in smaller batches

if _, ok := refTypeFilter[dref.GetType()]; ok {
err = visit(dref, addr)
for count < DefaultRefPageSize {
var batchCount int
err = dss.IterFromCount(ctx, startKey, batchSize, func(key string, addr hash.Hash) error {
if ref.IsRef(key) {
dref, err := ref.Parse(key)
if err != nil {
return err
}
count++
lastProcessedKey = key
}
}

return nil
})
if _, ok := refTypeFilter[dref.GetType()]; ok {
if count < DefaultRefPageSize {
err = visit(dref, addr)
if err != nil {
return err
}
count++
} else {
nextPageToken = &RefPageToken{Key: key}
return io.EOF // Break out of iteration
}
}
}
lastProcessedKey = key
batchCount++
return nil
})

if err == io.EOF {
err = nil
if err == io.EOF {
err = nil
break
}
if err != nil {
return nil, err
}
if batchCount < int(batchSize) {
break // No more data to process
}
startKey = lastProcessedKey
}

return nextPageToken, err
return nextPageToken, nil
}

// GetRefByNameInsensitive searches this Dolt database's branch, tag, and head refs for a case-insensitive
Expand Down Expand Up @@ -1321,7 +1366,7 @@ func (ddb *DoltDB) GetRefsOfType(ctx context.Context, refTypeFilter map[ref.RefT
return refs, err
}

func (ddb *DoltDB) GetRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken string) ([]ref.DoltRef, string, error) {
func (ddb *DoltDB) GetRefsOfTypePage(ctx context.Context, refTypeFilter map[ref.RefType]struct{}, pageToken *RefPageToken) ([]ref.DoltRef, *RefPageToken, error) {
var refs []ref.DoltRef
nextPageToken, err := ddb.VisitRefsOfTypePage(ctx, refTypeFilter, pageToken, func(r ref.DoltRef, _ hash.Hash) error {
refs = append(refs, r)
Expand Down
10 changes: 5 additions & 5 deletions go/libraries/doltcore/env/actions/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,23 @@ func IterResolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *dolt
// IterResolvedTagsPage iterates over a page of tags in dEnv.DoltDB from newest to oldest, resolving the tag to a commit and calling cb().
// Returns a token for retrieving the next page. If pageToken is empty, returns the first page. Returns an empty string for nextPageToken
// when there are no more pages.
func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken string, cb func(tag *doltdb.Tag) (stop bool, err error)) (nextPageToken string, err error) {
func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken *doltdb.RefPageToken, cb func(tag *doltdb.Tag) (stop bool, err error)) (nextPageToken *doltdb.RefPageToken, err error) {
tagRefs, nextToken, err := ddb.GetTagsPage(ctx, pageToken)

if err != nil {
return "", err
return nil, err
}

var resolved []*doltdb.Tag
for _, r := range tagRefs {
tr, ok := r.(ref.TagRef)
if !ok {
return "", fmt.Errorf("DoltDB.GetTagsPage() returned non-tag DoltRef")
return nil, fmt.Errorf("DoltDB.GetTagsPage() returned non-tag DoltRef")
}

tag, err := ddb.ResolveTag(ctx, tr)
if err != nil {
return "", err
return nil, err
}

resolved = append(resolved, tag)
Expand All @@ -173,7 +173,7 @@ func IterResolvedTagsPage(ctx context.Context, ddb *doltdb.DoltDB, pageToken str
stop, err := cb(tag)

if err != nil {
return "", err
return nil, err
}
if stop {
break
Expand Down
145 changes: 145 additions & 0 deletions go/libraries/doltcore/env/actions/tag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package actions

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"testing"

"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/concurrentmap"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/types"
"github.com/stretchr/testify/require"
)

const (
testHomeDir = "/user/bheni"
workingDir = "/user/bheni/datasets/addresses"
credsDir = "creds"

configFile = "config.json"
GlobalConfigFile = "config_global.json"

repoStateFile = "repo_state.json"
)

func testHomeDirFunc() (string, error) {
return testHomeDir, nil
}

func createTestEnv(isInitialized bool, hasLocalConfig bool) (*env.DoltEnv, *filesys.InMemFS) {
initialDirs := []string{testHomeDir, workingDir}
initialFiles := map[string][]byte{}

if isInitialized {
doltDir := filepath.Join(workingDir, dbfactory.DoltDir)
doltDataDir := filepath.Join(workingDir, dbfactory.DoltDataDir)
initialDirs = append(initialDirs, doltDir)
initialDirs = append(initialDirs, doltDataDir)

mainRef := ref.NewBranchRef(env.DefaultInitBranch)
repoState := &env.RepoState{Head: ref.MarshalableRef{Ref: mainRef}, Remotes: concurrentmap.New[string, env.Remote](), Backups: concurrentmap.New[string, env.Remote](), Branches: concurrentmap.New[string, env.BranchConfig]()}
repoStateData, err := json.Marshal(repoState)

if err != nil {
panic("Could not setup test. Could not marshall repostate struct")
}

initialFiles[getRepoStateFile()] = []byte(repoStateData)

if hasLocalConfig {
initialFiles[getLocalConfigPath()] = []byte(`{"user.name":"bheni"}`)
}
} else if hasLocalConfig {
panic("Bad test. Cant have a local config in a non initialized directory.")
}

fs := filesys.NewInMemFS(initialDirs, initialFiles, workingDir)
dEnv := env.Load(context.Background(), testHomeDirFunc, fs, doltdb.InMemDoltDB, "test")

return dEnv, fs
}

func TestIterResolvedTagsPage(t *testing.T) {
dEnv, _ := createTestEnv(false, false)
ctx := context.Background()

// Initialize repo
err := dEnv.InitRepo(ctx, types.Format_Default, "test user", "[email protected]", "main")
require.NoError(t, err)

totalTags := doltdb.DefaultRefPageSize + doltdb.DefaultRefPageSize

expectedFirstPage := make(map[string]string)
expectedSecondPage := make(map[string]string)
// Create some commits and tags
for i := 0; i < totalTags; i++ {
name := fmt.Sprintf("tag%d", i)
msg := fmt.Sprintf("tag message %d", i)
err := CreateTag(ctx, dEnv, name, "main", TagProps{TaggerName: "test user", TaggerEmail: "[email protected]", Description: msg})
require.NoError(t, err)
if i < doltdb.DefaultRefPageSize {
expectedFirstPage[name] = name
} else {
expectedSecondPage[name] = name
}
}

var tags []string
var pageToken *doltdb.RefPageToken

// Test first page
_, err = IterResolvedTagsPage(ctx, dEnv.DoltDB, nil, func(tag *doltdb.Tag) (bool, error) {
tags = append(tags, tag.Name)
return false, nil
})
require.NoError(t, err)

// Verify tags are in the first page
for _, tag := range tags {
require.Contains(t, expectedFirstPage, tag)
}

// Test with small page size to verify pagination
tags = []string{}
pageToken = nil

var nextToken *doltdb.RefPageToken
for {
nextToken, err = IterResolvedTagsPage(ctx, dEnv.DoltDB, pageToken, func(tag *doltdb.Tag) (bool, error) {
tags = append(tags, tag.Name)
return false, nil
})
require.NoError(t, err)

if nextToken == nil {
break
}
pageToken = nextToken
}

require.Equal(t, len(tags), totalTags)

// Verify tags are in the first page
for _, tag := range expectedFirstPage {
require.Contains(t, tags, tag)
}

// Verify tags are in the second page
for _, tag := range expectedSecondPage {
require.Contains(t, tags, tag)
}
}

func getLocalConfigPath() string {
return filepath.Join(dbfactory.DoltDir, configFile)
}

func getRepoStateFile() string {
return filepath.Join(dbfactory.DoltDir, repoStateFile)
}
1 change: 1 addition & 0 deletions go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DatasetsMap interface {
Len() (uint64, error)

IterAll(ctx context.Context, cb func(id string, addr hash.Hash) error) error
IterFromCount(ctx context.Context, startKey string, count uint64, cb func(id string, addr hash.Hash) error) error
}

// Database provides versioned storage for noms values. While Values can be
Expand Down
10 changes: 10 additions & 0 deletions go/store/datas/database_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (m refmapDatasetsMap) IterAll(ctx context.Context, cb func(string, hash.Has
return m.am.IterAll(ctx, cb)
}

// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning.
func (m refmapDatasetsMap) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(string, hash.Hash) error) error {
return m.am.IterFromCount(ctx, startKey, count, cb)
}

type nomsDatasetsMap struct {
m types.Map
}
Expand All @@ -150,6 +155,11 @@ func (m nomsDatasetsMap) IterAll(ctx context.Context, cb func(string, hash.Hash)
})
}

// IterFromCount iterates over count entries in the map starting from startKey. If startKey is empty, iteration starts from the beginning.
func (m nomsDatasetsMap) IterFromCount(ctx context.Context, startKey string, count uint64, cb func(string, hash.Hash) error) error {
return m.m.IterFromCount(ctx, startKey, count, cb)
}

// Datasets returns the Map of Datasets in the current root. If you intend to edit the map and commit changes back,
// then you should fetch the current root, then call DatasetsInRoot with that hash. Otherwise another writer could
// change the root value between when you get the root hash and call this method.
Expand Down
Loading

0 comments on commit df34756

Please sign in to comment.