Skip to content

Commit

Permalink
CBG-4046 implement attachment audit events
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Jul 19, 2024
1 parent 600d052 commit a066f72
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 86 deletions.
9 changes: 4 additions & 5 deletions base/audit_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,8 @@ var AuditEvents = events{
// fieldGroupAuthenticated, FIXME: CBG-3973
fieldGroupDatabase,
fieldGroupKeyspace,
fieldGroupRequest,
// fieldGroupRequest, FIXME: CBG-4092
},

EnabledByDefault: false,
FilteringPermitted: true,
EventType: eventTypeData,
Expand All @@ -989,7 +988,7 @@ var AuditEvents = events{
// fieldGroupAuthenticated, FIXME: CBG-3973
fieldGroupDatabase,
fieldGroupKeyspace,
fieldGroupRequest,
// fieldGroupAuthenticated, FIXME: CBG-3973
},
EnabledByDefault: false,
FilteringPermitted: true,
Expand All @@ -1007,7 +1006,7 @@ var AuditEvents = events{
// fieldGroupAuthenticated, // FIXME: CBG-3973
fieldGroupDatabase,
fieldGroupKeyspace,
fieldGroupRequest,
// fieldGroupAuthenticated, FIXME: CBG-3973
},
EnabledByDefault: false,
FilteringPermitted: true,
Expand All @@ -1025,7 +1024,7 @@ var AuditEvents = events{
// fieldGroupAuthenticated, // FIXME: CBG-3973
fieldGroupDatabase,
fieldGroupKeyspace,
fieldGroupRequest,
// fieldGroupAuthenticated, FIXME: CBG-3973
},
EnabledByDefault: false,
FilteringPermitted: true,
Expand Down
33 changes: 24 additions & 9 deletions db/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ type DocAttachment struct {
Data []byte `json:"-"` // tell json marshal/unmarshal to ignore this field
}

// attachmentID represents the name of the attachment and stored document id
type attachmentID struct {

Check failure on line 59 in db/attachment.go

View workflow job for this annotation

GitHub Actions / lint

type `attachmentID` is unused (unused)
name string // user facing name of the attachment
docID string // stored docID attachment
}

// ErrAttachmentTooLarge is returned when an attempt to attach an oversize attachment is made.
var ErrAttachmentTooLarge = errors.New("attachment too large")

Expand All @@ -63,19 +69,27 @@ const maxAttachmentSizeBytes = 20 * 1024 * 1024
// Given Attachments Meta to be stored in the database, storeAttachments goes through the map, finds attachments with
// inline bodies, copies the bodies into the Couchbase db, and replaces the bodies with the 'digest' attributes which
// are the keys to retrieving them.
func (db *DatabaseCollectionWithUser) storeAttachments(ctx context.Context, doc *Document, newAttachmentsMeta AttachmentsMeta, generation int, parentRev string, docHistory []string) (AttachmentData, error) {
func (db *DatabaseCollectionWithUser) storeAttachments(ctx context.Context, doc *Document, newAttachmentsMeta AttachmentsMeta, generation int, parentRev string, docHistory []string) (*updatedAttachments, error) {
if len(newAttachmentsMeta) == 0 {
return nil, nil
}

var parentAttachments map[string]interface{}
newAttachmentData := make(AttachmentData, 0)
newAttachments := &updatedAttachments{
data: make(AttachmentData, 0),
}
atts := newAttachmentsMeta
for name, value := range atts {
meta, ok := value.(map[string]interface{})
if !ok {
return nil, base.HTTPErrorf(400, "Invalid _attachments")
}
_, ok = doc.SyncData.Attachments[name]
if ok {
newAttachments.updatedNames = append(newAttachments.updatedNames, name)
} else {
newAttachments.createdNames = append(newAttachments.createdNames, name)
}
data := meta["data"]
if data != nil {
// Attachment contains data, so store it in the db:
Expand All @@ -85,7 +99,7 @@ func (db *DatabaseCollectionWithUser) storeAttachments(ctx context.Context, doc
}
digest := Sha1DigestKey(attachment)
key := MakeAttachmentKey(AttVersion2, doc.ID, digest)
newAttachmentData[key] = attachment
newAttachments.data[key] = attachment

newMeta := map[string]interface{}{
"stub": true,
Expand Down Expand Up @@ -130,14 +144,15 @@ func (db *DatabaseCollectionWithUser) storeAttachments(ctx context.Context, doc
}
}
}
return newAttachmentData, nil

return newAttachments, nil
}

// retrieveV2AttachmentKeys returns the list of V2 attachment keys from the attachment metadata that can be used for
// retrieveV2AttachmentIDs returns the list of V2 attachment keys from the attachment metadata that can be used for
// identifying obsolete attachments and triggering subsequent removal of those attachments to reclaim the storage.
func retrieveV2AttachmentKeys(docID string, docAttachments AttachmentsMeta) (attachments map[string]struct{}, err error) {
attachments = make(map[string]struct{})
for _, value := range docAttachments {
func retrieveV2AttachmentKeys(docID string, docAttachments AttachmentsMeta) (attachments map[string]string, err error) {
attachments = make(map[string]string)
for name, value := range docAttachments {
meta, ok := value.(map[string]interface{})
if !ok {
return nil, ErrAttachmentMeta
Expand All @@ -151,7 +166,7 @@ func retrieveV2AttachmentKeys(docID string, docAttachments AttachmentsMeta) (att
continue
}
key := MakeAttachmentKey(version, docID, digest)
attachments[key] = struct{}{}
attachments[key] = name
}
return attachments, nil
}
Expand Down
16 changes: 8 additions & 8 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ func TestAllowedAttachments(t *testing.T) {
}
docID := "doc1"

ctx.addAllowedAttachments(docID, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID, "fakeDocVersion", meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID, meta, tt.inputBlipProtocol)

ctx.removeAllowedAttachments(docID, meta, tt.inputBlipProtocol)
Expand All @@ -1360,7 +1360,7 @@ func TestAllowedAttachments(t *testing.T) {
}
docID := "doc1"

ctx.addAllowedAttachments(docID, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID, "fakeDocVersion", meta, tt.inputBlipProtocol)
key := allowedAttachmentKey(docID, meta[0].digest, tt.inputBlipProtocol)
require.True(t, isAllowedAttachment(ctx, key))

Expand All @@ -1379,11 +1379,11 @@ func TestAllowedAttachments(t *testing.T) {
}

docID1 := "doc1"
ctx.addAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID1, "fakeDocVersion", meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID1, meta, tt.inputBlipProtocol)

docID2 := "doc2"
ctx.addAllowedAttachments(docID2, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID2, "fakeDocVersion", meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID2, meta, tt.inputBlipProtocol)

ctx.removeAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
Expand All @@ -1409,11 +1409,11 @@ func TestAllowedAttachments(t *testing.T) {
}

docID1 := "doc1"
ctx.addAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID1, "fakeDocVersion", meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID1, meta, tt.inputBlipProtocol)

docID2 := "doc2"
ctx.addAllowedAttachments(docID2, meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID2, "fakeDocVersion", meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID2, meta, tt.inputBlipProtocol)

ctx.removeAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
Expand All @@ -1436,12 +1436,12 @@ func TestAllowedAttachments(t *testing.T) {

docID1 := "doc1"
att1Meta := []AttachmentStorageMeta{{digest: "att1", version: tt.inputAttVersion}}
ctx.addAllowedAttachments(docID1, att1Meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID1, "fakeDocVersion", att1Meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID1, att1Meta, tt.inputBlipProtocol)

docID2 := "doc2"
att2Meta := []AttachmentStorageMeta{{digest: "att2", version: tt.inputAttVersion}}
ctx.addAllowedAttachments(docID2, att2Meta, tt.inputBlipProtocol)
ctx.addAllowedAttachments(docID2, "fakeDocVersion", att2Meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID2, att2Meta, tt.inputBlipProtocol)

ctx.removeAllowedAttachments(docID1, att1Meta, tt.inputBlipProtocol)
Expand Down
14 changes: 10 additions & 4 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,11 @@ func (bh *blipHandler) handleGetAttachment(rq *blip.Message) error {
response.SetCompressed(rq.Properties[BlipCompress] == trueProperty)
bh.replicationStats.HandleGetAttachment.Add(1)
bh.replicationStats.HandleGetAttachmentBytes.Add(int64(len(attachment)))
base.Audit(bh.loggingCtx, base.AuditIDAttachmentRead, base.AuditFields{
base.AuditFieldDocID: docID,
base.AuditFieldDocVersion: allowedAttachment.docVersion,
base.AuditFieldAttachmentID: attachmentKey,
})

return nil
}
Expand Down Expand Up @@ -1478,7 +1483,7 @@ func (bsc *BlipSyncContext) incrementSerialNumber() uint64 {
return atomic.AddUint64(&bsc.handlerSerialNumber, 1)
}

func (bsc *BlipSyncContext) addAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
func (bsc *BlipSyncContext) addAllowedAttachments(docID string, docVersion string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
if len(attMeta) == 0 {
return
}
Expand All @@ -1497,9 +1502,10 @@ func (bsc *BlipSyncContext) addAllowedAttachments(docID string, attMeta []Attach
bsc.allowedAttachments[key] = att
} else {
bsc.allowedAttachments[key] = AllowedAttachment{
version: attachment.version,
counter: 1,
docID: docID,
version: attachment.version,
counter: 1,
docID: docID,
docVersion: docVersion,
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ type blipSyncStats struct {
// AllowedAttachment contains the metadata for handling allowed attachments
// while replicating over BLIP protocol.
type AllowedAttachment struct {
version int // Version of the attachment
counter int // Counter to track allowed attachments
docID string // docID, used for BlipCBMobileReplicationV2 retrieval of V2 attachments
version int // Version of the attachment
counter int // Counter to track allowed attachments
docID string // docID, used for BlipCBMobileReplicationV2 retrieval of V2 attachments
docVersion string // docVersion, used for audit logging
}

// SetActiveCBMobileSubprotocol returns the active subprotocol version
Expand Down Expand Up @@ -441,7 +442,7 @@ func (bsc *BlipSyncContext) sendRevisionWithProperties(ctx context.Context, send
activeSubprotocol := bsc.activeCBMobileSubprotocol
if awaitResponse {
// Allow client to download attachments in 'atts', but only while pulling this rev
bsc.addAllowedAttachments(docID, attMeta, activeSubprotocol)
bsc.addAllowedAttachments(docID, revID, attMeta, activeSubprotocol)
} else {
bsc.replicationStats.SendRevCount.Add(1)
outrq.SetNoReply(true)
Expand Down
66 changes: 50 additions & 16 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
}

allowImport := db.UseXattrs()
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, nil, false, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, nil, false, func(doc *Document) (resultDoc *Document, resultAttachmentData *updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
var isSgWrite bool
var crc32Match bool

Expand Down Expand Up @@ -1012,7 +1012,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c
}

allowImport := db.UseXattrs()
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData *updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
Expand Down Expand Up @@ -1834,7 +1834,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d

prevCurrentRev := doc.CurrentRev
doc.updateWinningRevAndSetDocFlags(ctx)
newDocHasAttachments := len(newAttachments) > 0
newDocHasAttachments := newAttachments != nil && len(newAttachments.data) > 0
col.storeOldBodyInRevTreeAndUpdateCurrent(ctx, doc, prevCurrentRev, newRevID, newDoc, newDocHasAttachments)

syncExpiry, oldBodyJSON, channelSet, access, roles, err := col.runSyncFn(ctx, doc, mutableBody, metaMap, newRevID)
Expand All @@ -1851,9 +1851,25 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d
doc.History[newRevID].Channels = channelSet
}

err = col.addAttachments(ctx, newAttachments)
if err != nil {
return
if newAttachments != nil {
err = col.addAttachments(ctx, newAttachments.data)
if err != nil {
return
}
for _, name := range newAttachments.createdNames {
base.Audit(ctx, base.AuditIDAttachmentCreate, base.AuditFields{
base.AuditFieldDocID: doc.ID,
base.AuditFieldDocVersion: newRevID,
base.AuditFieldAttachmentID: name,
})
}
for _, name := range newAttachments.updatedNames {
base.Audit(ctx, base.AuditIDAttachmentUpdate, base.AuditFields{
base.AuditFieldDocID: doc.ID,
base.AuditFieldDocVersion: newRevID,
base.AuditFieldAttachmentID: name,
})
}
}

col.backupAncestorRevs(ctx, doc, newDoc)
Expand Down Expand Up @@ -1902,8 +1918,15 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d
return updatedExpiry, newRevID, newDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err
}

// updatedAttachments is returned after processing attachments from an inline body
type updatedAttachments struct {
data AttachmentData // stores the AttachmentData from the changed attachments
createdNames []string // names of the attachments that were created, for logging later
updatedNames []string // names of the attachments that were updated, for logging later
}

// Function type for the callback passed into updateAndReturnDoc
type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error)
type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAttachmentData *updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error)

// Calling updateAndReturnDoc directly allows callers to:
// 1. Receive the updated document body in the response
Expand All @@ -1923,7 +1946,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
var unusedSequences []uint64 // Must be scoped outside callback, used over multiple iterations
var oldBodyJSON string // Stores previous revision body for use by DocumentChangeEvent
var createNewRevIDSkipped bool
var previousAttachments map[string]struct{}
var previousAttachments map[string]string

// Update the document
inConflict := false
Expand Down Expand Up @@ -2161,7 +2184,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
// Now that the document has successfully been stored, we can make other db changes:
base.DebugfCtx(ctx, base.KeyCRUD, "Stored doc %q / %q as #%v", base.UD(docid), newRevID, doc.Sequence)

leafAttachments := make(map[string]struct{})
leafAttachments := make(map[string]string)
if !skipObsoleteAttachmentsRemoval {
leafAttachments, err = getAttachmentIDsForLeafRevisions(ctx, db, doc, newRevID)
if err != nil {
Expand All @@ -2172,13 +2195,23 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

if !skipObsoleteAttachmentsRemoval {
var obsoleteAttachments []string
for previousAttachmentID := range previousAttachments {
for previousAttachmentID, previousAttachmentName := range previousAttachments {
if _, found := leafAttachments[previousAttachmentID]; !found {
err = db.dataStore.Delete(previousAttachmentID)
if err != nil {
base.ErrorfCtx(ctx, "Error deleting obsolete attachment %q of doc %q, Error: %v", previousAttachmentID, base.UD(doc.ID), err)
} else {
obsoleteAttachments = append(obsoleteAttachments, previousAttachmentID)
if !isImport {
_, exists := doc.SyncData.Attachments[previousAttachmentName]
if !exists {
base.Audit(ctx, base.AuditIDAttachmentDelete, base.AuditFields{
base.AuditFieldDocID: doc.ID,
base.AuditFieldDocVersion: newRevID,
base.AuditFieldAttachmentID: previousAttachmentName,
})
}
}
}
}
}
Expand All @@ -2195,16 +2228,17 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
return doc, newRevID, nil
}

func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectionWithUser, doc *Document, newRevID string) (map[string]struct{}, error) {
leafAttachments := make(map[string]struct{})
// getAttachmentIDsForLeafRevisions returns a map of attachment docids with values of attachment names.
func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectionWithUser, doc *Document, newRevID string) (map[string]string, error) {
leafAttachments := make(map[string]string)

currentAttachments, err := retrieveV2AttachmentKeys(doc.ID, doc.Attachments)
if err != nil {
return nil, err
}

for attachmentID, _ := range currentAttachments {
leafAttachments[attachmentID] = struct{}{}
for docid, name := range currentAttachments {
leafAttachments[docid] = name
}

// Grab leaf revisions that have attachments and aren't the currently being added rev
Expand All @@ -2227,8 +2261,8 @@ func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectio
return nil, err
}

for attachmentID, _ := range attachmentKeys {
leafAttachments[attachmentID] = struct{}{}
for attachmentID, attachmentName := range attachmentKeys {
leafAttachments[attachmentID] = attachmentName
}

}
Expand Down
2 changes: 1 addition & 1 deletion db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
existingDoc.Expiry = *expiry
}

docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, existingDoc, true, func(doc *Document) (resultDocument *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, existingDoc, true, func(doc *Document) (resultDocument *Document, resultAttachmentData *updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling.
// If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode.
if doc.Cas != existingDoc.Cas {
Expand Down
Loading

0 comments on commit a066f72

Please sign in to comment.