Skip to content

Commit

Permalink
Merge pull request #213 from replicatedhq/laverya/ensure-all-pvcs-mig…
Browse files Browse the repository at this point in the history
…rate-on-correct-nodes

handle PV node selections with an annotation, not in memory
  • Loading branch information
laverya authored Oct 16, 2023
2 parents 5a9d9c7 + 9b69a4f commit fb461f0
Show file tree
Hide file tree
Showing 2 changed files with 500 additions and 433 deletions.
141 changes: 85 additions & 56 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
sourcePVCAnnotation = baseAnnotation + "-sourcepvc"
desiredReclaimAnnotation = baseAnnotation + "-reclaim"
DesiredAccessModeAnnotation = baseAnnotation + "-destinationaccessmode"
sourceNodeAnnotation = baseAnnotation + "-sourcenode"
)

// IsDefaultStorageClassAnnotation - this is also exported by https://github.com/kubernetes/kubernetes/blob/v1.21.3/pkg/apis/storage/v1/util/helpers.go#L25
Expand Down Expand Up @@ -77,9 +78,9 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,

for ns, nsPVCs := range matchingPVCs {
for _, nsPVC := range nsPVCs {
err = swapPVs(ctx, w, clientset, ns, nsPVC.claim.Name)
err = swapPVs(ctx, w, clientset, ns, nsPVC.Name)
if err != nil {
return fmt.Errorf("failed to swap PVs for PVC %s in %s: %w", nsPVC.claim.Name, ns, err)
return fmt.Errorf("failed to swap PVs for PVC %s in %s: %w", nsPVC.Name, ns, err)
}
}
}
Expand All @@ -101,25 +102,6 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return nil
}

type pvcCtx struct {
claim *corev1.PersistentVolumeClaim
usedByPod *corev1.Pod
}

func (p *pvcCtx) Copy() *pvcCtx {
return &pvcCtx{
p.claim.DeepCopy(),
p.usedByPod.DeepCopy(),
}
}

func (pvc pvcCtx) getNodeNameRef() string {
if pvc.usedByPod == nil {
return ""
}
return pvc.usedByPod.Spec.NodeName
}

// swapDefaultStorageClasses attempts to set newDefaultSC as the default StorageClass
// if oldDefaultSC was set as the default, then it will be unset first
// if another StorageClass besides these two is currently the default, it will return an error
Expand Down Expand Up @@ -186,24 +168,30 @@ func swapDefaultStorageClasses(ctx context.Context, w *log.Logger, clientset k8s
return nil
}

func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]pvcCtx, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
// create a pod for each PVC migration, and wait for it to finish
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", sourceSCName, destSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
sourcePvcName, destPvcName := nsPvc.claim.Name, newPvcName(nsPvc.claim.Name)
w.Printf("Copying data from %s (%s) to %s in %s\n", sourcePvcName, nsPvc.claim.Spec.VolumeName, destPvcName, ns)
sourcePvcName, destPvcName := nsPvc.Name, newPvcName(nsPvc.Name)
w.Printf("Copying data from %s (%s) to %s in %s\n", sourcePvcName, nsPvc.Spec.VolumeName, destPvcName, ns)

err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, rsyncImage, verboseCopy, waitTime, nsPvc.getNodeNameRef(), rsyncFlags)
err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, rsyncImage, verboseCopy, waitTime, rsyncFlags)
if err != nil {
return fmt.Errorf("failed to copy PVC %s in %s: %w", nsPvc.claim.Name, ns, err)
return fmt.Errorf("failed to copy PVC %s in %s: %w", nsPvc.Name, ns, err)
}
}
}
return nil
}

func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, rsyncImage string, verboseCopy bool, waitTime time.Duration, nodeName string, rsyncFlags []string) error {
func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, rsyncImage string, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
w.Printf("Determining the node to migrate PVC %s on\n", sourcePvcName)
nodeName, err := getDesiredNode(ctx, clientset, ns, sourcePvcName)
if err != nil {
return fmt.Errorf("failed to get node for PVC %s in %s: %w", sourcePvcName, ns, err)
}

w.Printf("Creating pvc migrator pod on node %s\n", nodeName)
createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, rsyncImage, nodeName, rsyncFlags)
if err != nil {
Expand Down Expand Up @@ -419,7 +407,7 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s
// a map of namespaces to arrays of original PVCs
// an array of namespaces that the PVCs were found within
// an error, if one was encountered
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]pvcCtx, []string, error) {
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]*corev1.PersistentVolumeClaim, []string, error) {
// get PVs using the specified storage provider
pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -437,18 +425,16 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}

// get PVCs using specified PVs
matchingPVCs := map[string][]pvcCtx{}
var pvcInfo pvcCtx
matchingPVCs := map[string][]*corev1.PersistentVolumeClaim{}
for _, pv := range matchingPVs {
if pv.Spec.ClaimRef != nil {
pvc, err := clientset.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(ctx, pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err)
}
pvcInfo.claim = pvc

if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" {
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvcInfo)
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc)
}

} else {
Expand All @@ -466,8 +452,8 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
_, _ = fmt.Fprintf(tw, "namespace:\tpvc:\tpv:\tsize:\t\n")
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
pvCap := pvsByName[nsPvc.claim.Spec.VolumeName].Spec.Capacity
_, _ = fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t\n", ns, nsPvc.claim.Name, nsPvc.claim.Spec.VolumeName, pvCap.Storage().String())
pvCap := pvsByName[nsPvc.Spec.VolumeName].Spec.Capacity
_, _ = fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t\n", ns, nsPvc.Name, nsPvc.Spec.VolumeName, pvCap.Storage().String())
}
}
err = tw.Flush()
Expand All @@ -479,16 +465,16 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
newName := newPvcName(nsPvc.claim.Name)
newName := newPvcName(nsPvc.Name)

desiredPV, ok := pvsByName[nsPvc.claim.Spec.VolumeName]
desiredPV, ok := pvsByName[nsPvc.Spec.VolumeName]
if !ok {
return nil, nil, fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.claim.Spec.VolumeName, nsPvc.claim.Name, ns)
return nil, nil, fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
}

desiredPvStorage, ok := desiredPV.Spec.Capacity[corev1.ResourceStorage]
if !ok {
return nil, nil, fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.claim.Spec.VolumeName, nsPvc.claim.Name, ns)
return nil, nil, fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
}

// check to see if the desired PVC name already exists (and is appropriate)
Expand All @@ -505,27 +491,27 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
w.Printf("found existing PVC with name %s, not creating new one\n", newName)
continue
} else {
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.claim.Name)
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.Name)
}
} else {
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.claim.Name)
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.Name)
}
}

// Set destination access mode based on annotations
destAccessModes, err := GetDestAccessModes(*nsPvc.claim)
destAccessModes, err := GetDestAccessModes(*nsPvc)
if err != nil {
return nil, nil, fmt.Errorf("failed to get destination access mode for PVC %s in %s: %w", nsPvc.claim.Name, ns, err)
return nil, nil, fmt.Errorf("failed to get destination access mode for PVC %s in %s: %w", nsPvc.Name, ns, err)
}

// if it doesn't already exist, create it
newPVC, err := clientset.CoreV1().PersistentVolumeClaims(ns).Create(ctx, &corev1.PersistentVolumeClaim{
TypeMeta: nsPvc.claim.TypeMeta,
TypeMeta: nsPvc.TypeMeta,
ObjectMeta: metav1.ObjectMeta{
Name: newName,
Namespace: ns,
Labels: map[string]string{
baseAnnotation: nsPvc.claim.Name,
baseAnnotation: nsPvc.Name,
kindAnnotation: "dest",
},
},
Expand Down Expand Up @@ -698,7 +684,7 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error.
// if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted.
// It returns a map of namespace to PVCs and any errors encountered.
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]pvcCtx, checkInterval time.Duration) (map[string][]pvcCtx, error) {
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) (map[string][]*corev1.PersistentVolumeClaim, error) {
// build new map with complete pvcCtx
updatedPVCs := matchingPVCs

Expand All @@ -711,19 +697,13 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
nsPod := nsPod // need a new var that is per-iteration, NOT per-loop

perPodLoop:
for _, podVol := range nsPod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil {
for idx, nsPvClaim := range nsPvcs {
nsPvClaim := nsPvClaim // need a new var that is per-iteration, NOT per-loop
if podVol.PersistentVolumeClaim.ClaimName == nsPvClaim.claim.Name {
for _, nsPvClaim := range nsPvcs {
if podVol.PersistentVolumeClaim.ClaimName == nsPvClaim.Name {
matchingPods[ns] = append(matchingPods[ns], nsPod)
matchingPodsCount++
// when migrating the pvc data we'll use the nodeName in the podSpec to create the volume
// on the node where the pod was originally scheduled on
updatedPVCs[ns][idx] = pvcCtx{nsPvClaim.claim, &nsPod}
break perPodLoop // exit the for _, podVol := range nsPod.Spec.Volumes loop, as we've already determined that this pod matches
}
}
Expand All @@ -732,6 +712,40 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}
}

for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
for _, podVol := range nsPod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil {
for _, nsPvClaim := range nsPvcs {
if podVol.PersistentVolumeClaim.ClaimName == nsPvClaim.Name {
// when migrating the pvc data we'll use the nodeName to create the volume
// on the node where the pod was originally scheduled on

// TODO this is the only place in this function that we mutate the existing cluster, is there a better way?
err = mutatePV(ctx, w, clientset, nsPvClaim.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
// add annotations describing what node this data came from to help migrate
if volume.Annotations == nil {
volume.Annotations = map[string]string{}
}
volume.Annotations[sourceNodeAnnotation] = nsPod.Spec.NodeName
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Annotations[sourceNodeAnnotation] == nsPod.Spec.NodeName
})
if err != nil {
return nil, fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err)
}
}
}
}
}
}
}

w.Printf("\nFound %d matching pods to migrate across %d namespaces:\n", matchingPodsCount, len(matchingPods))
tw := tabwriter.NewWriter(w.Writer(), 2, 2, 1, ' ', 0)
_, _ = fmt.Fprintf(tw, "namespace:\tpod:\t\n")
Expand Down Expand Up @@ -868,12 +882,12 @@ checkPvcPodLoop:
for _, podVol := range nsPod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil {
for _, nsClaim := range nsPvcs {
if podVol.PersistentVolumeClaim.ClaimName == nsClaim.claim.Name {
if podVol.PersistentVolumeClaim.ClaimName == nsClaim.Name {
if nsPod.CreationTimestamp.After(migrationStartTime) {
return nil, fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.claim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
return nil, fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
}

w.Printf("Found pod %s in %s mounting to-be-migrated PVC %s, waiting\n", nsPod.Name, ns, nsClaim.claim.Name)
w.Printf("Found pod %s in %s mounting to-be-migrated PVC %s, waiting\n", nsPod.Name, ns, nsClaim.Name)
time.Sleep(checkInterval) // don't check too often, as this loop is relatively expensive
continue checkPvcPodLoop // as soon as we find a matching pod, we know we need to wait another 30s
}
Expand Down Expand Up @@ -1191,3 +1205,18 @@ func GetDestAccessModes(srcPVC corev1.PersistentVolumeClaim) ([]corev1.Persisten
}
return destAccessMode, nil
}

func getDesiredNode(ctx context.Context, clientset k8sclient.Interface, ns, sourcePvcName string) (string, error) {
sourceClaim, err := clientset.CoreV1().PersistentVolumeClaims(ns).Get(ctx, sourcePvcName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get PVC %s in %s: %w", sourcePvcName, ns, err)
}

sourcePV, err := clientset.CoreV1().PersistentVolumes().Get(ctx, sourceClaim.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get PV %s in %s: %w", sourceClaim.Spec.VolumeName, ns, err)
}

nodeName := sourcePV.Annotations[sourceNodeAnnotation]
return nodeName, nil
}
Loading

0 comments on commit fb461f0

Please sign in to comment.