diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3fed6771468..8c6a49132d6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,6 +166,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] - Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] - Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] *Heartbeat* diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go index ab1a37cbced..c66004807b4 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go @@ -130,6 +130,10 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli p.cfg.UserAttrs = withMandatory(p.cfg.UserAttrs, "distinguishedName", "whenChanged") p.cfg.GrpAttrs = withMandatory(p.cfg.GrpAttrs, "distinguishedName", "whenChanged") + var ( + last time.Time + err error + ) for { select { case <-inputCtx.Cancelation.Done(): @@ -137,9 +141,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli return inputCtx.Cancelation.Err() } return nil - case <-syncTimer.C: - start := time.Now() - if err := p.runFullSync(inputCtx, store, client); err != nil { + case start := <-syncTimer.C: + last, err = p.runFullSync(inputCtx, store, client) + if err != nil { p.logger.Errorw("Error running full sync", "error", err) p.metrics.syncError.Inc() } @@ -157,9 +161,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli } updateTimer.Reset(p.cfg.UpdateInterval) p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) - case <-updateTimer.C: - start := time.Now() - if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil { + case start := <-updateTimer.C: + last, err = p.runIncrementalUpdate(inputCtx, store, last, client) + if err != nil { p.logger.Errorw("Error running incremental update", "error", err) p.metrics.updateError.Inc() } @@ -192,13 +196,13 @@ outer: // identities from Azure Active Directory, enrich users with group memberships, // and publishes all known users (regardless if they have been modified) to the // given beat.Client. -func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (time.Time, error) { p.logger.Debugf("Running full sync...") p.logger.Debugf("Opening new transaction...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return time.Time{}, fmt.Errorf("unable to begin transaction: %w", err) } p.logger.Debugf("Transaction opened") defer func() { // If commit is successful, call to this close will be no-op. @@ -210,48 +214,89 @@ func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client ctx := ctxtool.FromCanceller(inputCtx.Cancelation) p.logger.Debugf("Starting fetch...") - _, err = p.doFetchUsers(ctx, state, true) + users, err := p.doFetchUsers(ctx, state, true) if err != nil { - return err + return time.Time{}, err } - if len(state.users) != 0 { - tracker := kvstore.NewTxTracker(ctx) - - start := time.Now() - p.publishMarker(start, start, inputCtx.ID, true, client, tracker) - for _, u := range state.users { - p.publishUser(u, state, inputCtx.ID, client, tracker) + if len(users) != 0 || state.len() != 0 { + // Active Directory does not have a notion of deleted users + // beyond absence from the directory, so compare found users + // with users already known by the state store and if any + // are in the store but not returned in the previous fetch, + // mark them as deleted and publish the deletion. We do not + // have the time of the deletion, so use now. + if state.len() != 0 { + found := make(map[string]bool) + for _, u := range users { + found[u.ID] = true + } + deleted := make(map[string]*User) + now := time.Now() + state.forEach(func(u *User) { + if u.State == Deleted { + // We have already seen that this is deleted + // so we do not need to publish again. The + // user will be deleted from the store when + // the state is closed. + return + } + if found[u.ID] { + // We have the user, so we do not need to + // mark it as deleted. + return + } + // This modifies the state store's copy since u + // is a pointer held by the state store map. + u.State = Deleted + u.WhenChanged = now + deleted[u.ID] = u + }) + for _, u := range deleted { + users = append(users, u) + } + } + if len(users) != 0 { + var tracker *kvstore.TxTracker + start := time.Now() + p.publishMarker(start, start, inputCtx.ID, true, client, tracker) + tracker = kvstore.NewTxTracker(ctx) + for _, u := range users { + p.publishUser(u, state, inputCtx.ID, client, tracker) + } + end := time.Now() + p.publishMarker(end, end, inputCtx.ID, false, client, tracker) + tracker.Wait() } - - end := time.Now() - p.publishMarker(end, end, inputCtx.ID, false, client, tracker) - - tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return time.Time{}, ctx.Err() } - state.lastSync = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. This will not include any of the deleted + // users since they were not collected. + latest := state.whenChanged + state.lastSync = latest err = state.close(true) if err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return time.Time{}, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // runIncrementalUpdate will run an incremental update. The process is similar // to full synchronization, except only users which have changed (newly // discovered, modified, or deleted) will be published. -func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) (time.Time, error) { p.logger.Debugf("Running incremental update...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return last, fmt.Errorf("unable to begin transaction: %w", err) } defer func() { // If commit is successful, call to this close will be no-op. closeErr := state.close(false) @@ -263,62 +308,37 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store ctx := ctxtool.FromCanceller(inputCtx.Cancelation) updatedUsers, err := p.doFetchUsers(ctx, state, false) if err != nil { - return err + return last, err } - var tracker *kvstore.TxTracker - if len(updatedUsers) != 0 || state.len() != 0 { - // Active Directory does not have a notion of deleted users - // beyond absence from the directory, so compare found users - // with users already known by the state store and if any - // are in the store but not returned in the previous fetch, - // mark them as deleted and publish the deletion. We do not - // have the time of the deletion, so use now. - if state.len() != 0 { - found := make(map[string]bool) - for _, u := range updatedUsers { - found[u.ID] = true - } - deleted := make(map[string]*User) - now := time.Now() - state.forEach(func(u *User) { - if u.State == Deleted || found[u.ID] { - return - } - // This modifies the state store's copy since u - // is a pointer held by the state store map. - u.State = Deleted - u.WhenChanged = now - deleted[u.ID] = u - }) - for _, u := range deleted { - updatedUsers = append(updatedUsers, u) - } - } - if len(updatedUsers) != 0 { - tracker = kvstore.NewTxTracker(ctx) - for _, u := range updatedUsers { - p.publishUser(u, state, inputCtx.ID, client, tracker) - } - tracker.Wait() + if len(updatedUsers) != 0 { + tracker := kvstore.NewTxTracker(ctx) + for _, u := range updatedUsers { + p.publishUser(u, state, inputCtx.ID, client, tracker) } + tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return last, ctx.Err() } - state.lastUpdate = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. + latest := state.whenChanged + state.lastUpdate = latest if err = state.close(true); err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return last, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // doFetchUsers handles fetching user identities from Active Directory. If // fullSync is true, then any existing whenChanged will be ignored, forcing a -// full synchronization from Active Directory. +// full synchronization from Active Directory. The whenChanged time of state +// is modified to be the time stamp of the latest User.WhenChanged value. // Returns a set of modified users by ID. func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) { var since time.Time @@ -332,31 +352,14 @@ func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync return nil, err } - var ( - users []*User - whenChanged time.Time - ) - if fullSync { - for _, u := range entries { - state.storeUser(u) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } + users := make([]*User, 0, len(entries)) + for _, u := range entries { + users = append(users, state.storeUser(u)) + if u.WhenChanged.After(state.whenChanged) { + state.whenChanged = u.WhenChanged } - } else { - users = make([]*User, 0, len(entries)) - for _, u := range entries { - users = append(users, state.storeUser(u)) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } - } - p.logger.Debugf("processed %d users from API", len(users)) } - if whenChanged.After(state.whenChanged) { - state.whenChanged = whenChanged - } - + p.logger.Debugf("processed %d users from API", len(users)) return users, nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go index 74486ebaac6..c81ece21a30 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go @@ -170,6 +170,13 @@ func (s *stateStore) close(commit bool) (err error) { } for key, value := range s.users { + if value.State == Deleted { + err = s.tx.Delete(usersBucket, []byte(key)) + if err != nil { + return fmt.Errorf("unable to delete user %q from state: %w", key, err) + } + continue + } err = s.tx.Set(usersBucket, []byte(key), value) if err != nil { return fmt.Errorf("unable to save user %q to state: %w", key, err)