Skip to content

Commit

Permalink
fix(indexer): extract Apps from Promotion directives (#2620)
Browse files Browse the repository at this point in the history
Signed-off-by: Hidde Beydals <[email protected]>
  • Loading branch information
hiddeco authored Oct 1, 2024
1 parent 0e79db7 commit 806fa9f
Show file tree
Hide file tree
Showing 18 changed files with 183 additions and 74 deletions.
8 changes: 4 additions & 4 deletions cmd/controlplane/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/api/kubernetes"
"github.com/akuity/kargo/internal/garbage"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/logging"
"github.com/akuity/kargo/internal/os"
versionpkg "github.com/akuity/kargo/internal/version"
Expand Down Expand Up @@ -112,15 +112,15 @@ func (o *garbageCollectorOptions) setupManager(ctx context.Context) (manager.Man
}

// Index Promotions by Stage
if err = kubeclient.IndexPromotionsByStage(ctx, mgr); err != nil {
if err = indexer.IndexPromotionsByStage(ctx, mgr); err != nil {
return nil, fmt.Errorf("error indexing Promotions by Stage: %w", err)
}
// Index Freight by Warehouse
if err = kubeclient.IndexFreightByWarehouse(ctx, mgr); err != nil {
if err = indexer.IndexFreightByWarehouse(ctx, mgr); err != nil {
return nil, fmt.Errorf("error indexing Freight by Warehouse: %w", err)
}
// Index Stages by Freight
if err = kubeclient.IndexStagesByFreight(ctx, mgr); err != nil {
if err = indexer.IndexStagesByFreight(ctx, mgr); err != nil {
return nil, fmt.Errorf("error indexing Stages by Freight: %w", err)
}
return mgr, nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/controlplane/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/api/kubernetes"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/logging"
"github.com/akuity/kargo/internal/os"
versionpkg "github.com/akuity/kargo/internal/version"
Expand Down Expand Up @@ -107,7 +107,7 @@ func (o *webhooksServerOptions) run(ctx context.Context) error {
}

// Index Stages by Freight
if err = kubeclient.IndexStagesByFreight(ctx, mgr); err != nil {
if err = indexer.IndexStagesByFreight(ctx, mgr); err != nil {
return fmt.Errorf("index Stages by Freight: %w", err)
}

Expand Down
14 changes: 7 additions & 7 deletions internal/api/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/api/user"
rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/logging"
)

Expand Down Expand Up @@ -233,22 +233,22 @@ func newDefaultInternalClient(
}

// Add all indices required by the API server
if err = kubeclient.IndexPromotionsByStage(ctx, cluster); err != nil {
if err = indexer.IndexPromotionsByStage(ctx, cluster); err != nil {
return nil, fmt.Errorf("error indexing Promotions by Stage: %w", err)
}
if err = kubeclient.IndexFreightByWarehouse(ctx, cluster); err != nil {
if err = indexer.IndexFreightByWarehouse(ctx, cluster); err != nil {
return nil, fmt.Errorf("error indexing Freight by Warehouse: %w", err)
}
if err = kubeclient.IndexFreightByVerifiedStages(ctx, cluster); err != nil {
if err = indexer.IndexFreightByVerifiedStages(ctx, cluster); err != nil {
return nil, fmt.Errorf("error indexing Freight by Stages in which it has been verified: %w", err)
}
if err = kubeclient.IndexFreightByApprovedStages(ctx, cluster); err != nil {
if err = indexer.IndexFreightByApprovedStages(ctx, cluster); err != nil {
return nil, fmt.Errorf("error indexing Freight by Stages for which it has been approved: %w", err)
}
if err = kubeclient.IndexServiceAccountsByOIDCClaims(ctx, cluster); err != nil {
if err = indexer.IndexServiceAccountsByOIDCClaims(ctx, cluster); err != nil {
return nil, fmt.Errorf("index ServiceAccounts by OIDC claims: %w", err)
}
if err = kubeclient.IndexEventsByInvolvedObjectAPIGroup(ctx, cluster); err != nil {
if err = indexer.IndexEventsByInvolvedObjectAPIGroup(ctx, cluster); err != nil {
return nil, fmt.Errorf("error indexing Events by InvolvedObject's API group: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/api/list_project_events_v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1"
)

Expand All @@ -36,7 +36,7 @@ func (s *server) ListProjectEvents(
// List Kargo related events only
client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(
kubeclient.EventsByInvolvedObjectAPIGroupIndexField,
indexer.EventsByInvolvedObjectAPIGroupIndexField,
kargoapi.GroupVersion.Group,
),
},
Expand Down
4 changes: 2 additions & 2 deletions internal/api/list_promotions_v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1"
)

Expand All @@ -33,7 +33,7 @@ func (s *server) ListPromotions(
client.InNamespace(project),
}
if stage != "" {
opts = append(opts, client.MatchingFields{kubeclient.PromotionsByStageIndexField: stage})
opts = append(opts, client.MatchingFields{indexer.PromotionsByStageIndexField: stage})
}
if err := s.client.List(ctx, &list, opts...); err != nil {
return nil, fmt.Errorf("list promotions: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions internal/api/option/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/api/config"
"github.com/akuity/kargo/internal/api/user"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
)

const authHeaderKey = "Authorization"
Expand Down Expand Up @@ -254,14 +254,14 @@ func (a *authInterceptor) listServiceAccounts(
for claimName, claimValue := range c {
if claimValuesString, ok := claimValue.(string); ok {
queries = append(queries, libClient.MatchingFields{
kubeclient.ServiceAccountsByOIDCClaimsIndexField: kubeclient.FormatClaim(claimName, claimValuesString),
indexer.ServiceAccountsByOIDCClaimsIndexField: indexer.FormatClaim(claimName, claimValuesString),
})
}
if claimValueSlice, ok := claimValue.([]any); ok {
for _, claimValueSliceItem := range claimValueSlice {
if claimValueSliceItemString, ok := claimValueSliceItem.(string); ok {
queries = append(queries, libClient.MatchingFields{
kubeclient.ServiceAccountsByOIDCClaimsIndexField: kubeclient.FormatClaim(
indexer.ServiceAccountsByOIDCClaimsIndexField: indexer.FormatClaim(
claimName, claimValueSliceItemString,
),
})
Expand Down
8 changes: 4 additions & 4 deletions internal/api/query_freights_v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1"
)

Expand Down Expand Up @@ -169,7 +169,7 @@ func (s *server) getAvailableFreightForStage(
&client.ListOptions{
Namespace: project,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightApprovedForStagesIndexField,
indexer.FreightApprovedForStagesIndexField,
stage,
),
},
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *server) getFreightFromWarehouses(
&client.ListOptions{
Namespace: project,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByWarehouseIndexField,
indexer.FreightByWarehouseIndexField,
warehouse,
),
},
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *server) getVerifiedFreight(
&client.ListOptions{
Namespace: project,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByVerifiedStagesIndexField,
indexer.FreightByVerifiedStagesIndexField,
upstream,
),
},
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/promotions/promotions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/akuity/kargo/internal/controller/promotion"
"github.com/akuity/kargo/internal/controller/runtime"
"github.com/akuity/kargo/internal/directives"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/kargo"
"github.com/akuity/kargo/internal/kubeclient"
libEvent "github.com/akuity/kargo/internal/kubernetes/event"
Expand Down Expand Up @@ -94,7 +95,7 @@ func SetupReconcilerWithManager(
cfg ReconcilerConfig,
) error {
// Index running Promotions by Argo CD Applications
if err := kubeclient.IndexRunningPromotionsByArgoCDApplications(ctx, kargoMgr, cfg.ShardName); err != nil {
if err := indexer.IndexRunningPromotionsByArgoCDApplications(ctx, kargoMgr, cfg.ShardName); err != nil {
return fmt.Errorf("index running Promotions by Argo CD Applications: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/controller/promotions/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

kargoapi "github.com/akuity/kargo/api/v1alpha1"
argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/logging"
)

Expand Down Expand Up @@ -208,7 +208,7 @@ func (u *UpdatedArgoCDAppHandler[T]) Update(
promotions,
&client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(
kubeclient.RunningPromotionsByArgoCDApplicationsIndexField,
indexer.RunningPromotionsByArgoCDApplicationsIndexField,
fmt.Sprintf("%s:%s", newApp.Namespace, newApp.Name),
),
LabelSelector: u.shardSelector,
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/promotions/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

kargoapi "github.com/akuity/kargo/api/v1alpha1"
argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/indexer"
)

func TestUpdatedArgoCDAppHandler_Update(t *testing.T) {
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestUpdatedArgoCDAppHandler_Update(t *testing.T) {
WithObjects(tt.applications...).
WithIndex(
&kargoapi.Promotion{},
kubeclient.RunningPromotionsByArgoCDApplicationsIndexField,
indexer.RunningPromotionsByArgoCDApplicationsIndexField,
tt.indexer,
).
WithInterceptorFuncs(tt.interceptor)
Expand Down
43 changes: 22 additions & 21 deletions internal/controller/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1"
rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1"
"github.com/akuity/kargo/internal/directives"
"github.com/akuity/kargo/internal/indexer"
"github.com/akuity/kargo/internal/kargo"
"github.com/akuity/kargo/internal/kubeclient"
libEvent "github.com/akuity/kargo/internal/kubernetes/event"
Expand Down Expand Up @@ -232,42 +233,42 @@ func SetupReconcilerWithManager(
cfg ReconcilerConfig,
) error {
// Index Promotions by Stage
if err := kubeclient.IndexPromotionsByStage(ctx, kargoMgr); err != nil {
if err := indexer.IndexPromotionsByStage(ctx, kargoMgr); err != nil {
return fmt.Errorf("index non-terminal Promotions by Stage: %w", err)
}

// Index Promotions by Stage + Freight
if err := kubeclient.IndexPromotionsByStageAndFreight(ctx, kargoMgr); err != nil {
if err := indexer.IndexPromotionsByStageAndFreight(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Promotions by Stage and Freight: %w", err)
}

// Index Freight by Warehouse
if err := kubeclient.IndexFreightByWarehouse(ctx, kargoMgr); err != nil {
if err := indexer.IndexFreightByWarehouse(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Freight by Warehouse: %w", err)
}

// Index Freight by Stages in which it has been verified
if err := kubeclient.IndexFreightByVerifiedStages(ctx, kargoMgr); err != nil {
if err := indexer.IndexFreightByVerifiedStages(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Freight by Stages in which it has been verified: %w", err)
}

// Index Freight by Stages for which it has been approved
if err := kubeclient.IndexFreightByApprovedStages(ctx, kargoMgr); err != nil {
if err := indexer.IndexFreightByApprovedStages(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Freight by Stages for which it has been approved: %w", err)
}

// Index Stages by upstream Stages
if err := kubeclient.IndexStagesByUpstreamStages(ctx, kargoMgr); err != nil {
if err := indexer.IndexStagesByUpstreamStages(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Stages by upstream Stages: %w", err)
}

// Index Stages by Warehouse
if err := kubeclient.IndexStagesByWarehouse(ctx, kargoMgr); err != nil {
if err := indexer.IndexStagesByWarehouse(ctx, kargoMgr); err != nil {
return fmt.Errorf("index Stages by Warehouse: %w", err)
}

// Index Stages by AnalysisRun
if err := kubeclient.IndexStagesByAnalysisRun(ctx, kargoMgr, cfg.ShardName); err != nil {
if err := indexer.IndexStagesByAnalysisRun(ctx, kargoMgr, cfg.ShardName); err != nil {
return fmt.Errorf("index Stages by Argo Rollouts AnalysisRun: %w", err)
}

Expand Down Expand Up @@ -947,8 +948,8 @@ func (r *reconciler) syncNormalStage(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.PromotionsByStageAndFreightIndexField,
kubeclient.StageAndFreightKey(stage.Name, latestFreight.Name),
indexer.PromotionsByStageAndFreightIndexField,
indexer.StageAndFreightKey(stage.Name, latestFreight.Name),
),
Limit: 1,
},
Expand Down Expand Up @@ -1147,7 +1148,7 @@ func (r *reconciler) clearVerifications(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByVerifiedStagesIndexField,
indexer.FreightByVerifiedStagesIndexField,
stage.Name,
),
},
Expand Down Expand Up @@ -1189,7 +1190,7 @@ func (r *reconciler) clearApprovals(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightApprovedForStagesIndexField,
indexer.FreightApprovedForStagesIndexField,
stage.Name,
),
},
Expand Down Expand Up @@ -1365,7 +1366,7 @@ func (r *reconciler) getPromotionsForStage(
&client.ListOptions{
Namespace: stageNamespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.PromotionsByStageIndexField,
indexer.PromotionsByStageIndexField,
stageName,
),
},
Expand Down Expand Up @@ -1396,7 +1397,7 @@ func (r *reconciler) getAvailableFreight(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByWarehouseIndexField,
indexer.FreightByWarehouseIndexField,
req.Origin.Name,
),
},
Expand All @@ -1419,7 +1420,7 @@ func (r *reconciler) getAvailableFreight(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByVerifiedStagesIndexField,
indexer.FreightByVerifiedStagesIndexField,
upstream,
),
},
Expand All @@ -1443,7 +1444,7 @@ func (r *reconciler) getAvailableFreight(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightApprovedForStagesIndexField,
indexer.FreightApprovedForStagesIndexField,
stage.Name,
),
},
Expand Down Expand Up @@ -1490,7 +1491,7 @@ func (r *reconciler) getAvailableFreightByOrigin(
&client.ListOptions{
Namespace: stage.Namespace,
FieldSelector: fields.OneTermEqualSelector(
kubeclient.FreightByWarehouseIndexField,
indexer.FreightByWarehouseIndexField,
req.Origin.Name,
),
},
Expand Down Expand Up @@ -1522,11 +1523,11 @@ func (r *reconciler) getAvailableFreightByOrigin(
// TODO(hidde): once we support more Freight origin
// kinds, we need to adjust this.
fields.OneTermEqualSelector(
kubeclient.FreightByWarehouseIndexField,
indexer.FreightByWarehouseIndexField,
req.Origin.Name,
),
fields.OneTermEqualSelector(
kubeclient.FreightByVerifiedStagesIndexField,
indexer.FreightByVerifiedStagesIndexField,
upstream,
),
),
Expand Down Expand Up @@ -1554,11 +1555,11 @@ func (r *reconciler) getAvailableFreightByOrigin(
// TODO(hidde): once we support more Freight origin
// kinds, we need to adjust this.
fields.OneTermEqualSelector(
kubeclient.FreightByWarehouseIndexField,
indexer.FreightByWarehouseIndexField,
req.Origin.Name,
),
fields.OneTermEqualSelector(
kubeclient.FreightApprovedForStagesIndexField,
indexer.FreightApprovedForStagesIndexField,
stage.Name,
),
),
Expand Down
Loading

0 comments on commit 806fa9f

Please sign in to comment.