Skip to content

Commit

Permalink
Add PushPull consistency test
Browse files Browse the repository at this point in the history
  • Loading branch information
sejongk committed Sep 6, 2024
1 parent b77c25a commit d78f6ff
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 10 deletions.
11 changes: 11 additions & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package packs

import (
"context"
"errors"
"fmt"
"strconv"
gotime "time"
Expand All @@ -37,6 +38,10 @@ import (
"github.com/yorkie-team/yorkie/server/logging"
)

var (
ErrCheckpointTest = errors.New("failure for checkpoint testing purpose")

Check failure on line 42 in server/packs/packs.go

View workflow job for this annotation

GitHub Actions / build

exported: exported var ErrCheckpointTest should have comment or be unexported (revive)
)

// PushPullKey creates a new sync.Key of PushPull for the given document.
func PushPullKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey))
Expand Down Expand Up @@ -69,6 +74,7 @@ func PushPull(
docInfo *database.DocInfo,
reqPack *change.Pack,
opts PushPullOptions,
cpTest bool,
) (*ServerPack, error) {
start := gotime.Now()
defer func() {
Expand Down Expand Up @@ -123,6 +129,11 @@ func PushPull(
}
}

// For consistency testing purposes
if cpTest {
return nil, ErrCheckpointTest
}

if err := be.DB.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo); err != nil {
return nil, err
}
Expand Down
232 changes: 232 additions & 0 deletions server/packs/packs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package packs_test

import (
"context"
"encoding/hex"
"fmt"
"log"
"net/http"
"os"
"testing"

"connectrpc.com/connect"
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect"
"github.com/yorkie-team/yorkie/client"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/mongo"
"github.com/yorkie-team/yorkie/server/backend/housekeeping"
"github.com/yorkie-team/yorkie/server/clients"
"github.com/yorkie-team/yorkie/server/documents"
"github.com/yorkie-team/yorkie/server/packs"
"github.com/yorkie-team/yorkie/server/profiling/prometheus"
"github.com/yorkie-team/yorkie/server/rpc"
"github.com/yorkie-team/yorkie/test/helper"
)

var (
testRPCServer *rpc.Server
testRPCAddr = fmt.Sprintf("localhost:%d", helper.RPCPort)
testClient v1connect.YorkieServiceClient
testBackend *backend.Backend
)

func TestMain(m *testing.M) {
met, err := prometheus.NewMetrics()
if err != nil {
log.Fatal(err)
}

testBackend, err = backend.New(&backend.Config{
AdminUser: helper.AdminUser,
AdminPassword: helper.AdminPassword,
UseDefaultProject: helper.UseDefaultProject,
ClientDeactivateThreshold: helper.ClientDeactivateThreshold,
SnapshotThreshold: helper.SnapshotThreshold,
AuthWebhookCacheSize: helper.AuthWebhookSize,
ProjectInfoCacheSize: helper.ProjectInfoCacheSize,
ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(),
AdminTokenDuration: helper.AdminTokenDuration,
}, &mongo.Config{
ConnectionURI: helper.MongoConnectionURI,
YorkieDatabase: helper.TestDBName(),
ConnectionTimeout: helper.MongoConnectionTimeout,
PingTimeout: helper.MongoPingTimeout,
}, &housekeeping.Config{
Interval: helper.HousekeepingInterval.String(),
CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject,
ProjectFetchSize: helper.HousekeepingProjectFetchSize,
}, met)
if err != nil {
log.Fatal(err)
}

project, err := testBackend.DB.FindProjectInfoByID(
context.Background(),
database.DefaultProjectID,
)
if err != nil {
log.Fatal(err)
}

testRPCServer, err = rpc.NewServer(&rpc.Config{
Port: helper.RPCPort,
}, testBackend)
if err != nil {
log.Fatal(err)
}

if err = testRPCServer.Start(); err != nil {
log.Fatalf("failed rpc listen: %s\n", err)
}
if err = helper.WaitForServerToStart(testRPCAddr); err != nil {
log.Fatal(err)
}

authInterceptor := client.NewAuthInterceptor(project.PublicKey, "")

conn := http.DefaultClient
testClient = v1connect.NewYorkieServiceClient(
conn,
"http://"+testRPCAddr,
connect.WithInterceptors(authInterceptor),
)

code := m.Run()

if err := testBackend.Shutdown(); err != nil {
log.Fatal(err)
}
testRPCServer.Shutdown(true)
os.Exit(code)
}

func TestPacks(t *testing.T) {
t.Run("pushpull consistency test", func(t *testing.T) {
ctx := context.Background()

projectInfo, err := testBackend.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)
assert.NoError(t, err)
project := projectInfo.ToProject()

activateResp, err := testClient.ActivateClient(
context.Background(),
connect.NewRequest(&api.ActivateClientRequest{ClientKey: helper.TestDocKey(t).String()}))
assert.NoError(t, err)

clientID, _ := hex.DecodeString(activateResp.Msg.ClientId)
resPack, err := testClient.AttachDocument(
context.Background(),
connect.NewRequest(&api.AttachDocumentRequest{
ClientId: activateResp.Msg.ClientId,
ChangePack: &api.ChangePack{
DocumentKey: helper.TestDocKey(t).String(),
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 1},
Changes: []*api.Change{{
Id: &api.ChangeID{
ClientSeq: 1,
Lamport: 1,
ActorId: clientID,
},
}},
},
},
))
assert.NoError(t, err)

actorID, err := time.ActorIDFromBytes(clientID)
assert.NoError(t, err)

docID := types.ID(resPack.Msg.DocumentId)
docRefKey := types.DocRefKey{
ProjectID: project.ID,
DocID: docID,
}

// 0. Check docInfo.ServerSeq and clientInfo.Checkpoint
docInfo, err := documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey)
assert.NoError(t, err)
assert.Equal(t, int64(1), docInfo.ServerSeq)

clientInfo, err := clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
})
assert.NoError(t, err)
assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq)
assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq)

// 1. Create a ChangePack with a single Change
pack, err := converter.FromChangePack(&api.ChangePack{
DocumentKey: helper.TestDocKey(t).String(),
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2},
Changes: []*api.Change{{
Id: &api.ChangeID{
ClientSeq: 2,
Lamport: 2,
ActorId: clientID,
},
}},
})
assert.NoError(t, err)

// 2-1. An arbitrary failure occurs while updating clientInfo
_, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
}, true)
assert.ErrorIs(t, err, packs.ErrCheckpointTest)

// 2-2. docInfo.ServerSeq increases from 1 to 2
docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey)
assert.NoError(t, err)
assert.Equal(t, int64(2), docInfo.ServerSeq)

// 2-3. clientInfo.Checkpoint has not been updated
clientInfo, err = clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
})
assert.NoError(t, err)
assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq)
assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq)

// 3-1. A duplicate request is sent
_, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
}, false)
assert.NoError(t, err)

// 3-2. The server should detect the duplication and not update docInfo.ServerSeq
docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey)
assert.NoError(t, err)
assert.Equal(t, int64(2), docInfo.ServerSeq)
})
}
8 changes: 4 additions & 4 deletions server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *yorkieServer) AttachDocument(
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *yorkieServer) DetachDocument(
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: status,
})
}, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *yorkieServer) PushPullChanges(
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: syncMode,
Status: document.StatusAttached,
})
}, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func (s *yorkieServer) RemoveDocument(
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusRemoved,
})
}, false)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions test/bench/push_pull_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func benchmarkPushChanges(
_, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)
}
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func benchmarkPullChanges(
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
Expand All @@ -187,7 +187,7 @@ func benchmarkPullChanges(
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)
}
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func benchmarkPushSnapshots(
pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)

b.StopTimer()
Expand Down Expand Up @@ -259,7 +259,7 @@ func benchmarkPullSnapshot(
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
Expand All @@ -269,7 +269,7 @@ func benchmarkPullSnapshot(
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
}, false)
assert.NoError(b, err)
}
}
Expand Down

0 comments on commit d78f6ff

Please sign in to comment.