From 504d3a883a2406e5f84f5d8195ee6621672af0bb Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Tue, 1 Oct 2024 21:54:40 +0200 Subject: [PATCH] fix(indexer): extract Apps from Promotion directives (#2620) Signed-off-by: Hidde Beydals (cherry picked from commit 806fa9fc2326ac5ae5c1e2a6e2bb76294e35feab) --- cmd/controlplane/garbage_collector.go | 8 +-- cmd/controlplane/webhooks.go | 4 +- internal/api/kubernetes/client.go | 14 ++--- internal/api/list_project_events_v1alpha1.go | 4 +- internal/api/list_promotions_v1alpha1.go | 4 +- internal/api/option/auth.go | 6 +- internal/api/query_freights_v1alpha1.go | 8 +-- internal/controller/promotions/promotions.go | 3 +- internal/controller/promotions/watches.go | 4 +- .../controller/promotions/watches_test.go | 4 +- internal/controller/stages/stages.go | 43 ++++++------- internal/controller/stages/stages_test.go | 22 +++---- internal/controller/stages/watches.go | 8 +-- internal/garbage/freight.go | 6 +- internal/garbage/promotions.go | 4 +- internal/{kubeclient => indexer}/indexer.go | 50 ++++++++++++++- .../{kubeclient => indexer}/indexer_test.go | 61 ++++++++++++++++++- internal/webhook/freight/webhook.go | 4 +- 18 files changed, 183 insertions(+), 74 deletions(-) rename internal/{kubeclient => indexer}/indexer.go (91%) rename internal/{kubeclient => indexer}/indexer_test.go (92%) diff --git a/cmd/controlplane/garbage_collector.go b/cmd/controlplane/garbage_collector.go index a8b17c6ed..c4cc78cae 100644 --- a/cmd/controlplane/garbage_collector.go +++ b/cmd/controlplane/garbage_collector.go @@ -15,7 +15,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/api/kubernetes" "github.com/akuity/kargo/internal/garbage" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" "github.com/akuity/kargo/internal/os" versionpkg "github.com/akuity/kargo/internal/version" @@ -112,15 +112,15 @@ func (o *garbageCollectorOptions) setupManager(ctx context.Context) (manager.Man } // Index Promotions by Stage - if err = kubeclient.IndexPromotionsByStage(ctx, mgr); err != nil { + if err = indexer.IndexPromotionsByStage(ctx, mgr); err != nil { return nil, fmt.Errorf("error indexing Promotions by Stage: %w", err) } // Index Freight by Warehouse - if err = kubeclient.IndexFreightByWarehouse(ctx, mgr); err != nil { + if err = indexer.IndexFreightByWarehouse(ctx, mgr); err != nil { return nil, fmt.Errorf("error indexing Freight by Warehouse: %w", err) } // Index Stages by Freight - if err = kubeclient.IndexStagesByFreight(ctx, mgr); err != nil { + if err = indexer.IndexStagesByFreight(ctx, mgr); err != nil { return nil, fmt.Errorf("error indexing Stages by Freight: %w", err) } return mgr, nil diff --git a/cmd/controlplane/webhooks.go b/cmd/controlplane/webhooks.go index d0b10fde6..2668f5d52 100644 --- a/cmd/controlplane/webhooks.go +++ b/cmd/controlplane/webhooks.go @@ -15,7 +15,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/api/kubernetes" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" "github.com/akuity/kargo/internal/os" versionpkg "github.com/akuity/kargo/internal/version" @@ -107,7 +107,7 @@ func (o *webhooksServerOptions) run(ctx context.Context) error { } // Index Stages by Freight - if err = kubeclient.IndexStagesByFreight(ctx, mgr); err != nil { + if err = indexer.IndexStagesByFreight(ctx, mgr); err != nil { return fmt.Errorf("index Stages by Freight: %w", err) } diff --git a/internal/api/kubernetes/client.go b/internal/api/kubernetes/client.go index 219bc9fb0..ab3f0a3f9 100644 --- a/internal/api/kubernetes/client.go +++ b/internal/api/kubernetes/client.go @@ -28,7 +28,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/api/user" rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" ) @@ -233,22 +233,22 @@ func newDefaultInternalClient( } // Add all indices required by the API server - if err = kubeclient.IndexPromotionsByStage(ctx, cluster); err != nil { + if err = indexer.IndexPromotionsByStage(ctx, cluster); err != nil { return nil, fmt.Errorf("error indexing Promotions by Stage: %w", err) } - if err = kubeclient.IndexFreightByWarehouse(ctx, cluster); err != nil { + if err = indexer.IndexFreightByWarehouse(ctx, cluster); err != nil { return nil, fmt.Errorf("error indexing Freight by Warehouse: %w", err) } - if err = kubeclient.IndexFreightByVerifiedStages(ctx, cluster); err != nil { + if err = indexer.IndexFreightByVerifiedStages(ctx, cluster); err != nil { return nil, fmt.Errorf("error indexing Freight by Stages in which it has been verified: %w", err) } - if err = kubeclient.IndexFreightByApprovedStages(ctx, cluster); err != nil { + if err = indexer.IndexFreightByApprovedStages(ctx, cluster); err != nil { return nil, fmt.Errorf("error indexing Freight by Stages for which it has been approved: %w", err) } - if err = kubeclient.IndexServiceAccountsByOIDCClaims(ctx, cluster); err != nil { + if err = indexer.IndexServiceAccountsByOIDCClaims(ctx, cluster); err != nil { return nil, fmt.Errorf("index ServiceAccounts by OIDC claims: %w", err) } - if err = kubeclient.IndexEventsByInvolvedObjectAPIGroup(ctx, cluster); err != nil { + if err = indexer.IndexEventsByInvolvedObjectAPIGroup(ctx, cluster); err != nil { return nil, fmt.Errorf("error indexing Events by InvolvedObject's API group: %w", err) } diff --git a/internal/api/list_project_events_v1alpha1.go b/internal/api/list_project_events_v1alpha1.go index 38f0d4f1c..0b414bac8 100644 --- a/internal/api/list_project_events_v1alpha1.go +++ b/internal/api/list_project_events_v1alpha1.go @@ -11,7 +11,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1" ) @@ -36,7 +36,7 @@ func (s *server) ListProjectEvents( // List Kargo related events only client.MatchingFieldsSelector{ Selector: fields.OneTermEqualSelector( - kubeclient.EventsByInvolvedObjectAPIGroupIndexField, + indexer.EventsByInvolvedObjectAPIGroupIndexField, kargoapi.GroupVersion.Group, ), }, diff --git a/internal/api/list_promotions_v1alpha1.go b/internal/api/list_promotions_v1alpha1.go index 3d43e725b..b9338ae65 100644 --- a/internal/api/list_promotions_v1alpha1.go +++ b/internal/api/list_promotions_v1alpha1.go @@ -9,7 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1" ) @@ -33,7 +33,7 @@ func (s *server) ListPromotions( client.InNamespace(project), } if stage != "" { - opts = append(opts, client.MatchingFields{kubeclient.PromotionsByStageIndexField: stage}) + opts = append(opts, client.MatchingFields{indexer.PromotionsByStageIndexField: stage}) } if err := s.client.List(ctx, &list, opts...); err != nil { return nil, fmt.Errorf("list promotions: %w", err) diff --git a/internal/api/option/auth.go b/internal/api/option/auth.go index fdc0734b4..4b1335c36 100644 --- a/internal/api/option/auth.go +++ b/internal/api/option/auth.go @@ -22,7 +22,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/api/config" "github.com/akuity/kargo/internal/api/user" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" ) const authHeaderKey = "Authorization" @@ -254,14 +254,14 @@ func (a *authInterceptor) listServiceAccounts( for claimName, claimValue := range c { if claimValuesString, ok := claimValue.(string); ok { queries = append(queries, libClient.MatchingFields{ - kubeclient.ServiceAccountsByOIDCClaimsIndexField: kubeclient.FormatClaim(claimName, claimValuesString), + indexer.ServiceAccountsByOIDCClaimsIndexField: indexer.FormatClaim(claimName, claimValuesString), }) } if claimValueSlice, ok := claimValue.([]any); ok { for _, claimValueSliceItem := range claimValueSlice { if claimValueSliceItemString, ok := claimValueSliceItem.(string); ok { queries = append(queries, libClient.MatchingFields{ - kubeclient.ServiceAccountsByOIDCClaimsIndexField: kubeclient.FormatClaim( + indexer.ServiceAccountsByOIDCClaimsIndexField: indexer.FormatClaim( claimName, claimValueSliceItemString, ), }) diff --git a/internal/api/query_freights_v1alpha1.go b/internal/api/query_freights_v1alpha1.go index 066273b0e..73619298f 100644 --- a/internal/api/query_freights_v1alpha1.go +++ b/internal/api/query_freights_v1alpha1.go @@ -14,7 +14,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1" ) @@ -169,7 +169,7 @@ func (s *server) getAvailableFreightForStage( &client.ListOptions{ Namespace: project, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightApprovedForStagesIndexField, + indexer.FreightApprovedForStagesIndexField, stage, ), }, @@ -216,7 +216,7 @@ func (s *server) getFreightFromWarehouses( &client.ListOptions{ Namespace: project, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexField, warehouse, ), }, @@ -247,7 +247,7 @@ func (s *server) getVerifiedFreight( &client.ListOptions{ Namespace: project, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByVerifiedStagesIndexField, + indexer.FreightByVerifiedStagesIndexField, upstream, ), }, diff --git a/internal/controller/promotions/promotions.go b/internal/controller/promotions/promotions.go index 60f507e9f..54d2e99dd 100644 --- a/internal/controller/promotions/promotions.go +++ b/internal/controller/promotions/promotions.go @@ -29,6 +29,7 @@ import ( "github.com/akuity/kargo/internal/controller/promotion" "github.com/akuity/kargo/internal/controller/runtime" "github.com/akuity/kargo/internal/directives" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/kargo" "github.com/akuity/kargo/internal/kubeclient" libEvent "github.com/akuity/kargo/internal/kubernetes/event" @@ -94,7 +95,7 @@ func SetupReconcilerWithManager( cfg ReconcilerConfig, ) error { // Index running Promotions by Argo CD Applications - if err := kubeclient.IndexRunningPromotionsByArgoCDApplications(ctx, kargoMgr, cfg.ShardName); err != nil { + if err := indexer.IndexRunningPromotionsByArgoCDApplications(ctx, kargoMgr, cfg.ShardName); err != nil { return fmt.Errorf("index running Promotions by Argo CD Applications: %w", err) } diff --git a/internal/controller/promotions/watches.go b/internal/controller/promotions/watches.go index 668b72228..88131cb85 100644 --- a/internal/controller/promotions/watches.go +++ b/internal/controller/promotions/watches.go @@ -14,7 +14,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" ) @@ -208,7 +208,7 @@ func (u *UpdatedArgoCDAppHandler[T]) Update( promotions, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector( - kubeclient.RunningPromotionsByArgoCDApplicationsIndexField, + indexer.RunningPromotionsByArgoCDApplicationsIndexField, fmt.Sprintf("%s:%s", newApp.Namespace, newApp.Name), ), LabelSelector: u.shardSelector, diff --git a/internal/controller/promotions/watches_test.go b/internal/controller/promotions/watches_test.go index baf98c390..2b39037c2 100644 --- a/internal/controller/promotions/watches_test.go +++ b/internal/controller/promotions/watches_test.go @@ -21,7 +21,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" ) func TestUpdatedArgoCDAppHandler_Update(t *testing.T) { @@ -287,7 +287,7 @@ func TestUpdatedArgoCDAppHandler_Update(t *testing.T) { WithObjects(tt.applications...). WithIndex( &kargoapi.Promotion{}, - kubeclient.RunningPromotionsByArgoCDApplicationsIndexField, + indexer.RunningPromotionsByArgoCDApplicationsIndexField, tt.indexer, ). WithInterceptorFuncs(tt.interceptor) diff --git a/internal/controller/stages/stages.go b/internal/controller/stages/stages.go index 2318f079a..4c4b9607d 100644 --- a/internal/controller/stages/stages.go +++ b/internal/controller/stages/stages.go @@ -30,6 +30,7 @@ import ( argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1" rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1" "github.com/akuity/kargo/internal/directives" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/kargo" "github.com/akuity/kargo/internal/kubeclient" libEvent "github.com/akuity/kargo/internal/kubernetes/event" @@ -232,42 +233,42 @@ func SetupReconcilerWithManager( cfg ReconcilerConfig, ) error { // Index Promotions by Stage - if err := kubeclient.IndexPromotionsByStage(ctx, kargoMgr); err != nil { + if err := indexer.IndexPromotionsByStage(ctx, kargoMgr); err != nil { return fmt.Errorf("index non-terminal Promotions by Stage: %w", err) } // Index Promotions by Stage + Freight - if err := kubeclient.IndexPromotionsByStageAndFreight(ctx, kargoMgr); err != nil { + if err := indexer.IndexPromotionsByStageAndFreight(ctx, kargoMgr); err != nil { return fmt.Errorf("index Promotions by Stage and Freight: %w", err) } // Index Freight by Warehouse - if err := kubeclient.IndexFreightByWarehouse(ctx, kargoMgr); err != nil { + if err := indexer.IndexFreightByWarehouse(ctx, kargoMgr); err != nil { return fmt.Errorf("index Freight by Warehouse: %w", err) } // Index Freight by Stages in which it has been verified - if err := kubeclient.IndexFreightByVerifiedStages(ctx, kargoMgr); err != nil { + if err := indexer.IndexFreightByVerifiedStages(ctx, kargoMgr); err != nil { return fmt.Errorf("index Freight by Stages in which it has been verified: %w", err) } // Index Freight by Stages for which it has been approved - if err := kubeclient.IndexFreightByApprovedStages(ctx, kargoMgr); err != nil { + if err := indexer.IndexFreightByApprovedStages(ctx, kargoMgr); err != nil { return fmt.Errorf("index Freight by Stages for which it has been approved: %w", err) } // Index Stages by upstream Stages - if err := kubeclient.IndexStagesByUpstreamStages(ctx, kargoMgr); err != nil { + if err := indexer.IndexStagesByUpstreamStages(ctx, kargoMgr); err != nil { return fmt.Errorf("index Stages by upstream Stages: %w", err) } // Index Stages by Warehouse - if err := kubeclient.IndexStagesByWarehouse(ctx, kargoMgr); err != nil { + if err := indexer.IndexStagesByWarehouse(ctx, kargoMgr); err != nil { return fmt.Errorf("index Stages by Warehouse: %w", err) } // Index Stages by AnalysisRun - if err := kubeclient.IndexStagesByAnalysisRun(ctx, kargoMgr, cfg.ShardName); err != nil { + if err := indexer.IndexStagesByAnalysisRun(ctx, kargoMgr, cfg.ShardName); err != nil { return fmt.Errorf("index Stages by Argo Rollouts AnalysisRun: %w", err) } @@ -947,8 +948,8 @@ func (r *reconciler) syncNormalStage( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.PromotionsByStageAndFreightIndexField, - kubeclient.StageAndFreightKey(stage.Name, latestFreight.Name), + indexer.PromotionsByStageAndFreightIndexField, + indexer.StageAndFreightKey(stage.Name, latestFreight.Name), ), Limit: 1, }, @@ -1147,7 +1148,7 @@ func (r *reconciler) clearVerifications( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByVerifiedStagesIndexField, + indexer.FreightByVerifiedStagesIndexField, stage.Name, ), }, @@ -1189,7 +1190,7 @@ func (r *reconciler) clearApprovals( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightApprovedForStagesIndexField, + indexer.FreightApprovedForStagesIndexField, stage.Name, ), }, @@ -1365,7 +1366,7 @@ func (r *reconciler) getPromotionsForStage( &client.ListOptions{ Namespace: stageNamespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.PromotionsByStageIndexField, + indexer.PromotionsByStageIndexField, stageName, ), }, @@ -1396,7 +1397,7 @@ func (r *reconciler) getAvailableFreight( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexField, req.Origin.Name, ), }, @@ -1419,7 +1420,7 @@ func (r *reconciler) getAvailableFreight( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByVerifiedStagesIndexField, + indexer.FreightByVerifiedStagesIndexField, upstream, ), }, @@ -1443,7 +1444,7 @@ func (r *reconciler) getAvailableFreight( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightApprovedForStagesIndexField, + indexer.FreightApprovedForStagesIndexField, stage.Name, ), }, @@ -1490,7 +1491,7 @@ func (r *reconciler) getAvailableFreightByOrigin( &client.ListOptions{ Namespace: stage.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexField, req.Origin.Name, ), }, @@ -1522,11 +1523,11 @@ func (r *reconciler) getAvailableFreightByOrigin( // TODO(hidde): once we support more Freight origin // kinds, we need to adjust this. fields.OneTermEqualSelector( - kubeclient.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexField, req.Origin.Name, ), fields.OneTermEqualSelector( - kubeclient.FreightByVerifiedStagesIndexField, + indexer.FreightByVerifiedStagesIndexField, upstream, ), ), @@ -1554,11 +1555,11 @@ func (r *reconciler) getAvailableFreightByOrigin( // TODO(hidde): once we support more Freight origin // kinds, we need to adjust this. fields.OneTermEqualSelector( - kubeclient.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexField, req.Origin.Name, ), fields.OneTermEqualSelector( - kubeclient.FreightApprovedForStagesIndexField, + indexer.FreightApprovedForStagesIndexField, stage.Name, ), ), diff --git a/internal/controller/stages/stages_test.go b/internal/controller/stages/stages_test.go index 1b8f2a33a..468110df0 100644 --- a/internal/controller/stages/stages_test.go +++ b/internal/controller/stages/stages_test.go @@ -23,7 +23,7 @@ import ( "github.com/akuity/kargo/internal/controller" rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1" "github.com/akuity/kargo/internal/directives" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" fakeevent "github.com/akuity/kargo/internal/kubernetes/event/fake" ) @@ -3524,7 +3524,7 @@ func TestGetAvailableFreightByOrigin(t *testing.T) { if strings.Contains( lo.FieldSelector.String(), - fmt.Sprintf("%s=%s", kubeclient.FreightByVerifiedStagesIndexField, "fake-upstream-stage"), + fmt.Sprintf("%s=%s", indexer.FreightByVerifiedStagesIndexField, "fake-upstream-stage"), ) { return fmt.Errorf("something went wrong") } @@ -3570,7 +3570,7 @@ func TestGetAvailableFreightByOrigin(t *testing.T) { if strings.Contains( lo.FieldSelector.String(), - fmt.Sprintf("%s=%s", kubeclient.FreightApprovedForStagesIndexField, "fake-stage"), + fmt.Sprintf("%s=%s", indexer.FreightApprovedForStagesIndexField, "fake-stage"), ) { return fmt.Errorf("something went wrong") } @@ -3618,11 +3618,11 @@ func TestGetAvailableFreightByOrigin(t *testing.T) { lo := &client.ListOptions{} lo.ApplyOptions(opts) - if strings.Contains(lo.FieldSelector.String(), kubeclient.FreightApprovedForStagesIndexField) { + if strings.Contains(lo.FieldSelector.String(), indexer.FreightApprovedForStagesIndexField) { return fmt.Errorf("something went wrong") } - if strings.Contains(lo.FieldSelector.String(), kubeclient.FreightByVerifiedStagesIndexField) { + if strings.Contains(lo.FieldSelector.String(), indexer.FreightByVerifiedStagesIndexField) { return fmt.Errorf("something went wrong") } @@ -3662,18 +3662,18 @@ func TestGetAvailableFreightByOrigin(t *testing.T) { WithScheme(s). WithIndex( &kargoapi.Freight{}, - kubeclient.FreightByWarehouseIndexField, - kubeclient.FreightByWarehouseIndexer, + indexer.FreightByWarehouseIndexField, + indexer.FreightByWarehouseIndexer, ). WithIndex( &kargoapi.Freight{}, - kubeclient.FreightByVerifiedStagesIndexField, - kubeclient.FreightByVerifiedStagesIndexer, + indexer.FreightByVerifiedStagesIndexField, + indexer.FreightByVerifiedStagesIndexer, ). WithIndex( &kargoapi.Freight{}, - kubeclient.FreightApprovedForStagesIndexField, - kubeclient.FreightApprovedForStagesIndexer, + indexer.FreightApprovedForStagesIndexField, + indexer.FreightApprovedForStagesIndexer, ). WithInterceptorFuncs(tc.interceptor). WithObjects(tc.objects...). diff --git a/internal/controller/stages/watches.go b/internal/controller/stages/watches.go index bbd4aaeeb..bc9318dea 100644 --- a/internal/controller/stages/watches.go +++ b/internal/controller/stages/watches.go @@ -18,7 +18,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1" rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" ) @@ -84,7 +84,7 @@ func (v *verifiedFreightEventHandler[T]) Update( &client.ListOptions{ Namespace: newFreight.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.StagesByUpstreamStagesIndexField, + indexer.StagesByUpstreamStagesIndexField, newlyVerifiedStage, ), LabelSelector: v.shardSelector, @@ -230,7 +230,7 @@ func (c *createdFreightEventHandler[T]) Create( &client.ListOptions{ Namespace: freight.Namespace, FieldSelector: fields.OneTermEqualSelector( - kubeclient.StagesByWarehouseIndexField, + indexer.StagesByWarehouseIndexField, freight.Origin.Name, ), LabelSelector: c.shardSelector, @@ -475,7 +475,7 @@ func (p *phaseChangedAnalysisRunHandler[T]) Update( stages, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector( - kubeclient.StagesByAnalysisRunIndexField, + indexer.StagesByAnalysisRunIndexField, fmt.Sprintf("%s:%s", analysisRun.Namespace, analysisRun.Name), ), LabelSelector: p.shardSelector, diff --git a/internal/garbage/freight.go b/internal/garbage/freight.go index 96462ebf5..72209d09c 100644 --- a/internal/garbage/freight.go +++ b/internal/garbage/freight.go @@ -9,7 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" ) @@ -72,7 +72,7 @@ func (c *collector) cleanWarehouseFreight( &freight, client.InNamespace(project), client.MatchingFields{ - kubeclient.FreightByWarehouseIndexField: warehouse, + indexer.FreightByWarehouseIndexField: warehouse, }, ); err != nil { return fmt.Errorf( @@ -101,7 +101,7 @@ func (c *collector) cleanWarehouseFreight( &stages, client.InNamespace(project), client.MatchingFields{ - kubeclient.StagesByFreightIndexField: f.Name, + indexer.StagesByFreightIndexField: f.Name, }, ); err != nil { logger.Error( diff --git a/internal/garbage/promotions.go b/internal/garbage/promotions.go index c1181a080..d11aab0c3 100644 --- a/internal/garbage/promotions.go +++ b/internal/garbage/promotions.go @@ -9,7 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/logging" ) @@ -67,7 +67,7 @@ func (c *collector) cleanStagePromotions( &promos, client.InNamespace(project), client.MatchingFields{ - kubeclient.PromotionsByStageIndexField: stage, + indexer.PromotionsByStageIndexField: stage, }, ); err != nil { return fmt.Errorf( diff --git a/internal/kubeclient/indexer.go b/internal/indexer/indexer.go similarity index 91% rename from internal/kubeclient/indexer.go rename to internal/indexer/indexer.go index edc13576d..f92000936 100644 --- a/internal/kubeclient/indexer.go +++ b/internal/indexer/indexer.go @@ -1,7 +1,8 @@ -package kubeclient +package indexer import ( "context" + "encoding/json" "fmt" "slices" "strings" @@ -14,6 +15,7 @@ import ( rbacapi "github.com/akuity/kargo/api/rbac/v1alpha1" kargoapi "github.com/akuity/kargo/api/v1alpha1" libargocd "github.com/akuity/kargo/internal/argocd" + "github.com/akuity/kargo/internal/directives" "github.com/akuity/kargo/internal/logging" ) @@ -207,6 +209,52 @@ func indexRunningPromotionsByArgoCDApplications( return nil } + // If the Promotion has directive steps, then we should extract the + // Argo CD Applications from those steps. + // + // TODO(hidde): While this is arguably already better than the "legacy" + // approach further down, which had to query the Stage to get the + // Applications, it is still not ideal as it requires parsing the + // directives and treating some of them as special cases. We should + // consider a more general approach in the future. + if len(promo.Spec.Steps) > 0 { + var res []string + for i, step := range promo.Spec.Steps { + if step.Uses != "argocd-update" || step.Config == nil { + continue + } + + config := directives.ArgoCDUpdateConfig{} + if err := json.Unmarshal(step.Config.Raw, &config); err != nil { + logger.Error( + err, + fmt.Sprintf( + "failed to extract config from Promotion step %d:"+ + "ignoring any Argo CD Applications from this step", + i, + ), + "promo", promo.Name, + "namespace", promo.Namespace, + ) + continue + } + + for _, app := range config.Apps { + namespace := app.Namespace + if namespace == "" { + namespace = libargocd.Namespace() + } + res = append(res, fmt.Sprintf("%s:%s", namespace, app.Name)) + } + } + return res + } + + // If there are no directive steps, then we should query the Stage to get + // the Argo CD Applications. + // + // TODO(hidde): Remove this once we fully transition to directive-based + // Promotions. stage := kargoapi.Stage{} if err := c.Get( ctx, diff --git a/internal/kubeclient/indexer_test.go b/internal/indexer/indexer_test.go similarity index 92% rename from internal/kubeclient/indexer_test.go rename to internal/indexer/indexer_test.go index 3af58b4ac..6022b20bf 100644 --- a/internal/kubeclient/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -1,4 +1,4 @@ -package kubeclient +package indexer import ( "context" @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -352,6 +353,64 @@ func TestIndexRunningPromotionsByArgoCDApplications(t *testing.T) { shardName: testShardName, expected: nil, }, + { + name: "Promotion has directive steps", + obj: &kargoapi.Promotion{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + }, + Spec: kargoapi.PromotionSpec{ + Stage: "fake-stage", + Steps: []kargoapi.PromotionStep{ + { + Uses: "argocd-update", + Config: &apiextensionsv1.JSON{ + Raw: []byte(`{"apps":[{"namespace":"fake-namespace","name":"fake-app"}]}`), + }, + }, + { + Uses: "fake-directive", + }, + { + Uses: "argocd-update", + Config: &apiextensionsv1.JSON{ + Raw: []byte(`{"apps":[{"name":"fake-app-2"}]}`), + }, + }, + }, + }, + Status: kargoapi.PromotionStatus{ + Phase: kargoapi.PromotionPhaseRunning, + }, + }, + expected: []string{ + "fake-namespace:fake-app", + fmt.Sprintf("%s:%s", argocd.Namespace(), "fake-app-2"), + }, + }, + { + name: "Promotion has directive steps without Applications", + obj: &kargoapi.Promotion{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + }, + Spec: kargoapi.PromotionSpec{ + Stage: "fake-stage", + Steps: []kargoapi.PromotionStep{ + { + Uses: "fake-directive", + }, + { + Uses: "fake-directive", + }, + }, + }, + Status: kargoapi.PromotionStatus{ + Phase: kargoapi.PromotionPhaseRunning, + }, + }, + expected: nil, + }, { name: "Related Promotion Stage does not have Argo CD Application mechanisms", obj: &kargoapi.Promotion{ diff --git a/internal/webhook/freight/webhook.go b/internal/webhook/freight/webhook.go index 1f1f5bcfd..9ed45cc2f 100644 --- a/internal/webhook/freight/webhook.go +++ b/internal/webhook/freight/webhook.go @@ -22,7 +22,7 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/git" "github.com/akuity/kargo/internal/helm" - "github.com/akuity/kargo/internal/kubeclient" + "github.com/akuity/kargo/internal/indexer" libEvent "github.com/akuity/kargo/internal/kubernetes/event" libWebhook "github.com/akuity/kargo/internal/webhook" ) @@ -298,7 +298,7 @@ func (w *webhook) ValidateDelete( &list, client.InNamespace(freight.GetNamespace()), client.MatchingFields{ - kubeclient.StagesByFreightIndexField: freight.Name, + indexer.StagesByFreightIndexField: freight.Name, }, ); err != nil { return nil, fmt.Errorf("list stages: %w", err)