Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4452 avoid checking revpos on attachments #7382

Merged
merged 6 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions db/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type AttachmentCallback func(name string, digest string, knownData []byte, meta
// The callback is told whether the attachment body is known to the database, according
// to its digest. If the attachment isn't known, the callback can return data for it, which will
// be added to the metadata as a "data" property.
func (c *DatabaseCollection) ForEachStubAttachment(body Body, minRevpos int, docID string, existingDigests map[string]string, callback AttachmentCallback) error {
func (c *DatabaseCollection) ForEachStubAttachment(body Body, docID string, existingDigests map[string]string, callback AttachmentCallback) error {
atts := GetBodyAttachments(body)
if atts == nil && body[BodyAttachments] != nil {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid _attachments")
Expand All @@ -293,9 +293,6 @@ func (c *DatabaseCollection) ForEachStubAttachment(body Body, minRevpos int, doc
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}
if meta["data"] == nil {
if revpos, ok := base.ToInt64(meta["revpos"]); revpos < int64(minRevpos) || !ok {
continue
}
digest, ok := meta["digest"].(string)
if !ok {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
Expand Down
26 changes: 9 additions & 17 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,60 +408,52 @@ func TestForEachStubAttachmentErrors(t *testing.T) {
docID := "foo"
existingDigests := make(map[string]string)
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err := collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err := collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid _attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Call ForEachStubAttachment with invalid attachment; simulates the error scenario.
doc = `{"_attachments": {"image1.jpeg": "", "image2.jpeg": ""}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid _attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Check whether the attachment iteration is getting skipped if revpos < minRevpos
callbackCount = 0
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 0, callbackCount)

// Verify the attachment is getting skipped if digest is in existing set
callbackCount = 0
existingDigests["image.jpg"] = "e1a1"
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":2, "digest":"e1a1"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 0, callbackCount)

// Verify the attachment is not getting skipped if digest doesn't match existing set
callbackCount = 0
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":2, "digest":"e1a2"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 1, callbackCount)

// Check whether the attachment iteration is getting skipped if there is no revpos.
doc = `{"_attachments": {"image.jpg": {"stub":true}}}`
doc = `{"_attachments": {"image.jpg": {"stub":true,"digest":"e1a1"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")

// Should throw invalid attachment error is the digest is not valid string or empty.
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1, "digest":true}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Call ForEachStubAttachment with some bad digest value. Internally it should throw a missing
// document error and invoke the callback function.
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1, "digest":"9304cdd066efa64f78387e9cc9240a70527271bc"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")

// Simulate an error from the callback function; it should return the same error from ForEachStubAttachment.
Expand All @@ -470,7 +462,7 @@ func TestForEachStubAttachmentErrors(t *testing.T) {
callback = func(name string, digest string, knownData []byte, meta map[string]interface{}) ([]byte, error) {
return nil, errors.New("Can't work with this digest value!")
}
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw the actual error")
assert.Contains(t, err.Error(), "Can't work with this digest value!")
}
Expand Down
27 changes: 9 additions & 18 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,22 +1128,18 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
var currentBucketDoc *Document

// Look at attachments with revpos > the last common ancestor's
minRevpos := 1
if len(history) > 0 {
currentDoc, rawDoc, err := bh.collection.GetDocumentWithRaw(bh.loggingCtx, docID, DocUnmarshalSync)
// If we're able to obtain current doc data then we should use the common ancestor generation++ for min revpos
// as we will already have any attachments on the common ancestor so don't need to ask for them.
// Otherwise we'll have to go as far back as we can in the doc history and choose the last entry in there.
if err == nil {
commonAncestor := currentDoc.History.findAncestorFromSet(currentDoc.CurrentRev, history)
minRevpos, _ = ParseRevID(bh.loggingCtx, commonAncestor)
minRevpos++
rawBucketDoc = rawDoc
currentBucketDoc = currentDoc
} else {
minRevpos, _ = ParseRevID(bh.loggingCtx, history[len(history)-1])
}
}
// updatedRevPos is the revpos of the new revision, to be added to attachment metadata if needed for CBL<4.0 compatibility. revpos is no longer used by Sync Gateway.
updatedRevPos, _ := ParseRevID(bh.loggingCtx, revID)

// currentDigests is a map from attachment name to the current bucket doc digest,
// for any attachments on the incoming document that are also on the current bucket doc
Expand All @@ -1159,7 +1155,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if !ok {
// If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise
// update to ensure it's fetched and uploaded
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
continue
}

Expand Down Expand Up @@ -1190,23 +1186,18 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}

incomingAttachmentRevpos, ok := base.ToInt64(incomingAttachmentMeta["revpos"])
if !ok {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}

// Compare the revpos and attachment digest. If incoming revpos is less than or equal to minRevPos and
// digest is different we need to override the revpos and set it to the current revision to ensure
// the attachment is requested and stored
if int(incomingAttachmentRevpos) <= minRevpos && currentAttachmentDigest != incomingAttachmentDigest {
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
// the attachment is requested and stored. revpos provided for SG/CBL<4.0 compatibility but is no longer used by Sync Gateway.
if currentAttachmentDigest != incomingAttachmentDigest {
bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
}
}

body[BodyAttachments] = bodyAtts
}

if err := bh.downloadOrVerifyAttachments(rq.Sender, body, minRevpos, docID, currentDigests); err != nil {
if err := bh.downloadOrVerifyAttachments(rq.Sender, body, docID, currentDigests); err != nil {
base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), revID, err)
return err
}
Expand Down Expand Up @@ -1472,8 +1463,8 @@ func (bh *blipHandler) sendProveAttachment(sender *blip.Sender, docID, name, dig

// For each attachment in the revision, makes sure it's in the database, asking the client to
// upload it if necessary. This method blocks until all the attachments have been processed.
func (bh *blipHandler) downloadOrVerifyAttachments(sender *blip.Sender, body Body, minRevpos int, docID string, currentDigests map[string]string) error {
return bh.collection.ForEachStubAttachment(body, minRevpos, docID, currentDigests,
func (bh *blipHandler) downloadOrVerifyAttachments(sender *blip.Sender, body Body, docID string, currentDigests map[string]string) error {
return bh.collection.ForEachStubAttachment(body, docID, currentDigests,
func(name string, digest string, knownData []byte, meta map[string]interface{}) ([]byte, error) {
// Request attachment if we don't have it
if knownData == nil {
Expand Down
59 changes: 0 additions & 59 deletions rest/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,65 +2344,6 @@ func TestPushUnknownAttachmentAsStub(t *testing.T) {
})
}

func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
DatabaseConfig: &DatabaseConfig{
DbConfig: DbConfig{
AllowConflicts: base.BoolPtr(true),
},
},
}

btcRunner := NewBlipTesterClientRunner(t)
const docID = "doc"

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, rtConfig)
defer rt.Close()

opts := BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &opts)
defer btc.Close()

btcRunner.StartPull(btc.id)

// Write an initial rev with attachment data
initialVersion := btc.rt.PutDoc(docID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)

// Replicate data to client and ensure doc arrives
btc.rt.WaitForPendingChanges()
btcRunner.WaitForVersion(btc.id, docID, initialVersion)

// Create a set of revisions before we start the replicator to ensure there's a significant amount of history to push
version := initialVersion
for i := 0; i < 25; i++ {
version = btcRunner.AddRev(btc.id, docID, &version, []byte(`{"update_count":`+strconv.Itoa(i)+`,"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`))
}

// Note this references revpos 1 and therefore SGW has it - Shouldn't need proveAttachment, even when we replicate it
proveAttachmentBefore := btc.pushReplication.replicationStats.ProveAttachment.Value()
btcRunner.StartPushWithOpts(btc.id, BlipTesterPushOptions{Continuous: false})
rt.WaitForVersion(docID, version)

proveAttachmentAfter := btc.pushReplication.replicationStats.ProveAttachment.Value()
assert.Equal(t, proveAttachmentBefore, proveAttachmentAfter)

// start another push to run in the background from where we last left off
latestSeq := btcRunner.SingleCollection(btc.id).lastSeq()
btcRunner.StartPushWithOpts(btc.id, BlipTesterPushOptions{Continuous: true, Since: strconv.Itoa(int(latestSeq))})

// Push another bunch of history, this time whilst a replicator is actively pushing them
for i := 25; i < 50; i++ {
version = btcRunner.AddRev(btc.id, docID, &version, []byte(`{"update_count":`+strconv.Itoa(i)+`,"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`))
}

rt.WaitForVersion(docID, version)
proveAttachmentAfter = btc.pushReplication.replicationStats.ProveAttachment.Value()
assert.Equal(t, proveAttachmentBefore, proveAttachmentAfter)
})
}

func TestAttachmentWithErroneousRevPos(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
Expand Down
9 changes: 1 addition & 8 deletions rest/blip_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,20 +1560,13 @@ func (btcc *BlipTesterCollectionClient) sendPushMsg(msg *blip.Message) {
btcc.parent.pushReplication.sendMsg(msg)
}

// lastSeq returns the latest sequence number for this collection.
func (btcc *BlipTesterCollectionClient) lastSeq() clientSeq {
btcc.seqLock.RLock()
defer btcc.seqLock.RUnlock()
return btcc._seqLast
}

// _nextSequence returns the next sequence number for this collection.
func (btcc *BlipTesterCollectionClient) _nextSequence() clientSeq {
btcc._seqLast++
return btcc._seqLast
}

// _pruneVersion removes the given version from the specified doc. This is not allowed for the latest version of a document.
// pruneVersion removes the given version from the specified doc. This is not allowed for the latest version of a document.
func (btcc *BlipTesterCollectionClient) pruneVersion(docID string, version DocVersion) {
btcc.seqLock.Lock()
defer btcc.seqLock.Unlock()
Expand Down
Loading