From d2cbba15d779ece7f8e4ba02dd0203e2f626d69b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 May 2024 14:29:07 +0000 Subject: [PATCH] backport of commit c8c67da52d7b44e754fd75b86eb7117f57fe98a8 --- .changelog/20555.txt | 3 ++ nomad/core_sched_test.go | 43 +++++++++++---- nomad/csi_endpoint.go | 2 +- nomad/csi_endpoint_test.go | 30 +++++------ nomad/state/state_store.go | 34 +++++++++++- nomad/state/state_store_test.go | 93 +++++++++++++++++---------------- nomad/structs/errors.go | 1 + nomad/structs/structs.go | 14 ++++- 8 files changed, 142 insertions(+), 78 deletions(-) create mode 100644 .changelog/20555.txt diff --git a/.changelog/20555.txt b/.changelog/20555.txt new file mode 100644 index 00000000000..a0e5d269e48 --- /dev/null +++ b/.changelog/20555.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where plugins would not be deleted on GC if their job updated the plugin ID +``` diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index d056ed1ec8c..8bd47ebf386 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2221,38 +2221,59 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) { // Create a core scheduler snap, err := store.Snapshot() - require.NoError(t, err) + must.NoError(t, err) core := NewCoreScheduler(srv, snap) // Attempt the GC index++ gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index) - require.NoError(t, core.Process(gc)) + must.NoError(t, core.Process(gc)) // Should not be gone (plugin in use) ws := memdb.NewWatchSet() plug, err := store.CSIPluginByID(ws, "foo") - require.NotNil(t, plug) - require.NoError(t, err) + must.NotNil(t, plug) + must.NoError(t, err) - // Empty the plugin + // Empty the plugin but add a job plug = plug.Copy() plug.Controllers = map[string]*structs.CSIInfo{} plug.Nodes = map[string]*structs.CSIInfo{} + job := mock.CSIPluginJob(structs.CSIPluginTypeController, plug.ID) index++ - err = store.UpsertCSIPlugin(index, plug) - require.NoError(t, err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + plug.ControllerJobs.Add(job, 1) + + index++ + must.NoError(t, store.UpsertCSIPlugin(index, plug)) // Retry index++ gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index) - require.NoError(t, core.Process(gc)) + must.NoError(t, core.Process(gc)) - // Should be gone + // Should not be gone (plugin in use) + ws = memdb.NewWatchSet() plug, err = store.CSIPluginByID(ws, "foo") - require.Nil(t, plug) - require.NoError(t, err) + must.NotNil(t, plug) + must.NoError(t, err) + + // Update the job with a different pluginID + job = job.Copy() + job.TaskGroups[0].Tasks[0].CSIPluginConfig.ID = "another-plugin-id" + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + + // Retry + index++ + gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index) + must.NoError(t, core.Process(gc)) + + // Should now be gone + plug, err = store.CSIPluginByID(ws, "foo") + must.Nil(t, plug) + must.NoError(t, err) } func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index afa7aaa0cc9..0cbb9a6abd9 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -1869,7 +1869,7 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs. } _, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args) - if err != nil { + if err != nil && !errors.Is(err, structs.ErrCSIPluginInUse) { v.logger.Error("csi raft apply failed", "error", err, "method", "delete") return err } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index cfb02b02bde..97fd2c3f8e3 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -2161,9 +2161,8 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) { }, } respGet := &structs.CSIPluginGetResponse{} - err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) - require.NoError(t, err) - require.NotNil(t, respGet.Plugin) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)) + must.NotNil(t, respGet.Plugin) // Delete plugin reqDel := &structs.CSIPluginDeleteRequest{ @@ -2176,18 +2175,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) { respDel := &structs.CSIPluginDeleteResponse{} // Improper permissions - err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) - require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) + must.EqError(t, err, structs.ErrPermissionDenied.Error()) // Retry with management permissions reqDel.AuthToken = srv.getLeaderAcl() err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) - require.EqualError(t, err, "plugin in use") + must.NoError(t, err) // plugin is in use but this does not return an error // Plugin was not deleted - err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) - require.NoError(t, err) - require.NotNil(t, respGet.Plugin) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)) + must.NotNil(t, respGet.Plugin) // Empty the plugin plugin := respGet.Plugin.Copy() @@ -2196,21 +2194,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) { index, _ := state.LatestIndex() index++ - err = state.UpsertCSIPlugin(index, plugin) - require.NoError(t, err) + must.NoError(t, state.UpsertCSIPlugin(index, plugin)) // Retry now that it's empty - err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)) // Plugin is deleted - err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) - require.NoError(t, err) - require.Nil(t, respGet.Plugin) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)) + must.Nil(t, respGet.Plugin) // Safe to call on already-deleted plugnis - err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)) } func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 44356883ee2..34d59434af9 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3152,8 +3152,40 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { if err != nil { return err } + + jobIDs := set.New[structs.NamespacedID](1) + for _, alloc := range plug.Allocations { + jobIDs.Insert(structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}) + } + + // after denormalization of allocs, remove any ControllerJobs or NodeJobs + // that no longer have allocations and have been either purged or updated to + // no longer include the plugin + removeInvalidJobs := func(jobDescs structs.JobDescriptions) { + for ns, namespacedJobDescs := range jobDescs { + for jobID := range namespacedJobDescs { + if !jobIDs.Contains(structs.NamespacedID{Namespace: ns, ID: jobID}) { + job, err := s.JobByID(nil, ns, jobID) + if err != nil { // programmer error in JobByID only + s.logger.Error("could not query JobByID", "error", err) + continue + } + if job == nil { // job was purged + jobDescs.Delete(&structs.Job{ID: jobID, Namespace: ns}) + } else if !job.HasPlugin(plug.ID) { + // job was updated to a different plugin ID + jobDescs.Delete(job) + } + } + } + } + } + + removeInvalidJobs(plug.ControllerJobs) + removeInvalidJobs(plug.NodeJobs) + if !plug.IsEmpty() { - return fmt.Errorf("plugin in use") + return structs.ErrCSIPluginInUse } err = txn.Delete("csi_plugins", plug) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index c14160e22b3..04e0a0d4c1f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3983,16 +3983,17 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { } // helper function for test assertions - checkPlugin := func(counts pluginCounts) *structs.CSIPlugin { + checkPlugin := func(t *testing.T, plugID string, counts pluginCounts) *structs.CSIPlugin { + t.Helper() plug, err := store.CSIPluginByID(memdb.NewWatchSet(), plugID) - require.NotNil(t, plug, "plugin was nil") - require.NoError(t, err) - require.Equal(t, counts.controllerFingerprints, len(plug.Controllers), "controllers fingerprinted") - require.Equal(t, counts.nodeFingerprints, len(plug.Nodes), "nodes fingerprinted") - require.Equal(t, counts.controllersHealthy, plug.ControllersHealthy, "controllers healthy") - require.Equal(t, counts.nodesHealthy, plug.NodesHealthy, "nodes healthy") - require.Equal(t, counts.controllersExpected, plug.ControllersExpected, "controllers expected") - require.Equal(t, counts.nodesExpected, plug.NodesExpected, "nodes expected") + must.NotNil(t, plug, must.Sprint("plugin was nil")) + must.NoError(t, err) + must.MapLen(t, counts.controllerFingerprints, plug.Controllers, must.Sprint("controllers fingerprinted")) + must.MapLen(t, counts.nodeFingerprints, plug.Nodes, must.Sprint("nodes fingerprinted")) + must.Eq(t, counts.controllersHealthy, plug.ControllersHealthy, must.Sprint("controllers healthy")) + must.Eq(t, counts.nodesHealthy, plug.NodesHealthy, must.Sprint("nodes healthy")) + must.Eq(t, counts.controllersExpected, plug.ControllersExpected, must.Sprint("controllers expected")) + must.Eq(t, counts.nodesExpected, plug.NodesExpected, must.Sprint("nodes expected")) return plug.Copy() } @@ -4011,7 +4012,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { ws := memdb.NewWatchSet() for _, id := range allocIDs { alloc, err := store.AllocByID(ws, id) - require.NoError(t, err) + must.NoError(t, err) alloc = alloc.Copy() transform(alloc) allocs = append(allocs, alloc) @@ -4026,7 +4027,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { // transaction setup bugs err = store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIndex(store), allocs) } - require.NoError(t, err) + must.NoError(t, err) return allocs } @@ -4037,13 +4038,13 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { node = node.Copy() transform(node) err = store.UpsertNode(structs.MsgTypeTestSetup, nextIndex(store), node) - require.NoError(t, err) + must.NoError(t, err) } nodes := []*structs.Node{mock.Node(), mock.Node(), mock.Node()} for _, node := range nodes { err = store.UpsertNode(structs.MsgTypeTestSetup, nextIndex(store), node) - require.NoError(t, err) + must.NoError(t, err) } // Note: these are all subtests for clarity but are expected to be @@ -4066,7 +4067,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { // // TODO: that's the current code but we really should be able // to figure out the system jobs too - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 0, nodeFingerprints: 0, controllersHealthy: 0, @@ -4074,7 +4075,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, nodesExpected: 0, }) - require.False(t, plug.ControllerRequired) + must.False(t, plug.ControllerRequired) }) t.Run("plan apply upserts allocations", func(t *testing.T) { @@ -4109,10 +4110,10 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { allocs = append(allocs, nodeAlloc) } err = store.UpsertAllocs(structs.MsgTypeTestSetup, nextIndex(store), allocs) - require.NoError(t, err) + must.NoError(t, err) // node plugin now has expected counts too - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 0, nodeFingerprints: 0, controllersHealthy: 0, @@ -4120,7 +4121,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, nodesExpected: 3, }) - require.False(t, plug.ControllerRequired) + must.False(t, plug.ControllerRequired) }) t.Run("client upserts alloc status", func(t *testing.T) { @@ -4130,7 +4131,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { }) // plugin still has allocs but no fingerprints - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 0, nodeFingerprints: 0, controllersHealthy: 0, @@ -4138,7 +4139,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, nodesExpected: 3, }) - require.False(t, plug.ControllerRequired) + must.False(t, plug.ControllerRequired) }) t.Run("client upserts node fingerprints", func(t *testing.T) { @@ -4179,7 +4180,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { } // plugins have been fingerprinted so we have healthy counts - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 2, nodeFingerprints: 3, controllersHealthy: 2, @@ -4187,21 +4188,21 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, nodesExpected: 3, }) - require.True(t, plug.ControllerRequired) + must.True(t, plug.ControllerRequired) }) t.Run("node marked for drain", func(t *testing.T) { ws := memdb.NewWatchSet() nodeAllocs, err := store.AllocsByNode(ws, nodes[0].ID) - require.NoError(t, err) - require.Len(t, nodeAllocs, 2) + must.NoError(t, err) + must.Len(t, 2, nodeAllocs) updateAllocsFn([]string{nodeAllocs[0].ID, nodeAllocs[1].ID}, SERVER, func(alloc *structs.Allocation) { alloc.DesiredStatus = structs.AllocDesiredStatusStop }) - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 2, nodeFingerprints: 3, controllersHealthy: 2, @@ -4209,7 +4210,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, // job summary hasn't changed nodesExpected: 3, // job summary hasn't changed }) - require.True(t, plug.ControllerRequired) + must.True(t, plug.ControllerRequired) }) t.Run("client removes fingerprints after node drain", func(t *testing.T) { @@ -4218,7 +4219,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { node.CSINodePlugins = nil }) - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 1, nodeFingerprints: 2, controllersHealthy: 1, @@ -4226,20 +4227,20 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, nodesExpected: 3, }) - require.True(t, plug.ControllerRequired) + must.True(t, plug.ControllerRequired) }) t.Run("client updates alloc status to stopped after node drain", func(t *testing.T) { nodeAllocs, err := store.AllocsByNode(memdb.NewWatchSet(), nodes[0].ID) - require.NoError(t, err) - require.Len(t, nodeAllocs, 2) + must.NoError(t, err) + must.Len(t, 2, nodeAllocs) updateAllocsFn([]string{nodeAllocs[0].ID, nodeAllocs[1].ID}, CLIENT, func(alloc *structs.Allocation) { alloc.ClientStatus = structs.AllocClientStatusComplete }) - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 1, nodeFingerprints: 2, controllersHealthy: 1, @@ -4247,7 +4248,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 2, // still 2 because count=2 nodesExpected: 2, // has to use nodes we're actually placed on }) - require.True(t, plug.ControllerRequired) + must.True(t, plug.ControllerRequired) }) t.Run("job stop with purge", func(t *testing.T) { @@ -4258,15 +4259,15 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { PluginID: plugID, } err = store.UpsertCSIVolume(nextIndex(store), []*structs.CSIVolume{vol}) - require.NoError(t, err) + must.NoError(t, err) err = store.DeleteJob(nextIndex(store), structs.DefaultNamespace, controllerJobID) - require.NoError(t, err) + must.NoError(t, err) err = store.DeleteJob(nextIndex(store), structs.DefaultNamespace, nodeJobID) - require.NoError(t, err) + must.NoError(t, err) - plug := checkPlugin(pluginCounts{ + plug := checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 1, // no changes till we get fingerprint nodeFingerprints: 2, controllersHealthy: 1, @@ -4274,8 +4275,8 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 0, nodesExpected: 0, }) - require.True(t, plug.ControllerRequired) - require.False(t, plug.IsEmpty()) + must.True(t, plug.ControllerRequired) + must.False(t, plug.IsEmpty()) for _, node := range nodes { updateNodeFn(node.ID, func(node *structs.Node) { @@ -4283,7 +4284,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { }) } - plug = checkPlugin(pluginCounts{ + plug = checkPlugin(t, plugID, pluginCounts{ controllerFingerprints: 0, nodeFingerprints: 2, // haven't removed fingerprints yet controllersHealthy: 0, @@ -4291,8 +4292,8 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { controllersExpected: 0, nodesExpected: 0, }) - require.True(t, plug.ControllerRequired) - require.False(t, plug.IsEmpty()) + must.True(t, plug.ControllerRequired) + must.False(t, plug.IsEmpty()) for _, node := range nodes { updateNodeFn(node.ID, func(node *structs.Node) { @@ -4302,13 +4303,13 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { ws := memdb.NewWatchSet() plug, err := store.CSIPluginByID(ws, plugID) - require.NoError(t, err) - require.Nil(t, plug, "plugin was not deleted") + must.NoError(t, err) + must.Nil(t, plug, must.Sprint("plugin was not deleted")) vol, err = store.CSIVolumeByID(ws, vol.Namespace, vol.ID) - require.NoError(t, err) - require.NotNil(t, vol, "volume should be queryable even if plugin is deleted") - require.False(t, vol.Schedulable) + must.NoError(t, err) + must.NotNil(t, vol, must.Sprint("volume should be queryable even if plugin is deleted")) + must.False(t, vol.Schedulable) }) } diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 89680a57cea..b137bc6b145 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -81,6 +81,7 @@ var ( ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)") ErrCSIVolumeMaxClaims = errors.New("volume max claims reached") ErrCSIVolumeUnschedulable = errors.New("volume is currently unschedulable") + ErrCSIPluginInUse = errors.New("plugin in use") ) // IsErrNoLeader returns whether the error is due to there being no leader. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ed426c77d5f..04b5cbefa8c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4906,7 +4906,7 @@ func (j *Job) IsMultiregion() bool { return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 } -// IsPlugin returns whether a job is implements a plugin (currently just CSI) +// IsPlugin returns whether a job implements a plugin (currently just CSI) func (j *Job) IsPlugin() bool { for _, tg := range j.TaskGroups { for _, task := range tg.Tasks { @@ -4918,6 +4918,18 @@ func (j *Job) IsPlugin() bool { return false } +// HasPlugin returns whether a job implements a specific plugin ID +func (j *Job) HasPlugin(id string) bool { + for _, tg := range j.TaskGroups { + for _, task := range tg.Tasks { + if task.CSIPluginConfig != nil && task.CSIPluginConfig.ID == id { + return true + } + } + } + return false +} + // Vault returns the set of Vault blocks per task group, per task func (j *Job) Vault() map[string]map[string]*Vault { blocks := make(map[string]map[string]*Vault, len(j.TaskGroups))