From dfee311b1255f3f0241a088b4e52fcc948000523 Mon Sep 17 00:00:00 2001 From: Ivan Sim Date: Fri, 31 May 2024 13:54:09 -0700 Subject: [PATCH] Remove unnecessary internal gpu allocation states Signed-off-by: Ivan Sim --- cmd/plugin/root.go | 15 +-- go.mod | 2 +- pkg/apis/gpu/v1alpha1/api.go | 1 - pkg/apis/gpu/v1alpha1/types.go | 9 +- pkg/drivers/gpu/driver.go | 121 +++++++++++++++---- pkg/drivers/gpu/kubelet/cdi/cdi.go | 27 +++++ pkg/drivers/gpu/kubelet/nodeserver.go | 58 +++------- pkg/drivers/gpu/plugin.go | 160 -------------------------- 8 files changed, 153 insertions(+), 240 deletions(-) delete mode 100644 pkg/drivers/gpu/plugin.go diff --git a/cmd/plugin/root.go b/cmd/plugin/root.go index ed936c7..4d75928 100644 --- a/cmd/plugin/root.go +++ b/cmd/plugin/root.go @@ -9,7 +9,6 @@ import ( "github.com/ihcsim/k8s-dra/cmd/flags" draclientset "github.com/ihcsim/k8s-dra/pkg/apis/clientset/versioned" gpukubeletplugin "github.com/ihcsim/k8s-dra/pkg/drivers/gpu/kubelet" - "golang.org/x/exp/rand" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -59,12 +58,10 @@ func executeContext(ctx context.Context) error { func run(ctx context.Context) error { var ( - kubeconfig = viper.GetString("kubeconfig") - cdiRoot = viper.GetString("cdi-root") - namespace = viper.GetString("namespace") - nodeName = viper.GetString("node-name") - maxAvailableGPU = viper.GetInt("max-available-gpu") - randAvailableGPU = rand.Intn(maxAvailableGPU) + kubeconfig = viper.GetString("kubeconfig") + cdiRoot = viper.GetString("cdi-root") + namespace = viper.GetString("namespace") + nodeName = viper.GetString("node-name") ) if err := os.MkdirAll(pluginPath, 0750); err != nil { @@ -80,8 +77,8 @@ func run(ctx context.Context) error { return err } - log.Info().Msgf("Starting DRA node server with %d available GPUs", randAvailableGPU) - nodeServer, err := gpukubeletplugin.NewNodeServer(ctx, draClientSets, cdiRoot, namespace, nodeName, randAvailableGPU, log.Logger) + log.Info().Msgf("starting DRA node server...") + nodeServer, err := gpukubeletplugin.NewNodeServer(ctx, draClientSets, cdiRoot, namespace, nodeName, log.Logger) if err != nil { return err } diff --git a/go.mod b/go.mod index 724fd0c..b2a05f2 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.18.2 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/api v0.30.1 k8s.io/apimachinery v0.30.1 k8s.io/client-go v0.30.1 @@ -69,6 +68,7 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/mod v0.15.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect diff --git a/pkg/apis/gpu/v1alpha1/api.go b/pkg/apis/gpu/v1alpha1/api.go index 3770d15..0bdd298 100644 --- a/pkg/apis/gpu/v1alpha1/api.go +++ b/pkg/apis/gpu/v1alpha1/api.go @@ -22,7 +22,6 @@ import ( ) const ( - DeviceTypeGPU = "gpu" GPUClaimParametersKind = "GPUClaimParameters" Version = "v1alpha1" ) diff --git a/pkg/apis/gpu/v1alpha1/types.go b/pkg/apis/gpu/v1alpha1/types.go index b4418cf..37319ed 100644 --- a/pkg/apis/gpu/v1alpha1/types.go +++ b/pkg/apis/gpu/v1alpha1/types.go @@ -32,8 +32,8 @@ type NodeDevicesStatus struct { type NodeDevicesAllocationState int const ( - Ready NodeDevicesAllocationState = iota - NotReady + NodeDevicesAllocationStateReady NodeDevicesAllocationState = iota + NodeDevicesAllocationStateNotReady ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -50,6 +50,7 @@ type NodeDevicesList struct { type GPUDevice struct { UUID string `json:"uuid"` ProductName string `json:"productName"` + Vendor string `json:"vendor"` } // +genclient @@ -72,8 +73,8 @@ type GPUClassParametersSpec struct { // DeviceSelector allows one to match on a specific type of Device as part of the class. type DeviceSelector struct { - Type string `json:"type"` - Name string `json:"name"` + Name string `json:"name"` + Vendor string `json:"vendor"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/drivers/gpu/driver.go b/pkg/drivers/gpu/driver.go index 0ccf661..d5726b3 100644 --- a/pkg/drivers/gpu/driver.go +++ b/pkg/drivers/gpu/driver.go @@ -27,7 +27,6 @@ var _ dractrl.Driver = &driver{} // allocation and deallocation operations of GPU resources. type driver struct { clientsets draclientset.Interface - gpu *gpuPlugin namespace string log zlog.Logger } @@ -36,7 +35,6 @@ type driver struct { func NewDriver(clientsets draclientset.Interface, namespace string, log zlog.Logger) (*driver, error) { return &driver{ clientsets: clientsets, - gpu: newGPUPlugin(log), namespace: namespace, log: log, }, nil @@ -56,8 +54,8 @@ func (d *driver) GetClassParameters(ctx context.Context, class *resourcev1alpha2 return &gpuv1alpha1.GPUClassParametersSpec{ DeviceSelector: []gpuv1alpha1.DeviceSelector{ { - Type: gpuv1alpha1.DeviceTypeGPU, - Name: "*", + Vendor: "*", + Name: "*", }, }, }, nil @@ -130,7 +128,7 @@ func (d *driver) Allocate(ctx context.Context, claimAllocations []*dractrl.Claim continue } - if err := d.allocateGPU(ctx, ca, selectedNode); err != nil { + if err := d.allocate(ctx, ca, selectedNode); err != nil { ca.Error = err continue } @@ -138,7 +136,7 @@ func (d *driver) Allocate(ctx context.Context, claimAllocations []*dractrl.Claim } } -func (d *driver) allocateGPU( +func (d *driver) allocate( ctx context.Context, claimAllocation *dractrl.ClaimAllocation, selectedNode string) error { @@ -167,22 +165,12 @@ func (d *driver) allocateGPU( return nil } - classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParametersSpec) - if !ok { - return fmt.Errorf("unsupported class parameters kind: %T", claimAllocation.ClassParameters) - } - - claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec) - if !ok { - return fmt.Errorf("unsupported claim parameters kind: %T", claimAllocation.ClaimParameters) - } - log.Info().Msg("allocating GPUs...") - allocatedGPUs, commitAllocation, err := d.gpu.allocate(claimUID, selectedNode, claimParams, classParams) + allocatableGPUs, err := d.allocatableGPUs(nodeDevices, claimAllocation, selectedNode) if err != nil { return err } - nodeDevices.Status.AllocatedGPUs[claimUID] = allocatedGPUs + nodeDevices.Status.AllocatedGPUs[claimUID] = allocatableGPUs updateOpts := metav1.UpdateOptions{} if _, err := d.clientsets.GpuV1alpha1().NodeDevices(d.namespace).Update(ctx, nodeDevices, updateOpts); err != nil { @@ -190,7 +178,7 @@ func (d *driver) allocateGPU( } log.Info().Msg("allocation completed") - return commitAllocation() + return nil } // Deallocate gets called when a ResourceClaim is ready to be freed. @@ -227,9 +215,6 @@ func (d *driver) Deallocate(ctx context.Context, claim *resourcev1alpha2.Resourc } log.Info().Msg("deallocating claimed GPUs...") - if err := d.gpu.deallocate(claimUID, selectedNode); err != nil { - return err - } delete(nodeDevices.Status.AllocatedGPUs, claimUID) updateOpts := metav1.UpdateOptions{} @@ -267,11 +252,18 @@ func (d *driver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims [] return nil } + if nodeDevices.Status.State != gpuv1alpha1.NodeDevicesAllocationStateReady { + for _, claim := range claims { + claim.UnsuitableNodes = append(claim.UnsuitableNodes, potentialNode) + } + return nil + } + if nodeDevices.Status.AllocatedGPUs == nil { nodeDevices.Status.AllocatedGPUs = map[string][]*gpuv1alpha1.GPUDevice{} } - if err := d.gpu.unsuitableNode(nodeDevices, pod, claims, potentialNode); err != nil { + if err := d.unsuitableNode(nodeDevices, pod, claims, potentialNode); err != nil { errs = errors.Join(errs, err) } } @@ -308,3 +300,86 @@ func buildAllocationResult(selectedNode string, shareable bool) *resourcev1alpha Shareable: shareable, } } + +func (d *driver) allocatableGPUs( + nodeDevices *gpuv1alpha1.NodeDevices, + claimAllocation *dractrl.ClaimAllocation, + selectedNode string) ([]*gpuv1alpha1.GPUDevice, error) { + claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec) + if !ok { + return nil, fmt.Errorf("unsupported claim parameters kind: %T", claimAllocation.ClaimParameters) + } + + availableGPUs := d.availableGPUs(nodeDevices) + if len(availableGPUs) < claimParams.Count { + return nil, fmt.Errorf("insufficient GPUs on node %s for claim %s", selectedNode, claimAllocation.Claim.GetUID()) + } + + classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParametersSpec) + if !ok { + return nil, fmt.Errorf("unsupported class parameters kind: %T", claimAllocation.ClassParameters) + } + + allocatableGPUs := []*gpuv1alpha1.GPUDevice{} + for _, availableGPU := range availableGPUs { + for _, selector := range classParams.DeviceSelector { + if selector.Name == availableGPU.ProductName && selector.Vendor == availableGPU.Vendor { + allocatableGPUs = append(allocatableGPUs, availableGPU) + } + } + } + + return allocatableGPUs, nil +} + +func (d *driver) unsuitableNode( + nodeDevices *gpuv1alpha1.NodeDevices, + pod *corev1.Pod, + claims []*dractrl.ClaimAllocation, + potentialNode string) error { + for _, claim := range claims { + claimParams, ok := claim.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec) + if !ok { + d.log.Info().Msgf("skipping unsupported claim parameters kind: %T", claim.ClaimParameters) + continue + } + + // if the number of allocated GPUs is less than the requested count, mark the node as + // unsuitable + claimUID := string(claim.Claim.GetUID()) + allocatedCount := d.allocatedCount(nodeDevices, claimUID) + if claimParams.Count != allocatedCount { + d.log.Info().Msgf("insufficient GPUs on node %s for claim %s, marking node as unsuitable", potentialNode, claimUID) + claim.UnsuitableNodes = append(claim.UnsuitableNodes, potentialNode) + } + } + + return nil +} + +func (d *driver) allocatedCount(nodeDevices *gpuv1alpha1.NodeDevices, claimUID string) int { + // if existing allocated GPUs are founf for this claim, return the count + if gpus, exists := nodeDevices.Status.AllocatedGPUs[claimUID]; exists { + return len(gpus) + } + + return len(d.availableGPUs(nodeDevices)) +} + +func (d *driver) availableGPUs( + nodeDevices *gpuv1alpha1.NodeDevices) map[string]*gpuv1alpha1.GPUDevice { + available := map[string]*gpuv1alpha1.GPUDevice{} + for _, gpu := range nodeDevices.Spec.AllocatableGPUs { + available[gpu.UUID] = gpu + } + + // find the GPUs that are already allocated, regardless of their claims + // and remove them from the available list + for _, gpus := range nodeDevices.Status.AllocatedGPUs { + for _, gpu := range gpus { + delete(available, gpu.UUID) + } + } + + return available +} diff --git a/pkg/drivers/gpu/kubelet/cdi/cdi.go b/pkg/drivers/gpu/kubelet/cdi/cdi.go index 5a2650c..4e6c530 100644 --- a/pkg/drivers/gpu/kubelet/cdi/cdi.go +++ b/pkg/drivers/gpu/kubelet/cdi/cdi.go @@ -22,12 +22,39 @@ var ( once sync.Once ) +// GPUDevice is used to encapsulate the CDI information of a GPU device. +type GPUDevice struct { + UUID string + ProductName string + VendorName string +} + func InitRegistryOnce(cdiRoot string) { once.Do(func() { registry = cdiapi.GetRegistry(cdiapi.WithSpecDirs(cdiRoot)) }) } +func DiscoverFromSpecs() ([]*GPUDevice, error) { + specs, err := Specs() + if err != nil { + return nil, err + } + + var gpuDevices []*GPUDevice + for _, spec := range specs { + for _, device := range spec.Devices { + gpuDevices = append(gpuDevices, &GPUDevice{ + UUID: device.Name, + ProductName: device.ContainerEdits.Env[1], + VendorName: device.ContainerEdits.Env[2], + }) + } + } + + return gpuDevices, nil +} + func DeviceQualifiedName(gpu *gpuv1alpha1.GPUDevice) string { return cdiapi.QualifiedName(cdiVendor, cdiClass, gpu.UUID) } diff --git a/pkg/drivers/gpu/kubelet/nodeserver.go b/pkg/drivers/gpu/kubelet/nodeserver.go index 97c5a0c..1112c36 100644 --- a/pkg/drivers/gpu/kubelet/nodeserver.go +++ b/pkg/drivers/gpu/kubelet/nodeserver.go @@ -20,19 +20,13 @@ const AvailableGPUsCount = 4 var _ kubeletdrav1.NodeServer = &NodeServer{} -type gpuDevice struct { - *gpuv1alpha1.GPUDevice - cdiAnnotations map[string]string -} - // NodeServer provides the API implementation of the node server. // see https://pkg.go.dev/k8s.io/kubelet/pkg/apis/dra/v1alpha3#NodeServer type NodeServer struct { - clientSets draclientset.Interface - log zlog.Logger - namespace string - nodeName string - availableGPUs int + clientSets draclientset.Interface + log zlog.Logger + namespace string + nodeName string } // NewNodeServer returns a new instance of the NodeServer. It also initializes @@ -44,16 +38,20 @@ func NewNodeServer( cdiRoot string, namespace string, nodeName string, - availableGPUs int, log zlog.Logger) (*NodeServer, error) { - gpus, err := discoverGPUs(availableGPUs) + cdi.InitRegistryOnce(cdiRoot) + gpus, err := cdi.DiscoverFromSpecs() if err != nil { return nil, err } var gpuDevices []*gpuv1alpha1.GPUDevice for _, gpu := range gpus { - gpuDevices = append(gpuDevices, gpu.GPUDevice) + gpuDevices = append(gpuDevices, &gpuv1alpha1.GPUDevice{ + UUID: gpu.UUID, + ProductName: gpu.ProductName, + Vendor: gpu.VendorName, + }) } if _, err := clientSets.GpuV1alpha1().NodeDevices(namespace).Get(ctx, nodeName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { @@ -66,7 +64,7 @@ func NewNodeServer( AllocatableGPUs: gpuDevices, }, Status: gpuv1alpha1.NodeDevicesStatus{ - State: gpuv1alpha1.Ready, + State: gpuv1alpha1.NodeDevicesAllocationStateReady, }, } if _, err := clientSets.GpuV1alpha1().NodeDevices(namespace).Create(ctx, nodeDevices, metav1.CreateOptions{}); err != nil { @@ -74,15 +72,12 @@ func NewNodeServer( } } - cdi.InitRegistryOnce(cdiRoot) - logger := log.With().Str("namespace", namespace).Logger() return &NodeServer{ - clientSets: clientSets, - log: logger, - namespace: namespace, - nodeName: nodeName, - availableGPUs: availableGPUs, + clientSets: clientSets, + log: logger, + namespace: namespace, + nodeName: nodeName, }, nil } @@ -221,24 +216,3 @@ func (n *NodeServer) NodeListAndWatchResources(req *kubeletdrav1.NodeListAndWatc } return s.Send(res) } - -func discoverGPUs(maxAvailableGPU int) ([]*gpuDevice, error) { - specs, err := cdi.Specs() - if err != nil { - return nil, err - } - - var gpuDevices []*gpuDevice - for _, spec := range specs { - for _, device := range spec.Devices { - gpuDevices = append(gpuDevices, &gpuDevice{ - GPUDevice: &gpuv1alpha1.GPUDevice{ - UUID: device.Name, - ProductName: device.ContainerEdits.Env[1], - }, - }) - } - } - - return gpuDevices, nil -} diff --git a/pkg/drivers/gpu/plugin.go b/pkg/drivers/gpu/plugin.go deleted file mode 100644 index 596d6f7..0000000 --- a/pkg/drivers/gpu/plugin.go +++ /dev/null @@ -1,160 +0,0 @@ -package gpu - -import ( - "fmt" - "sync" - - gpuv1alpha1 "github.com/ihcsim/k8s-dra/pkg/apis/gpu/v1alpha1" - zlog "github.com/rs/zerolog" - corev1 "k8s.io/api/core/v1" - dractrl "k8s.io/dynamic-resource-allocation/controller" -) - -type gpuPlugin struct { - // pendingAllocatedGPUs is a map of resource claim UID to a map of node name to allocated - // GPUs. - pendingAllocatedGPUs map[nodeClaim][]*gpuv1alpha1.GPUDevice - - // mux is used to synchronized concurrent read-write accesses to allocations. - mux sync.RWMutex - - log zlog.Logger -} - -type nodeClaim struct { - claimUID string - nodeName string -} - -func (n *nodeClaim) String() string { - return fmt.Sprintf("%s/%s", n.nodeName, n.claimUID) -} - -func newGPUPlugin(log zlog.Logger) *gpuPlugin { - return &gpuPlugin{ - pendingAllocatedGPUs: map[nodeClaim][]*gpuv1alpha1.GPUDevice{}, - mux: sync.RWMutex{}, - log: log, - } -} - -func (p *gpuPlugin) allocate( - claimUID, nodeName string, - claimParams *gpuv1alpha1.GPUClaimParametersSpec, - classParams *gpuv1alpha1.GPUClassParametersSpec) ([]*gpuv1alpha1.GPUDevice, func() error, error) { - p.mux.RLock() - defer p.mux.RUnlock() - - nodeClaim := nodeClaim{ - claimUID: claimUID, - nodeName: nodeName, - } - allocatedGPUs, exists := p.pendingAllocatedGPUs[nodeClaim] - if !exists { - return nil, nil, fmt.Errorf("no allocations generated for node claim %s", nodeClaim) - } - - // once the allocation is committed on K8s side, remove the devices from the - // pending list - commitAllocation := func() error { - return p.deallocate(claimUID, nodeName) - } - - return allocatedGPUs, commitAllocation, nil -} - -func (p *gpuPlugin) deallocate(claimUID, nodeName string) error { - p.mux.Lock() - defer p.mux.Unlock() - - nodeClaim := nodeClaim{ - claimUID: claimUID, - nodeName: nodeName, - } - delete(p.pendingAllocatedGPUs, nodeClaim) - return nil -} - -func (p *gpuPlugin) unsuitableNode( - nodeDevices *gpuv1alpha1.NodeDevices, - pod *corev1.Pod, - claims []*dractrl.ClaimAllocation, - potentialNode string) error { - p.mux.Lock() - defer p.mux.Unlock() - - gpuClaims := []*dractrl.ClaimAllocation{} - for _, claim := range claims { - if _, ok := claim.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec); !ok { - p.log.Info().Msgf("skipping unsupported claim parameters kind: %T", claim.ClaimParameters) - continue - } - gpuClaims = append(gpuClaims, claim) - } - - allocatableGPUs := p.allocatableGPUs(nodeDevices) - allocatedGPUs := map[string][]string{} - for _, gpuClaim := range gpuClaims { - nodeClaim := nodeClaim{ - claimUID: string(gpuClaim.Claim.GetUID()), - nodeName: potentialNode, - } - - // if the nodeDevices has already allocated GPUs for the claim, add the - // allocated GPUs to the result map - if gpus, exists := p.pendingAllocatedGPUs[nodeClaim]; exists { - nodeDevices.Status.AllocatedGPUs[nodeClaim.claimUID] = gpus - for _, gpu := range gpus { - allocatedGPUs[nodeClaim.claimUID] = append(allocatedGPUs[nodeClaim.claimUID], gpu.UUID) - } - continue - } - - // otherwise, allocate up to claimParams.Count GPUs from the available pool - claimParams, ok := gpuClaim.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec) - if !ok { - p.log.Info().Msgf("skipping unsupported claim parameters kind: %T", gpuClaim.ClaimParameters) - continue - } - for _, gpu := range allocatableGPUs { - allocatedGPUs[nodeClaim.claimUID] = append(allocatedGPUs[nodeClaim.claimUID], gpu.UUID) - if len(allocatedGPUs[nodeClaim.claimUID]) >= claimParams.Count { - break - } - } - - // if the number of allocated GPUs is less than the requested count, mark the node as - // unsuitable - if claimParams.Count != len(allocatedGPUs[nodeClaim.claimUID]) { - p.log.Info().Msgf("insufficient GPUs on node %s for claim %s, marking node as unsuitable", potentialNode, nodeClaim.claimUID) - gpuClaim.UnsuitableNodes = append(gpuClaim.UnsuitableNodes, potentialNode) - continue - } - - // otherwise, potentialNode is a suitable node - if _, exists := p.pendingAllocatedGPUs[nodeClaim]; !exists { - p.pendingAllocatedGPUs[nodeClaim] = []*gpuv1alpha1.GPUDevice{} - } - for _, gpu := range allocatedGPUs[nodeClaim.claimUID] { - allocatedGPU := &gpuv1alpha1.GPUDevice{UUID: gpu} - p.pendingAllocatedGPUs[nodeClaim] = append(p.pendingAllocatedGPUs[nodeClaim], allocatedGPU) - } - } - - return nil -} - -func (p *gpuPlugin) allocatableGPUs(nodeDevices *gpuv1alpha1.NodeDevices) map[string]*gpuv1alpha1.GPUDevice { - available := map[string]*gpuv1alpha1.GPUDevice{} - for _, gpu := range nodeDevices.Spec.AllocatableGPUs { - available[gpu.UUID] = gpu - } - - for _, gpus := range nodeDevices.Status.AllocatedGPUs { - for _, gpu := range gpus { - delete(available, gpu.UUID) - } - } - - return available -}