Skip to content

Commit

Permalink
[release-2.12] ✨ Add clusterInfo updating/deleting for inventory API (#…
Browse files Browse the repository at this point in the history
…1151)

* adjust the inventory client

Signed-off-by: myan <[email protected]>

* add the managed cluster info controller

Signed-off-by: myan <[email protected]>

* finished the framework

Signed-off-by: myan <[email protected]>

* add delete and update

Signed-off-by: myan <[email protected]>

* fix the test

Signed-off-by: myan <[email protected]>

* fix the controller teset

Signed-off-by: myan <[email protected]>

* modify the delete

Signed-off-by: myan <[email protected]>

* remove the test

Signed-off-by: myan <[email protected]>

---------

Signed-off-by: myan <[email protected]>
Co-authored-by: myan <[email protected]>
  • Loading branch information
openshift-cherrypick-robot and yanmxa authored Sep 30, 2024
1 parent 0246442 commit b5bfd09
Show file tree
Hide file tree
Showing 23 changed files with 721 additions and 727 deletions.
4 changes: 2 additions & 2 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
return func(transportClient transport.TransportClient) error {
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig, producer, consumer); err != nil {
if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig, transportClient); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

Expand Down
38 changes: 18 additions & 20 deletions agent/pkg/controllers/crd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,29 @@ const (
var crdCtrlStarted = false

type crdController struct {
mgr ctrl.Manager
log logr.Logger
restConfig *rest.Config
agentConfig *config.AgentConfig
producer transport.Producer
consumer transport.Consumer
mgr ctrl.Manager
log logr.Logger
restConfig *rest.Config
agentConfig *config.AgentConfig
transportClient transport.TransportClient
}

func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
switch {
case request.Name == clusterManagersCRDName:
return c.reconcileClusterManagers(ctx, request)
return c.addACMController(ctx, request)
case request.Name == stackRoxCentralCRDName && c.agentConfig.EnableStackroxIntegration:
return c.reconcileStackRoxCentrals()
return c.addStackRoxCentrals()
default:
return ctrl.Result{}, nil
}
}

func (c *crdController) reconcileClusterManagers(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
func (c *crdController) addACMController(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := c.log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.V(2).Info("crd controller", "NamespacedName:", request.NamespacedName)

if err := statusController.AddControllers(ctx, c.mgr, c.producer, c.agentConfig); err != nil {
if err := statusController.AddControllers(ctx, c.mgr, c.transportClient, c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add status syncer: %w", err)
}

Expand All @@ -63,7 +62,7 @@ func (c *crdController) reconcileClusterManagers(ctx context.Context, request ct
}

// add spec controllers
if err := specController.AddToManager(ctx, c.mgr, c.consumer, c.agentConfig); err != nil {
if err := specController.AddToManager(ctx, c.mgr, c.transportClient.GetConsumer(), c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add spec syncer: %w", err)
}
reqLogger.V(2).Info("add spec controllers to manager")
Expand All @@ -81,15 +80,15 @@ func (c *crdController) reconcileClusterManagers(ctx context.Context, request ct
return ctrl.Result{}, nil
}

func (c *crdController) reconcileStackRoxCentrals() (result ctrl.Result, err error) {
func (c *crdController) addStackRoxCentrals() (result ctrl.Result, err error) {
c.log.Info("Detected the presence of the StackRox central CRD")

// Create the object that polls the StackRox API and publishes the message, then add it to the controller
// manager so that it will be started automatically.
syncer, err := security.NewStackRoxSyncer().
SetLogger(c.log.WithName("stackrox-syncer")).
SetTopic(c.agentConfig.TransportConfig.KafkaCredential.StatusTopic).
SetProducer(c.producer).
SetProducer(c.transportClient.GetProducer()).
SetKubernetesClient(c.mgr.GetClient()).
SetPollInterval(c.agentConfig.StackroxPollInterval).
Build()
Expand All @@ -115,7 +114,7 @@ func (c *crdController) reconcileStackRoxCentrals() (result ctrl.Result, err err
// this controller is used to watch the multiclusterhub crd or clustermanager crd
// if the crd exists, then add controllers to the manager dynamically
func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig,
producer transport.Producer, consumer transport.Consumer,
transportClient transport.TransportClient,
) error {
if crdCtrlStarted {
return nil
Expand All @@ -141,12 +140,11 @@ func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *co
}),
).
Complete(&crdController{
mgr: mgr,
restConfig: restConfig,
agentConfig: agentConfig,
producer: producer,
consumer: consumer,
log: ctrl.Log.WithName("crd-controller"),
mgr: mgr,
restConfig: restConfig,
agentConfig: agentConfig,
transportClient: transportClient,
log: ctrl.Log.WithName("crd-controller"),
}); err != nil {
return err
}
Expand Down
37 changes: 26 additions & 11 deletions agent/pkg/status/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,43 @@ import (
var statusCtrlStarted = false

// AddControllers adds all the controllers to the Manager.
func AddControllers(ctx context.Context, mgr ctrl.Manager, producer transport.Producer,
func AddControllers(ctx context.Context, mgr ctrl.Manager, transportClient transport.TransportClient,
agentConfig *config.AgentConfig,
) error {
if statusCtrlStarted {
return nil
}
// managed cluster info
if err := managedclusters.LaunchManagedClusterInfoSyncer(ctx, mgr, agentConfig, producer); err != nil {
return fmt.Errorf("failed to launch managedclusterinfo syncer: %w", err)
if err := agentstatusconfig.AddConfigController(mgr, agentConfig); err != nil {
return fmt.Errorf("failed to add ConfigMap controller: %w", err)
}

// if it's rest transport, skip the following controllers
if agentConfig.TransportConfig.TransportType == string(transport.Rest) {
statusCtrlStarted = true
return nil
var err error
switch agentConfig.TransportConfig.TransportType {
case string(transport.Kafka):
err = addKafkaSyncer(ctx, mgr, transportClient.GetProducer(), agentConfig)
case string(transport.Rest):
err = addInventorySyncer(ctx, mgr, transportClient.GetRequester(), agentConfig)
}
if err != nil {
return fmt.Errorf("failed to add the syncer: %w", err)
}

if err := agentstatusconfig.AddConfigController(mgr, agentConfig); err != nil {
return fmt.Errorf("failed to add ConfigMap controller: %w", err)
statusCtrlStarted = true
return nil
}

func addInventorySyncer(ctx context.Context, mgr ctrl.Manager, inventoryRequester transport.Requester,
agentConfig *config.AgentConfig,
) error {
if err := managedclusters.AddManagedClusterInfoCtrl(mgr, inventoryRequester); err != nil {
return err
}
return nil
}

func addKafkaSyncer(ctx context.Context, mgr ctrl.Manager, producer transport.Producer,
agentConfig *config.AgentConfig,
) error {
// managed cluster
if err := managedclusters.LaunchManagedClusterSyncer(ctx, mgr, agentConfig, producer); err != nil {
return fmt.Errorf("failed to launch managedcluster syncer: %w", err)
Expand Down Expand Up @@ -95,6 +111,5 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, producer transport.Pr
agentConfig.TransportConfig.KafkaCredential.StatusTopic); err != nil {
return fmt.Errorf("failed to launch time filter: %w", err)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
package transfer
package managedclusters

import (
"testing"

kessel "github.com/project-kessel/inventory-api/api/kessel/inventory/v1beta1/resources"
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
)

func TestGetK8SCluster(t *testing.T) {
func TestGetK8SClusterInfo(t *testing.T) {
clusterInfo := createMockClusterInfo("test-cluster", clusterinfov1beta1.KubeVendorOpenShift, "4.10.0",
clusterinfov1beta1.CloudVendorAWS)

// Call the function
result := GetK8SCluster(clusterInfo, "guest")
k8sCluster := GetK8SCluster(clusterInfo, "guest")

// Assert the results
assert.NotNil(t, result)
assert.Equal(t, "k8s-cluster", result.K8SCluster.Metadata.ResourceType)
assert.Equal(t, kessel.ReporterData_ACM, result.K8SCluster.ReporterData.ReporterType)
assert.Equal(t, "https://api.test-cluster.example.com", result.K8SCluster.ReporterData.ApiHref)
assert.Equal(t, "https://console.test-cluster.example.com", result.K8SCluster.ReporterData.ConsoleHref)
assert.Equal(t, "test-cluster-id", result.K8SCluster.ResourceData.ExternalClusterId)
assert.Equal(t, "1.23.0", result.K8SCluster.ResourceData.KubeVersion)
assert.Equal(t, kessel.K8SClusterDetail_READY, result.K8SCluster.ResourceData.ClusterStatus)
assert.Equal(t, kessel.K8SClusterDetail_AWS_UPI, result.K8SCluster.ResourceData.CloudPlatform)
assert.Equal(t, kessel.K8SClusterDetail_OPENSHIFT, result.K8SCluster.ResourceData.KubeVendor)
assert.Equal(t, "4.10.0", result.K8SCluster.ResourceData.VendorVersion)
assert.NotNil(t, k8sCluster)
assert.Equal(t, "k8s-cluster", k8sCluster.Metadata.ResourceType)
assert.Equal(t, kessel.ReporterData_ACM, k8sCluster.ReporterData.ReporterType)
assert.Equal(t, "https://api.test-cluster.example.com", k8sCluster.ReporterData.ApiHref)
assert.Equal(t, "https://console.test-cluster.example.com", k8sCluster.ReporterData.ConsoleHref)
assert.Equal(t, "test-cluster-id", k8sCluster.ResourceData.ExternalClusterId)
assert.Equal(t, "1.23.0", k8sCluster.ResourceData.KubeVersion)
assert.Equal(t, kessel.K8SClusterDetail_READY, k8sCluster.ResourceData.ClusterStatus)
assert.Equal(t, kessel.K8SClusterDetail_AWS_UPI, k8sCluster.ResourceData.CloudPlatform)
assert.Equal(t, kessel.K8SClusterDetail_OPENSHIFT, k8sCluster.ResourceData.KubeVendor)
assert.Equal(t, "4.10.0", k8sCluster.ResourceData.VendorVersion)
}

func TestKubeVendorK8SCluster(t *testing.T) {
Expand Down Expand Up @@ -70,19 +71,19 @@ func TestKubeVendorK8SCluster(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := GetK8SCluster(tc.clusterInfo, "guest")
k8sCluster := GetK8SCluster(tc.clusterInfo, "guest")

assert.NotNil(t, result)
assert.Equal(t, tc.expectedVendor, result.K8SCluster.ResourceData.KubeVendor)
assert.Equal(t, tc.expectedVersion, result.K8SCluster.ResourceData.VendorVersion)
assert.NotNil(t, k8sCluster)
assert.Equal(t, tc.expectedVendor, k8sCluster.ResourceData.KubeVendor)
assert.Equal(t, tc.expectedVersion, k8sCluster.ResourceData.VendorVersion)
// Add more assertions for common fields
assert.Equal(t, "k8s-cluster", result.K8SCluster.Metadata.ResourceType)
assert.Equal(t, kessel.ReporterData_ACM, result.K8SCluster.ReporterData.ReporterType)
assert.Equal(t, "https://api.test-cluster.example.com", result.K8SCluster.ReporterData.ApiHref)
assert.Equal(t, "https://console.test-cluster.example.com", result.K8SCluster.ReporterData.ConsoleHref)
assert.Equal(t, "test-cluster-id", result.K8SCluster.ResourceData.ExternalClusterId)
assert.Equal(t, "1.23.0", result.K8SCluster.ResourceData.KubeVersion)
assert.Equal(t, kessel.K8SClusterDetail_READY, result.K8SCluster.ResourceData.ClusterStatus)
assert.Equal(t, "k8s-cluster", k8sCluster.Metadata.ResourceType)
assert.Equal(t, kessel.ReporterData_ACM, k8sCluster.ReporterData.ReporterType)
assert.Equal(t, "https://api.test-cluster.example.com", k8sCluster.ReporterData.ApiHref)
assert.Equal(t, "https://console.test-cluster.example.com", k8sCluster.ReporterData.ConsoleHref)
assert.Equal(t, "test-cluster-id", k8sCluster.ResourceData.ExternalClusterId)
assert.Equal(t, "1.23.0", k8sCluster.ResourceData.KubeVersion)
assert.Equal(t, kessel.K8SClusterDetail_READY, k8sCluster.ResourceData.ClusterStatus)
})
}
}
Expand All @@ -109,6 +110,18 @@ func createMockClusterInfo(name string, kubeVendor clusterinfov1beta1.KubeVendor
Status: metav1.ConditionTrue,
},
},
NodeList: []clusterinfov1beta1.NodeStatus{
{
Name: "ip-10-0-14-217.ec2.internal",
Capacity: clusterinfov1beta1.ResourceList{
clusterv1.ResourceCPU: resource.MustParse("16"),
clusterv1.ResourceMemory: resource.MustParse("64453796Ki"),
},
Labels: map[string]string{
"node.kubernetes.io/instance-type": "m6a.4xlarge",
},
},
},
},
}

Expand Down
Loading

0 comments on commit b5bfd09

Please sign in to comment.