Skip to content

Commit

Permalink
CBG-4101 add AuditIDChangesFeedStarted from blip (#7010)
Browse files Browse the repository at this point in the history
* CBG-4101 add AuditIDChangesFeedStarted from blip

* write tests for blip subchanges

* Only support a single filter in audit messages
  • Loading branch information
torcolvin authored Jul 26, 2024
1 parent 36d1a81 commit 91fac39
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 115 deletions.
2 changes: 1 addition & 1 deletion base/audit_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,9 @@ var AuditEvents = events{
AuditFieldSince: "since",
},
mandatoryFieldGroups: []fieldGroup{
fieldGroupAuthenticated,
fieldGroupKeyspace,
fieldGroupRequest,
fieldGroupAuthenticated,
},
OptionalFields: AuditFields{
AuditFieldFilter: "filter",
Expand Down
20 changes: 20 additions & 0 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,25 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
}()

auditFields := base.AuditFields{
base.AuditFieldSince: subChangesParams.Since().String(),
}
if subChangesParams.filter() != "" {
auditFields[base.AuditFieldFilter] = subChangesParams.filter()
}
if len(subChangesParams.docIDs()) > 0 {
auditFields[base.AuditFieldDocIDs] = subChangesParams.docIDs()
auditFields[base.AuditFieldFilter] = base.DocIDsFilter
}
if continuous {
auditFields[base.AuditFieldFeedType] = "continuous"
} else {
auditFields[base.AuditFieldFeedType] = "normal"
}
if len(channels) > 0 {
auditFields[base.AuditFieldChannels] = channels
}
base.Audit(bh.loggingCtx, base.AuditIDChangesFeedStarted, auditFields)
return nil
}

Expand Down Expand Up @@ -457,6 +476,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
return false

}

_, forceClose := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
base.DebugfCtx(bh.loggingCtx, base.KeySync, " Sending %d changes", len(changes))
for _, change := range changes {
Expand Down
294 changes: 203 additions & 91 deletions rest/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ func TestAuditDocumentRead(t *testing.T) {

RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated)

docVersion := rt.CreateTestDoc("doc1")
const docID = "doc1"
docVersion := rt.CreateTestDoc(docID)

type testCase struct {
name string
Expand Down Expand Up @@ -1061,108 +1062,207 @@ func TestAuditDocumentCreateUpdateEvents(t *testing.T) {
}

func TestAuditChangesFeedStart(t *testing.T) {
rt := createAuditLoggingRestTester(t)
defer rt.Close()
btcRunner := NewBlipTesterClientRunner(t)
btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {

RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated)
rt := createAuditLoggingRestTester(t)
defer rt.Close()

_ = rt.CreateTestDoc("doc1")
const (
requestUser = "alice"
)
RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated)

rt.CreateUser(requestUser, []string{"*"})
opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts)
defer btc.Close()

type testCase struct {
name string
method string
path string
requestBody string
adminAPI bool
expectedFields map[string]any // expected fields on changes audit event
}
testCases := []testCase{
{
name: "get changes",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes",
expectedFields: map[string]any{
base.AuditFieldSince: "0",
const (
requestUser = "alice"
)

rt.CreateUser(requestUser, []string{"*"})

type testCase struct {
name string
auditableCode func(t testing.TB, docID string, docVersion DocVersion)
expectedFields map[string]any // expected fields on changes audit event
}
testCases := []testCase{
{
name: "get changes",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendUserRequest(http.MethodGet, "/{{.keyspace}}/_changes", "", requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldSince: "0",
},
},
},
{
name: "get changes since longpoll, feed type",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes?since=10&feed=normal",
expectedFields: map[string]any{
base.AuditFieldSince: "10",
base.AuditFieldFeedType: "normal",
{
name: "get changes since normal, feed type",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendUserRequest(http.MethodGet, "/{{.keyspace}}/_changes?since=10&feed=normal", "", requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldSince: "10",
},
},
},
{
name: "get changes compound since, include_docs",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes?since=5:10&include_docs=true",
expectedFields: map[string]any{
base.AuditFieldSince: "5:10",
base.AuditFieldIncludeDocs: true,
{
name: "get changes compound since, include_docs",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendUserRequest(http.MethodGet, "/{{.keyspace}}/_changes?since=5:10&include_docs=true", "", requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldIncludeDocs: true,
base.AuditFieldSince: "5:10",
},
},
},
{
name: "get changes channel filters",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes?filter=" + base.ByChannelFilter + "&channels=A,B",
expectedFields: map[string]any{
base.AuditFieldSince: "0",
base.AuditFieldFilter: base.ByChannelFilter,
base.AuditFieldChannels: []any{"A", "B"},
{
name: "get changes channel filters",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendUserRequest(http.MethodGet, "/{{.keyspace}}/_changes?filter="+base.ByChannelFilter+"&channels=A,B", "", requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldChannels: []any{"A", "B"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.ByChannelFilter,
base.AuditFieldSince: "0",
},
},
},
{
name: "get changes docid filters",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes?filter=" + base.DocIDsFilter + "&doc_ids=doc1,doc2",
expectedFields: map[string]any{
base.AuditFieldSince: "0",
base.AuditFieldFilter: base.DocIDsFilter,
base.AuditFieldDocIDs: []any{"doc1", "doc2"},
{
name: "get changes docid filters",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendUserRequest(http.MethodGet, "/{{.keyspace}}/_changes?filter="+base.DocIDsFilter+"&doc_ids=doc1,doc2", "", requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldDocIDs: []any{"doc1", "doc2"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.DocIDsFilter,
base.AuditFieldSince: "0",
},
},
},
{
name: "post changes",
method: http.MethodPost,
path: "/{{.keyspace}}/_changes",
requestBody: `{"feed":"normal", "since":10,"filter":"sync_gateway/bychannel","channels":"A,B"}`,
expectedFields: map[string]any{
base.AuditFieldSince: "10",
base.AuditFieldFilter: base.ByChannelFilter,
base.AuditFieldChannels: []any{"A", "B"},
base.AuditFieldFeedType: "normal",
{
name: "post changes",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
requestBody := `{"feed":"normal", "since":10,"filter":"sync_gateway/bychannel","channels":"A,B"}`
RequireStatus(t, rt.SendUserRequest(http.MethodPost, "/{{.keyspace}}/_changes", requestBody, requestUser), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldChannels: []any{"A", "B"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.ByChannelFilter,
base.AuditFieldSince: "10",
},
},
},
{
name: "get changes admin",
method: http.MethodGet,
path: "/{{.keyspace}}/_changes?since=10",
expectedFields: map[string]any{
base.AuditFieldSince: "10",
{
name: "get changes admin",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
RequireStatus(t, rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?since=10", ""), http.StatusOK)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldSince: "10",
},
},
adminAPI: true,
},
}
for _, testCase := range testCases {
rt.Run(testCase.name, func(t *testing.T) {
output := base.AuditLogContents(t, func(t testing.TB) {
if testCase.adminAPI {
RequireStatus(t, rt.SendAdminRequestWithAuth(testCase.method, testCase.path, testCase.requestBody, base.TestClusterUsername(), base.TestClusterPassword()), http.StatusOK)
} else {
RequireStatus(t, rt.SendUserRequest(testCase.method, testCase.path, testCase.requestBody, requestUser), http.StatusOK)
}
})
{
name: "blip changes continuous",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartPull(btc.id))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "continuous",
base.AuditFieldSince: "0",
},
},
{
name: "blip changes one shot",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartOneshotPull(btc.id))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldSince: "0",
},
},
{
name: "blip changes with channels",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Since: "0", Channels: "A,B"}))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldChannels: []any{"A", "B"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.ByChannelFilter,
base.AuditFieldSince: "0",
},
},
{
name: "blip changes with docids",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Since: "0", DocIDs: []string{docID, "non_existent"}}))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldDocIDs: []any{"blip_changes_with_docids", "non_existent"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.DocIDsFilter,
base.AuditFieldSince: "0",
},
},
{
// invalid specification, this would ignore channel filter and only use docids filter
name: "blip changes with docids and channels",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Since: "0", DocIDs: []string{docID, "non_existent"}, Channels: "A,B"}))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldDocIDs: []any{"blip_changes_with_docids_and_channels", "non_existent"},
base.AuditFieldChannels: []any{"A", "B"},
base.AuditFieldFeedType: "normal",
base.AuditFieldFilter: base.DocIDsFilter,
base.AuditFieldSince: "0",
},
},
{
name: "blip changes with compound since",
auditableCode: func(t testing.TB, docID string, docVersion DocVersion) {
require.NoError(t, btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Since: "1:10"}))
btcRunner.WaitForVersion(btc.id, docID, docVersion)
_, err := btcRunner.UnsubPullChanges(btc.id)
require.NoError(t, err)
},
expectedFields: map[string]any{
base.AuditFieldFeedType: "normal",
base.AuditFieldSince: "1:10",
},
},
}
for _, testCase := range testCases {
rt.Run(testCase.name, func(t *testing.T) {
docID := strings.ReplaceAll(testCase.name, " ", "_")
docVersion := rt.PutDoc(docID, `{"channels": "A"}`)
output := base.AuditLogContents(t, func(t testing.TB) {
testCase.auditableCode(t, docID, docVersion)
})

requireChangesStartEvent(rt.TB(), output, testCase.expectedFields)
})
}
requireChangesStartEvent(rt.TB(), output, testCase.expectedFields)
})
}
})
}

// requireDocumentMetadataReadEvents validates that there read events for each doc version specified. There should be only audit events for a given docid.
Expand Down Expand Up @@ -1246,6 +1346,18 @@ func requireChangesStartEvent(t testing.TB, output []byte, expectedFields map[st
require.True(t, ok, fmt.Sprintf("Expected field %v not present", fieldID))
require.Equal(t, expectedValue, value)
}
for _, fieldName := range []string{
base.AuditFieldChannels,
base.AuditFieldDocIDs,
base.AuditFieldFeedType,
base.AuditFieldFilter,
base.AuditFieldIncludeDocs,
base.AuditFieldSince,
} {
if _, ok := expectedFields[fieldName]; !ok {
require.NotContains(t, event, fieldName, fmt.Sprintf("Unexpected field %v present", fieldName))
}
}
}
require.True(t, found, "Did not receive expected changeFeedStart audit event")
}
Expand Down
4 changes: 2 additions & 2 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ func TestActiveOnlyContinuous(t *testing.T) {
version := rt.PutDoc(docID, `{"test":true}`)

// start an initial pull
require.NoError(t, btcRunner.StartPullSince(btc.id, "true", "0", "true"))
require.NoError(t, btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Continuous: true, Since: "0", ActiveOnly: true}))
rev := btcRunner.WaitForVersion(btc.id, docID, version)
assert.Equal(t, `{"test":true}`, string(rev))

Expand Down Expand Up @@ -3009,7 +3009,7 @@ func TestBlipRefreshUser(t *testing.T) {
version := rt.PutDoc(docID, `{"channels":["chan1"]}`)

// Start a regular one-shot pull
err := btcRunner.StartPullSince(btc.id, "true", "0", "false")
err := btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Continuous: true, Since: "0"})
require.NoError(t, err)

_ = btcRunner.WaitForDoc(btc.id, docID)
Expand Down
Loading

0 comments on commit 91fac39

Please sign in to comment.