diff --git a/go.mod b/go.mod index 3216a79..5123e82 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.10.9 - github.com/myrteametrics/myrtea-sdk/v5 v5.1.6 + github.com/myrteametrics/myrtea-sdk/v5 v5.1.7 github.com/prataprc/goparsec v0.0.0-20211219142520-daac0e635e7e github.com/prometheus/client_golang v1.20.2 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index 2ba677e..85a78c3 100644 --- a/go.sum +++ b/go.sum @@ -213,8 +213,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/myrteametrics/myrtea-sdk/v5 v5.1.6 h1:mRJ5ii2DbPUZLMiWZfTE7TSCkKPSh9FYzk4D/obpqpI= -github.com/myrteametrics/myrtea-sdk/v5 v5.1.6/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164= +github.com/myrteametrics/myrtea-sdk/v5 v5.1.7 h1:qOQcdWs1bpqZY7+xNgEm8ub4Oc5CawYgQrTGj4u8/Eg= +github.com/myrteametrics/myrtea-sdk/v5 v5.1.7/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= diff --git a/internals/fact/service.go b/internals/fact/service.go index d868a31..caa5aa3 100644 --- a/internals/fact/service.go +++ b/internals/fact/service.go @@ -3,6 +3,9 @@ package fact import ( "context" "errors" + "fmt" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/deletebyquery" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/conflicts" "github.com/myrteametrics/myrtea-sdk/v5/elasticsearch" "strings" "time" @@ -62,6 +65,62 @@ func ExecuteFact( return widgetData, nil } +func ExecuteFactDeleteQuery( + ti time.Time, + f engine.Fact, +) (*deletebyquery.Response, error) { + + parameters := make(map[string]string, 0) + f.ContextualizeDimensions(ti, parameters) + err := f.ContextualizeCondition(ti, parameters) + if err := f.ContextualizeCondition(ti, parameters); err != nil { + return nil, fmt.Errorf("failed to contextualize condition: %w", err) + } + + searchRequest, err := elasticsearch.ConvertFactToSearchRequestV8(f, ti, parameters) + if err != nil { + zap.L().Error("ConvertFactToSearchRequestV8 failed", zap.Error(err)) + return nil, fmt.Errorf("failed to convert fact to search request: %w", err) + } + + indices := FindIndices(f, ti, false) + if len(indices) == 0 { + return nil, errors.New("no indices found for the fact") + } + + zap.L().Debug("Preparing to execute DeleteByQuery", + zap.Strings("indices", indices), + zap.Any("request", searchRequest)) + + response, err := elasticsearch.C().DeleteByQuery(strings.Join(indices, ",")). + Query(searchRequest.Query). + Conflicts(conflicts.Proceed). // Ignore les conflits (insertion/suppression) + Do(context.Background()) + + if err != nil { + zap.L().Error("ES DeleteByQuery execution failed", zap.Error(err)) + return nil, fmt.Errorf("failed to execute DeleteByQuery: %w", err) + } + + // Check for failures in the response + if len(response.Failures) > 0 { + zap.L().Warn("DeleteByQuery encountered shard failures", zap.Any("failures", response.Failures)) + return nil, errors.New("one or more shards failed during DeleteByQuery") + } + + // Check if the request timed out + if response.TimedOut != nil && *response.TimedOut { + zap.L().Warn("DeleteByQuery timed out") + return nil, errors.New("delete by query operation timed out") + } + + zap.L().Info("DeleteByQuery completed successfully", + zap.Int64p("deleted", response.Deleted), + zap.Int64p("version_conflicts", response.VersionConflicts)) + + return response, nil +} + func FindIndices(f engine.Fact, ti time.Time, update bool) []string { var indices []string diff --git a/internals/scheduler/elastic_doc_purge_job.go b/internals/scheduler/elastic_doc_purge_job.go new file mode 100644 index 0000000..6009b6a --- /dev/null +++ b/internals/scheduler/elastic_doc_purge_job.go @@ -0,0 +1,81 @@ +package scheduler + +import ( + "errors" + "github.com/myrteametrics/myrtea-engine-api/v5/internals/fact" + "github.com/myrteametrics/myrtea-sdk/v5/engine" + "go.uber.org/zap" + "time" +) + +type ElasticDocPurgeJob struct { + FactIds []int64 `json:"factIds"` + ScheduleID int64 `json:"-"` +} + +func (job ElasticDocPurgeJob) IsValid() (bool, error) { + if job.FactIds == nil || len(job.FactIds) <= 0 { + return false, errors.New("missing FactIds") + } + return true, nil +} + +func (job ElasticDocPurgeJob) Run() { + + if S().ExistingRunningJob(job.ScheduleID) { + zap.L().Info("Skipping Elastic document purge job because last execution is still running", zap.Int64s("ids", job.FactIds)) + return + } + S().AddRunningJob(job.ScheduleID) + + zap.L().Info("Delete Elastic document job started", zap.Int64s("ids", job.FactIds)) + + t := time.Now().Truncate(1 * time.Second).UTC() + + PurgeElasticDocs(t, job.FactIds) + + zap.L().Info("Elastic document purge job ended", zap.Int64("id Schedule", job.ScheduleID)) + S().RemoveRunningJob(job.ScheduleID) + +} + +func PurgeElasticDocs(t time.Time, factIds []int64) { + zap.L().Info("Starting Elastic document purge", zap.Time("timestamp", t), zap.Int("number_of_facts", len(factIds))) + + for _, factId := range factIds { + zap.L().Debug("Processing fact", zap.Int64("factId", factId)) + + // Retrieve the Fact + f, found, err := fact.R().Get(factId) + if err != nil { + zap.L().Error("Error retrieving the fact; skipping deletion", + zap.Int64("factId", factId), + zap.Error(err)) + continue + } + + if !found { + zap.L().Warn("Fact does not exist; skipping deletion", zap.Int64("factId", factId)) + continue + } + + if f.Intent.Operator != engine.Delete { + zap.L().Warn("Fact is not a delete fact; skipping deletion", zap.Int64("factId", factId)) + continue + } + + // Execute deletion + _, err = fact.ExecuteFactDeleteQuery(t, f) + if err != nil { + zap.L().Error("Error during fact deletion", + zap.Int64("factId", f.ID), + zap.Any("fact", f), + zap.Error(err)) + continue + } + + zap.L().Info("Fact successfully deleted from Elastic", zap.Int64("factId", f.ID)) + } + + zap.L().Info("Elastic document purge completed", zap.Time("timestamp", t), zap.Int("processed_facts", len(factIds))) +} diff --git a/internals/scheduler/job.go b/internals/scheduler/job.go index 643eac0..418bf45 100644 --- a/internals/scheduler/job.go +++ b/internals/scheduler/job.go @@ -15,10 +15,11 @@ type InternalJob interface { } var jobTypes = map[string]struct{}{ - "fact": {}, - "baseline": {}, - "compact": {}, - "purge": {}, + "fact": {}, + "baseline": {}, + "compact": {}, + "purge": {}, + "elastic_doc_purge": {}, } // InternalSchedule wrap a schedule @@ -26,7 +27,7 @@ type InternalSchedule struct { ID int64 `json:"id"` Name string `json:"name"` CronExpr string `json:"cronexpr" example:"0 */15 * * *"` - JobType string `json:"jobtype" enums:"fact,baseline,compact,purge"` + JobType string `json:"jobtype" enums:"fact,baseline,compact,purge,elastic_doc_purge"` Job InternalJob `json:"job"` Enabled bool `json:"enabled"` } @@ -103,6 +104,11 @@ func UnmarshalInternalJob(t string, b json.RawMessage, scheduleID int64) (Intern err = json.Unmarshal(b, &tJob) tJob.ScheduleID = scheduleID job = tJob + case "elastic_doc_purge": + var tJob ElasticDocPurgeJob + err = json.Unmarshal(b, &tJob) + tJob.ScheduleID = scheduleID + job = tJob default: zap.L().Error("unknown internal job type", zap.String("type", t))