Skip to content

Commit

Permalink
Backport of CSI: track node claim before staging to prevent interleav…
Browse files Browse the repository at this point in the history
…ed unstage into release/1.7.x (#20610)

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and tgross authored May 16, 2024
1 parent a68f4b1 commit f308c54
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/20550.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where concurrent mount and unmount operations could unstage volumes needed by another allocation
```
9 changes: 5 additions & 4 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
ctx = hclog.WithContext(ctx, logger)

// Claim before we stage/publish to prevent interleaved Unmount for another
// alloc from unstaging between stage/publish steps below
v.usageTracker.Claim(alloc.ID, vol.ID, vol.Namespace, usage)

if v.requiresStaging {
err = v.stageVolume(ctx, vol, usage, publishContext)
}
Expand All @@ -261,10 +265,6 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
mountInfo, err = v.publishVolume(ctx, vol, alloc, usage, publishContext)
}

if err == nil {
v.usageTracker.Claim(alloc.ID, vol.ID, vol.Namespace, usage)
}

event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Mount volume").
Expand All @@ -274,6 +274,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
} else {
event.AddDetail("success", "false")
event.AddDetail("error", err.Error())
v.usageTracker.Free(alloc.ID, vol.ID, vol.Namespace, usage)
}

v.eventer(event)
Expand Down
71 changes: 71 additions & 0 deletions client/pluginmanager/csimanager/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/mount"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
csifake "github.com/hashicorp/nomad/plugins/csi/fake"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -526,3 +529,71 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
require.Equal(t, "vol", e.Details["volume_id"])
require.Equal(t, "true", e.Details["success"])
}

// TestVolumeManager_InterleavedStaging tests that a volume cannot be unstaged
// if another alloc has staged but not yet published
func TestVolumeManager_InterleavedStaging(t *testing.T) {
ci.Parallel(t)

tmpPath := t.TempDir()
csiFake := &csifake.Client{}

logger := testlog.HCLogger(t)
ctx := hclog.WithContext(context.Background(), logger)

manager := newVolumeManager(logger,
func(e *structs.NodeEvent) {}, csiFake,
tmpPath, tmpPath, true, "i-example")

alloc0, alloc1 := mock.Alloc(), mock.Alloc()
vol := &structs.CSIVolume{ID: "vol", Namespace: "ns"}
usage := &UsageOptions{
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}
pubCtx := map[string]string{}

// first alloc has previously claimed the volume
manager.usageTracker.Claim(alloc0.ID, vol.ID, vol.Namespace, usage)

alloc0WaitCh := make(chan struct{})
alloc1WaitCh := make(chan struct{})

// this goroutine simulates MountVolume, but with control over interleaving
// by waiting for the other alloc to check if should unstage before trying
// to publish
manager.usageTracker.Claim(alloc1.ID, vol.ID, vol.Namespace, usage)
must.NoError(t, manager.stageVolume(ctx, vol, usage, pubCtx))

go func() {
defer close(alloc1WaitCh)
<-alloc0WaitCh
_, err := manager.publishVolume(ctx, vol, alloc1, usage, pubCtx)
must.NoError(t, err)
}()

must.NoError(t, manager.UnmountVolume(ctx, vol.Namespace, vol.ID, "foo", alloc0.ID, usage))
close(alloc0WaitCh)

testTimeoutCtx, cancel := context.WithTimeout(context.TODO(), time.Second)
t.Cleanup(cancel)

select {
case <-alloc1WaitCh:
case <-testTimeoutCtx.Done():
t.Fatal("test timed out")
}

key := volumeUsageKey{
id: vol.ID,
ns: vol.Namespace,
usageOpts: *usage,
}

manager.usageTracker.stateMu.Lock()
t.Cleanup(manager.usageTracker.stateMu.Unlock)
must.Eq(t, []string{alloc1.ID}, manager.usageTracker.state[key])

must.Eq(t, 1, csiFake.NodeUnpublishVolumeCallCount, must.Sprint("expected 1 unpublish call"))
must.Eq(t, 0, csiFake.NodeUnstageVolumeCallCount, must.Sprint("expected no unstage call"))
}

0 comments on commit f308c54

Please sign in to comment.