Skip to content

Commit

Permalink
Remove unnecessary internal gpu allocation states
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Sim <[email protected]>
  • Loading branch information
ihcsim committed May 31, 2024
1 parent af72839 commit dfee311
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 240 deletions.
15 changes: 6 additions & 9 deletions cmd/plugin/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ihcsim/k8s-dra/cmd/flags"
draclientset "github.com/ihcsim/k8s-dra/pkg/apis/clientset/versioned"
gpukubeletplugin "github.com/ihcsim/k8s-dra/pkg/drivers/gpu/kubelet"
"golang.org/x/exp/rand"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -59,12 +58,10 @@ func executeContext(ctx context.Context) error {

func run(ctx context.Context) error {
var (
kubeconfig = viper.GetString("kubeconfig")
cdiRoot = viper.GetString("cdi-root")
namespace = viper.GetString("namespace")
nodeName = viper.GetString("node-name")
maxAvailableGPU = viper.GetInt("max-available-gpu")
randAvailableGPU = rand.Intn(maxAvailableGPU)
kubeconfig = viper.GetString("kubeconfig")
cdiRoot = viper.GetString("cdi-root")
namespace = viper.GetString("namespace")
nodeName = viper.GetString("node-name")
)

if err := os.MkdirAll(pluginPath, 0750); err != nil {
Expand All @@ -80,8 +77,8 @@ func run(ctx context.Context) error {
return err
}

log.Info().Msgf("Starting DRA node server with %d available GPUs", randAvailableGPU)
nodeServer, err := gpukubeletplugin.NewNodeServer(ctx, draClientSets, cdiRoot, namespace, nodeName, randAvailableGPU, log.Logger)
log.Info().Msgf("starting DRA node server...")
nodeServer, err := gpukubeletplugin.NewNodeServer(ctx, draClientSets, cdiRoot, namespace, nodeName, log.Logger)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/api v0.30.1
k8s.io/apimachinery v0.30.1
k8s.io/client-go v0.30.1
Expand Down Expand Up @@ -69,6 +68,7 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/gpu/v1alpha1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

const (
DeviceTypeGPU = "gpu"
GPUClaimParametersKind = "GPUClaimParameters"
Version = "v1alpha1"
)
Expand Down
9 changes: 5 additions & 4 deletions pkg/apis/gpu/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type NodeDevicesStatus struct {
type NodeDevicesAllocationState int

const (
Ready NodeDevicesAllocationState = iota
NotReady
NodeDevicesAllocationStateReady NodeDevicesAllocationState = iota
NodeDevicesAllocationStateNotReady
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand All @@ -50,6 +50,7 @@ type NodeDevicesList struct {
type GPUDevice struct {
UUID string `json:"uuid"`
ProductName string `json:"productName"`
Vendor string `json:"vendor"`
}

// +genclient
Expand All @@ -72,8 +73,8 @@ type GPUClassParametersSpec struct {

// DeviceSelector allows one to match on a specific type of Device as part of the class.
type DeviceSelector struct {
Type string `json:"type"`
Name string `json:"name"`
Name string `json:"name"`
Vendor string `json:"vendor"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
121 changes: 98 additions & 23 deletions pkg/drivers/gpu/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var _ dractrl.Driver = &driver{}
// allocation and deallocation operations of GPU resources.
type driver struct {
clientsets draclientset.Interface
gpu *gpuPlugin
namespace string
log zlog.Logger
}
Expand All @@ -36,7 +35,6 @@ type driver struct {
func NewDriver(clientsets draclientset.Interface, namespace string, log zlog.Logger) (*driver, error) {
return &driver{
clientsets: clientsets,
gpu: newGPUPlugin(log),
namespace: namespace,
log: log,
}, nil
Expand All @@ -56,8 +54,8 @@ func (d *driver) GetClassParameters(ctx context.Context, class *resourcev1alpha2
return &gpuv1alpha1.GPUClassParametersSpec{
DeviceSelector: []gpuv1alpha1.DeviceSelector{
{
Type: gpuv1alpha1.DeviceTypeGPU,
Name: "*",
Vendor: "*",
Name: "*",
},
},
}, nil
Expand Down Expand Up @@ -130,15 +128,15 @@ func (d *driver) Allocate(ctx context.Context, claimAllocations []*dractrl.Claim
continue
}

if err := d.allocateGPU(ctx, ca, selectedNode); err != nil {
if err := d.allocate(ctx, ca, selectedNode); err != nil {
ca.Error = err
continue
}
ca.Allocation = buildAllocationResult(selectedNode, true)
}
}

func (d *driver) allocateGPU(
func (d *driver) allocate(
ctx context.Context,
claimAllocation *dractrl.ClaimAllocation,
selectedNode string) error {
Expand Down Expand Up @@ -167,30 +165,20 @@ func (d *driver) allocateGPU(
return nil
}

classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParametersSpec)
if !ok {
return fmt.Errorf("unsupported class parameters kind: %T", claimAllocation.ClassParameters)
}

claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec)
if !ok {
return fmt.Errorf("unsupported claim parameters kind: %T", claimAllocation.ClaimParameters)
}

log.Info().Msg("allocating GPUs...")
allocatedGPUs, commitAllocation, err := d.gpu.allocate(claimUID, selectedNode, claimParams, classParams)
allocatableGPUs, err := d.allocatableGPUs(nodeDevices, claimAllocation, selectedNode)
if err != nil {
return err
}
nodeDevices.Status.AllocatedGPUs[claimUID] = allocatedGPUs
nodeDevices.Status.AllocatedGPUs[claimUID] = allocatableGPUs

updateOpts := metav1.UpdateOptions{}
if _, err := d.clientsets.GpuV1alpha1().NodeDevices(d.namespace).Update(ctx, nodeDevices, updateOpts); err != nil {
return err
}

log.Info().Msg("allocation completed")
return commitAllocation()
return nil
}

// Deallocate gets called when a ResourceClaim is ready to be freed.
Expand Down Expand Up @@ -227,9 +215,6 @@ func (d *driver) Deallocate(ctx context.Context, claim *resourcev1alpha2.Resourc
}

log.Info().Msg("deallocating claimed GPUs...")
if err := d.gpu.deallocate(claimUID, selectedNode); err != nil {
return err
}
delete(nodeDevices.Status.AllocatedGPUs, claimUID)

updateOpts := metav1.UpdateOptions{}
Expand Down Expand Up @@ -267,11 +252,18 @@ func (d *driver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims []
return nil
}

if nodeDevices.Status.State != gpuv1alpha1.NodeDevicesAllocationStateReady {
for _, claim := range claims {
claim.UnsuitableNodes = append(claim.UnsuitableNodes, potentialNode)
}
return nil
}

if nodeDevices.Status.AllocatedGPUs == nil {
nodeDevices.Status.AllocatedGPUs = map[string][]*gpuv1alpha1.GPUDevice{}
}

if err := d.gpu.unsuitableNode(nodeDevices, pod, claims, potentialNode); err != nil {
if err := d.unsuitableNode(nodeDevices, pod, claims, potentialNode); err != nil {
errs = errors.Join(errs, err)
}
}
Expand Down Expand Up @@ -308,3 +300,86 @@ func buildAllocationResult(selectedNode string, shareable bool) *resourcev1alpha
Shareable: shareable,
}
}

func (d *driver) allocatableGPUs(
nodeDevices *gpuv1alpha1.NodeDevices,
claimAllocation *dractrl.ClaimAllocation,
selectedNode string) ([]*gpuv1alpha1.GPUDevice, error) {
claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec)
if !ok {
return nil, fmt.Errorf("unsupported claim parameters kind: %T", claimAllocation.ClaimParameters)
}

availableGPUs := d.availableGPUs(nodeDevices)
if len(availableGPUs) < claimParams.Count {
return nil, fmt.Errorf("insufficient GPUs on node %s for claim %s", selectedNode, claimAllocation.Claim.GetUID())
}

classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParametersSpec)
if !ok {
return nil, fmt.Errorf("unsupported class parameters kind: %T", claimAllocation.ClassParameters)
}

allocatableGPUs := []*gpuv1alpha1.GPUDevice{}
for _, availableGPU := range availableGPUs {
for _, selector := range classParams.DeviceSelector {
if selector.Name == availableGPU.ProductName && selector.Vendor == availableGPU.Vendor {
allocatableGPUs = append(allocatableGPUs, availableGPU)
}
}
}

return allocatableGPUs, nil
}

func (d *driver) unsuitableNode(
nodeDevices *gpuv1alpha1.NodeDevices,
pod *corev1.Pod,
claims []*dractrl.ClaimAllocation,
potentialNode string) error {
for _, claim := range claims {
claimParams, ok := claim.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec)
if !ok {
d.log.Info().Msgf("skipping unsupported claim parameters kind: %T", claim.ClaimParameters)
continue
}

// if the number of allocated GPUs is less than the requested count, mark the node as
// unsuitable
claimUID := string(claim.Claim.GetUID())
allocatedCount := d.allocatedCount(nodeDevices, claimUID)
if claimParams.Count != allocatedCount {
d.log.Info().Msgf("insufficient GPUs on node %s for claim %s, marking node as unsuitable", potentialNode, claimUID)
claim.UnsuitableNodes = append(claim.UnsuitableNodes, potentialNode)
}
}

return nil
}

func (d *driver) allocatedCount(nodeDevices *gpuv1alpha1.NodeDevices, claimUID string) int {
// if existing allocated GPUs are founf for this claim, return the count
if gpus, exists := nodeDevices.Status.AllocatedGPUs[claimUID]; exists {
return len(gpus)
}

return len(d.availableGPUs(nodeDevices))
}

func (d *driver) availableGPUs(
nodeDevices *gpuv1alpha1.NodeDevices) map[string]*gpuv1alpha1.GPUDevice {
available := map[string]*gpuv1alpha1.GPUDevice{}
for _, gpu := range nodeDevices.Spec.AllocatableGPUs {
available[gpu.UUID] = gpu
}

// find the GPUs that are already allocated, regardless of their claims
// and remove them from the available list
for _, gpus := range nodeDevices.Status.AllocatedGPUs {
for _, gpu := range gpus {
delete(available, gpu.UUID)
}
}

return available
}
27 changes: 27 additions & 0 deletions pkg/drivers/gpu/kubelet/cdi/cdi.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,39 @@ var (
once sync.Once
)

// GPUDevice is used to encapsulate the CDI information of a GPU device.
type GPUDevice struct {
UUID string
ProductName string
VendorName string
}

func InitRegistryOnce(cdiRoot string) {
once.Do(func() {
registry = cdiapi.GetRegistry(cdiapi.WithSpecDirs(cdiRoot))
})
}

func DiscoverFromSpecs() ([]*GPUDevice, error) {
specs, err := Specs()
if err != nil {
return nil, err
}

var gpuDevices []*GPUDevice
for _, spec := range specs {
for _, device := range spec.Devices {
gpuDevices = append(gpuDevices, &GPUDevice{
UUID: device.Name,
ProductName: device.ContainerEdits.Env[1],
VendorName: device.ContainerEdits.Env[2],
})
}
}

return gpuDevices, nil
}

func DeviceQualifiedName(gpu *gpuv1alpha1.GPUDevice) string {
return cdiapi.QualifiedName(cdiVendor, cdiClass, gpu.UUID)
}
Expand Down
Loading

0 comments on commit dfee311

Please sign in to comment.