Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elastic documents purge job #209

Merged
merged 8 commits into from
Jan 7, 2025
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/json-iterator/go v1.1.12
github.com/lib/pq v1.10.9
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7
github.com/prataprc/goparsec v0.0.0-20211219142520-daac0e635e7e
github.com/prometheus/client_golang v1.20.2
github.com/robfig/cron/v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6 h1:mRJ5ii2DbPUZLMiWZfTE7TSCkKPSh9FYzk4D/obpqpI=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7 h1:qOQcdWs1bpqZY7+xNgEm8ub4Oc5CawYgQrTGj4u8/Eg=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
Expand Down
59 changes: 59 additions & 0 deletions internals/fact/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package fact
import (
"context"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/deletebyquery"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/conflicts"
"github.com/myrteametrics/myrtea-sdk/v5/elasticsearch"
"strings"
"time"
Expand Down Expand Up @@ -62,6 +65,62 @@ func ExecuteFact(
return widgetData, nil
}

func ExecuteFactDeleteQuery(
ti time.Time,
f engine.Fact,
) (*deletebyquery.Response, error) {

parameters := make(map[string]string, 0)
f.ContextualizeDimensions(ti, parameters)
err := f.ContextualizeCondition(ti, parameters)
if err := f.ContextualizeCondition(ti, parameters); err != nil {
return nil, fmt.Errorf("failed to contextualize condition: %w", err)
}

searchRequest, err := elasticsearch.ConvertFactToSearchRequestV8(f, ti, parameters)
if err != nil {
zap.L().Error("ConvertFactToSearchRequestV8 failed", zap.Error(err))
return nil, fmt.Errorf("failed to convert fact to search request: %w", err)
}

indices := FindIndices(f, ti, false)
if len(indices) == 0 {
return nil, errors.New("no indices found for the fact")
}

zap.L().Debug("Preparing to execute DeleteByQuery",
zap.Strings("indices", indices),
zap.Any("request", searchRequest))

response, err := elasticsearch.C().DeleteByQuery(strings.Join(indices, ",")).
Query(searchRequest.Query).
Conflicts(conflicts.Proceed). // Ignore les conflits (insertion/suppression)
Do(context.Background())

if err != nil {
zap.L().Error("ES DeleteByQuery execution failed", zap.Error(err))
return nil, fmt.Errorf("failed to execute DeleteByQuery: %w", err)
}

// Check for failures in the response
if len(response.Failures) > 0 {
zap.L().Warn("DeleteByQuery encountered shard failures", zap.Any("failures", response.Failures))
return nil, errors.New("one or more shards failed during DeleteByQuery")
}

// Check if the request timed out
if response.TimedOut != nil && *response.TimedOut {
zap.L().Warn("DeleteByQuery timed out")
return nil, errors.New("delete by query operation timed out")
}

zap.L().Info("DeleteByQuery completed successfully",
zap.Int64p("deleted", response.Deleted),
zap.Int64p("version_conflicts", response.VersionConflicts))

return response, nil
}

func FindIndices(f engine.Fact, ti time.Time, update bool) []string {

var indices []string
Expand Down
83 changes: 83 additions & 0 deletions internals/scheduler/elastic_doc_purge_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package scheduler

import (
"errors"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/fact"
"github.com/myrteametrics/myrtea-sdk/v5/engine"
"go.uber.org/zap"
"time"
)

type ElasticDocPurgeJob struct {
FactIds []int64 `json:"factIds"`
ScheduleID int64 `json:"-"`
}

func (job ElasticDocPurgeJob) IsValid() (bool, error) {
if job.FactIds == nil {
return false, errors.New("missing FactIds")
}
if len(job.FactIds) <= 0 {
return false, errors.New("missing FactIds")
}
Ismail731404 marked this conversation as resolved.
Show resolved Hide resolved
return true, nil
}

func (job ElasticDocPurgeJob) Run() {

if S().ExistingRunningJob(job.ScheduleID) {
zap.L().Info("Skipping Elastic Documents Purge Job because last execution is still running", zap.Int64s("ids", job.FactIds))
Ismail731404 marked this conversation as resolved.
Show resolved Hide resolved
return
}
S().AddRunningJob(job.ScheduleID)

zap.L().Info("Delete Elastic Document job started", zap.Int64s("ids", job.FactIds))
Ismail731404 marked this conversation as resolved.
Show resolved Hide resolved

t := time.Now().Truncate(1 * time.Second).UTC()

PurgeElasticDocs(t, job.FactIds)

zap.L().Info("Elastic Doc Purge job Ended", zap.Int64("id Schedule", job.ScheduleID))
Ismail731404 marked this conversation as resolved.
Show resolved Hide resolved
S().RemoveRunningJob(job.ScheduleID)

}

func PurgeElasticDocs(t time.Time, factIds []int64) {
zap.L().Info("Starting Elastic document purge", zap.Time("timestamp", t), zap.Int("number_of_facts", len(factIds)))

for _, factId := range factIds {
zap.L().Debug("Processing fact", zap.Int64("factId", factId))

// Retrieve the Fact
f, found, err := fact.R().Get(factId)
if err != nil {
zap.L().Error("Error retrieving the fact; skipping deletion",
zap.Int64("factId", factId),
zap.Error(err))
continue
}

if !found {
zap.L().Warn("Fact does not exist; skipping deletion", zap.Int64("factId", factId))
continue
}

if f.Intent.Operator != engine.Delete {
zap.L().Warn("Fact is not a delete fact; skipping deletion", zap.Int64("factId", factId))
Ismail731404 marked this conversation as resolved.
Show resolved Hide resolved
}

// Execute deletion
_, err = fact.ExecuteFactDeleteQuery(t, f)
if err != nil {
zap.L().Error("Error during fact deletion",
zap.Int64("factId", f.ID),
zap.Any("fact", f),
zap.Error(err))
continue
}

zap.L().Info("Fact successfully deleted from Elastic", zap.Int64("factId", f.ID))
}

zap.L().Info("Elastic document purge completed", zap.Time("timestamp", t), zap.Int("processed_facts", len(factIds)))
}
16 changes: 11 additions & 5 deletions internals/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ type InternalJob interface {
}

var jobTypes = map[string]struct{}{
"fact": {},
"baseline": {},
"compact": {},
"purge": {},
"fact": {},
"baseline": {},
"compact": {},
"purge": {},
"elastic_doc_purge": {},
}

// InternalSchedule wrap a schedule
type InternalSchedule struct {
ID int64 `json:"id"`
Name string `json:"name"`
CronExpr string `json:"cronexpr" example:"0 */15 * * *"`
JobType string `json:"jobtype" enums:"fact,baseline,compact,purge"`
JobType string `json:"jobtype" enums:"fact,baseline,compact,purge,elastic_doc_purge"`
Job InternalJob `json:"job"`
Enabled bool `json:"enabled"`
}
Expand Down Expand Up @@ -103,6 +104,11 @@ func UnmarshalInternalJob(t string, b json.RawMessage, scheduleID int64) (Intern
err = json.Unmarshal(b, &tJob)
tJob.ScheduleID = scheduleID
job = tJob
case "elastic_doc_purge":
var tJob ElasticDocPurgeJob
err = json.Unmarshal(b, &tJob)
tJob.ScheduleID = scheduleID
job = tJob

default:
zap.L().Error("unknown internal job type", zap.String("type", t))
Expand Down
Loading