Skip to content

Commit

Permalink
Add Pagination to Listing Projects for Housekeeping
Browse files Browse the repository at this point in the history
To avoid all project info being loaded from DB, pagination is added for projects and perform housekeeping in smaller portions.

* Add HousekeepingProjectFetchSize in config

Fix checking the last page

Add housekeeping project fetch size option to the CLI flag

* set project fetch size to 100 in config.sample.yml

fix lint

Move housekeepingProjectPage variable into the Housekeeping.run function

Add keyset pagination to find projects

* update housekeeping test not to set project id to zero value
  • Loading branch information
tedkimdev committed Aug 2, 2023
1 parent 036a991 commit a271e36
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 49 deletions.
6 changes: 6 additions & 0 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func init() {
server.DefaultHousekeepingCandidatesLimitPerProject,
"candidates limit per project for a single housekeeping run",
)
cmd.Flags().IntVar(
&conf.Housekeeping.HousekeepingProjectFetchSize,
"housekeeping-project-fetch-size",
server.DefaultHousekeepingProjectFetchSize,
"housekeeping project fetch size for a single housekeeping run",
)
cmd.Flags().StringVar(
&mongoConnectionURI,
"mongo-connection-uri",
Expand Down
2 changes: 2 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Database interface {
FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*ClientInfo, error)

// FindDocInfoByKey finds the document of the given key.
Expand Down
43 changes: 34 additions & 9 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memory

import (
"bytes"
"context"
"fmt"
gotime "time"
Expand Down Expand Up @@ -224,16 +225,15 @@ func (d *DB) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (d *DB) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (d *DB) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

// TODO(krapie): txn.Get() loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
iter, err := txn.Get(
tblProjects,
"id",
Expand All @@ -242,12 +242,35 @@ func (d *DB) listAllProjectInfos(
return nil, fmt.Errorf("fetch all projects: %w", err)
}

lastIDBytes, err := housekeepingLastProjectID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode last project id: %w", err)
}

var infos []*database.ProjectInfo
for raw := iter.Next(); raw != nil; raw = iter.Next() {
for i := 0; i < pageSize; {
raw := iter.Next()
if raw == nil {
*housekeepingLastProjectID = database.DefaultProjectID
break
}

info := raw.(*database.ProjectInfo).DeepCopy()
infos = append(infos, info)
}

idBytes, err := info.ID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode project id: %w", err)
}

if bytes.Compare(idBytes, lastIDBytes) > 0 {
infos = append(infos, info)
i++
}

if i == pageSize {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}
}
return infos, nil
}

Expand Down Expand Up @@ -599,8 +622,10 @@ func (d *DB) findDeactivateCandidatesPerProject(
func (d *DB) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := d.listAllProjectInfos(ctx)
projects, err := d.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion server/backend/database/memory/housekeeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"

"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/memory"
)

Expand All @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) {
ctx := context.Background()

clientDeactivateThreshold := "23h"
_, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold)

userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)
project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

yesterday := gotime.Now().Add(-24 * gotime.Hour)
Expand All @@ -59,9 +63,12 @@ func TestHousekeeping(t *testing.T) {
clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name()))
assert.NoError(t, err)

housekeepingLastProjectID := database.DefaultProjectID
candidates, err := memdb.FindDeactivateCandidates(
ctx,
10,
10,
&housekeepingLastProjectID,
)
assert.NoError(t, err)
assert.Len(t, candidates, 2)
Expand Down
37 changes: 28 additions & 9 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,38 @@ func (c *Client) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (c *Client) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (c *Client) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
// TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}})
encodedID, err := encodeID(*housekeepingLastProjectID)
if err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, err
}

opts := options.Find()
opts.SetLimit(int64(pageSize))

cursor, err := c.collection(colProjects).Find(ctx, bson.M{
"_id": bson.M{
"$gt": encodedID,
},
}, opts)
if err != nil {
return nil, fmt.Errorf("find project infos: %w", err)
}

var infos []*database.ProjectInfo
if err := cursor.All(ctx, &infos); err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("fetch project infos: %w", err)
}

if len(infos) < pageSize {
*housekeepingLastProjectID = database.DefaultProjectID
} else if len(infos) > 0 {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}

return infos, nil
Expand Down Expand Up @@ -657,8 +674,10 @@ func (c *Client) findDeactivateCandidatesPerProject(
func (c *Client) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := c.listAllProjectInfos(ctx)
projects, err := c.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
61 changes: 61 additions & 0 deletions server/backend/housekeeping/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package housekeeping

import (
"fmt"
"time"
)

// Config is the configuration for the housekeeping service.
type Config struct {
// Interval is the time between housekeeping runs.
Interval string `yaml:"Interval"`

// CandidatesLimitPerProject is the maximum number of candidates to be returned per project.
CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"`

// HousekeepingProjectFetchSize is the maximum number of projects to be returned to deactivate candidates.
HousekeepingProjectFetchSize int `yaml:"HousekeepingProjectFetchSize"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if _, err := time.ParseDuration(c.Interval); err != nil {
return fmt.Errorf(
`invalid argument %s for "--housekeeping-interval" flag: %w`,
c.Interval,
err,
)
}

if c.CandidatesLimitPerProject <= 0 {
return fmt.Errorf(
`invalid argument %d for "--housekeeping-candidates-limit-per-project" flag`,
c.HousekeepingProjectFetchSize,
)
}

if c.HousekeepingProjectFetchSize <= 0 {
return fmt.Errorf(
`invalid argument %d for "--housekeeping-project-fetc-size" flag`,
c.HousekeepingProjectFetchSize,
)
}

return nil
}
48 changes: 48 additions & 0 deletions server/backend/housekeeping/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package housekeeping_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/server/backend/housekeeping"
)

func TestConfig(t *testing.T) {
t.Run("validate test", func(t *testing.T) {
validConf := housekeeping.Config{
Interval: "1m",
CandidatesLimitPerProject: 100,
HousekeepingProjectFetchSize: 100,
}
assert.NoError(t, validConf.Validate())

conf1 := validConf
conf1.Interval = "hour"
assert.Error(t, conf1.Validate())

conf2 := validConf
conf2.CandidatesLimitPerProject = 0
assert.Error(t, conf2.Validate())

conf3 := validConf
conf3.HousekeepingProjectFetchSize = -1
assert.Error(t, conf3.Validate())
})
}
33 changes: 9 additions & 24 deletions server/backend/housekeeping/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/clients"
Expand All @@ -34,28 +35,6 @@ const (
deactivateCandidatesKey = "housekeeping/deactivateCandidates"
)

// Config is the configuration for the housekeeping service.
type Config struct {
// Interval is the time between housekeeping runs.
Interval string `yaml:"Interval"`

// CandidatesLimitPerProject is the maximum number of candidates to be returned per project.
CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if _, err := time.ParseDuration(c.Interval); err != nil {
return fmt.Errorf(
`invalid argument %s for "--housekeeping-interval" flag: %w`,
c.Interval,
err,
)
}

return nil
}

// Housekeeping is the housekeeping service. It periodically runs housekeeping
// tasks. It is responsible for deactivating clients that have not been active
// for a long time.
Expand All @@ -65,6 +44,7 @@ type Housekeeping struct {

interval time.Duration
candidatesLimitPerProject int
projectFetchSize int

ctx context.Context
cancelFunc context.CancelFunc
Expand Down Expand Up @@ -106,6 +86,7 @@ func New(

interval: interval,
candidatesLimitPerProject: conf.CandidatesLimitPerProject,
projectFetchSize: conf.HousekeepingProjectFetchSize,

ctx: ctx,
cancelFunc: cancelFunc,
Expand All @@ -127,9 +108,11 @@ func (h *Housekeeping) Stop() error {

// run is the housekeeping loop.
func (h *Housekeeping) run() {
housekeepingLastProjectID := database.DefaultProjectID

for {
ctx := context.Background()
if err := h.deactivateCandidates(ctx); err != nil {
if err := h.deactivateCandidates(ctx, &housekeepingLastProjectID); err != nil {
continue
}

Expand All @@ -142,7 +125,7 @@ func (h *Housekeeping) run() {
}

// deactivateCandidates deactivates candidates.
func (h *Housekeeping) deactivateCandidates(ctx context.Context) error {
func (h *Housekeeping) deactivateCandidates(ctx context.Context, housekeepingLastProjectID *types.ID) error {
start := time.Now()
locker, err := h.coordinator.NewLocker(ctx, deactivateCandidatesKey)
if err != nil {
Expand All @@ -162,6 +145,8 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error {
candidates, err := h.database.FindDeactivateCandidates(
ctx,
h.candidatesLimitPerProject,
h.projectFetchSize,
housekeepingLastProjectID,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit a271e36

Please sign in to comment.