Skip to content

Commit

Permalink
Merge branch 'v1.6.x' into renovate/v1.6.x-longhorn-branch-repo-depen…
Browse files Browse the repository at this point in the history
…dencies
  • Loading branch information
innobead authored Sep 24, 2024
2 parents 7f1071a + 442d68a commit a25f67e
Show file tree
Hide file tree
Showing 16 changed files with 1,001 additions and 446 deletions.
89 changes: 58 additions & 31 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP
return nil, err
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,39 +550,20 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
isRWXVolume = true
}

// For a RWX volume, the node down, for example, caused by kubelet restart, leads to share-manager pod deletion/recreation
// and volume detachment/attachment.
// Then, the newly created share-manager pod blindly mounts the longhorn volume inside /dev/longhorn/<pvc-name> and exports it.
// To avoid mounting a dead and orphaned volume, try to clean up the engine instance as well as the orphaned iscsi device
// regardless of the instance-manager status.
if !isRWXVolume {
if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
log.Infof("Skipping deleting engine %v since instance manager is in %v state", e.Name, im.Status.CurrentState)
return nil
}
if shouldSkip, skipReason := shouldSkipEngineDeletion(im.Status.CurrentState, isRWXVolume); shouldSkip {
log.Infof("Skipping deleting engine %v since %s", e.Name, skipReason)
return nil
}

log.Info("Deleting engine instance")

defer func() {
if err != nil {
log.WithError(err).Warnf("Failed to delete engine %v", e.Name)
}
if isRWXVolume && im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
// Try the best to delete engine instance.
// To prevent that the volume is stuck at detaching state, ignore the error when volume is
// a RWX volume and the instance manager is not running.
//
// If the engine instance of a RWX volume is not deleted successfully:
// If a RWX volume is on node A and the network of this node is partitioned,
// the owner of the share manager (SM) is transferred to node B. The engine instance and
// the block device (/dev/longhorn/pvc-xxx) on the node A become orphaned.
// If the network of the node A gets back to normal, the SM can be shifted back to node A.
// After shifting to node A, the first reattachment fail due to the IO error resulting from the
// orphaned engine instance and block device. Then, the detachment will trigger the teardown of the
// problematic engine instance and block device. The next reattachment then will succeed.
log.Warnf("Ignored the failure of deleting engine %v", e.Name)
err = nil
if canIgnore, ignoreReason := canIgnoreEngineDeletionFailure(im.Status.CurrentState, isRWXVolume); canIgnore {
log.Warnf("Ignored the failure to delete engine %v because %s", e.Name, ignoreReason)
err = nil
}
}
}()

Expand All @@ -607,7 +588,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
return nil
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -697,7 +678,7 @@ func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProc
return nil, err
}
}
c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, err
}
Expand All @@ -717,7 +698,7 @@ func (ec *EngineController) LogInstance(ctx context.Context, obj interface{}) (*
return nil, nil, err
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -2080,7 +2061,7 @@ func (ec *EngineController) UpgradeEngineInstance(e *longhorn.Engine, log *logru
return err
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -2312,3 +2293,49 @@ func sizeThreshold(nominalSize int64) int64 {
}
return 100 * util.MiB // Update status for any change > 100 MiB.
}

func shouldSkipEngineDeletion(imState longhorn.InstanceManagerState, isRWXVolume bool) (canSkip bool, reason string) {
// For a RWX volume, the node down, for example, caused by kubelet restart, leads to share-manager pod
// deletion/recreation and volume detachment/attachment. Then, the newly created share-manager pod blindly mounts
// the longhorn volume inside /dev/longhorn/<pvc-name> and exports it. To avoid mounting a dead and orphaned volume,
// try to clean up the engine instance as well as the orphaned iscsi device regardless of the instance-manager
// status.
if isRWXVolume {
return false, ""
}

// If the instance manager is in an unknown state, we should at least attempt instance deletion.
if imState == longhorn.InstanceManagerStateRunning || imState == longhorn.InstanceManagerStateUnknown {
return false, ""
}

return true, fmt.Sprintf("instance manager is in %v state", imState)
}

func canIgnoreEngineDeletionFailure(imState longhorn.InstanceManagerState, isRWXVolume bool) (canIgnore bool, reason string) {
// Instance deletion is always best effort for an unknown instance manager.
if imState == longhorn.InstanceManagerStateUnknown {
return true, fmt.Sprintf("instance manager is in %v state", imState)
}

// The remaining reasons apply only to RWX volumes.
if !isRWXVolume {
return false, ""
}

// Try the best to delete engine instance.
// To prevent that the volume is stuck at detaching state, ignore the error when volume is a RWX volume and the
// instance manager is not running or the RWX volume is currently delinquent.
//
// If the engine instance of a RWX volume is not deleted successfully: If a RWX volume is on node A and the network
// of this node is partitioned, the owner of the share manager (SM) is transferred to node B. The engine instance
// and the block device (/dev/longhorn/pvc-xxx) on the node A become orphaned. If the network of the node A gets
// back to normal, the SM can be shifted back to node A. After shifting to node A, the first reattachment fail due
// to the IO error resulting from the orphaned engine instance and block device. Then, the detachment will trigger
// the teardown of the problematic engine instance and block device. The next reattachment then will succeed.
if imState != longhorn.InstanceManagerStateRunning {
return true, fmt.Sprintf("instance manager is in %v state for the RWX volume", imState)
}

return false, ""
}
6 changes: 5 additions & 1 deletion controller/engine_image_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ func (ic *EngineImageController) canDoOfflineEngineImageUpgrade(v *longhorn.Volu
// 3. Volume is not a DR volume AND
// 4. Volume is not expanding AND
// 5. Volume is not migrating AND
// 6. The current volume's engine image is compatible with the new engine image
// 6. Volume is not strict-local AND
// 7. The current volume's engine image is compatible with the new engine image
func (ic *EngineImageController) canDoLiveEngineImageUpgrade(v *longhorn.Volume, newEngineImageResource *longhorn.EngineImage) bool {
if v.Status.State != longhorn.VolumeStateAttached {
return false
Expand All @@ -532,6 +533,9 @@ func (ic *EngineImageController) canDoLiveEngineImageUpgrade(v *longhorn.Volume,
if util.IsVolumeMigrating(v) {
return false
}
if v.Spec.DataLocality == longhorn.DataLocalityStrictLocal {
return false
}
oldEngineImageResource, err := ic.ds.GetEngineImage(types.GetEngineImageChecksumName(v.Status.CurrentImage))
if err != nil {
return false
Expand Down
4 changes: 2 additions & 2 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type InstanceManagerMonitor struct {
}

func updateInstanceManagerVersion(im *longhorn.InstanceManager) error {
cli, err := engineapi.NewInstanceManagerClient(im)
cli, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1426,7 +1426,7 @@ func (imc *InstanceManagerController) startMonitoring(im *longhorn.InstanceManag
}

// TODO: #2441 refactor this when we do the resource monitoring refactor
client, err := engineapi.NewInstanceManagerClient(im)
client, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
log.WithError(err).Errorf("Failed to initialize im client to %v before monitoring", im.Name)
return
Expand Down
12 changes: 11 additions & 1 deletion controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,17 @@ func (nc *NodeController) syncInstanceManagers(node *longhorn.Node) error {
}

log.Infof("Creating default instance manager %v, image: %v, dataEngine: %v", imName, defaultInstanceManagerImage, dataEngine)
if _, err := nc.createInstanceManager(node, imName, defaultInstanceManagerImage, imType, dataEngine); err != nil {
_, err = nc.createInstanceManager(node, imName, defaultInstanceManagerImage, imType, dataEngine)
if err != nil {
if apierrors.IsAlreadyExists(err) {
log.WithError(err).Warnf("Deleting instance manager %v because it cannot be obtained by selector labels", imName)
if err := nc.ds.DeleteInstanceManager(imName); err != nil {
return err
}

nc.enqueueNode(node)
return nil
}
return err
}
}
Expand Down
98 changes: 98 additions & 0 deletions controller/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"fmt"
"strings"

monitor "github.com/longhorn/longhorn-manager/controller/monitor"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
fake "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -1756,6 +1758,102 @@ func (s *NodeControllerSuite) TestNoEventOnUnknownTrueNodeCondition(c *C) {
s.checkEvents(c, expectation)
}

func (s *NodeControllerSuite) TestSyncInstanceManagers(c *C) {
// Skip the Lister check that occurs on creation of an Instance Manager.
datastore.SkipListerCheck = true

var err error

fixture := &NodeControllerFixture{
lhNodes: map[string]*longhorn.Node{
TestNode1: newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusUnknown, ""),
},
lhSettings: map[string]*longhorn.Setting{
string(types.SettingNameDefaultInstanceManagerImage): newDefaultInstanceManagerImageSetting(),
},
}

s.initTest(c, fixture)

defaultInstanceManager := DefaultInstanceManagerTestNode1.DeepCopy()
defaultInstanceManagerName, err := types.GetInstanceManagerName(
defaultInstanceManager.Spec.Type,
defaultInstanceManager.Spec.NodeID,
defaultInstanceManager.Spec.Image,
string(defaultInstanceManager.Spec.DataEngine))
c.Assert(err, IsNil)

defaultInstanceManager.Name = defaultInstanceManagerName

type instanceManagerCases struct {
existingInstanceManagers map[string]*longhorn.InstanceManager
isLabeled bool
isExpectError bool
}
testCases := map[string]instanceManagerCases{
"instance manager should be deleted and requeue node when instance manager labels are missing": {
existingInstanceManagers: map[string]*longhorn.InstanceManager{
defaultInstanceManagerName: defaultInstanceManager,
},
isLabeled: false,
isExpectError: false,
},
}

for testName, testCase := range testCases {
fmt.Printf("testing %v", testName)

node := fixture.lhNodes[TestNode1]

if !testCase.isLabeled {
existingInstanceManagers := testCase.existingInstanceManagers
for name := range existingInstanceManagers {
existingInstanceManagers[name].Labels = nil
}
fixture.lhInstanceManagers = existingInstanceManagers
}

for _, instanceManager := range testCase.existingInstanceManagers {
im, err := s.lhClient.LonghornV1beta2().InstanceManagers(TestNamespace).Get(context.TODO(), instanceManager.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
c.Assert(err, IsNil)
}
if im != nil {
err := s.lhClient.LonghornV1beta2().InstanceManagers(TestNamespace).Delete(context.TODO(), instanceManager.Name, metav1.DeleteOptions{})
c.Assert(err, IsNil)
}

im, err = s.lhClient.LonghornV1beta2().InstanceManagers(TestNamespace).Create(context.TODO(), instanceManager, metav1.CreateOptions{})
c.Assert(err, IsNil)
c.Assert(im, NotNil)

err = s.lhInstanceManagerIndexer.Add(im)
c.Assert(err, IsNil)
}

err = s.controller.syncInstanceManagers(node)
if len(testCase.existingInstanceManagers) > 0 {
instanceManagers, err := s.lhClient.LonghornV1beta2().InstanceManagers(TestNamespace).List(context.TODO(), metav1.ListOptions{})
c.Assert(err, IsNil)
c.Assert(len(instanceManagers.Items), Equals, 0)
} else {
c.Assert(err, IsNil)
}

// Simulate the enqueue.
err = s.controller.syncInstanceManagers(node)
if testCase.isExpectError {
c.Assert(err, NotNil)
continue
}
c.Assert(err, IsNil)

instanceManagers, err := s.lhClient.LonghornV1beta2().InstanceManagers(TestNamespace).List(context.TODO(), metav1.ListOptions{})
c.Assert(err, IsNil)
c.Assert(len(instanceManagers.Items), Equals, len(testCase.existingInstanceManagers))
}
}

// -- Helpers --

func (s *NodeControllerSuite) checkNodeConditions(c *C, expectation *NodeControllerExpectation, node *longhorn.Node) {
Expand Down
39 changes: 34 additions & 5 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (rc *ReplicaController) CreateInstance(obj interface{}) (*longhorn.Instance
return nil, err
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -547,11 +547,22 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error {
}
}

if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
if shouldSkip, skipReason := shouldSkipReplicaDeletion(im.Status.CurrentState); shouldSkip {
log.Infof("Skipping deleting replica %v since %s", r.Name, skipReason)
return nil
}

c, err := engineapi.NewInstanceManagerClient(im)
defer func() {
if err != nil {
log.WithError(err).Warnf("Failed to delete replica process %v", r.Name)
if canIgnore, ignoreReason := canIgnoreReplicaDeletionFailure(im.Status.CurrentState); canIgnore {
log.Warnf("Ignored the failure to delete replica process %v because %s", r.Name, ignoreReason)
err = nil
}
}
}()

c, err := engineapi.NewInstanceManagerClient(im, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -674,7 +685,7 @@ func (rc *ReplicaController) GetInstance(obj interface{}) (*longhorn.InstancePro
}
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -705,7 +716,7 @@ func (rc *ReplicaController) LogInstance(ctx context.Context, obj interface{}) (
return nil, nil, err
}

c, err := engineapi.NewInstanceManagerClient(im)
c, err := engineapi.NewInstanceManagerClient(im, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -914,3 +925,21 @@ func hasMatchingReplica(replica *longhorn.Replica, replicas map[string]*longhorn
}
return false
}

func shouldSkipReplicaDeletion(imState longhorn.InstanceManagerState) (canSkip bool, reason string) {
// If the instance manager is in an unknown state, we should at least attempt instance deletion.
if imState == longhorn.InstanceManagerStateRunning || imState == longhorn.InstanceManagerStateUnknown {
return false, ""
}

return true, fmt.Sprintf("instance manager is in %v state", imState)
}

func canIgnoreReplicaDeletionFailure(imState longhorn.InstanceManagerState) (canIgnore bool, reason string) {
// Instance deletion is always best effort for an unknown instance manager.
if imState == longhorn.InstanceManagerStateUnknown {
return true, fmt.Sprintf("instance manager is in %v state", imState)
}

return false, ""
}
Loading

0 comments on commit a25f67e

Please sign in to comment.