diff --git a/rest/attachment_test.go b/rest/attachment_test.go index 872a10abfa..964315ac47 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2428,9 +2428,7 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) { btcRunner.WaitForVersion(btc.id, docID, version) // Add an attachment to client - btcRunner.AttachmentsLock(btc.id).Lock() - btcRunner.Attachments(btc.id)["sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="] = []byte("goodbye cruel world") - btcRunner.AttachmentsLock(btc.id).Unlock() + btcRunner.saveAttachment(btc.id, base64.StdEncoding.EncodeToString([]byte("goodbye cruel world"))) // Put doc with an erroneous revpos 1 but with a different digest, referring to the above attachment updatedVersion, err := btcRunner.PushRevWithHistory(btc.id, docID, &version, []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 2ce8812b0b..72ef418d0e 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1948,7 +1948,7 @@ func TestSendReplacementRevision(t *testing.T) { // one shot or else we'll carry on to send rev 2-... normally, and we can't assert correctly on the final state of the client rt.WaitForPendingChanges() - btcRunner.StartOneshotPullFiltered(btc.id, replicationChannels) + btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: replicationChannels, Continuous: false}) // block until we've written the update and got the new version to use in assertions version2 := <-updatedVersion @@ -2270,7 +2270,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi assert.Equal(t, docID, changes.Results[0].ID) RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) - btcRunner.StartOneshotPullFiltered(btc.id, "A") + btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, docID, version) _ = rt.UpdateDoc(docID, version, `{"channels": ["B"]}`) @@ -2285,7 +2285,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi assert.Equal(t, "doc", changes.Results[0].ID) assert.Equal(t, markerID, changes.Results[1].ID) - btcRunner.StartOneshotPullFiltered(btc.id, "A") + btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, markerID, markerVersion) messages := btc.pullReplication.GetMessages() @@ -2879,7 +2879,7 @@ func TestRequestPlusPull(t *testing.T) { caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() // Start a regular one-shot pull - btcRunner.StartOneshotPullRequestPlus(client.id) + btcRunner.StartPullSince(client.id, BlipTesterPullOptions{Continuous: true, RequestPlus: true}) // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1)) diff --git a/rest/blip_client_test.go b/rest/blip_client_test.go index 2966e117be..11f2f63131 100644 --- a/rest/blip_client_test.go +++ b/rest/blip_client_test.go @@ -73,44 +73,44 @@ type BlipTesterClient struct { } // getClientDocForSeq returns the clientDoc for the given sequence number, if it exists. -func (c *BlipTesterCollectionClient) getClientDocForSeq(seq clientSeq) (*clientDoc, bool) { - c.seqLock.RLock() - defer c.seqLock.RUnlock() - doc, ok := c._seqStore[seq] +func (btcc *BlipTesterCollectionClient) getClientDocForSeq(seq clientSeq) (*clientDoc, bool) { + btcc.seqLock.RLock() + defer btcc.seqLock.RUnlock() + doc, ok := btcc._seqStore[seq] return doc, ok } // OneShotDocsSince is an iterator that yields client sequence and document pairs that are newer than the given since value. -func (c *BlipTesterCollectionClient) OneShotDocsSince(ctx context.Context, since clientSeq) iter.Seq2[clientSeq, *clientDoc] { +func (btcc *BlipTesterCollectionClient) OneShotDocsSince(ctx context.Context, since clientSeq) iter.Seq2[clientSeq, *clientDoc] { return func(yield func(clientSeq, *clientDoc) bool) { - c.seqLock.Lock() - seqLast := c._seqLast - for c._seqLast <= since { + btcc.seqLock.Lock() + seqLast := btcc._seqLast + for btcc._seqLast <= since { if ctx.Err() != nil { - c.seqLock.Unlock() + btcc.seqLock.Unlock() return } // block until new seq - base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - waiting for new sequence", since, c._seqLast) - c._seqCond.Wait() + base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - waiting for new sequence", since, btcc._seqLast) + btcc._seqCond.Wait() // Check to see if we were woken because of Close() if ctx.Err() != nil { - c.seqLock.Unlock() + btcc.seqLock.Unlock() return } - seqLast = c._seqLast - base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - woke up", since, c._seqLast) + seqLast = btcc._seqLast + base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - woke up", since, btcc._seqLast) } - c.seqLock.Unlock() + btcc.seqLock.Unlock() base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - iterating", since, seqLast) for seq := since; seq <= seqLast; seq++ { - doc, ok := c.getClientDocForSeq(seq) + doc, ok := btcc.getClientDocForSeq(seq) // filter non-latest entries in cases where we haven't pruned _seqStore if !ok { continue } // make sure that seq is latestseq - require.Equal(c.TB(), doc.latestSeq(), seq, "this should've been pruned out!") + require.Equal(btcc.TB(), doc.latestSeq(), seq, "this should've been pruned out!") if !yield(seq, doc) { base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: since=%d, _seqLast=%d - stopping iteration", since, seqLast) return @@ -122,11 +122,11 @@ func (c *BlipTesterCollectionClient) OneShotDocsSince(ctx context.Context, since // docsSince returns a channel which will yield client documents that are newer than the given since value. // The channel will be closed when the iteration is finished. In the case of a continuous iteration, the channel will remain open until the context is cancelled. -func (c *BlipTesterCollectionClient) docsSince(ctx context.Context, since clientSeq, continuous bool) chan *clientDoc { +func (btcc *BlipTesterCollectionClient) docsSince(ctx context.Context, since clientSeq, continuous bool) chan *clientDoc { ch := make(chan *clientDoc) - c.goroutineWg.Add(1) + btcc.goroutineWg.Add(1) go func() { - defer c.goroutineWg.Done() + defer btcc.goroutineWg.Done() sinceVal := since defer close(ch) for { @@ -134,7 +134,7 @@ func (c *BlipTesterCollectionClient) docsSince(ctx context.Context, since client return } base.DebugfCtx(ctx, base.KeySGTest, "OneShotDocsSince: sinceVal=%d", sinceVal) - for _, doc := range c.OneShotDocsSince(ctx, sinceVal) { + for _, doc := range btcc.OneShotDocsSince(ctx, sinceVal) { select { case <-ctx.Done(): return @@ -181,6 +181,7 @@ func (cd *clientDoc) docRevSeqsNewestToOldest() []clientSeq { return cd._docRevSeqsNewestToOldest() } +// _docRevSeqsNewestToOldest returns a list of sequences associated with this document, ordered newest to oldest. func (cd *clientDoc) _docRevSeqsNewestToOldest() []clientSeq { seqs := make([]clientSeq, 0, len(cd._revisionsBySeq)) for _, rev := range cd._revisionsBySeq { @@ -296,6 +297,7 @@ func (btcc *BlipTesterCollectionClient) getClientDoc(docID string) (*clientDoc, return btcc._getClientDoc(docID) } +// _getClientDoc returns the clientDoc for the given docID, if it exists. Requires BlipTesterCollectionClient.seqLock read lock to be held. func (btcc *BlipTesterCollectionClient) _getClientDoc(docID string) (*clientDoc, bool) { seq, ok := btcc._seqFromDocID[docID] if !ok { @@ -333,6 +335,7 @@ func NewBlipTesterClientRunner(t *testing.T) *BlipTestClientRunner { } } +// Close shuts down all the clients and clears all messages stored. func (btr *BlipTesterReplicator) Close() { btr.bt.Close() btr.messagesLock.Lock() @@ -340,28 +343,38 @@ func (btr *BlipTesterReplicator) Close() { btr.messagesLock.Unlock() } +// initHandlers sets up the blip client side handles for each message type. func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { - revsLimit := base.IntDefault(btc.revsLimit, defaultBlipTesterClientRevsLimit) if btr.replicationStats == nil { btr.replicationStats = db.NewBlipSyncStats() } ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil) - btr.bt.blipContext.HandlerForProfile[db.MessageProveAttachment] = func(msg *blip.Message) { - btr.storeMessage(msg) + btr.bt.blipContext.DefaultHandler = btr.defaultHandler() + btr.bt.blipContext.HandlerForProfile[db.MessageNoRev] = btr.handleNoRev(btc) + btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = btr.handleGetAttachment(btc) + btr.bt.blipContext.HandlerForProfile[db.MessageRev] = btr.handleRev(ctx, btc) + btr.bt.blipContext.HandlerForProfile[db.MessageProposeChanges] = btr.handleProposeChanges(btc) + btr.bt.blipContext.HandlerForProfile[db.MessageChanges] = btr.handleChanges(btc) + btr.bt.blipContext.HandlerForProfile[db.MessageProveAttachment] = btr.handleProveAttachment(ctx, btc) +} + +// handleProveAttachment handles proveAttachment received by blip client +func (btr *BlipTesterReplicator) handleProveAttachment(ctx context.Context, btc *BlipTesterClient) func(*blip.Message) { + return func(msg *blip.Message) { + defer btr.storeMessage(msg) nonce, err := msg.Body() require.NoError(btr.TB(), err) - require.NotEmpty(btr.TB(), nonce, "no nonce sent with proveAttachment") digest, ok := msg.Properties[db.ProveAttachmentDigest] require.True(btr.TB(), ok, "no digest sent with proveAttachment") - btcr := btc.getCollectionClientFromMessage(msg) + btcc := btc.getCollectionClientFromMessage(msg) - attData := btcr.getAttachment(digest) + attData := btcc.getAttachment(digest) proof := db.ProveAttachment(ctx, attData, nonce) @@ -370,10 +383,15 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { btr.replicationStats.ProveAttachment.Add(1) } - btr.bt.blipContext.HandlerForProfile[db.MessageChanges] = func(msg *blip.Message) { - btr.storeMessage(msg) +} + +// handleChanges handles changes messages on the blip tester client +func (btr *BlipTesterReplicator) handleChanges(btc *BlipTesterClient) func(*blip.Message) { + revsLimit := base.IntDefault(btc.revsLimit, defaultBlipTesterClientRevsLimit) + return func(msg *blip.Message) { + defer btr.storeMessage(msg) - btcr := btc.getCollectionClientFromMessage(msg) + btcc := btc.getCollectionClientFromMessage(msg) // Exit early when there's nothing to do if msg.NoReply() { @@ -411,7 +429,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // Build up a list of revisions known to the client for each change // The first element of each revision list must be the parent revision of the change - if doc, haveDoc := btcr.getClientDoc(docID); haveDoc { + if doc, haveDoc := btcc.getClientDoc(docID); haveDoc { docSeqs := doc.docRevSeqsNewestToOldest() revList := make([]string, 0, revsLimit) @@ -454,15 +472,21 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { response.SetBody(b) } +} - btr.bt.blipContext.HandlerForProfile[db.MessageProposeChanges] = func(msg *blip.Message) { +// handleProposeChanges handles proposeChanges messages on the blip tester client +func (btr *BlipTesterReplicator) handleProposeChanges(btc *BlipTesterClient) func(msg *blip.Message) { + return func(msg *blip.Message) { btc.pullReplication.storeMessage(msg) } +} - btr.bt.blipContext.HandlerForProfile[db.MessageRev] = func(msg *blip.Message) { - btc.pullReplication.storeMessage(msg) +// handleRev handles rev messages on the blip tester client +func (btr *BlipTesterReplicator) handleRev(ctx context.Context, btc *BlipTesterClient) func(msg *blip.Message) { + return func(msg *blip.Message) { + defer btc.pullReplication.storeMessage(msg) - btcr := btc.getCollectionClientFromMessage(msg) + btcc := btc.getCollectionClientFromMessage(msg) docID := msg.Properties[db.RevMessageID] revID := msg.Properties[db.RevMessageRev] @@ -473,10 +497,10 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { require.NoError(btr.TB(), err) if msg.Properties[db.RevMessageDeleted] == "1" { - btcr.seqLock.Lock() - defer btcr.seqLock.Unlock() - btcr._seqLast++ - newClientSeq := btcr._seqLast + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + btcc._seqLast++ + newClientSeq := btcc._seqLast newVersion := DocVersion{RevID: revID} docRev := clientDocRev{ @@ -487,7 +511,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { message: msg, } - doc, ok := btcr._getClientDoc(docID) + doc, ok := btcc._getClientDoc(docID) if !ok { doc = &clientDoc{ id: docID, @@ -501,11 +525,11 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { } } else { // remove existing entry and replace with new seq - delete(btcr._seqStore, doc.latestSeq()) + delete(btcc._seqStore, doc.latestSeq()) doc.addNewRev(docRev) } - btcr._seqStore[newClientSeq] = doc - btcr._seqFromDocID[docID] = newClientSeq + btcc._seqStore[newClientSeq] = doc + btcc._seqFromDocID[docID] = newClientSeq if replacedRev != "" { // store the new sequence for a replaced rev for tests waiting for this specific rev @@ -540,7 +564,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { require.NoError(btc.TB(), err) var old db.Body - doc, ok := btcr.getClientDoc(docID) + doc, ok := btcc.getClientDoc(docID) require.True(btc.TB(), ok, "docID %q not found in _seqFromDocID", docID) oldRev, err := doc.getRev(DocVersion{RevID: deltaSrc}) require.NoError(btc.TB(), err) @@ -569,25 +593,23 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var missingDigests []string var knownDigests []string - btcr.attachmentsLock.RLock() + btcc.attachmentsLock.RLock() for _, attachment := range attsMap { attMap, ok := attachment.(map[string]interface{}) require.True(btr.TB(), ok, "att in doc wasn't map[string]interface{}") digest := attMap["digest"].(string) - if _, found := btcr._attachments[digest]; !found { + if _, found := btcc._attachments[digest]; !found { missingDigests = append(missingDigests, digest) - } else { - if btr.bt.activeSubprotocol == db.CBMobileReplicationV2 { - // only v2 clients care about proveAttachments - knownDigests = append(knownDigests, digest) - } + } else if btr.bt.activeSubprotocol == db.CBMobileReplicationV2 { + // only v2 clients care about proveAttachments + knownDigests = append(knownDigests, digest) } } - btcr.attachmentsLock.RUnlock() + btcc.attachmentsLock.RUnlock() for _, digest := range knownDigests { - attData := btcr.getAttachment(digest) + attData := btcc.getAttachment(digest) nonce, proof, err := db.GenerateProofOfAttachment(ctx, attData) require.NoError(btr.TB(), err) @@ -597,7 +619,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { outrq.Properties[db.ProveAttachmentDigest] = digest outrq.SetBody(nonce) - btcr.sendPullMsg(outrq) + btcc.sendPullMsg(outrq) resp := outrq.Response() btc.pullReplication.storeMessage(resp) @@ -632,7 +654,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { outrq.Properties[db.GetAttachmentID] = docID } - btcr.sendPullMsg(outrq) + btcc.sendPullMsg(outrq) resp := outrq.Response() btc.pullReplication.storeMessage(resp) @@ -649,9 +671,9 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { } } - btcr.attachmentsLock.Lock() - btcr._attachments[digest] = respBody - btcr.attachmentsLock.Unlock() + btcc.attachmentsLock.Lock() + btcc._attachments[digest] = respBody + btcc.attachmentsLock.Unlock() } } @@ -663,10 +685,10 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { } // TODO: Duplicated code from the deleted case above - factor into shared function? - btcr.seqLock.Lock() - defer btcr.seqLock.Unlock() - btcr._seqLast++ - newClientSeq := btcr._seqLast + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + btcc._seqLast++ + newClientSeq := btcc._seqLast newVersion := DocVersion{RevID: revID} docRev := clientDocRev{ @@ -676,7 +698,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { message: msg, } - doc, ok := btcr._getClientDoc(docID) + doc, ok := btcc._getClientDoc(docID) if !ok { doc = &clientDoc{ id: docID, @@ -690,11 +712,11 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { } } else { // remove existing entry and replace with new seq - delete(btcr._seqStore, doc.latestSeq()) + delete(btcc._seqStore, doc.latestSeq()) doc.addNewRev(docRev) } - btcr._seqStore[newClientSeq] = doc - btcr._seqFromDocID[docID] = newClientSeq + btcc._seqStore[newClientSeq] = doc + btcc._seqFromDocID[docID] = newClientSeq if replacedRev != "" { // store the new sequence for a replaced rev for tests waiting for this specific rev @@ -709,35 +731,42 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { response.SetBody([]byte(`[]`)) } } +} - btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) { - btr.storeMessage(msg) +// handleGetAttachment handles getAttachment messages on the blip tester client +func (btr *BlipTesterReplicator) handleGetAttachment(btc *BlipTesterClient) func(msg *blip.Message) { + return func(msg *blip.Message) { + defer btr.storeMessage(msg) digest, ok := msg.Properties[db.GetAttachmentDigest] require.True(btr.TB(), ok, "couldn't find digest in getAttachment message properties") - btcr := btc.getCollectionClientFromMessage(msg) + btcc := btc.getCollectionClientFromMessage(msg) - attachment := btcr.getAttachment(digest) + attachment := btcc.getAttachment(digest) response := msg.Response() response.SetBody(attachment) btr.replicationStats.GetAttachment.Add(1) } - btr.bt.blipContext.HandlerForProfile[db.MessageNoRev] = func(msg *blip.Message) { - btr.storeMessage(msg) +} - btcr := btc.getCollectionClientFromMessage(msg) +// handleNoRev handles noRev messages on the blip tester client +func (btr *BlipTesterReplicator) handleNoRev(btc *BlipTesterClient) func(msg *blip.Message) { + return func(msg *blip.Message) { + defer btr.storeMessage(msg) + + btcc := btc.getCollectionClientFromMessage(msg) docID := msg.Properties[db.NorevMessageId] revID := msg.Properties[db.NorevMessageRev] - btcr.seqLock.Lock() - defer btcr.seqLock.Unlock() - btcr._seqLast++ - newSeq := btcr._seqLast - doc, ok := btcr._getClientDoc(docID) + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + btcc._seqLast++ + newSeq := btcc._seqLast + doc, ok := btcc._getClientDoc(docID) if !ok { doc = &clientDoc{ id: docID, @@ -753,13 +782,17 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { isDelete: false, message: msg, }) - btcr._seqStore[newSeq] = doc - btcr._seqFromDocID[docID] = newSeq + btcc._seqStore[newSeq] = doc + btcc._seqFromDocID[docID] = newSeq } - btr.bt.blipContext.DefaultHandler = func(msg *blip.Message) { +} + +// defaultHandler is the default handler for the blip tester client, this will fail the test harness +func (btr *BlipTesterReplicator) defaultHandler() func(msg *blip.Message) { + return func(msg *blip.Message) { btr.storeMessage(msg) - base.PanicfCtx(ctx, "Unknown profile: %s caught by client DefaultHandler - msg: %#v", msg.Profile(), msg) + require.FailNow(btr.TB(), fmt.Sprintf("Unknown profile: %s caught by client DefaultHandler - msg: %#v", msg.Profile(), msg)) } } @@ -769,53 +802,56 @@ func (btr *BlipTesterReplicator) TB() testing.TB { } // TB returns testing.TB for the current test -func (btc *BlipTesterCollectionClient) TB() testing.TB { - return btc.parent.rt.TB() +func (btcc *BlipTesterCollectionClient) TB() testing.TB { + return btcc.parent.rt.TB() } // saveAttachment takes base64 encoded data and stores the attachment on the client. -func (btc *BlipTesterCollectionClient) saveAttachment(base64data string) (dataLength int, digest string) { - btc.attachmentsLock.Lock() - defer btc.attachmentsLock.Unlock() +func (btcc *BlipTesterCollectionClient) saveAttachment(base64data string) (dataLength int, digest string) { + btcc.attachmentsLock.Lock() + defer btcc.attachmentsLock.Unlock() - ctx := base.DatabaseLogCtx(base.TestCtx(btc.parent.rt.TB()), btc.parent.rt.GetDatabase().Name, nil) + ctx := base.DatabaseLogCtx(base.TestCtx(btcc.parent.rt.TB()), btcc.parent.rt.GetDatabase().Name, nil) data, err := base64.StdEncoding.DecodeString(base64data) - require.NoError(btc.TB(), err) + require.NoError(btcc.TB(), err) digest = db.Sha1DigestKey(data) - if _, found := btc._attachments[digest]; found { + if _, found := btcc._attachments[digest]; found { base.InfofCtx(ctx, base.KeySync, "attachment with digest %s already exists", digest) } else { - btc._attachments[digest] = data + btcc._attachments[digest] = data } return len(data), digest } -func (btc *BlipTesterCollectionClient) getAttachment(digest string) (attachment []byte) { - btc.attachmentsLock.RLock() - defer btc.attachmentsLock.RUnlock() +// getAttachment returns the attachment data for the given digest. The test will fail if the attachment is not found. +func (btcc *BlipTesterCollectionClient) getAttachment(digest string) (attachment []byte) { + btcc.attachmentsLock.RLock() + defer btcc.attachmentsLock.RUnlock() - attachment, found := btc._attachments[digest] - require.True(btc.TB(), found, "attachment with digest %s not found", digest) + attachment, found := btcc._attachments[digest] + require.True(btcc.TB(), found, "attachment with digest %s not found", digest) return attachment } -func (btc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, version DocVersion) { - btc.seqLock.Lock() - defer btc.seqLock.Unlock() - doc, ok := btc._getClientDoc(docID) - require.True(btc.TB(), ok, "docID %q not found in _seqFromDocID", docID) +// updateLastReplicatedRev stores this version as the last version replicated to Sync Gateway. +func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, version DocVersion) { + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + doc, ok := btcc._getClientDoc(docID) + require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", docID) doc.setLatestServerVersion(version) } -func (btc *BlipTesterCollectionClient) getLastReplicatedRev(docID string) (version DocVersion, ok bool) { - btc.seqLock.Lock() - defer btc.seqLock.Unlock() - doc, ok := btc._getClientDoc(docID) - require.True(btc.TB(), ok, "docID %q not found in _seqFromDocID", docID) +// getLastReplicatedRev returns the last version replicated to Sync Gateway for the given docID. +func (btcc *BlipTesterCollectionClient) getLastReplicatedRev(docID string) (version DocVersion, ok bool) { + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + doc, ok := btcc._getClientDoc(docID) + require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", docID) doc.lock.RLock() latestServerVersion := doc._latestServerVersion doc.lock.RUnlock() @@ -883,6 +919,7 @@ func (btc *BlipTesterClient) TB() testing.TB { return btc.rt.TB() } +// Close shuts down all the clients and clears all messages stored. func (btc *BlipTesterClient) Close() { btc.tearDownBlipClientReplications() for _, collectionClient := range btc.collectionClients { @@ -898,6 +935,7 @@ func (btcRunner *BlipTestClientRunner) TB() testing.TB { return btcRunner.t } +// Run is the main entry point for running the blip tester client and its associated methods in test framework and should be used instead of t.Run func (btcRunner *BlipTestClientRunner) Run(test func(t *testing.T, SupportedBLIPProtocols []string)) { btcRunner.initialisedInsideRunnerCode = true // reset to protect against someone creating a new client after Run() is run @@ -907,11 +945,13 @@ func (btcRunner *BlipTestClientRunner) Run(test func(t *testing.T, SupportedBLIP }) } +// tearDownBlipClientReplications closes the push and pull replications for the client. func (btc *BlipTesterClient) tearDownBlipClientReplications() { btc.pullReplication.Close() btc.pushReplication.Close() } +// createBlipTesterReplications creates the push and pull replications for the client. func (btc *BlipTesterClient) createBlipTesterReplications() { id, err := uuid.NewRandom() require.NoError(btc.TB(), err) @@ -933,6 +973,7 @@ func (btc *BlipTesterClient) createBlipTesterReplications() { btc.pushReplication.bt.avoidRestTesterClose = true } +// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection. func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) { btcReplicator := NewBlipTesterCollectionClient(btc) btcReplicator.collection = collection @@ -940,6 +981,7 @@ func (btc *BlipTesterClient) initCollectionReplication(collection string, collec btc.collectionClients[collectionIdx] = btcReplicator } +// waitForReplicationMessage waits for a replication message with the given serial number. func (btc *BlipTesterClient) waitForReplicationMessage(collection *db.DatabaseCollection, serialNumber blip.MessageNumber) *blip.Message { if base.IsDefaultCollection(collection.ScopeName, collection.Name) { return btc.pushReplication.WaitForMessage(serialNumber) @@ -976,9 +1018,9 @@ type BlipTesterPushOptions struct { Since string // TODO: Not Implemented - //Channels string - //DocIDs []string - //changesBatchSize int + // Channels string + // DocIDs []string + // changesBatchSize int } // StartPush will begin a continuous push replication since 0 between the client and server @@ -1159,18 +1201,11 @@ func (btcc *BlipTesterCollectionClient) StartPull() { btcc.StartPullSince(BlipTesterPullOptions{Continuous: true, Since: "0"}) } +// StartOneShotPull will begin a one-shot pull replication since 0 and continuous=false between the client and server func (btcc *BlipTesterCollectionClient) StartOneshotPull() { btcc.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0"}) } -func (btcc *BlipTesterCollectionClient) StartOneshotPullFiltered(channels string) { - btcc.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", Channels: channels}) -} - -func (btcc *BlipTesterCollectionClient) StartOneshotPullRequestPlus() { - btcc.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", RequestPlus: true}) -} - // BlipTesterPullOptions represents options passed to StartPull (SubChanges) functions type BlipTesterPullOptions struct { ActiveOnly bool @@ -1182,7 +1217,7 @@ type BlipTesterPullOptions struct { } // StartPullSince will begin a pull replication between the client and server with the given params. -func (btc *BlipTesterCollectionClient) StartPullSince(options BlipTesterPullOptions) { +func (btcc *BlipTesterCollectionClient) StartPullSince(options BlipTesterPullOptions) { subChangesRequest := blip.NewRequest() subChangesRequest.SetProfile(db.MessageSubChanges) subChangesRequest.Properties[db.SubChangesContinuous] = fmt.Sprintf("%t", options.Continuous) @@ -1197,34 +1232,34 @@ func (btc *BlipTesterCollectionClient) StartPullSince(options BlipTesterPullOpti } subChangesRequest.SetNoReply(true) - if btc.parent.BlipTesterClientOpts.SendRevocations { + if btcc.parent.BlipTesterClientOpts.SendRevocations { subChangesRequest.Properties[db.SubChangesRevocations] = "true" } - if btc.parent.BlipTesterClientOpts.sendReplacementRevs { + if btcc.parent.BlipTesterClientOpts.sendReplacementRevs { subChangesRequest.Properties[db.SubChangesSendReplacementRevs] = "true" } if len(options.DocIDs) > 0 { - subChangesRequest.SetBody(base.MustJSONMarshal(btc.TB(), + subChangesRequest.SetBody(base.MustJSONMarshal(btcc.TB(), db.SubChangesBody{ DocIDs: options.DocIDs, }, )) } - btc.sendPullMsg(subChangesRequest) + btcc.sendPullMsg(subChangesRequest) } // UnsubPullChanges will send an UnsubChanges message to the server to stop the pull replication. Fails test harness if Sync Gateway responds with an error. -func (btc *BlipTesterCollectionClient) UnsubPullChanges() { +func (btcc *BlipTesterCollectionClient) UnsubPullChanges() { unsubChangesRequest := blip.NewRequest() unsubChangesRequest.SetProfile(db.MessageUnsubChanges) - btc.sendPullMsg(unsubChangesRequest) + btcc.sendPullMsg(unsubChangesRequest) response, err := unsubChangesRequest.Response().Body() - require.NoError(btc.TB(), err) - require.Empty(btc.TB(), response) + require.NoError(btcc.TB(), err) + require.Empty(btcc.TB(), response) } // NewBlipTesterCollectionClient creates a collection specific client from a BlipTesterClient @@ -1246,41 +1281,42 @@ func NewBlipTesterCollectionClient(btc *BlipTesterClient) *BlipTesterCollectionC } // Close will empty the stored docs and close the underlying replications. -func (btc *BlipTesterCollectionClient) Close() { - btc.ctxCancel() +func (btcc *BlipTesterCollectionClient) Close() { + btcc.ctxCancel() // wake up changes feeds to exit - don't need lock for sync.Cond - btc._seqCond.Broadcast() + btcc._seqCond.Broadcast() - btc.seqLock.Lock() - defer btc.seqLock.Unlock() + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() // empty storage - btc._seqStore = make(map[clientSeq]*clientDoc, 0) - btc._seqFromDocID = make(map[string]clientSeq, 0) + btcc._seqStore = make(map[clientSeq]*clientDoc, 0) + btcc._seqFromDocID = make(map[string]clientSeq, 0) - btc.attachmentsLock.Lock() - defer btc.attachmentsLock.Unlock() - btc._attachments = make(map[string][]byte, 0) - globalBlipTesterClients.remove(btc.TB(), btc.TB().Name()) + btcc.attachmentsLock.Lock() + defer btcc.attachmentsLock.Unlock() + btcc._attachments = make(map[string][]byte, 0) + globalBlipTesterClients.remove(btcc.TB(), btcc.TB().Name()) } +// sendMsg sends a blip message to the server and stores it on BlipTesterReplicator. The response is not read unless the caller calls msg.Response() func (btr *BlipTesterReplicator) sendMsg(msg *blip.Message) { require.True(btr.TB(), btr.bt.sender.Send(msg)) btr.storeMessage(msg) } // upsertDoc will create or update the doc based on whether parentVersion is passed or not. Enforces MVCC update. -func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *DocVersion, body []byte) *clientDocRev { - btc.seqLock.Lock() - defer btc.seqLock.Unlock() - oldSeq, ok := btc._seqFromDocID[docID] +func (btcc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *DocVersion, body []byte) *clientDocRev { + btcc.seqLock.Lock() + defer btcc.seqLock.Unlock() + oldSeq, ok := btcc._seqFromDocID[docID] var doc *clientDoc if ok { - require.NotNil(btc.TB(), parentVersion, "docID: %v already exists on the client with seq: %v - expecting to create doc based on not nil parentVersion", docID, oldSeq) - doc, ok = btc._seqStore[oldSeq] - require.True(btc.TB(), ok, "seq %q for docID %q found but no doc in _seqStore", oldSeq, docID) + require.NotNil(btcc.TB(), parentVersion, "docID: %v already exists on the client with seq: %v - expecting to create doc based on not nil parentVersion", docID, oldSeq) + doc, ok = btcc._seqStore[oldSeq] + require.True(btcc.TB(), ok, "seq %q for docID %q found but no doc in _seqStore", oldSeq, docID) } else { - require.Nil(btc.TB(), parentVersion, "docID: %v was not found on the client - expecting to create doc based on nil parentVersion, parentVersion=%v", docID, parentVersion) + require.Nil(btcc.TB(), parentVersion, "docID: %v was not found on the client - expecting to create doc based on nil parentVersion, parentVersion=%v", docID, parentVersion) doc = &clientDoc{ id: docID, _latestSeq: 0, @@ -1292,46 +1328,47 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do if parentVersion != nil { // grab latest version for this doc and make sure we're doing an upsert on top of it to avoid branching revisions latestRev, err := doc.latestRev() - require.NoError(btc.TB(), err) + require.NoError(btcc.TB(), err) latestVersion := latestRev.version - require.Equal(btc.TB(), *parentVersion, latestVersion, "latest version for docID: %v is %v, expected parentVersion: %v", docID, latestVersion, parentVersion) + require.Equal(btcc.TB(), *parentVersion, latestVersion, "latest version for docID: %v is %v, expected parentVersion: %v", docID, latestVersion, parentVersion) newGen = parentVersion.RevIDGeneration() + 1 } - body = btc.ProcessInlineAttachments(body, newGen) + body = btcc.ProcessInlineAttachments(body, newGen) digest := "abc" // TODO: Generate rev ID digest based on body hash? newRevID := fmt.Sprintf("%d-%s", newGen, digest) - btc._seqLast++ - newSeq := btc._seqLast + btcc._seqLast++ + newSeq := btcc._seqLast rev := clientDocRev{clientSeq: newSeq, version: DocVersion{RevID: newRevID}, body: body} doc.addNewRev(rev) - btc._seqStore[newSeq] = doc - btc._seqFromDocID[docID] = newSeq - delete(btc._seqStore, oldSeq) + btcc._seqStore[newSeq] = doc + btcc._seqFromDocID[docID] = newSeq + delete(btcc._seqStore, oldSeq) // new sequence written, wake up changes feeds - btc._seqCond.Broadcast() + btcc._seqCond.Broadcast() return &rev } // AddRev creates a revision on the client. // The rev ID is always: "N-abc", where N is rev generation for predictability. -func (btc *BlipTesterCollectionClient) AddRev(docID string, parentVersion *DocVersion, body []byte) DocVersion { - newRev := btc.upsertDoc(docID, parentVersion, body) +func (btcc *BlipTesterCollectionClient) AddRev(docID string, parentVersion *DocVersion, body []byte) DocVersion { + newRev := btcc.upsertDoc(docID, parentVersion, body) return newRev.version } -func (btc *BlipTesterCollectionClient) PushUnsolicitedRev(docID string, parentRev *DocVersion, body []byte) (version *DocVersion, err error) { - return btc.PushRevWithHistory(docID, parentRev, body, 1, 0) +// PushUnsolicitedRev creates a revision on the client, and immediately sends a changes request for it. This is only intended for use when there is no push replication running. +func (btcc *BlipTesterCollectionClient) PushUnsolicitedRev(docID string, parentRev *DocVersion, body []byte) (version *DocVersion, err error) { + return btcc.PushRevWithHistory(docID, parentRev, body, 1, 0) } // PushRevWithHistory creates a revision on the client with history, and immediately sends a changes request for it. -func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVersion *DocVersion, body []byte, revCount, prunedRevCount int) (version *DocVersion, err error) { - ctx := base.DatabaseLogCtx(base.TestCtx(btc.parent.rt.TB()), btc.parent.rt.GetDatabase().Name, nil) +func (btcc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVersion *DocVersion, body []byte, revCount, prunedRevCount int) (version *DocVersion, err error) { + ctx := base.DatabaseLogCtx(base.TestCtx(btcc.parent.rt.TB()), btcc.parent.rt.GetDatabase().Name, nil) parentRevGen := parentVersion.RevIDGeneration() revGen := parentRevGen + revCount + prunedRevCount @@ -1342,11 +1379,11 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe } // Inline attachment processing - body = btc.ProcessInlineAttachments(body, revGen) + body = btcc.ProcessInlineAttachments(body, revGen) var parentDocBody []byte if parentVersion != nil { - doc, ok := btc.getClientDoc(docID) + doc, ok := btcc.getClientDoc(docID) if !ok { return nil, fmt.Errorf("doc %s not found in client", docID) } @@ -1356,7 +1393,7 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe } newRevID := fmt.Sprintf("%d-%s", revGen, "abc") - newRev := btc.upsertDoc(docID, parentVersion, body) + newRev := btcc.upsertDoc(docID, parentVersion, body) // send a proposeChanges message with the single rev we just created on the client proposeChangesRequest := blip.NewRequest() @@ -1367,16 +1404,16 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe } proposeChangesRequest.SetBody([]byte(fmt.Sprintf(`[["%s","%s"%s]]`, docID, newRevID, serverVersionComponent))) - btc.addCollectionProperty(proposeChangesRequest) + btcc.addCollectionProperty(proposeChangesRequest) - btc.sendPushMsg(proposeChangesRequest) + btcc.sendPushMsg(proposeChangesRequest) proposeChangesResponse := proposeChangesRequest.Response() rspBody, err := proposeChangesResponse.Body() - require.NoError(btc.TB(), err) - require.NotContains(btc.TB(), proposeChangesResponse.Properties, "Error-Domain", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody) - require.NotContains(btc.TB(), proposeChangesResponse.Properties, "Error-Code", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody) - require.Equal(btc.TB(), "[]", string(rspBody)) + require.NoError(btcc.TB(), err) + require.NotContains(btcc.TB(), proposeChangesResponse.Properties, "Error-Domain", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody) + require.NotContains(btcc.TB(), proposeChangesResponse.Properties, "Error-Code", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody) + require.Equal(btcc.TB(), "[]", string(rspBody)) // send msg rev with new doc revRequest := blip.NewRequest() @@ -1385,14 +1422,14 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe revRequest.Properties[db.RevMessageRev] = newRevID revRequest.Properties[db.RevMessageHistory] = strings.Join(revisionHistory, ",") - btc.addCollectionProperty(revRequest) - if btc.parent.ClientDeltas && proposeChangesResponse.Properties[db.ProposeChangesResponseDeltas] == "true" && parentVersion != nil { + btcc.addCollectionProperty(revRequest) + if btcc.parent.ClientDeltas && proposeChangesResponse.Properties[db.ProposeChangesResponseDeltas] == "true" && parentVersion != nil { base.DebugfCtx(ctx, base.KeySync, "Sending deltas from test client from parent %v", parentVersion) var parentDocJSON, newDocJSON db.Body - require.NoError(btc.TB(), parentDocJSON.Unmarshal(parentDocBody)) - require.NoError(btc.TB(), newDocJSON.Unmarshal(body)) + require.NoError(btcc.TB(), parentDocJSON.Unmarshal(parentDocBody)) + require.NoError(btcc.TB(), newDocJSON.Unmarshal(body)) delta, err := base.Diff(parentDocJSON, newDocJSON) - require.NoError(btc.TB(), err) + require.NoError(btcc.TB(), err) revRequest.Properties[db.RevMessageDeltaSrc] = parentVersion.RevID body = delta } else { @@ -1401,47 +1438,47 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe revRequest.SetBody(body) - btc.sendPushMsg(revRequest) + btcc.sendPushMsg(revRequest) revResponse := revRequest.Response() rspBody, err = revResponse.Body() - require.NoError(btc.TB(), err) + require.NoError(btcc.TB(), err) if revResponse.Type() == blip.ErrorType { return nil, fmt.Errorf("error %s %s from revResponse: %s", revResponse.Properties["Error-Domain"], revResponse.Properties["Error-Code"], rspBody) } - btc.updateLastReplicatedRev(docID, newRev.version) + btcc.updateLastReplicatedRev(docID, newRev.version) return &newRev.version, nil } -func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte, revGen int) (outputBody []byte) { +func (btcc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte, revGen int) (outputBody []byte) { if !bytes.Contains(inputBody, []byte(db.BodyAttachments)) { return inputBody } var newDocJSON map[string]interface{} - require.NoError(btc.TB(), base.JSONUnmarshal(inputBody, &newDocJSON)) + require.NoError(btcc.TB(), base.JSONUnmarshal(inputBody, &newDocJSON)) attachments, ok := newDocJSON[db.BodyAttachments] if !ok { return inputBody } attachmentMap, ok := attachments.(map[string]interface{}) - require.True(btc.TB(), ok) + require.True(btcc.TB(), ok) for attachmentName, inlineAttachment := range attachmentMap { inlineAttachmentMap := inlineAttachment.(map[string]interface{}) attachmentData, ok := inlineAttachmentMap["data"] if !ok { isStub, _ := inlineAttachmentMap["stub"].(bool) - require.True(btc.TB(), isStub, "couldn't find data and stub property for inline attachment %#v : %v", attachmentName, inlineAttachmentMap) + require.True(btcc.TB(), isStub, "couldn't find data and stub property for inline attachment %#v : %v", attachmentName, inlineAttachmentMap) // push the stub as-is continue } // Transform inline attachment data into metadata data, ok := attachmentData.(string) - require.True(btc.TB(), ok, "inline attachment data was not a string, got %T", attachmentData) + require.True(btcc.TB(), ok, "inline attachment data was not a string, got %T", attachmentData) - length, digest := btc.saveAttachment(data) + length, digest := btcc.saveAttachment(data) attachmentMap[attachmentName] = map[string]interface{}{ "digest": digest, @@ -1451,12 +1488,12 @@ func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte } newDocJSON[db.BodyAttachments] = attachmentMap } - return base.MustJSONMarshal(btc.TB(), newDocJSON) + return base.MustJSONMarshal(btcc.TB(), newDocJSON) } // GetVersion returns the data stored in the Client under the given docID and version -func (btc *BlipTesterCollectionClient) GetVersion(docID string, docVersion DocVersion) (data []byte, found bool) { - doc, ok := btc.getClientDoc(docID) +func (btcc *BlipTesterCollectionClient) GetVersion(docID string, docVersion DocVersion) (data []byte, found bool) { + doc, ok := btcc.getClientDoc(docID) if !ok { return nil, false } @@ -1468,33 +1505,33 @@ func (btc *BlipTesterCollectionClient) GetVersion(docID string, docVersion DocVe } rev, ok := doc._revisionsBySeq[revSeq] - require.True(btc.TB(), ok, "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) + require.True(btcc.TB(), ok, "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) return rev.body, true } // WaitForVersion blocks until the given document version has been stored by the client, and returns the data when found. The test will fail after 10 seconds if a matching document is not found. -func (btc *BlipTesterCollectionClient) WaitForVersion(docID string, docVersion DocVersion) (data []byte) { - if data, found := btc.GetVersion(docID, docVersion); found { +func (btcc *BlipTesterCollectionClient) WaitForVersion(docID string, docVersion DocVersion) (data []byte) { + if data, found := btcc.GetVersion(docID, docVersion); found { return data } - require.EventuallyWithT(btc.TB(), func(c *assert.CollectT) { + require.EventuallyWithT(btcc.TB(), func(c *assert.CollectT) { var found bool - data, found = btc.GetVersion(docID, docVersion) + data, found = btcc.GetVersion(docID, docVersion) assert.True(c, found, "Could not find docID:%+v Version %+v", docID, docVersion) }, 10*time.Second, 5*time.Millisecond, "BlipTesterClient timed out waiting for doc %+v Version %+v", docID, docVersion) return data } // GetDoc returns a rev stored in the Client under the given docID. (if multiple revs are present, rev body returned is non-deterministic) -func (btc *BlipTesterCollectionClient) GetDoc(docID string) (data []byte, found bool) { - doc, ok := btc.getClientDoc(docID) +func (btcc *BlipTesterCollectionClient) GetDoc(docID string) (data []byte, found bool) { + doc, ok := btcc.getClientDoc(docID) if !ok { return nil, false } latestRev, err := doc.latestRev() - require.NoError(btc.TB(), err) + require.NoError(btcc.TB(), err) if latestRev == nil { return nil, false } @@ -1503,14 +1540,14 @@ func (btc *BlipTesterCollectionClient) GetDoc(docID string) (data []byte, found } // WaitForDoc blocks until any document with the doc ID has been stored by the client, and returns the document body when found. If a document will be reported multiple times, the latest copy of the document is returned (not necessarily the first). The test will fail after 10 seconds if the document -func (btc *BlipTesterCollectionClient) WaitForDoc(docID string) (data []byte) { +func (btcc *BlipTesterCollectionClient) WaitForDoc(docID string) (data []byte) { - if data, found := btc.GetDoc(docID); found { + if data, found := btcc.GetDoc(docID); found { return data } - require.EventuallyWithT(btc.TB(), func(c *assert.CollectT) { + require.EventuallyWithT(btcc.TB(), func(c *assert.CollectT) { var found bool - data, found = btc.GetDoc(docID) + data, found = btcc.GetDoc(docID) assert.True(c, found, "Could not find docID:%+v", docID) }, 10*time.Second, 5*time.Millisecond, "BlipTesterClient timed out waiting for doc %+v", docID) return data @@ -1560,27 +1597,27 @@ func (btr *BlipTesterReplicator) storeMessage(msg *blip.Message) { } // WaitForBlipRevMessage blocks until the given doc ID and rev ID has been stored by the client, and returns the message when found. If not found after 10 seconds, test will fail. -func (btc *BlipTesterCollectionClient) WaitForBlipRevMessage(docID string, docVersion DocVersion) (msg *blip.Message) { - require.EventuallyWithT(btc.TB(), func(c *assert.CollectT) { +func (btcc *BlipTesterCollectionClient) WaitForBlipRevMessage(docID string, docVersion DocVersion) (msg *blip.Message) { + require.EventuallyWithT(btcc.TB(), func(c *assert.CollectT) { var ok bool - msg, ok = btc.GetBlipRevMessage(docID, docVersion) + msg, ok = btcc.GetBlipRevMessage(docID, docVersion) assert.True(c, ok, "Could not find docID:%+v, RevID: %+v", docID, docVersion.RevID) }, 10*time.Second, 5*time.Millisecond, "BlipTesterReplicator timed out waiting for BLIP message") - require.NotNil(btc.TB(), msg) + require.NotNil(btcc.TB(), msg) return msg } // GetBLipRevMessage returns the rev message that wrote the given docID/DocVersion on the client. -func (btc *BlipTesterCollectionClient) GetBlipRevMessage(docID string, version DocVersion) (msg *blip.Message, found bool) { - btc.seqLock.RLock() - defer btc.seqLock.RUnlock() +func (btcc *BlipTesterCollectionClient) GetBlipRevMessage(docID string, version DocVersion) (msg *blip.Message, found bool) { + btcc.seqLock.RLock() + defer btcc.seqLock.RUnlock() - if doc, ok := btc._getClientDoc(docID); ok { + if doc, ok := btcc._getClientDoc(docID); ok { doc.lock.RLock() defer doc.lock.RUnlock() if seq, ok := doc._seqsByVersions[version]; ok { if rev, ok := doc._revisionsBySeq[seq]; ok { - require.NotNil(btc.TB(), rev.message, "rev.message is nil for docID:%+v, version: %+v", docID, version) + require.NotNil(btcc.TB(), rev.message, "rev.message is nil for docID:%+v, version: %+v", docID, version) return rev.message, true } } @@ -1620,14 +1657,6 @@ func (btcRunner *BlipTestClientRunner) StartOneshotPull(clientID uint32) { btcRunner.SingleCollection(clientID).StartOneshotPull() } -func (btcRunner *BlipTestClientRunner) StartOneshotPullFiltered(clientID uint32, channels string) { - btcRunner.SingleCollection(clientID).StartOneshotPullFiltered(channels) -} - -func (btcRunner *BlipTestClientRunner) StartOneshotPullRequestPlus(clientID uint32) { - btcRunner.SingleCollection(clientID).StartOneshotPullRequestPlus() -} - // AddRev creates a revision on the client. // The rev ID is always: "N-abc", where N is rev generation for predictability. func (btcRunner *BlipTestClientRunner) AddRev(clientID uint32, docID string, version *DocVersion, body []byte) DocVersion { @@ -1651,45 +1680,29 @@ func (btcRunner *BlipTestClientRunner) saveAttachment(clientID uint32, attachmen return btcRunner.SingleCollection(clientID).saveAttachment(attachmentData) } +// PushRevWithHistory creates a revision on the client with history, and immediately sends a changes request for it. func (btcRunner *BlipTestClientRunner) PushRevWithHistory(clientID uint32, docID string, parentVersion *DocVersion, body []byte, revCount, prunedRevCount int) (*DocVersion, error) { return btcRunner.SingleCollection(clientID).PushRevWithHistory(docID, parentVersion, body, revCount, prunedRevCount) } -func (btcRunner *BlipTestClientRunner) AttachmentsLock(clientID uint32) *sync.RWMutex { - return &btcRunner.SingleCollection(clientID).attachmentsLock -} - -func (btc *BlipTesterCollectionClient) AttachmentsLock() *sync.RWMutex { - return &btc.attachmentsLock -} - -func (btcRunner *BlipTestClientRunner) Attachments(clientID uint32) map[string][]byte { - return btcRunner.SingleCollection(clientID)._attachments -} - -func (btc *BlipTesterCollectionClient) Attachments() map[string][]byte { - return btc._attachments -} - // UnsubPullChanges will send an UnsubChanges message to the server to stop the pull replication. Fails test harness if Sync Gateway responds with an error. func (btcRunner *BlipTestClientRunner) UnsubPullChanges(clientID uint32) { btcRunner.SingleCollection(clientID).UnsubPullChanges() } -func (btc *BlipTesterCollectionClient) addCollectionProperty(msg *blip.Message) { - if btc.collection != "" { - msg.Properties[db.BlipCollection] = strconv.Itoa(btc.collectionIdx) +// addCollectionProperty adds a collection index to the message properties. +func (btcc *BlipTesterCollectionClient) addCollectionProperty(msg *blip.Message) { + if btcc.collection != "" { + msg.Properties[db.BlipCollection] = strconv.Itoa(btcc.collectionIdx) } } // addCollectionProperty will automatically add a collection. If we are running with the default collection, or a single named collection, automatically add the right value. If there are multiple collections on the database, the test will fatally exit, since the behavior is undefined. -func (bt *BlipTesterClient) addCollectionProperty(msg *blip.Message) *blip.Message { - if bt.nonCollectionAwareClient == nil { - require.Equal(bt.TB(), 1, len(bt.collectionClients), "Multiple collection clients, exist so assuming that the only named collection is the single element of an array is not valid") +func (btc *BlipTesterClient) addCollectionProperty(msg *blip.Message) { + if btc.nonCollectionAwareClient == nil { + require.Equal(btc.TB(), 1, len(btc.collectionClients), "Multiple collection clients, exist so assuming that the only named collection is the single element of an array is not valid") msg.Properties[db.BlipCollection] = "0" } - - return msg } // getCollectionClientFromMessage returns a the right blip tester client. This works automatically when BlipTesterClient is initialized when skipCollectionsInitialization is false. @@ -1710,20 +1723,23 @@ func (btc *BlipTesterClient) getCollectionClientFromMessage(msg *blip.Message) * return btc.collectionClients[idx] } -func (btc *BlipTesterCollectionClient) sendPullMsg(msg *blip.Message) { - btc.addCollectionProperty(msg) - btc.parent.pullReplication.sendMsg(msg) +// sendPullMsg sends a message to the server and stores the message locally. This function does not wait for a response. +func (btcc *BlipTesterCollectionClient) sendPullMsg(msg *blip.Message) { + btcc.addCollectionProperty(msg) + btcc.parent.pullReplication.sendMsg(msg) } -func (btc *BlipTesterCollectionClient) sendPushMsg(msg *blip.Message) { - btc.addCollectionProperty(msg) - btc.parent.pushReplication.sendMsg(msg) +// sendPushMsg sends a message to the server and stores the message locally. This function does not wait for a response. +func (btcc *BlipTesterCollectionClient) sendPushMsg(msg *blip.Message) { + btcc.addCollectionProperty(msg) + btcc.parent.pushReplication.sendMsg(msg) } -func (c *BlipTesterCollectionClient) lastSeq() clientSeq { - c.seqLock.RLock() - defer c.seqLock.RUnlock() - return c._seqLast +// lastSeq returns the latest sequence number for this collection. +func (btcc *BlipTesterCollectionClient) lastSeq() clientSeq { + btcc.seqLock.RLock() + defer btcc.seqLock.RUnlock() + return btcc._seqLast } // pruneVersion removes the given version from the specified doc. This is not allowed for the latest version of a document.