Skip to content

Commit

Permalink
refactor: remove isCLIAPIVersionOne related codes
Browse files Browse the repository at this point in the history
Longhorn 7191

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit authored and innobead committed Aug 18, 2024
1 parent 0b8faf9 commit 5c07281
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 213 deletions.
84 changes: 1 addition & 83 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,6 @@ func (ec *EngineController) syncEngine(key string) (err error) {
}
}()

isCLIAPIVersionOne := false
if types.IsDataEngineV1(engine.Spec.DataEngine) {
if engine.Status.CurrentImage != "" {
isCLIAPIVersionOne, err = ec.ds.IsEngineImageCLIAPIVersionOne(engine.Status.CurrentImage)
if err != nil {
return err
}
}
}

syncReplicaAddressMap := false
if len(engine.Spec.UpgradedReplicaAddressMap) != 0 && engine.Status.CurrentImage != engine.Spec.Image {
if err := ec.Upgrade(engine, log); err != nil {
Expand All @@ -359,16 +349,6 @@ func (ec *EngineController) syncEngine(key string) (err error) {
return err
}

// For incompatible engine, skip starting engine monitor and clean up fields when the engine is not running
if isCLIAPIVersionOne {
if engine.Status.CurrentState != longhorn.InstanceStateRunning {
engine.Status.Endpoint = ""
engine.Status.ReplicaModeMap = nil
engine.Status.ReplicaTransitionTimeMap = nil
}
return nil
}

if engine.Status.CurrentState == longhorn.InstanceStateRunning {
// we allow across monitoring temporarily due to migration case
if !ec.isMonitoring(engine) {
Expand Down Expand Up @@ -521,15 +501,8 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
if !ok {
return fmt.Errorf("invalid object for engine process deletion: %v", obj)
}
log := getLoggerForEngine(ec.logger, e)

if types.IsDataEngineV1(e.Spec.DataEngine) {
err = ec.deleteInstanceWithCLIAPIVersionOne(e)
if err != nil {
return err
}
}

log := getLoggerForEngine(ec.logger, e)
var im *longhorn.InstanceManager

// Not assigned or not updated, try best to delete
Expand Down Expand Up @@ -643,61 +616,6 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
return nil
}

func (ec *EngineController) deleteInstanceWithCLIAPIVersionOne(e *longhorn.Engine) (err error) {
isCLIAPIVersionOne := false
if e.Status.CurrentImage != "" {
isCLIAPIVersionOne, err = ec.ds.IsEngineImageCLIAPIVersionOne(e.Status.CurrentImage)
if err != nil {
return err
}
}

if isCLIAPIVersionOne {
pod, err := ec.kubeClient.CoreV1().Pods(ec.namespace).Get(context.TODO(), e.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get pod for old engine %v", e.Name)
}
if apierrors.IsNotFound(err) {
pod = nil
}

ec.logger.WithField("engine", e.Name).Info("Deleting engine pod because of outdated version")
ec.deleteOldEnginePod(pod, e)
}
return nil
}

func (ec *EngineController) deleteOldEnginePod(pod *corev1.Pod, e *longhorn.Engine) {
// pod already stopped
if pod == nil {
return
}

log := ec.logger.WithField("pod", pod.Name)
if pod.DeletionTimestamp != nil {
if pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds != 0 {
// force deletion in the case of node lost
deletionDeadline := pod.DeletionTimestamp.Add(time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second)
now := time.Now().UTC()
if now.After(deletionDeadline) {
log.Warnf("Engine pod still exists after grace period %v passed, force deletion: now %v, deadline %v", pod.DeletionGracePeriodSeconds, now, deletionDeadline)
gracePeriod := int64(0)
if err := ec.kubeClient.CoreV1().Pods(ec.namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil {
log.WithError(err).Warn("Failed to force delete engine pod")
return
}
}
}
return
}

if err := ec.kubeClient.CoreV1().Pods(ec.namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
ec.eventRecorder.Eventf(e, corev1.EventTypeWarning, constant.EventReasonFailedStopping, "Error stopping pod for old engine %v: %v", pod.Name, err)
return
}
ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonStop, "Stops pod for old engine %v", pod.Name)
}

func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
e, ok := obj.(*longhorn.Engine)
if !ok {
Expand Down
72 changes: 21 additions & 51 deletions controller/instance_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,45 +256,33 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn

log := logrus.WithField("instance", instanceName)

isCLIAPIVersionOne := false
if types.IsDataEngineV1(spec.DataEngine) {
if status.CurrentImage != "" {
isCLIAPIVersionOne, err = h.ds.IsEngineImageCLIAPIVersionOne(status.CurrentImage)
if err != nil {
var im *longhorn.InstanceManager
if status.InstanceManagerName != "" {
im, err = h.ds.GetInstanceManagerRO(status.InstanceManagerName)
if err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}
}
}

var im *longhorn.InstanceManager
if !isCLIAPIVersionOne {
if status.InstanceManagerName != "" {
im, err = h.ds.GetInstanceManagerRO(status.InstanceManagerName)
if err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}
}
// There should be an available instance manager for a scheduled instance when its related engine image is compatible
if im == nil && spec.Image != "" && spec.NodeID != "" {
dataEngineEnabled, err := h.ds.IsDataEngineEnabled(spec.DataEngine)
if err != nil {
return err
}
// There should be an available instance manager for a scheduled instance when its related engine image is compatible
if im == nil && spec.Image != "" && spec.NodeID != "" {
dataEngineEnabled, err := h.ds.IsDataEngineEnabled(spec.DataEngine)
if err != nil {
return err
}
if !dataEngineEnabled {
return nil
}
// The related node maybe cleaned up then there is no available instance manager for this instance (typically it's replica).
isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeletedOrDelinquent(spec.NodeID, spec.VolumeName)
if !dataEngineEnabled {
return nil
}
// The related node maybe cleaned up then there is no available instance manager for this instance (typically it's replica).
isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeletedOrDelinquent(spec.NodeID, spec.VolumeName)
if err != nil {
return err
}
if !isNodeDownOrDeleted {
im, err = h.ds.GetInstanceManagerByInstanceRO(obj)
if err != nil {
return err
}
if !isNodeDownOrDeleted {
im, err = h.ds.GetInstanceManagerByInstanceRO(obj)
if err != nil {
return errors.Wrapf(err, "failed to get instance manager for instance %v", instanceName)
}
return errors.Wrapf(err, "failed to get instance manager for instance %v", instanceName)
}
}
}
Expand Down Expand Up @@ -334,10 +322,6 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
// do nothing for incompatible instance except for deleting
switch spec.DesireState {
case longhorn.InstanceStateRunning:
if isCLIAPIVersionOne {
return nil
}

if im == nil {
break
}
Expand All @@ -364,20 +348,6 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
}

case longhorn.InstanceStateStopped:
if isCLIAPIVersionOne {
if err := h.deleteInstance(instanceName, runtimeObj); err != nil {
return err
}
status.Started = false
status.CurrentState = longhorn.InstanceStateStopped
status.CurrentImage = ""
status.InstanceManagerName = ""
status.IP = ""
status.StorageIP = ""
status.Port = 0
return nil
}

if im != nil && im.DeletionTimestamp == nil {
// there is a delay between deleteInstance() invocation and state/InstanceManager update,
// deleteInstance() may be called multiple times.
Expand Down
65 changes: 0 additions & 65 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -30,7 +29,6 @@ import (

lhns "github.com/longhorn/go-common-libs/ns"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/datastore"
"github.com/longhorn/longhorn-manager/engineapi"
"github.com/longhorn/longhorn-manager/types"
Expand Down Expand Up @@ -520,12 +518,6 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) {
}
log := getLoggerForReplica(rc.logger, r)

if types.IsDataEngineV1(r.Spec.DataEngine) {
if err := rc.deleteInstanceWithCLIAPIVersionOne(r); err != nil {
return err
}
}

var im *longhorn.InstanceManager
// Not assigned or not updated, try best to delete
if r.Status.InstanceManagerName == "" {
Expand Down Expand Up @@ -614,63 +606,6 @@ func deleteUnixSocketFile(volumeName string) error {
return os.RemoveAll(filepath.Join(types.UnixDomainSocketDirectoryOnHost, volumeName+filepath.Ext(".sock")))
}

func (rc *ReplicaController) deleteInstanceWithCLIAPIVersionOne(r *longhorn.Replica) (err error) {
isCLIAPIVersionOne := false
if r.Status.CurrentImage != "" {
isCLIAPIVersionOne, err = rc.ds.IsEngineImageCLIAPIVersionOne(r.Status.CurrentImage)
if err != nil {
return err
}
}

if isCLIAPIVersionOne {
pod, err := rc.kubeClient.CoreV1().Pods(rc.namespace).Get(context.TODO(), r.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get pod for old replica %v", r.Name)
}
if apierrors.IsNotFound(err) {
pod = nil
}

log := getLoggerForReplica(rc.logger, r)
log.Info("Deleting old version replica with running pod")
rc.deleteOldReplicaPod(pod, r)
}
return nil
}

func (rc *ReplicaController) deleteOldReplicaPod(pod *corev1.Pod, r *longhorn.Replica) {
// pod already stopped
if pod == nil {
return
}

if pod.DeletionTimestamp != nil {
if pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds != 0 {
// force deletion in the case of node lost
deletionDeadline := pod.DeletionTimestamp.Add(time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second)
now := time.Now().UTC()
if now.After(deletionDeadline) {
log := rc.logger.WithField("pod", pod.Name)
log.Warnf("Replica pod still exists after grace period %v passed, force deletion: now %v, deadline %v",
pod.DeletionGracePeriodSeconds, now, deletionDeadline)
gracePeriod := int64(0)
if err := rc.kubeClient.CoreV1().Pods(rc.namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil {
log.WithError(err).Warn("Failed to force deleting replica pod")
return
}
}
}
return
}

if err := rc.kubeClient.CoreV1().Pods(rc.namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
rc.eventRecorder.Eventf(r, corev1.EventTypeWarning, constant.EventReasonFailedStopping, "Error stopping pod for old replica %v: %v", pod.Name, err)
return
}
rc.eventRecorder.Eventf(r, corev1.EventTypeNormal, constant.EventReasonStop, "Stops pod for old replica %v", pod.Name)
}

func (rc *ReplicaController) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
r, ok := obj.(*longhorn.Replica)
if !ok {
Expand Down
14 changes: 0 additions & 14 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3721,20 +3721,6 @@ func resourceVersionAtLeast(curr, min string) bool {
return currVersion >= minVersion
}

// IsEngineImageCLIAPIVersionOne get engine image CLIAPIVersion for the given name.
// Returns true if CLIAPIVersion is 1
func (s *DataStore) IsEngineImageCLIAPIVersionOne(imageName string) (bool, error) {
version, err := s.GetEngineImageCLIAPIVersion(imageName)
if err != nil {
return false, err
}

if version == 1 {
return true, nil
}
return false, nil
}

// GetEngineImageCLIAPIVersion get engine image for the given name and returns the
// CLIAPIVersion
func (s *DataStore) GetEngineImageCLIAPIVersion(imageName string) (int, error) {
Expand Down

0 comments on commit 5c07281

Please sign in to comment.