Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of CSI: allow plugin GC to detect jobs with updated plugin IDs into release/1.6.x #20612

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20555.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where plugins would not be deleted on GC if their job updated the plugin ID
```
43 changes: 32 additions & 11 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,38 +2201,59 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {

// Create a core scheduler
snap, err := store.Snapshot()
require.NoError(t, err)
must.NoError(t, err)
core := NewCoreScheduler(srv, snap)

// Attempt the GC
index++
gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
must.NoError(t, core.Process(gc))

// Should not be gone (plugin in use)
ws := memdb.NewWatchSet()
plug, err := store.CSIPluginByID(ws, "foo")
require.NotNil(t, plug)
require.NoError(t, err)
must.NotNil(t, plug)
must.NoError(t, err)

// Empty the plugin
// Empty the plugin but add a job
plug = plug.Copy()
plug.Controllers = map[string]*structs.CSIInfo{}
plug.Nodes = map[string]*structs.CSIInfo{}

job := mock.CSIPluginJob(structs.CSIPluginTypeController, plug.ID)
index++
err = store.UpsertCSIPlugin(index, plug)
require.NoError(t, err)
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
plug.ControllerJobs.Add(job, 1)

index++
must.NoError(t, store.UpsertCSIPlugin(index, plug))

// Retry
index++
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
must.NoError(t, core.Process(gc))

// Should be gone
// Should not be gone (plugin in use)
ws = memdb.NewWatchSet()
plug, err = store.CSIPluginByID(ws, "foo")
require.Nil(t, plug)
require.NoError(t, err)
must.NotNil(t, plug)
must.NoError(t, err)

// Update the job with a different pluginID
job = job.Copy()
job.TaskGroups[0].Tasks[0].CSIPluginConfig.ID = "another-plugin-id"
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))

// Retry
index++
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
must.NoError(t, core.Process(gc))

// Should now be gone
plug, err = store.CSIPluginByID(ws, "foo")
must.Nil(t, plug)
must.NoError(t, err)
}

func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,7 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
}

_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
if err != nil && !errors.Is(err, structs.ErrCSIPluginInUse) {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}
Expand Down
30 changes: 12 additions & 18 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2171,9 +2171,8 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {
},
}
respGet := &structs.CSIPluginGetResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.NotNil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.NotNil(t, respGet.Plugin)

// Delete plugin
reqDel := &structs.CSIPluginDeleteRequest{
Expand All @@ -2186,18 +2185,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {
respDel := &structs.CSIPluginDeleteResponse{}

// Improper permissions
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.EqualError(t, err, structs.ErrPermissionDenied.Error())
err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
must.EqError(t, err, structs.ErrPermissionDenied.Error())

// Retry with management permissions
reqDel.AuthToken = srv.getLeaderAcl()
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.EqualError(t, err, "plugin in use")
must.NoError(t, err) // plugin is in use but this does not return an error

// Plugin was not deleted
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.NotNil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.NotNil(t, respGet.Plugin)

// Empty the plugin
plugin := respGet.Plugin.Copy()
Expand All @@ -2206,21 +2204,17 @@ func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) {

index, _ := state.LatestIndex()
index++
err = state.UpsertCSIPlugin(index, plugin)
require.NoError(t, err)
must.NoError(t, state.UpsertCSIPlugin(index, plugin))

// Retry now that it's empty
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel))

// Plugin is deleted
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet)
require.NoError(t, err)
require.Nil(t, respGet.Plugin)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet))
must.Nil(t, respGet.Plugin)

// Safe to call on already-deleted plugnis
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel))
}

func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
Expand Down
34 changes: 33 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3152,8 +3152,40 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
if err != nil {
return err
}

jobIDs := set.New[structs.NamespacedID](1)
for _, alloc := range plug.Allocations {
jobIDs.Insert(structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID})
}

// after denormalization of allocs, remove any ControllerJobs or NodeJobs
// that no longer have allocations and have been either purged or updated to
// no longer include the plugin
removeInvalidJobs := func(jobDescs structs.JobDescriptions) {
for ns, namespacedJobDescs := range jobDescs {
for jobID := range namespacedJobDescs {
if !jobIDs.Contains(structs.NamespacedID{Namespace: ns, ID: jobID}) {
job, err := s.JobByID(nil, ns, jobID)
if err != nil { // programmer error in JobByID only
s.logger.Error("could not query JobByID", "error", err)
continue
}
if job == nil { // job was purged
jobDescs.Delete(&structs.Job{ID: jobID, Namespace: ns})
} else if !job.HasPlugin(plug.ID) {
// job was updated to a different plugin ID
jobDescs.Delete(job)
}
}
}
}
}

removeInvalidJobs(plug.ControllerJobs)
removeInvalidJobs(plug.NodeJobs)

if !plug.IsEmpty() {
return fmt.Errorf("plugin in use")
return structs.ErrCSIPluginInUse
}

err = txn.Delete("csi_plugins", plug)
Expand Down
Loading
Loading