diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5b24a90e5b00..6df71121122e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -98,6 +98,36 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - Updated Websocket input title to align with existing inputs {pull}39006[39006] +- Restore netflow input on Windows {pull}39024[39024] +- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861] +- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] +- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] +- Fix request trace filename handling in http_endpoint input. {pull}39410[39410] +- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653] +- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036] +- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859] +- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115] +- Prevent panic in CEL and salesforce inputs when github.com/hashicorp/go-retryablehttp exceeds maximum retries. {pull}40144[40144] +- Fix bug in CEL input rate limit logic. {issue}40106[40106] {pull}40270[40270] +- Relax requirements in Okta entity analytics provider user and device profile data shape. {pull}40359[40359] +- Fix bug in Okta entity analytics rate limit logic. {issue}40106[40106] {pull}40267[40267] +- Fix crashes in the journald input. {pull}40061[40061] +- Fix order of configuration for EntraID entity analytics provider. {pull}40487[40487] +- Ensure Entra ID request bodies are not truncated and trace logs are rotated before 100MB. {pull}40494[40494] +- The Elasticsearch output now correctly logs the event fields to the event log file {issue}40509[40509] {pull}40512[40512] +- Fix the "No such input type exist: 'azure-eventhub'" error on the Windows platform {issue}40608[40608] {pull}40609[40609] +- awss3 input: Fix handling of SQS notifications that don't contain a region. {pull}40628[40628] +- Fix credential handling when workload identity is being used in GCS input. {issue}39977[39977] {pull}40663[40663] +- Fix publication of group data from the Okta entity analytics provider. {pull}40681[40681] +- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] +- Fix replace processor handling of zero string replacement validation. {pull}40751[40751] +- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] +- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071] +- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] +- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Log bad handshake details when websocket connection fails {pull}41300[41300] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] *Heartbeat* diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go index fed77b48d679..bb9e604e7089 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go @@ -128,6 +128,16 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli syncTimer := time.NewTimer(syncWaitTime) updateTimer := time.NewTimer(updateWaitTime) +<<<<<<< HEAD +======= + p.cfg.UserAttrs = withMandatory(p.cfg.UserAttrs, "distinguishedName", "whenChanged") + p.cfg.GrpAttrs = withMandatory(p.cfg.GrpAttrs, "distinguishedName", "whenChanged") + + var ( + last time.Time + err error + ) +>>>>>>> 6b540745cb (x-pack/filebeat/input/entityanalytics/provider/activedirectory: improve modification time and deletion logic (#41179)) for { select { case <-inputCtx.Cancelation.Done(): @@ -135,9 +145,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli return inputCtx.Cancelation.Err() } return nil - case <-syncTimer.C: - start := time.Now() - if err := p.runFullSync(inputCtx, store, client); err != nil { + case start := <-syncTimer.C: + last, err = p.runFullSync(inputCtx, store, client) + if err != nil { p.logger.Errorw("Error running full sync", "error", err) p.metrics.syncError.Inc() } @@ -155,9 +165,9 @@ func (p *adInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Cli } updateTimer.Reset(p.cfg.UpdateInterval) p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) - case <-updateTimer.C: - start := time.Now() - if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil { + case start := <-updateTimer.C: + last, err = p.runIncrementalUpdate(inputCtx, store, last, client) + if err != nil { p.logger.Errorw("Error running incremental update", "error", err) p.metrics.updateError.Inc() } @@ -182,13 +192,13 @@ func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.Tran // identities from Azure Active Directory, enrich users with group memberships, // and publishes all known users (regardless if they have been modified) to the // given beat.Client. -func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (time.Time, error) { p.logger.Debugf("Running full sync...") p.logger.Debugf("Opening new transaction...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return time.Time{}, fmt.Errorf("unable to begin transaction: %w", err) } p.logger.Debugf("Transaction opened") defer func() { // If commit is successful, call to this close will be no-op. @@ -200,48 +210,89 @@ func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client ctx := ctxtool.FromCanceller(inputCtx.Cancelation) p.logger.Debugf("Starting fetch...") - _, err = p.doFetchUsers(ctx, state, true) + users, err := p.doFetchUsers(ctx, state, true) if err != nil { - return err + return time.Time{}, err } - if len(state.users) != 0 { - tracker := kvstore.NewTxTracker(ctx) - - start := time.Now() - p.publishMarker(start, start, inputCtx.ID, true, client, tracker) - for _, u := range state.users { - p.publishUser(u, state, inputCtx.ID, client, tracker) + if len(users) != 0 || state.len() != 0 { + // Active Directory does not have a notion of deleted users + // beyond absence from the directory, so compare found users + // with users already known by the state store and if any + // are in the store but not returned in the previous fetch, + // mark them as deleted and publish the deletion. We do not + // have the time of the deletion, so use now. + if state.len() != 0 { + found := make(map[string]bool) + for _, u := range users { + found[u.ID] = true + } + deleted := make(map[string]*User) + now := time.Now() + state.forEach(func(u *User) { + if u.State == Deleted { + // We have already seen that this is deleted + // so we do not need to publish again. The + // user will be deleted from the store when + // the state is closed. + return + } + if found[u.ID] { + // We have the user, so we do not need to + // mark it as deleted. + return + } + // This modifies the state store's copy since u + // is a pointer held by the state store map. + u.State = Deleted + u.WhenChanged = now + deleted[u.ID] = u + }) + for _, u := range deleted { + users = append(users, u) + } + } + if len(users) != 0 { + var tracker *kvstore.TxTracker + start := time.Now() + p.publishMarker(start, start, inputCtx.ID, true, client, tracker) + tracker = kvstore.NewTxTracker(ctx) + for _, u := range users { + p.publishUser(u, state, inputCtx.ID, client, tracker) + } + end := time.Now() + p.publishMarker(end, end, inputCtx.ID, false, client, tracker) + tracker.Wait() } - - end := time.Now() - p.publishMarker(end, end, inputCtx.ID, false, client, tracker) - - tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return time.Time{}, ctx.Err() } - state.lastSync = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. This will not include any of the deleted + // users since they were not collected. + latest := state.whenChanged + state.lastSync = latest err = state.close(true) if err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return time.Time{}, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // runIncrementalUpdate will run an incremental update. The process is similar // to full synchronization, except only users which have changed (newly // discovered, modified, or deleted) will be published. -func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { +func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) (time.Time, error) { p.logger.Debugf("Running incremental update...") state, err := newStateStore(store) if err != nil { - return fmt.Errorf("unable to begin transaction: %w", err) + return last, fmt.Errorf("unable to begin transaction: %w", err) } defer func() { // If commit is successful, call to this close will be no-op. closeErr := state.close(false) @@ -253,62 +304,37 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store ctx := ctxtool.FromCanceller(inputCtx.Cancelation) updatedUsers, err := p.doFetchUsers(ctx, state, false) if err != nil { - return err + return last, err } - var tracker *kvstore.TxTracker - if len(updatedUsers) != 0 || state.len() != 0 { - // Active Directory does not have a notion of deleted users - // beyond absence from the directory, so compare found users - // with users already known by the state store and if any - // are in the store but not returned in the previous fetch, - // mark them as deleted and publish the deletion. We do not - // have the time of the deletion, so use now. - if state.len() != 0 { - found := make(map[string]bool) - for _, u := range updatedUsers { - found[u.ID] = true - } - deleted := make(map[string]*User) - now := time.Now() - state.forEach(func(u *User) { - if u.State == Deleted || found[u.ID] { - return - } - // This modifies the state store's copy since u - // is a pointer held by the state store map. - u.State = Deleted - u.WhenChanged = now - deleted[u.ID] = u - }) - for _, u := range deleted { - updatedUsers = append(updatedUsers, u) - } - } - if len(updatedUsers) != 0 { - tracker = kvstore.NewTxTracker(ctx) - for _, u := range updatedUsers { - p.publishUser(u, state, inputCtx.ID, client, tracker) - } - tracker.Wait() + if len(updatedUsers) != 0 { + tracker := kvstore.NewTxTracker(ctx) + for _, u := range updatedUsers { + p.publishUser(u, state, inputCtx.ID, client, tracker) } + tracker.Wait() } if ctx.Err() != nil { - return ctx.Err() + return last, ctx.Err() } - state.lastUpdate = time.Now() + // state.whenChanged is modified by the call to doFetchUsers to be + // the latest modification time for all of the users that have been + // collected in that call. + latest := state.whenChanged + state.lastUpdate = latest if err = state.close(true); err != nil { - return fmt.Errorf("unable to commit state: %w", err) + return last, fmt.Errorf("unable to commit state: %w", err) } - return nil + return latest, nil } // doFetchUsers handles fetching user identities from Active Directory. If // fullSync is true, then any existing whenChanged will be ignored, forcing a -// full synchronization from Active Directory. +// full synchronization from Active Directory. The whenChanged time of state +// is modified to be the time stamp of the latest User.WhenChanged value. // Returns a set of modified users by ID. func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) { var since time.Time @@ -322,31 +348,14 @@ func (p *adInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync return nil, err } - var ( - users []*User - whenChanged time.Time - ) - if fullSync { - for _, u := range entries { - state.storeUser(u) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } + users := make([]*User, 0, len(entries)) + for _, u := range entries { + users = append(users, state.storeUser(u)) + if u.WhenChanged.After(state.whenChanged) { + state.whenChanged = u.WhenChanged } - } else { - users = make([]*User, 0, len(entries)) - for _, u := range entries { - users = append(users, state.storeUser(u)) - if u.WhenChanged.After(whenChanged) { - whenChanged = u.WhenChanged - } - } - p.logger.Debugf("processed %d users from API", len(users)) } - if whenChanged.After(state.whenChanged) { - state.whenChanged = whenChanged - } - + p.logger.Debugf("processed %d users from API", len(users)) return users, nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go index 74486ebaac69..c81ece21a30a 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go @@ -170,6 +170,13 @@ func (s *stateStore) close(commit bool) (err error) { } for key, value := range s.users { + if value.State == Deleted { + err = s.tx.Delete(usersBucket, []byte(key)) + if err != nil { + return fmt.Errorf("unable to delete user %q from state: %w", key, err) + } + continue + } err = s.tx.Set(usersBucket, []byte(key), value) if err != nil { return fmt.Errorf("unable to save user %q to state: %w", key, err)