Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #41179) x-pack/filebeat/input/entityanalytics/provider/activedirectory: don't publish unmodified updates #41386

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,20 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli
p.cfg.UserAttrs = withMandatory(p.cfg.UserAttrs, "distinguishedName", "whenChanged")
p.cfg.GrpAttrs = withMandatory(p.cfg.GrpAttrs, "distinguishedName", "whenChanged")

var (
last time.Time
err error
)
for {
select {
case <-inputCtx.Cancelation.Done():
if !errors.Is(inputCtx.Cancelation.Err(), context.Canceled) {
return inputCtx.Cancelation.Err()
}
return nil
case <-syncTimer.C:
start := time.Now()
if err := p.runFullSync(inputCtx, store, client); err != nil {
case start := <-syncTimer.C:
last, err = p.runFullSync(inputCtx, store, client)
if err != nil {
p.logger.Errorw("Error running full sync", "error", err)
p.metrics.syncError.Inc()
}
Expand All @@ -157,9 +161,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli
}
updateTimer.Reset(p.cfg.UpdateInterval)
p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval))
case <-updateTimer.C:
start := time.Now()
if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil {
case start := <-updateTimer.C:
last, err = p.runIncrementalUpdate(inputCtx, store, last, client)
if err != nil {
p.logger.Errorw("Error running incremental update", "error", err)
p.metrics.updateError.Inc()
}
Expand Down Expand Up @@ -192,13 +196,13 @@ outer:
// identities from Azure Active Directory, enrich users with group memberships,
// and publishes all known users (regardless if they have been modified) to the
// given beat.Client.
func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (time.Time, error) {
p.logger.Debugf("Running full sync...")

p.logger.Debugf("Opening new transaction...")
state, err := newStateStore(store)
if err != nil {
return fmt.Errorf("unable to begin transaction: %w", err)
return time.Time{}, fmt.Errorf("unable to begin transaction: %w", err)
}
p.logger.Debugf("Transaction opened")
defer func() { // If commit is successful, call to this close will be no-op.
Expand All @@ -210,48 +214,89 @@ func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
_, err = p.doFetchUsers(ctx, state, true)
users, err := p.doFetchUsers(ctx, state, true)
if err != nil {
return err
return time.Time{}, err
}

if len(state.users) != 0 {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
if len(users) != 0 || state.len() != 0 {
// Active Directory does not have a notion of deleted users
// beyond absence from the directory, so compare found users
// with users already known by the state store and if any
// are in the store but not returned in the previous fetch,
// mark them as deleted and publish the deletion. We do not
// have the time of the deletion, so use now.
if state.len() != 0 {
found := make(map[string]bool)
for _, u := range users {
found[u.ID] = true
}
deleted := make(map[string]*User)
now := time.Now()
state.forEach(func(u *User) {
if u.State == Deleted {
// We have already seen that this is deleted
// so we do not need to publish again. The
// user will be deleted from the store when
// the state is closed.
return
}
if found[u.ID] {
// We have the user, so we do not need to
// mark it as deleted.
return
}
// This modifies the state store's copy since u
// is a pointer held by the state store map.
u.State = Deleted
u.WhenChanged = now
deleted[u.ID] = u
})
for _, u := range deleted {
users = append(users, u)
}
}
if len(users) != 0 {
var tracker *kvstore.TxTracker
start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
tracker = kvstore.NewTxTracker(ctx)
for _, u := range users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}
end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)
tracker.Wait()
}

end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)

tracker.Wait()
}

if ctx.Err() != nil {
return ctx.Err()
return time.Time{}, ctx.Err()
}

state.lastSync = time.Now()
// state.whenChanged is modified by the call to doFetchUsers to be
// the latest modification time for all of the users that have been
// collected in that call. This will not include any of the deleted
// users since they were not collected.
latest := state.whenChanged
state.lastSync = latest
err = state.close(true)
if err != nil {
return fmt.Errorf("unable to commit state: %w", err)
return time.Time{}, fmt.Errorf("unable to commit state: %w", err)
}

return nil
return latest, nil
}

// runIncrementalUpdate will run an incremental update. The process is similar
// to full synchronization, except only users which have changed (newly
// discovered, modified, or deleted) will be published.
func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) (time.Time, error) {
p.logger.Debugf("Running incremental update...")

state, err := newStateStore(store)
if err != nil {
return fmt.Errorf("unable to begin transaction: %w", err)
return last, fmt.Errorf("unable to begin transaction: %w", err)
}
defer func() { // If commit is successful, call to this close will be no-op.
closeErr := state.close(false)
Expand All @@ -263,62 +308,37 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store
ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
updatedUsers, err := p.doFetchUsers(ctx, state, false)
if err != nil {
return err
return last, err
}

var tracker *kvstore.TxTracker
if len(updatedUsers) != 0 || state.len() != 0 {
// Active Directory does not have a notion of deleted users
// beyond absence from the directory, so compare found users
// with users already known by the state store and if any
// are in the store but not returned in the previous fetch,
// mark them as deleted and publish the deletion. We do not
// have the time of the deletion, so use now.
if state.len() != 0 {
found := make(map[string]bool)
for _, u := range updatedUsers {
found[u.ID] = true
}
deleted := make(map[string]*User)
now := time.Now()
state.forEach(func(u *User) {
if u.State == Deleted || found[u.ID] {
return
}
// This modifies the state store's copy since u
// is a pointer held by the state store map.
u.State = Deleted
u.WhenChanged = now
deleted[u.ID] = u
})
for _, u := range deleted {
updatedUsers = append(updatedUsers, u)
}
}
if len(updatedUsers) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, u := range updatedUsers {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}
tracker.Wait()
if len(updatedUsers) != 0 {
tracker := kvstore.NewTxTracker(ctx)
for _, u := range updatedUsers {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}
tracker.Wait()
}

if ctx.Err() != nil {
return ctx.Err()
return last, ctx.Err()
}

state.lastUpdate = time.Now()
// state.whenChanged is modified by the call to doFetchUsers to be
// the latest modification time for all of the users that have been
// collected in that call.
latest := state.whenChanged
state.lastUpdate = latest
if err = state.close(true); err != nil {
return fmt.Errorf("unable to commit state: %w", err)
return last, fmt.Errorf("unable to commit state: %w", err)
}

return nil
return latest, nil
}

// doFetchUsers handles fetching user identities from Active Directory. If
// fullSync is true, then any existing whenChanged will be ignored, forcing a
// full synchronization from Active Directory.
// full synchronization from Active Directory. The whenChanged time of state
// is modified to be the time stamp of the latest User.WhenChanged value.
// Returns a set of modified users by ID.
func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) {
var since time.Time
Expand All @@ -332,31 +352,14 @@ func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync
return nil, err
}

var (
users []*User
whenChanged time.Time
)
if fullSync {
for _, u := range entries {
state.storeUser(u)
if u.WhenChanged.After(whenChanged) {
whenChanged = u.WhenChanged
}
users := make([]*User, 0, len(entries))
for _, u := range entries {
users = append(users, state.storeUser(u))
if u.WhenChanged.After(state.whenChanged) {
state.whenChanged = u.WhenChanged
}
} else {
users = make([]*User, 0, len(entries))
for _, u := range entries {
users = append(users, state.storeUser(u))
if u.WhenChanged.After(whenChanged) {
whenChanged = u.WhenChanged
}
}
p.logger.Debugf("processed %d users from API", len(users))
}
if whenChanged.After(state.whenChanged) {
state.whenChanged = whenChanged
}

p.logger.Debugf("processed %d users from API", len(users))
return users, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ func (s *stateStore) close(commit bool) (err error) {
}

for key, value := range s.users {
if value.State == Deleted {
err = s.tx.Delete(usersBucket, []byte(key))
if err != nil {
return fmt.Errorf("unable to delete user %q from state: %w", key, err)
}
continue
}
err = s.tx.Set(usersBucket, []byte(key), value)
if err != nil {
return fmt.Errorf("unable to save user %q to state: %w", key, err)
Expand Down
Loading