Skip to content

Commit

Permalink
backport of commit c8c67da
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored May 16, 2024
1 parent f308c54 commit d2cbba1
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 78 deletions.
3 changes: 3 additions & 0 deletions .changelog/20555.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where plugins would not be deleted on GC if their job updated the plugin ID
```
43 changes: 32 additions & 11 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2221,38 +2221,59 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {

// Create a core scheduler
snap, err := store.Snapshot()
require.NoError(t, err)
must.NoError(t, err)
core := NewCoreScheduler(srv, snap)

// Attempt the GC
index++
gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
must.NoError(t, core.Process(gc))

// Should not be gone (plugin in use)
ws := memdb.NewWatchSet()
plug, err := store.CSIPluginByID(ws, "foo")
require.NotNil(t, plug)
require.NoError(t, err)
must.NotNil(t, plug)
must.NoError(t, err)

// Empty the plugin
// Empty the plugin but add a job
plug = plug.Copy()
plug.Controllers = map[string]*structs.CSIInfo{}
plug.Nodes = map[string]*structs.CSIInfo{}

job := mock.CSIPluginJob(structs.CSIPluginTypeController, plug.ID)
index++
err = store.UpsertCSIPlugin(index, plug)
require.NoError(t, err)
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
plug.ControllerJobs.Add(job, 1)

index++
must.NoError(t, store.UpsertCSIPlugin(index, plug))

// Retry
index++
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
must.NoError(t, core.Process(gc))

// Should be gone
// Should not be gone (plugin in use)
ws = memdb.NewWatchSet()
plug, err = store.CSIPluginByID(ws, "foo")
require.Nil(t, plug)
require.NoError(t, err)
must.NotNil(t, plug)
must.NoError(t, err)

// Update the job with a different pluginID
job = job.Copy()
job.TaskGroups[0].Tasks[0].CSIPluginConfig.ID = "another-plugin-id"
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))

// Retry
index++
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
must.NoError(t, core.Process(gc))

// Should now be gone
plug, err = store.CSIPluginByID(ws, "foo")
must.Nil(t, plug)
must.NoError(t, err)
}

func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
}

_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
if err != nil && !errors.Is(err, structs.ErrCSIPluginInUse) {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}
Expand Down
30 changes: 12 additions & 18 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,9 +2161,8 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {
},
}
respGet := &structs.CSIPluginGetResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.NotNil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.NotNil(t, respGet.Plugin)

// Delete plugin
reqDel := &structs.CSIPluginDeleteRequest{
Expand All @@ -2176,18 +2175,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {
respDel := &structs.CSIPluginDeleteResponse{}

// Improper permissions
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.EqualError(t, err, structs.ErrPermissionDenied.Error())
err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
must.EqError(t, err, structs.ErrPermissionDenied.Error())

// Retry with management permissions
reqDel.AuthToken = srv.getLeaderAcl()
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.EqualError(t, err, "plugin in use")
must.NoError(t, err) // plugin is in use but this does not return an error

// Plugin was not deleted
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.NotNil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.NotNil(t, respGet.Plugin)

// Empty the plugin
plugin := respGet.Plugin.Copy()
Expand All @@ -2196,21 +2194,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {

index, _ := state.LatestIndex()
index++
err = state.UpsertCSIPlugin(index, plugin)
require.NoError(t, err)
must.NoError(t, state.UpsertCSIPlugin(index, plugin))

// Retry now that it's empty
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel))

// Plugin is deleted
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.Nil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.Nil(t, respGet.Plugin)

// Safe to call on already-deleted plugnis
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel))
}

func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
Expand Down
34 changes: 33 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3152,8 +3152,40 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
if err != nil {
return err
}

jobIDs := set.New[structs.NamespacedID](1)
for _, alloc := range plug.Allocations {
jobIDs.Insert(structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID})
}

// after denormalization of allocs, remove any ControllerJobs or NodeJobs
// that no longer have allocations and have been either purged or updated to
// no longer include the plugin
removeInvalidJobs := func(jobDescs structs.JobDescriptions) {
for ns, namespacedJobDescs := range jobDescs {
for jobID := range namespacedJobDescs {
if !jobIDs.Contains(structs.NamespacedID{Namespace: ns, ID: jobID}) {
job, err := s.JobByID(nil, ns, jobID)
if err != nil { // programmer error in JobByID only
s.logger.Error("could not query JobByID", "error", err)
continue
}
if job == nil { // job was purged
jobDescs.Delete(&structs.Job{ID: jobID, Namespace: ns})
} else if !job.HasPlugin(plug.ID) {
// job was updated to a different plugin ID
jobDescs.Delete(job)
}
}
}
}
}

removeInvalidJobs(plug.ControllerJobs)
removeInvalidJobs(plug.NodeJobs)

if !plug.IsEmpty() {
return fmt.Errorf("plugin in use")
return structs.ErrCSIPluginInUse
}

err = txn.Delete("csi_plugins", plug)
Expand Down
Loading

0 comments on commit d2cbba1

Please sign in to comment.