diff --git a/pkg/apis/stork/v1alpha1/migration.go b/pkg/apis/stork/v1alpha1/migration.go index 68a722dbb7..bccb368e4e 100644 --- a/pkg/apis/stork/v1alpha1/migration.go +++ b/pkg/apis/stork/v1alpha1/migration.go @@ -81,6 +81,8 @@ const ( MigrationStatusPartialSuccess MigrationStatusType = "PartialSuccess" // MigrationStatusSuccessful for when migration has completed successfully MigrationStatusSuccessful MigrationStatusType = "Successful" + // MigrationStatusCleaned for when migration objects has been deleted + MigrationStatusCleaned MigrationStatusType = "Cleaned" ) // MigrationStageType is the stage of the migration diff --git a/pkg/applicationmanager/controllers/applicationbackup.go b/pkg/applicationmanager/controllers/applicationbackup.go index 757f02405c..4c81b399b6 100644 --- a/pkg/applicationmanager/controllers/applicationbackup.go +++ b/pkg/applicationmanager/controllers/applicationbackup.go @@ -22,7 +22,7 @@ import ( "github.com/portworx/sched-ops/k8s" "github.com/sirupsen/logrus" "gocloud.dev/gcerrors" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -597,7 +597,7 @@ func (a *ApplicationBackupController) preparePVResource( func (a *ApplicationBackupController) backupResources( backup *stork_api.ApplicationBackup, ) error { - allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors) + allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors, nil, false) if err != nil { log.ApplicationBackupLog(backup).Errorf("Error getting resources: %v", err) return err diff --git a/pkg/applicationmanager/controllers/applicationclone.go b/pkg/applicationmanager/controllers/applicationclone.go index aef26f4eaa..8eba3fdf8f 100644 --- a/pkg/applicationmanager/controllers/applicationclone.go +++ b/pkg/applicationmanager/controllers/applicationclone.go @@ -15,7 +15,7 @@ import ( "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/portworx/sched-ops/k8s" "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -632,7 +632,7 @@ func (a *ApplicationCloneController) applyResources( func (a *ApplicationCloneController) cloneResources( clone *stork_api.ApplicationClone, ) error { - allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors) + allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors, nil, false) if err != nil { log.ApplicationCloneLog(clone).Errorf("Error getting resources: %v", err) return err diff --git a/pkg/migration/controllers/migration.go b/pkg/migration/controllers/migration.go index ffea191db9..8703c9f4b5 100644 --- a/pkg/migration/controllers/migration.go +++ b/pkg/migration/controllers/migration.go @@ -119,6 +119,10 @@ func setDefaults(migration *stork_api.Migration) *stork_api.Migration { defaultBool := false migration.Spec.StartApplications = &defaultBool } + if migration.Spec.AllowCleaningResources == nil { + defaultBool := false + migration.Spec.AllowCleaningResources = &defaultBool + } return migration } @@ -285,13 +289,7 @@ func (m *MigrationController) Handle(ctx context.Context, event sdk.Event) error message) return nil } - case stork_api.MigrationStageFinal: - // delete resources on destination which are not present on source - // TODO: what should be idle location for this - if *migration.Spec.AllowCleaningResources { - return m.cleanupMigratedResources(migration) - } return nil default: log.MigrationLog(migration).Errorf("Invalid stage for migration: %v", migration.Status.Stage) @@ -301,9 +299,91 @@ func (m *MigrationController) Handle(ctx context.Context, event sdk.Event) error } func (m *MigrationController) cleanupMigratedResources(migration *stork_api.Migration) error { + remoteConfig, err := getClusterPairSchedulerConfig(migration.Spec.ClusterPair, migration.Namespace) + if err != nil { + return err + } + + destObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, remoteConfig, true) + if err != nil { + m.Recorder.Event(migration, + v1.EventTypeWarning, + string(stork_api.MigrationStatusFailed), + fmt.Sprintf("Error getting resource: %v", err)) + log.MigrationLog(migration).Errorf("Error getting resources: %v", err) + return err + } + srcObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false) + if err != nil { + m.Recorder.Event(migration, + v1.EventTypeWarning, + string(stork_api.MigrationStatusFailed), + fmt.Sprintf("Error getting resource: %v", err)) + log.MigrationLog(migration).Errorf("Error getting resources: %v", err) + return err + } + dynamicInterface, err := dynamic.NewForConfig(remoteConfig) + if err != nil { + return err + } + + toBeDeleted := objectTobeDeleted(srcObjects, destObjects) + err = m.ResourceCollector.DeleteResources(dynamicInterface, toBeDeleted) + if err != nil { + return err + } + + // update status of cleaned up objects migration info + for _, r := range toBeDeleted { + nm, ns, kind := getObjectDetails(r) + resourceInfo := &stork_api.MigrationResourceInfo{ + Name: nm, + Namespace: ns, + Status: stork_api.MigrationStatusCleaned, + } + resourceInfo.Kind = kind + migration.Status.Resources = append(migration.Status.Resources, resourceInfo) + } + + err = sdk.Update(migration) + if err != nil { + return err + } return nil } +func getObjectDetails(o interface{}) (name, namespace, kind string) { + metadata, err := meta.Accessor(o) + if err != nil { + return "", "", "" + } + objType, err := meta.TypeAccessor(o) + if err != nil { + return "", "", "" + } + return metadata.GetName(), metadata.GetNamespace(), objType.GetKind() +} + +func objectTobeDeleted(srcObjects, destObjects []runtime.Unstructured) []runtime.Unstructured { + var deleteObjects []runtime.Unstructured + for _, o := range destObjects { + name, namespace, kind := getObjectDetails(o) + isPresent := false + logrus.Debugf("Checking if destObject(%v:%v:%v) present on source cluster", name, namespace, kind) + for _, s := range srcObjects { + sname, snamespace, skind := getObjectDetails(s) + if skind == kind && snamespace == namespace && sname == name { + isPresent = true + } + } + if !isPresent { + logrus.Infof("Deleting object from destination(%v:%v:%v)", name, namespace, kind) + deleteObjects = append(deleteObjects, o) + } + } + return deleteObjects +} + func (m *MigrationController) namespaceMigrationAllowed(migration *stork_api.Migration) bool { // Restrict migration to only the namespace that the object belongs // except for the namespace designated by the admin @@ -553,7 +633,7 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e } } - allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors) + allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false) if err != nil { m.Recorder.Event(migration, v1.EventTypeWarning, @@ -625,6 +705,20 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e if err != nil { return err } + logrus.Infof("Cleaning up migrated resources") + // delete resources on destination which are not present on source + // TODO: what should be idle location for this + if *migration.Spec.AllowCleaningResources { + if err := m.cleanupMigratedResources(migration); err != nil { + message := fmt.Sprintf("Error cleaning up resources: %v", err) + log.MigrationLog(migration).Errorf(message) + m.Recorder.Event(migration, + v1.EventTypeWarning, + string(stork_api.MigrationStatusPartialSuccess), + message) + return nil + } + } return nil } diff --git a/pkg/resourcecollector/resourcecollector.go b/pkg/resourcecollector/resourcecollector.go index 7299c19155..e223e82ab8 100644 --- a/pkg/resourcecollector/resourcecollector.go +++ b/pkg/resourcecollector/resourcecollector.go @@ -21,16 +21,16 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" + restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) const ( // Annotation to use when the resource shouldn't be collected - skipResourceAnnotation = "stork.libopenstorage.org/skipresource" - - deletedMaxRetries = 12 - deletedRetryInterval = 10 * time.Second + skipResourceAnnotation = "stork.libopenstorage.org/skipresource" + storkMigrationAnnotation = "stork.libopenstorage.org/storkMigration" + deletedMaxRetries = 12 + deletedRetryInterval = 10 * time.Second ) // ResourceCollector is used to collect and process unstructured objects in namespaces and using label selectors @@ -42,9 +42,16 @@ type ResourceCollector struct { // Init initializes the resource collector func (r *ResourceCollector) Init() error { - config, err := rest.InClusterConfig() - if err != nil { - return fmt.Errorf("error getting cluster config: %v", err) + return r.initClusterConfig(nil) +} + +func (r *ResourceCollector) initClusterConfig(config *restclient.Config) error { + var err error + if config == nil { + config, err = restclient.InClusterConfig() + if err != nil { + return fmt.Errorf("error getting cluster config: %v", err) + } } aeclient, err := apiextensionsclient.NewForConfig(config) @@ -62,6 +69,9 @@ func (r *ResourceCollector) Init() error { if err != nil { return err } + + // reset k8s instance to given cluster config + k8s.Instance().SetConfig(config) return nil } @@ -92,12 +102,16 @@ func resourceToBeCollected(resource metav1.APIResource) bool { } // GetResources gets all the resources in the given list of namespaces which match the labelSelectors -func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string) ([]runtime.Unstructured, error) { - err := r.discoveryHelper.Refresh() +func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string, remoteConfig *restclient.Config, destCluster bool) ([]runtime.Unstructured, error) { + err := r.initClusterConfig(remoteConfig) if err != nil { return nil, err } + err = r.discoveryHelper.Refresh() + if err != nil { + return nil, err + } allObjects := make([]runtime.Unstructured, 0) // Map to prevent collection of duplicate objects @@ -118,7 +132,11 @@ func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map if !resourceToBeCollected(resource) { continue } - + // skip collecting non-namespaced resources for + // destination cluster + if destCluster && !resource.Namespaced { + continue + } for _, ns := range namespaces { var dynamicClient dynamic.ResourceInterface if !resource.Namespaced { @@ -154,7 +172,7 @@ func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map return nil, fmt.Errorf("error casting object: %v", o) } - collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, crbs, ns) + collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, crbs, ns, destCluster) if err != nil { return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err) } @@ -187,12 +205,24 @@ func (r *ResourceCollector) objectToBeCollected( object runtime.Unstructured, crbs *rbacv1.ClusterRoleBindingList, namespace string, + destCluster bool, ) (bool, error) { metadata, err := meta.Accessor(object) if err != nil { return false, err } + // check migration resources + if destCluster { + if val, ok := metadata.GetAnnotations()[storkMigrationAnnotation]; ok { + if skip, err := strconv.ParseBool(val); err == nil && !skip { + return true, err + } + } else { + return false, nil + } + } + if value, present := metadata.GetAnnotations()[skipResourceAnnotation]; present { if skip, err := strconv.ParseBool(value); err == nil && skip { return false, err