Skip to content

Commit

Permalink
Cleanup migrated resources from destination cluster
Browse files Browse the repository at this point in the history
   - modified GetResources() to accept cluster config to fetch resource from
   - only fetch resources which has migration annotation
   - find and delete cleaned up resources from dest cluster

Signed-off-by: Ram <[email protected]>
  • Loading branch information
ram-infrac committed Nov 27, 2019
1 parent df796e6 commit ef30674
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 23 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/stork/v1alpha1/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
MigrationStatusPartialSuccess MigrationStatusType = "PartialSuccess"
// MigrationStatusSuccessful for when migration has completed successfully
MigrationStatusSuccessful MigrationStatusType = "Successful"
// MigrationStatusCleaned for when migration objects has been deleted
MigrationStatusCleaned MigrationStatusType = "Cleaned"
)

// MigrationStageType is the stage of the migration
Expand Down
4 changes: 2 additions & 2 deletions pkg/applicationmanager/controllers/applicationbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/portworx/sched-ops/k8s"
"github.com/sirupsen/logrus"
"gocloud.dev/gcerrors"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -597,7 +597,7 @@ func (a *ApplicationBackupController) preparePVResource(
func (a *ApplicationBackupController) backupResources(
backup *stork_api.ApplicationBackup,
) error {
allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors)
allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors, nil, false)
if err != nil {
log.ApplicationBackupLog(backup).Errorf("Error getting resources: %v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/applicationmanager/controllers/applicationclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/operator-framework/operator-sdk/pkg/sdk"
"github.com/portworx/sched-ops/k8s"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -632,7 +632,7 @@ func (a *ApplicationCloneController) applyResources(
func (a *ApplicationCloneController) cloneResources(
clone *stork_api.ApplicationClone,
) error {
allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors)
allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors, nil, false)
if err != nil {
log.ApplicationCloneLog(clone).Errorf("Error getting resources: %v", err)
return err
Expand Down
108 changes: 101 additions & 7 deletions pkg/migration/controllers/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func setDefaults(migration *stork_api.Migration) *stork_api.Migration {
defaultBool := false
migration.Spec.StartApplications = &defaultBool
}
if migration.Spec.AllowCleaningResources == nil {
defaultBool := false
migration.Spec.AllowCleaningResources = &defaultBool
}
return migration
}

Expand Down Expand Up @@ -285,13 +289,7 @@ func (m *MigrationController) Handle(ctx context.Context, event sdk.Event) error
message)
return nil
}

case stork_api.MigrationStageFinal:
// delete resources on destination which are not present on source
// TODO: what should be idle location for this
if *migration.Spec.AllowCleaningResources {
return m.cleanupMigratedResources(migration)
}
return nil
default:
log.MigrationLog(migration).Errorf("Invalid stage for migration: %v", migration.Status.Stage)
Expand All @@ -301,9 +299,91 @@ func (m *MigrationController) Handle(ctx context.Context, event sdk.Event) error
}

func (m *MigrationController) cleanupMigratedResources(migration *stork_api.Migration) error {
remoteConfig, err := getClusterPairSchedulerConfig(migration.Spec.ClusterPair, migration.Namespace)
if err != nil {
return err
}

destObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, remoteConfig, true)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
string(stork_api.MigrationStatusFailed),
fmt.Sprintf("Error getting resource: %v", err))
log.MigrationLog(migration).Errorf("Error getting resources: %v", err)
return err
}
srcObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
string(stork_api.MigrationStatusFailed),
fmt.Sprintf("Error getting resource: %v", err))
log.MigrationLog(migration).Errorf("Error getting resources: %v", err)
return err
}
dynamicInterface, err := dynamic.NewForConfig(remoteConfig)
if err != nil {
return err
}

toBeDeleted := objectTobeDeleted(srcObjects, destObjects)
err = m.ResourceCollector.DeleteResources(dynamicInterface, toBeDeleted)
if err != nil {
return err
}

// update status of cleaned up objects migration info
for _, r := range toBeDeleted {
nm, ns, kind := getObjectDetails(r)
resourceInfo := &stork_api.MigrationResourceInfo{
Name: nm,
Namespace: ns,
Status: stork_api.MigrationStatusCleaned,
}
resourceInfo.Kind = kind
migration.Status.Resources = append(migration.Status.Resources, resourceInfo)
}

err = sdk.Update(migration)
if err != nil {
return err
}
return nil
}

func getObjectDetails(o interface{}) (name, namespace, kind string) {
metadata, err := meta.Accessor(o)
if err != nil {
return "", "", ""
}
objType, err := meta.TypeAccessor(o)
if err != nil {
return "", "", ""
}
return metadata.GetName(), metadata.GetNamespace(), objType.GetKind()
}

func objectTobeDeleted(srcObjects, destObjects []runtime.Unstructured) []runtime.Unstructured {
var deleteObjects []runtime.Unstructured
for _, o := range destObjects {
name, namespace, kind := getObjectDetails(o)
isPresent := false
logrus.Debugf("Checking if destObject(%v:%v:%v) present on source cluster", name, namespace, kind)
for _, s := range srcObjects {
sname, snamespace, skind := getObjectDetails(s)
if skind == kind && snamespace == namespace && sname == name {
isPresent = true
}
}
if !isPresent {
logrus.Infof("Deleting object from destination(%v:%v:%v)", name, namespace, kind)
deleteObjects = append(deleteObjects, o)
}
}
return deleteObjects
}

func (m *MigrationController) namespaceMigrationAllowed(migration *stork_api.Migration) bool {
// Restrict migration to only the namespace that the object belongs
// except for the namespace designated by the admin
Expand Down Expand Up @@ -553,7 +633,7 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e
}
}

allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors)
allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
Expand Down Expand Up @@ -625,6 +705,20 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e
if err != nil {
return err
}
logrus.Infof("Cleaning up migrated resources")
// delete resources on destination which are not present on source
// TODO: what should be idle location for this
if *migration.Spec.AllowCleaningResources {
if err := m.cleanupMigratedResources(migration); err != nil {
message := fmt.Sprintf("Error cleaning up resources: %v", err)
log.MigrationLog(migration).Errorf(message)
m.Recorder.Event(migration,
v1.EventTypeWarning,
string(stork_api.MigrationStatusPartialSuccess),
message)
return nil
}
}
return nil
}

Expand Down
54 changes: 42 additions & 12 deletions pkg/resourcecollector/resourcecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)

const (
// Annotation to use when the resource shouldn't be collected
skipResourceAnnotation = "stork.libopenstorage.org/skipresource"

deletedMaxRetries = 12
deletedRetryInterval = 10 * time.Second
skipResourceAnnotation = "stork.libopenstorage.org/skipresource"
storkMigrationAnnotation = "stork.libopenstorage.org/storkMigration"
deletedMaxRetries = 12
deletedRetryInterval = 10 * time.Second
)

// ResourceCollector is used to collect and process unstructured objects in namespaces and using label selectors
Expand All @@ -42,9 +42,16 @@ type ResourceCollector struct {

// Init initializes the resource collector
func (r *ResourceCollector) Init() error {
config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("error getting cluster config: %v", err)
return r.initClusterConfig(nil)
}

func (r *ResourceCollector) initClusterConfig(config *restclient.Config) error {
var err error
if config == nil {
config, err = restclient.InClusterConfig()
if err != nil {
return fmt.Errorf("error getting cluster config: %v", err)
}
}

aeclient, err := apiextensionsclient.NewForConfig(config)
Expand All @@ -62,6 +69,9 @@ func (r *ResourceCollector) Init() error {
if err != nil {
return err
}

// reset k8s instance to given cluster config
k8s.Instance().SetConfig(config)
return nil
}

Expand Down Expand Up @@ -92,12 +102,16 @@ func resourceToBeCollected(resource metav1.APIResource) bool {
}

// GetResources gets all the resources in the given list of namespaces which match the labelSelectors
func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string) ([]runtime.Unstructured, error) {
err := r.discoveryHelper.Refresh()
func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string, remoteConfig *restclient.Config, destCluster bool) ([]runtime.Unstructured, error) {
err := r.initClusterConfig(remoteConfig)
if err != nil {
return nil, err
}

err = r.discoveryHelper.Refresh()
if err != nil {
return nil, err
}
allObjects := make([]runtime.Unstructured, 0)

// Map to prevent collection of duplicate objects
Expand All @@ -118,7 +132,11 @@ func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map
if !resourceToBeCollected(resource) {
continue
}

// skip collecting non-namespaced resources for
// destination cluster
if destCluster && !resource.Namespaced {
continue
}
for _, ns := range namespaces {
var dynamicClient dynamic.ResourceInterface
if !resource.Namespaced {
Expand Down Expand Up @@ -154,7 +172,7 @@ func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map
return nil, fmt.Errorf("error casting object: %v", o)
}

collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, crbs, ns)
collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, crbs, ns, destCluster)
if err != nil {
return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err)
}
Expand Down Expand Up @@ -187,12 +205,24 @@ func (r *ResourceCollector) objectToBeCollected(
object runtime.Unstructured,
crbs *rbacv1.ClusterRoleBindingList,
namespace string,
destCluster bool,
) (bool, error) {
metadata, err := meta.Accessor(object)
if err != nil {
return false, err
}

// check migration resources
if destCluster {
if val, ok := metadata.GetAnnotations()[storkMigrationAnnotation]; ok {
if skip, err := strconv.ParseBool(val); err == nil && !skip {
return true, err
}
} else {
return false, nil
}
}

if value, present := metadata.GetAnnotations()[skipResourceAnnotation]; present {
if skip, err := strconv.ParseBool(value); err == nil && skip {
return false, err
Expand Down

0 comments on commit ef30674

Please sign in to comment.