Skip to content

Commit

Permalink
Move destination resource collection to migration controller
Browse files Browse the repository at this point in the history
 - use NewInstance() instead of changing singleton k8s instance in
   resource collector
 - collect old resources to purge inside migration controller instead
   of resourcecollector pkg
 - change migration cleanup status to migration purge
 - collect resource from destination cluster using new resource
collector

Signed-off-by: Ram <[email protected]>
  • Loading branch information
ram-infrac committed Nov 27, 2019
1 parent ef30674 commit 8026c7c
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 101 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/stork/stork.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func runStork(d volume.Driver, recorder record.EventRecorder, c *cli.Context) {
resourceCollector := resourcecollector.ResourceCollector{
Driver: d,
}
if err := resourceCollector.Init(); err != nil {
if err := resourceCollector.Init(nil); err != nil {
log.Fatalf("Error initializing ResourceCollector: %v", err)
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/apis/stork/v1alpha1/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ const (

// MigrationSpec is the spec used to migrate apps between clusterpairs
type MigrationSpec struct {
ClusterPair string `json:"clusterPair"`
AdminClusterPair string `json:"adminClusterPair"`
Namespaces []string `json:"namespaces"`
IncludeResources *bool `json:"includeResources"`
IncludeVolumes *bool `json:"includeVolumes"`
StartApplications *bool `json:"startApplications"`
AllowCleaningResources *bool `json:"allowCleaningResources"`
Selectors map[string]string `json:"selectors"`
PreExecRule string `json:"preExecRule"`
PostExecRule string `json:"postExecRule"`
ClusterPair string `json:"clusterPair"`
AdminClusterPair string `json:"adminClusterPair"`
Namespaces []string `json:"namespaces"`
IncludeResources *bool `json:"includeResources"`
IncludeVolumes *bool `json:"includeVolumes"`
StartApplications *bool `json:"startApplications"`
PurgeDeletedResources *bool `json:"purgeDeletedResources"`
Selectors map[string]string `json:"selectors"`
PreExecRule string `json:"preExecRule"`
PostExecRule string `json:"postExecRule"`
}

// MigrationStatus is the status of a migration operation
Expand Down Expand Up @@ -81,8 +81,8 @@ const (
MigrationStatusPartialSuccess MigrationStatusType = "PartialSuccess"
// MigrationStatusSuccessful for when migration has completed successfully
MigrationStatusSuccessful MigrationStatusType = "Successful"
// MigrationStatusCleaned for when migration objects has been deleted
MigrationStatusCleaned MigrationStatusType = "Cleaned"
// MigrationStatusPurged for when migration objects has been deleted
MigrationStatusPurged MigrationStatusType = "Purged"
)

// MigrationStageType is the stage of the migration
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/stork/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/applicationmanager/controllers/applicationbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (a *ApplicationBackupController) preparePVResource(
func (a *ApplicationBackupController) backupResources(
backup *stork_api.ApplicationBackup,
) error {
allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors, nil, false)
allObjects, err := a.ResourceCollector.GetResources(backup.Spec.Namespaces, backup.Spec.Selectors)
if err != nil {
log.ApplicationBackupLog(backup).Errorf("Error getting resources: %v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/applicationmanager/controllers/applicationclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (a *ApplicationCloneController) applyResources(
func (a *ApplicationCloneController) cloneResources(
clone *stork_api.ApplicationClone,
) error {
allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors, nil, false)
allObjects, err := a.ResourceCollector.GetResources([]string{clone.Spec.SourceNamespace}, clone.Spec.Selectors)
if err != nil {
log.ApplicationCloneLog(clone).Errorf("Error getting resources: %v", err)
return err
Expand Down
123 changes: 87 additions & 36 deletions pkg/migration/controllers/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ const (
StorkMigrationReplicasAnnotation = "stork.libopenstorage.org/migrationReplicas"
// StorkMigrationAnnotation is the annotation used to keep track of resources
// migrated by stork
StorkMigrationAnnotation = "stork.libopenstorage.org/storkMigration"
StorkMigrationAnnotation = "stork.libopenstorage.org/migrated"
// StorkMigrationName is the annotation used to identify resource migrated by
// migration CRD name
StorkMigrationName = "stork.libopenstorage.org/migrationName"
// StorkMigrationTime is the annotation used to specify time of migration
StorkMigrationTime = "stork.libopenstorage.org/migrationTime"
// Max number of times to retry applying resources on the desination
maxApplyRetries = 10
)
Expand Down Expand Up @@ -119,9 +124,9 @@ func setDefaults(migration *stork_api.Migration) *stork_api.Migration {
defaultBool := false
migration.Spec.StartApplications = &defaultBool
}
if migration.Spec.AllowCleaningResources == nil {
if migration.Spec.PurgeDeletedResources == nil {
defaultBool := false
migration.Spec.AllowCleaningResources = &defaultBool
migration.Spec.PurgeDeletedResources = &defaultBool
}
return migration
}
Expand Down Expand Up @@ -298,82 +303,124 @@ func (m *MigrationController) Handle(ctx context.Context, event sdk.Event) error
return nil
}

func (m *MigrationController) cleanupMigratedResources(migration *stork_api.Migration) error {
func (m *MigrationController) purgeMigratedResources(migration *stork_api.Migration) error {
remoteConfig, err := getClusterPairSchedulerConfig(migration.Spec.ClusterPair, migration.Namespace)
if err != nil {
return err
}

destObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, remoteConfig, true)
log.MigrationLog(migration).Infof("Purging old unused resources ...")
// use seperate resource collector for collecting resources
// from destination cluster
rc := resourcecollector.ResourceCollector{
Driver: m.Driver,
}
err = rc.Init(remoteConfig)
if err != nil {
log.MigrationLog(migration).Errorf("Error initializing resource collector: %v", err)
return err
}
destObjects, err := rc.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
string(stork_api.MigrationStatusFailed),
fmt.Sprintf("Error getting resource: %v", err))
fmt.Sprintf("Error getting resources from destination: %v", err))
log.MigrationLog(migration).Errorf("Error getting resources: %v", err)
return err
}
srcObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false)
srcObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
string(stork_api.MigrationStatusFailed),
fmt.Sprintf("Error getting resource: %v", err))
fmt.Sprintf("Error getting resources from source: %v", err))
log.MigrationLog(migration).Errorf("Error getting resources: %v", err)
return err
}
obj, err := objectToCollect(destObjects)
if err != nil {
return err
}
toBeDeleted := objectTobeDeleted(srcObjects, obj)
dynamicInterface, err := dynamic.NewForConfig(remoteConfig)
if err != nil {
return err
}

toBeDeleted := objectTobeDeleted(srcObjects, destObjects)
err = m.ResourceCollector.DeleteResources(dynamicInterface, toBeDeleted)
if err != nil {
return err
}

// update status of cleaned up objects migration info
for _, r := range toBeDeleted {
nm, ns, kind := getObjectDetails(r)
nm, ns, kind, err := getObjectDetails(r)
if err != nil {
// log error and skip adding object to status
log.MigrationLog(migration).Errorf("Unable to get object details: %v", err)
continue
}
resourceInfo := &stork_api.MigrationResourceInfo{
Name: nm,
Namespace: ns,
Status: stork_api.MigrationStatusCleaned,
Status: stork_api.MigrationStatusPurged,
}
resourceInfo.Kind = kind
migration.Status.Resources = append(migration.Status.Resources, resourceInfo)
}

err = sdk.Update(migration)
if err != nil {
return err
}
return nil
}

func getObjectDetails(o interface{}) (name, namespace, kind string) {
func getObjectDetails(o interface{}) (name, namespace, kind string, err error) {
metadata, err := meta.Accessor(o)
if err != nil {
return "", "", ""
return "", "", "", err
}
objType, err := meta.TypeAccessor(o)
if err != nil {
return "", "", ""
return "", "", "", err
}
return metadata.GetName(), metadata.GetNamespace(), objType.GetKind()
return metadata.GetName(), metadata.GetNamespace(), objType.GetKind(), nil
}

func objectToCollect(destObject []runtime.Unstructured) ([]runtime.Unstructured, error) {
var objects []runtime.Unstructured
for _, obj := range destObject {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
if metadata.GetNamespace() != "" {
if val, ok := metadata.GetAnnotations()[StorkMigrationAnnotation]; ok {
if skip, err := strconv.ParseBool(val); err == nil && skip {
objects = append(objects, obj)
}
}
}
}
return objects, nil
}

func objectTobeDeleted(srcObjects, destObjects []runtime.Unstructured) []runtime.Unstructured {
var deleteObjects []runtime.Unstructured
for _, o := range destObjects {
name, namespace, kind := getObjectDetails(o)
name, namespace, kind, err := getObjectDetails(o)
if err != nil {
// skip purging if we are not able to get object details
logrus.Errorf("Unable to get object details %v", err)
continue
}
isPresent := false
logrus.Debugf("Checking if destObject(%v:%v:%v) present on source cluster", name, namespace, kind)
for _, s := range srcObjects {
sname, snamespace, skind := getObjectDetails(s)
sname, snamespace, skind, err := getObjectDetails(s)
if err != nil {
// skip purging if we are not able to get object details
continue
}
if skind == kind && snamespace == namespace && sname == name {
isPresent = true
break
}
}
if !isPresent {
Expand Down Expand Up @@ -633,7 +680,7 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e
}
}

allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors, nil, false)
allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors)
if err != nil {
m.Recorder.Event(migration,
v1.EventTypeWarning,
Expand Down Expand Up @@ -701,15 +748,8 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e
break
}
}
err = sdk.Update(migration)
if err != nil {
return err
}
logrus.Infof("Cleaning up migrated resources")
// delete resources on destination which are not present on source
// TODO: what should be idle location for this
if *migration.Spec.AllowCleaningResources {
if err := m.cleanupMigratedResources(migration); err != nil {
if *migration.Spec.PurgeDeletedResources {
if err := m.purgeMigratedResources(migration); err != nil {
message := fmt.Sprintf("Error cleaning up resources: %v", err)
log.MigrationLog(migration).Errorf(message)
m.Recorder.Event(migration,
Expand All @@ -719,6 +759,11 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e
return nil
}
}

err = sdk.Update(migration)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -990,9 +1035,15 @@ func (m *MigrationController) applyResources(
return fmt.Errorf("unable to cast object to unstructured: %v", o)
}

unstructured.SetAnnotations(map[string]string{StorkMigrationAnnotation: "true"})
log.MigrationLog(migration).Infof("Applied stork migration annotation %v %v", metadata.GetName(), unstructured.GetAnnotations())

// set migration annotations
migrAnnot := metadata.GetAnnotations()
if migrAnnot == nil {
migrAnnot = make(map[string]string)
}
migrAnnot[StorkMigrationAnnotation] = "true"
migrAnnot[StorkMigrationName] = migration.GetName()
migrAnnot[StorkMigrationTime] = time.Now().Format(nameTimeSuffixFormat)
unstructured.SetAnnotations(migrAnnot)
retries := 0
for {
_, err = dynamicClient.Create(unstructured)
Expand Down
7 changes: 3 additions & 4 deletions pkg/resourcecollector/clusterrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package resourcecollector
import (
"strings"

"github.com/portworx/sched-ops/k8s"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -177,10 +176,10 @@ func (r *ResourceCollector) mergeAndUpdateClusterRoleBinding(
return err
}

currentCRB, err := k8s.Instance().GetClusterRoleBinding(newCRB.Name)
currentCRB, err := r.k8sOps.GetClusterRoleBinding(newCRB.Name)
if err != nil {
if apierrors.IsNotFound(err) {
_, err = k8s.Instance().CreateClusterRoleBinding(&newCRB)
_, err = r.k8sOps.CreateClusterRoleBinding(&newCRB)
}
return err
}
Expand All @@ -204,6 +203,6 @@ func (r *ResourceCollector) mergeAndUpdateClusterRoleBinding(
currentCRB.Subjects = append(currentCRB.Subjects, subject)
}

_, err = k8s.Instance().UpdateClusterRoleBinding(currentCRB)
_, err = r.k8sOps.UpdateClusterRoleBinding(currentCRB)
return err
}
5 changes: 2 additions & 3 deletions pkg/resourcecollector/persistentvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package resourcecollector
import (
"fmt"

"github.com/portworx/sched-ops/k8s"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (r *ResourceCollector) pvToBeCollected(
return false, nil
}

pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, pvcNamespace)
pvc, err := r.k8sOps.GetPersistentVolumeClaim(pvcName, pvcNamespace)
if err != nil {
return false, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/resourcecollector/persistentvolumeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package resourcecollector
import (
"fmt"

"github.com/portworx/sched-ops/k8s"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
)
Expand All @@ -19,7 +18,7 @@ func (r *ResourceCollector) pvcToBeCollected(
}

pvcName := metadata.GetName()
pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, namespace)
pvc, err := r.k8sOps.GetPersistentVolumeClaim(pvcName, namespace)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 8026c7c

Please sign in to comment.