Skip to content

Commit

Permalink
feat(issue): refactor issue aggregations (#296)
Browse files Browse the repository at this point in the history
* feat(issue): refactor issue aggregations

* feat(issue): add e2e test for aggregation
  • Loading branch information
MR2011 authored Oct 22, 2024
1 parent d18b516 commit f423cf9
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 89 deletions.
2 changes: 1 addition & 1 deletion internal/api/graphql/graph/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func NewIssueWithAggregations(issue *entity.IssueResult) Issue {
if issue.IssueAggregations != nil {
metadata = IssueMetadata{
ServiceCount: int(issue.IssueAggregations.AffectedServices),
ActivityCount: int(issue.IssueAggregations.Activites),
ActivityCount: int(issue.IssueAggregations.Activities),
IssueMatchCount: int(issue.IssueAggregations.IssueMatches),
ComponentInstanceCount: int(issue.IssueAggregations.AffectedComponentInstances),
ComponentVersionCount: int(issue.IssueAggregations.ComponentVersions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,23 @@ query ($filter: IssueFilter, $first: Int, $after: String) {
earliestDiscoveryDate
earliestTargetRemediationDate
}
issueMatches {
totalCount
edges {
node {
componentInstance {
count
service {
id
}
}
}
}
}
activities {
totalCount
}
}
cursor
}
}
}
}
12 changes: 6 additions & 6 deletions internal/database/mariadb/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type GetIssuesByRow struct {
}

type IssueAggregationsRow struct {
Activites sql.NullInt64 `db:"agg_activities"`
Activities sql.NullInt64 `db:"agg_activities"`
IssueMatches sql.NullInt64 `db:"agg_issue_matches"`
AffectedServices sql.NullInt64 `db:"agg_affected_services"`
ComponentVersions sql.NullInt64 `db:"agg_component_versions"`
Expand All @@ -124,11 +124,11 @@ type IssueAggregationsRow struct {
func (ibr *GetIssuesByRow) AsIssueWithAggregations() entity.IssueWithAggregations {
return entity.IssueWithAggregations{
IssueAggregations: entity.IssueAggregations{
Activites: GetInt64Value(ibr.IssueAggregationsRow.Activites),
IssueMatches: GetInt64Value(ibr.IssueAggregationsRow.IssueMatches),
AffectedServices: GetInt64Value(ibr.IssueAggregationsRow.AffectedServices),
ComponentVersions: GetInt64Value(ibr.IssueAggregationsRow.ComponentVersions),
AffectedComponentInstances: GetInt64Value(ibr.IssueAggregationsRow.AffectedComponentInstances),
Activities: lo.Max([]int64{0, GetInt64Value(ibr.IssueAggregationsRow.Activities)}),
IssueMatches: lo.Max([]int64{0, GetInt64Value(ibr.IssueAggregationsRow.IssueMatches)}),
AffectedServices: lo.Max([]int64{0, GetInt64Value(ibr.IssueAggregationsRow.AffectedServices)}),
ComponentVersions: lo.Max([]int64{0, GetInt64Value(ibr.IssueAggregationsRow.ComponentVersions)}),
AffectedComponentInstances: lo.Max([]int64{0, GetInt64Value(ibr.IssueAggregationsRow.AffectedComponentInstances)}),
EarliestTargetRemediationDate: GetTimeValue(ibr.IssueAggregationsRow.EarliestTargetRemediationDate),
EarliestDiscoveryDate: GetTimeValue(ibr.IssueAggregationsRow.EarliestDiscoveryDate),
},
Expand Down
141 changes: 92 additions & 49 deletions internal/database/mariadb/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ const (
wildCardFilterParamCount = 2
)

func (s *SqlDatabase) buildIssueFilterParameters(filter *entity.IssueFilter, withCursor bool, cursor entity.Cursor) []interface{} {
var filterParameters []interface{}
filterParameters = buildQueryParameters(filterParameters, filter.ServiceName)
filterParameters = buildQueryParameters(filterParameters, filter.Id)
filterParameters = buildQueryParameters(filterParameters, filter.IssueMatchStatus)
filterParameters = buildQueryParameters(filterParameters, filter.ActivityId)
filterParameters = buildQueryParameters(filterParameters, filter.IssueMatchId)
filterParameters = buildQueryParameters(filterParameters, filter.ComponentVersionId)
filterParameters = buildQueryParameters(filterParameters, filter.IssueVariantId)
filterParameters = buildQueryParameters(filterParameters, filter.Type)
filterParameters = buildQueryParameters(filterParameters, filter.PrimaryName)
filterParameters = buildQueryParametersCount(filterParameters, filter.Search, wildCardFilterParamCount)
if withCursor {
filterParameters = append(filterParameters, cursor.Value)
filterParameters = append(filterParameters, cursor.Limit)
}

return filterParameters
}

func (s *SqlDatabase) getIssueFilterString(filter *entity.IssueFilter) string {
var fl []string
fl = append(fl, buildFilterQuery(filter.ServiceName, "S.service_name = ?", OP_OR))
Expand All @@ -36,28 +56,28 @@ func (s *SqlDatabase) getIssueFilterString(filter *entity.IssueFilter) string {
return combineFilterQueries(fl, OP_AND)
}

func (s *SqlDatabase) getIssueJoins(filter *entity.IssueFilter, withAggregations bool) string {
func (s *SqlDatabase) getIssueJoins(filter *entity.IssueFilter) string {
joins := ""
if len(filter.ActivityId) > 0 || withAggregations {
if len(filter.ActivityId) > 0 {
joins = fmt.Sprintf("%s\n%s", joins, `
LEFT JOIN ActivityHasIssue AHI on I.issue_id = AHI.activityhasissue_issue_id
LEFT JOIN Activity A on AHI.activityhasissue_activity_id = A.activity_id
`)
}
if len(filter.IssueMatchStatus) > 0 || len(filter.ServiceName) > 0 || len(filter.IssueMatchId) > 0 || withAggregations {
if len(filter.IssueMatchStatus) > 0 || len(filter.ServiceName) > 0 || len(filter.IssueMatchId) > 0 {
joins = fmt.Sprintf("%s\n%s", joins, `
LEFT JOIN IssueMatch IM ON I.issue_id = IM.issuematch_issue_id
`)
}
if len(filter.ServiceName) > 0 || withAggregations {
if len(filter.ServiceName) > 0 {
joins = fmt.Sprintf("%s\n%s", joins, `
LEFT JOIN ComponentInstance CI ON CI.componentinstance_id = IM.issuematch_component_instance_id
LEFT JOIN ComponentVersion CV ON CI.componentinstance_component_version_id = CV.componentversion_id
LEFT JOIN Service S ON S.service_id = CI.componentinstance_service_id
`)
}

if len(filter.ComponentVersionId) > 0 || withAggregations {
if len(filter.ComponentVersionId) > 0 {
joins = fmt.Sprintf("%s\n%s", joins, `
LEFT JOIN ComponentVersionIssue CVI ON I.issue_id = CVI.componentversionissue_issue_id
`)
Expand Down Expand Up @@ -117,27 +137,20 @@ func (s *SqlDatabase) getIssueUpdateFields(issue *entity.Issue) string {
return strings.Join(fl, ", ")
}

func (s *SqlDatabase) buildIssueStatement(baseQuery string, filter *entity.IssueFilter, aggregations []string, withCursor bool, l *logrus.Entry) (*sqlx.Stmt, []interface{}, error) {
func (s *SqlDatabase) buildIssueStatement(baseQuery string, filter *entity.IssueFilter, withCursor bool, l *logrus.Entry) (*sqlx.Stmt, []interface{}, error) {
var query string
filter = s.ensureIssueFilter(filter)
l.WithFields(logrus.Fields{"filter": filter})

filterStr := s.getIssueFilterString(filter)
withAggreations := len(aggregations) > 0
joins := s.getIssueJoins(filter, withAggreations)
joins := s.getIssueJoins(filter)
cursor := getCursor(filter.Paginated, filterStr, "I.issue_id > ?")

whereClause := ""
if filterStr != "" || withCursor {
whereClause = fmt.Sprintf("WHERE %s", filterStr)
}

ags := ""
if len(aggregations) > 0 {
ags = fmt.Sprintf(", %s", strings.Join(aggregations, ", "))
baseQuery = fmt.Sprintf(baseQuery, ags, "%s", "%s", "%s")
}

// construct final query
if withCursor {
query = fmt.Sprintf(baseQuery, joins, whereClause, cursor.Statement)
Expand All @@ -162,21 +175,7 @@ func (s *SqlDatabase) buildIssueStatement(baseQuery string, filter *entity.Issue
}

//adding parameters
var filterParameters []interface{}
filterParameters = buildQueryParameters(filterParameters, filter.ServiceName)
filterParameters = buildQueryParameters(filterParameters, filter.Id)
filterParameters = buildQueryParameters(filterParameters, filter.IssueMatchStatus)
filterParameters = buildQueryParameters(filterParameters, filter.ActivityId)
filterParameters = buildQueryParameters(filterParameters, filter.IssueMatchId)
filterParameters = buildQueryParameters(filterParameters, filter.ComponentVersionId)
filterParameters = buildQueryParameters(filterParameters, filter.IssueVariantId)
filterParameters = buildQueryParameters(filterParameters, filter.Type)
filterParameters = buildQueryParameters(filterParameters, filter.PrimaryName)
filterParameters = buildQueryParametersCount(filterParameters, filter.Search, wildCardFilterParamCount)
if withCursor {
filterParameters = append(filterParameters, cursor.Value)
filterParameters = append(filterParameters, cursor.Limit)
}
filterParameters := s.buildIssueFilterParameters(filter, withCursor, cursor)

return stmt, filterParameters, nil
}
Expand All @@ -188,34 +187,78 @@ func (s *SqlDatabase) GetIssuesWithAggregations(filter *entity.IssueFilter) ([]e
"event": "database.GetIssuesWithAggregations",
})

baseQuery := `
SELECT I.* %s FROM Issue I
baseCiQuery := `
SELECT I.*, SUM(CI.componentinstance_count) AS agg_affected_component_instances FROM Issue I
LEFT JOIN IssueMatch IM on I.issue_id = IM.issuematch_issue_id
LEFT JOIN ComponentInstance CI on IM.issuematch_component_instance_id = CI.componentinstance_id
%s
%s
%s GROUP BY I.issue_id ORDER BY I.issue_id LIMIT ?
`

baseAggQuery := `
SELECT I.*,
count(distinct issuematch_id) as agg_issue_matches,
count(distinct activity_id) as agg_activities,
count(distinct service_name) as agg_affected_services,
count(distinct componentversionissue_component_version_id) as agg_component_versions,
min(issuematch_target_remediation_date) as agg_earliest_target_remediation_date,
min(issuematch_created_at) agg_earliest_discovery_date
FROM Issue I
LEFT JOIN ActivityHasIssue AHI on I.issue_id = AHI.activityhasissue_issue_id
LEFT JOIN Activity A on AHI.activityhasissue_activity_id = A.activity_id
LEFT JOIN IssueMatch IM on I.issue_id = IM.issuematch_issue_id
LEFT JOIN ComponentInstance CI ON CI.componentinstance_id = IM.issuematch_component_instance_id
LEFT JOIN ComponentVersion CV ON CI.componentinstance_component_version_id = CV.componentversion_id
LEFT JOIN Service S ON S.service_id = CI.componentinstance_service_id
LEFT JOIN ComponentVersionIssue CVI ON I.issue_id = CVI.componentversionissue_issue_id
%s
%s
%s GROUP BY I.issue_id ORDER BY I.issue_id LIMIT ?
`
`

aggregations := []string{
"count(distinct issuematch_id) as agg_issue_matches",
"count(distinct activity_id) as agg_activities",
"count(distinct service_name) as agg_affected_services",
"count(distinct componentversionissue_component_version_id) as agg_component_versions",
"sum(componentinstance_count) as agg_affected_component_instances",
"min(issuematch_target_remediation_date) as agg_earliest_target_remediation_date",
"min(issuematch_created_at) agg_earliest_discovery_date",
}
baseQuery := `
With ComponentInstanceCounts AS (
%s
),
Aggs AS (
%s
)
SELECT A.*, CIC.*
FROM ComponentInstanceCounts CIC
JOIN Aggs A ON CIC.issue_id = A.issue_id;
`

stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, aggregations, true, l)
filter = s.ensureIssueFilter(filter)
filterStr := s.getIssueFilterString(filter)
joins := s.getIssueJoins(filter)
cursor := getCursor(filter.Paginated, filterStr, "I.issue_id > ?")
whereClause := fmt.Sprintf("WHERE %s", filterStr)

ciQuery := fmt.Sprintf(baseCiQuery, joins, whereClause, cursor.Statement)
aggQuery := fmt.Sprintf(baseAggQuery, joins, whereClause, cursor.Statement)
query := fmt.Sprintf(baseQuery, ciQuery, aggQuery)

var stmt *sqlx.Stmt
var err error

stmt, err = s.db.Preparex(query)
if err != nil {
msg := ERROR_MSG_PREPARED_STMT
l.WithFields(
logrus.Fields{
"error": err,
"aggregations": aggregations,
"error": err,
"query": query,
"stmt": stmt,
}).Error(msg)
return nil, fmt.Errorf("%s", msg)
}

// parameters for component instance query
filterParameters := s.buildIssueFilterParameters(filter, true, cursor)
// parameters for agg query
filterParameters = append(filterParameters, s.buildIssueFilterParameters(filter, true, cursor)...)

defer stmt.Close()

return performListScan(
Expand All @@ -238,7 +281,7 @@ func (s *SqlDatabase) CountIssues(filter *entity.IssueFilter) (int64, error) {
%s
%s
`
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, []string{}, false, l)
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, false, l)

if err != nil {
return -1, err
Expand All @@ -261,7 +304,7 @@ func (s *SqlDatabase) CountIssueTypes(filter *entity.IssueFilter) (*entity.Issue
GROUP BY I.issue_type
`

stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, []string{}, false, l)
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, false, l)

if err != nil {
return nil, err
Expand Down Expand Up @@ -308,7 +351,7 @@ func (s *SqlDatabase) GetAllIssueIds(filter *entity.IssueFilter) ([]int64, error
%s GROUP BY I.issue_id ORDER BY I.issue_id
`

stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, []string{}, false, l)
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, false, l)

if err != nil {
return nil, err
Expand All @@ -333,7 +376,7 @@ func (s *SqlDatabase) GetIssues(filter *entity.IssueFilter) ([]entity.Issue, err

filter = s.ensureIssueFilter(filter)

stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, []string{}, true, l)
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, true, l)

if err != nil {
return nil, err
Expand Down Expand Up @@ -502,7 +545,7 @@ func (s *SqlDatabase) GetIssueNames(filter *entity.IssueFilter) ([]string, error
filter = s.ensureIssueFilter(filter)

// Builds full statement with possible joins and filters
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, []string{}, false, l)
stmt, filterParameters, err := s.buildIssueStatement(baseQuery, filter, false, l)
if err != nil {
l.Error("Error preparing statement: ", err)
return nil, err
Expand Down
36 changes: 34 additions & 2 deletions internal/database/mariadb/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,39 @@ var _ = Describe("Issue", Label("database", "Issue"), func() {
})
})
When("Getting Issues with Aggregations", Label("GetIssuesWithAggregations"), func() {
BeforeEach(func() {
_ = seeder.SeedDbWithNFakeData(10)
Context("and the database contains service without aggregations", func() {
BeforeEach(func() {
newIssueRow := test.NewFakeIssue()
newIssue := newIssueRow.AsIssue()
db.CreateIssue(&newIssue)
})
It("returns the issues with aggregations", func() {
entriesWithAggregations, err := db.GetIssuesWithAggregations(nil)

By("throwing no error", func() {
Expect(err).To(BeNil())
})

By("returning some aggregations", func() {
for _, entryWithAggregations := range entriesWithAggregations {
Expect(entryWithAggregations).NotTo(
BeEquivalentTo(entity.IssueAggregations{}))
Expect(entryWithAggregations.IssueAggregations.Activities).To(BeEquivalentTo(0))
Expect(entryWithAggregations.IssueAggregations.IssueMatches).To(BeEquivalentTo(0))
Expect(entryWithAggregations.IssueAggregations.AffectedServices).To(BeEquivalentTo(0))
Expect(entryWithAggregations.IssueAggregations.AffectedComponentInstances).To(BeEquivalentTo(0))
Expect(entryWithAggregations.IssueAggregations.ComponentVersions).To(BeEquivalentTo(0))
}
})
By("returning all issues", func() {
Expect(len(entriesWithAggregations)).To(BeEquivalentTo(1))
})
})
})
Context("and and we have 10 elements in the database", func() {
BeforeEach(func() {
_ = seeder.SeedDbWithNFakeData(10)
})
It("returns the issues with aggregations", func() {
entriesWithAggregations, err := db.GetIssuesWithAggregations(nil)

Expand All @@ -433,6 +462,9 @@ var _ = Describe("Issue", Label("database", "Issue"), func() {
BeEquivalentTo(entity.IssueAggregations{}))
}
})
By("returning all ld constraints exclude all Go files inservices", func() {
Expect(len(entriesWithAggregations)).To(BeEquivalentTo(10))
})
})
It("returns correct aggregation values", func() {
//Should be filled with a check for each aggregation value,
Expand Down
Loading

0 comments on commit f423cf9

Please sign in to comment.