From 9cf613c43a5277e63c9b5c18aff03565b7d0569c Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 23 Oct 2024 10:05:43 +1030 Subject: [PATCH] x-pack/filebeat/input/entityanalytics/provider/activedirectory: improve modification time and deletion logic (#41179) This improves the update time stamps of modified events by using the documents' whenChanged fields in the case of returned documents, and the current time when a document is identified as having been deleted. The latest of these is used to determine the time filter for the next Active Directory query. Documents are marked as deleted only when they are found to not exist in full sync collection, and are removed from the state store when they are identified as deleted. The change in behaviour to not use updates to identify corrects behaviour that would cause older but not deleted entities to be deleted from the index. (cherry picked from commit 6b540745cb3c990b9320bf2f1380bb1b4579a31f) # Conflicts: # x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go --- CHANGELOG.next.asciidoc | 30 +++ .../activedirectory/activedirectory.go | 193 +++++++++--------- .../provider/activedirectory/statestore.go | 7 + 3 files changed, 138 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c0b9a16b384..b5bf701733a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -99,6 +99,36 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - Updated Websocket input title to align with existing inputs {pull}39006[39006] - Log bad handshake details when websocket connection fails {pull}41300[41300] +- Restore netflow input on Windows {pull}39024[39024] +- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861] +- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] +- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] +- Fix request trace filename handling in http_endpoint input. {pull}39410[39410] +- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653] +- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036] +- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859] +- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115] +- Prevent panic in CEL and salesforce inputs when github.com/hashicorp/go-retryablehttp exceeds maximum retries. {pull}40144[40144] +- Fix bug in CEL input rate limit logic. {issue}40106[40106] {pull}40270[40270] +- Relax requirements in Okta entity analytics provider user and device profile data shape. {pull}40359[40359] +- Fix bug in Okta entity analytics rate limit logic. {issue}40106[40106] {pull}40267[40267] +- Fix crashes in the journald input. {pull}40061[40061] +- Fix order of configuration for EntraID entity analytics provider. {pull}40487[40487] +- Ensure Entra ID request bodies are not truncated and trace logs are rotated before 100MB. {pull}40494[40494] +- The Elasticsearch output now correctly logs the event fields to the event log file {issue}40509[40509] {pull}40512[40512] +- Fix the "No such input type exist: 'azure-eventhub'" error on the Windows platform {issue}40608[40608] {pull}40609[40609] +- awss3 input: Fix handling of SQS notifications that don't contain a region. {pull}40628[40628] +- Fix credential handling when workload identity is being used in GCS input. {issue}39977[39977] {pull}40663[40663] +- Fix publication of group data from the Okta entity analytics provider. {pull}40681[40681] +- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] +- Fix replace processor handling of zero string replacement validation. {pull}40751[40751] +- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] +- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071] +- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] +- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Log bad handshake details when websocket connection fails {pull}41300[41300] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] *Heartbeat* diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go index fed77b48d67..bb9e604e708 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go @@ -128,6 +128,16 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli syncTimer := time.NewTimer(syncWaitTime) updateTimer := time.NewTimer(updateWaitTime) +<<<<<<< HEAD +======= + p.cfg.UserAttrs = withMandatory(p.cfg.UserAttrs, "distinguishedName", "whenChanged") + p.cfg.GrpAttrs = withMandatory(p.cfg.GrpAttrs, "distinguishedName", "whenChanged") + + var ( + last time.Time + err error + ) +>>>>>>> 6b540745cb (x-pack/filebeat/input/entityanalytics/provider/activedirectory: improve modification time and deletion logic (#41179)) for { select { case <-inputCtx.Cancelation.Done(): @@ -135,9 +145,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli return inputCtx.Cancelation.Err() } return nil - case <-syncTimer.C: - start := time.Now() - if err := p.runFullSync(inputCtx, store, client); err != nil { + case start := <-syncTimer.C: + last, err = p.runFullSync(inputCtx, store, client) + if err != nil { p.logger.Errorw("Error running full sync", "error", err) p.metrics.syncError.Inc() } @@ -155,9 +165,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli } updateTimer.Reset(p.cfg.UpdateInterval) p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) - case <-updateTimer.C: - start := time.Now() - if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil { + case start := <-updateTimer.C: + last, err = p.runIncrementalUpdate(inputCtx, store, last, client) + if err != nil { p.logger.Errorw("Error running incremental update", "error", err) p.metrics.updateError.Inc() } @@ -182,13 +192,13 @@ func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.Tran // identities from Azure Active Directory, enrich users with group memberships, // and publishes all known users (regardless if they have been modified) to the // given beat.Client. -func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (time.Time, error) { p.logger.Debugf("Running full sync...") p.logger.Debugf("Opening new transaction...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return time.Time{}, fmt.Errorf("unable to begin transaction: %w", err) } p.logger.Debugf("Transaction opened") defer func() { // If commit is successful, call to this close will be no-op. @@ -200,48 +210,89 @@ func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client ctx := ctxtool.FromCanceller(inputCtx.Cancelation) p.logger.Debugf("Starting fetch...") - _, err = p.doFetchUsers(ctx, state, true) + users, err := p.doFetchUsers(ctx, state, true) if err != nil { - return err + return time.Time{}, err } - if len(state.users) != 0 { - tracker := kvstore.NewTxTracker(ctx) - - start := time.Now() - p.publishMarker(start, start, inputCtx.ID, true, client, tracker) - for _, u := range state.users { - p.publishUser(u, state, inputCtx.ID, client, tracker) + if len(users) != 0 || state.len() != 0 { + // Active Directory does not have a notion of deleted users + // beyond absence from the directory, so compare found users + // with users already known by the state store and if any + // are in the store but not returned in the previous fetch, + // mark them as deleted and publish the deletion. We do not + // have the time of the deletion, so use now. + if state.len() != 0 { + found := make(map[string]bool) + for _, u := range users { + found[u.ID] = true + } + deleted := make(map[string]*User) + now := time.Now() + state.forEach(func(u *User) { + if u.State == Deleted { + // We have already seen that this is deleted + // so we do not need to publish again. The + // user will be deleted from the store when + // the state is closed. + return + } + if found[u.ID] { + // We have the user, so we do not need to + // mark it as deleted. + return + } + // This modifies the state store's copy since u + // is a pointer held by the state store map. + u.State = Deleted + u.WhenChanged = now + deleted[u.ID] = u + }) + for _, u := range deleted { + users = append(users, u) + } + } + if len(users) != 0 { + var tracker *kvstore.TxTracker + start := time.Now() + p.publishMarker(start, start, inputCtx.ID, true, client, tracker) + tracker = kvstore.NewTxTracker(ctx) + for _, u := range users { + p.publishUser(u, state, inputCtx.ID, client, tracker) + } + end := time.Now() + p.publishMarker(end, end, inputCtx.ID, false, client, tracker) + tracker.Wait() } - - end := time.Now() - p.publishMarker(end, end, inputCtx.ID, false, client, tracker) - - tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return time.Time{}, ctx.Err() } - state.lastSync = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. This will not include any of the deleted + // users since they were not collected. + latest := state.whenChanged + state.lastSync = latest err = state.close(true) if err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return time.Time{}, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // runIncrementalUpdate will run an incremental update. The process is similar // to full synchronization, except only users which have changed (newly // discovered, modified, or deleted) will be published. -func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) (time.Time, error) { p.logger.Debugf("Running incremental update...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return last, fmt.Errorf("unable to begin transaction: %w", err) } defer func() { // If commit is successful, call to this close will be no-op. closeErr := state.close(false) @@ -253,62 +304,37 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store ctx := ctxtool.FromCanceller(inputCtx.Cancelation) updatedUsers, err := p.doFetchUsers(ctx, state, false) if err != nil { - return err + return last, err } - var tracker *kvstore.TxTracker - if len(updatedUsers) != 0 || state.len() != 0 { - // Active Directory does not have a notion of deleted users - // beyond absence from the directory, so compare found users - // with users already known by the state store and if any - // are in the store but not returned in the previous fetch, - // mark them as deleted and publish the deletion. We do not - // have the time of the deletion, so use now. - if state.len() != 0 { - found := make(map[string]bool) - for _, u := range updatedUsers { - found[u.ID] = true - } - deleted := make(map[string]*User) - now := time.Now() - state.forEach(func(u *User) { - if u.State == Deleted || found[u.ID] { - return - } - // This modifies the state store's copy since u - // is a pointer held by the state store map. - u.State = Deleted - u.WhenChanged = now - deleted[u.ID] = u - }) - for _, u := range deleted { - updatedUsers = append(updatedUsers, u) - } - } - if len(updatedUsers) != 0 { - tracker = kvstore.NewTxTracker(ctx) - for _, u := range updatedUsers { - p.publishUser(u, state, inputCtx.ID, client, tracker) - } - tracker.Wait() + if len(updatedUsers) != 0 { + tracker := kvstore.NewTxTracker(ctx) + for _, u := range updatedUsers { + p.publishUser(u, state, inputCtx.ID, client, tracker) } + tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return last, ctx.Err() } - state.lastUpdate = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. + latest := state.whenChanged + state.lastUpdate = latest if err = state.close(true); err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return last, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // doFetchUsers handles fetching user identities from Active Directory. If // fullSync is true, then any existing whenChanged will be ignored, forcing a -// full synchronization from Active Directory. +// full synchronization from Active Directory. The whenChanged time of state +// is modified to be the time stamp of the latest User.WhenChanged value. // Returns a set of modified users by ID. func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) { var since time.Time @@ -322,31 +348,14 @@ func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync return nil, err } - var ( - users []*User - whenChanged time.Time - ) - if fullSync { - for _, u := range entries { - state.storeUser(u) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } + users := make([]*User, 0, len(entries)) + for _, u := range entries { + users = append(users, state.storeUser(u)) + if u.WhenChanged.After(state.whenChanged) { + state.whenChanged = u.WhenChanged } - } else { - users = make([]*User, 0, len(entries)) - for _, u := range entries { - users = append(users, state.storeUser(u)) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } - } - p.logger.Debugf("processed %d users from API", len(users)) } - if whenChanged.After(state.whenChanged) { - state.whenChanged = whenChanged - } - + p.logger.Debugf("processed %d users from API", len(users)) return users, nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go index 74486ebaac6..c81ece21a30 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go @@ -170,6 +170,13 @@ func (s *stateStore) close(commit bool) (err error) { } for key, value := range s.users { + if value.State == Deleted { + err = s.tx.Delete(usersBucket, []byte(key)) + if err != nil { + return fmt.Errorf("unable to delete user %q from state: %w", key, err) + } + continue + } err = s.tx.Set(usersBucket, []byte(key), value) if err != nil { return fmt.Errorf("unable to save user %q to state: %w", key, err)