Skip to content

Commit

Permalink
Add logging to driver
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Sim <[email protected]>
  • Loading branch information
ihcsim committed May 24, 2024
1 parent f520518 commit 10a1c1b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 43 deletions.
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (

func init() {
log = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339})
log = log.With().Caller().Logger()

rootCmd.PersistentFlags().AddFlagSet(flags.NewK8sFlags())
rootCmd.PersistentFlags().AddFlagSet(flags.NewControllerFlags())
Expand Down Expand Up @@ -104,7 +105,8 @@ func run(ctx context.Context) error {
)
informerFactory.Start(ctx.Done())

driver, err := gpu.NewDriver(draClientSets, namespace, log)
dlog := log.With().Str("namespace", namespace).Logger()
driver, err := gpu.NewDriver(draClientSets, namespace, dlog)
if err != nil {
return err
}
Expand Down
99 changes: 60 additions & 39 deletions pkg/drivers/gpu/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (d *driver) GetName() string {
// parameters.
// see https://pkg.go.dev/k8s.io/dynamic-resource-allocation/controller#Driver
func (d *driver) GetClassParameters(ctx context.Context, class *resourcev1alpha2.ResourceClass) (interface{}, error) {
log := d.log.With().Str("resourceClass", class.GetName()).Logger()
log.Info().Msg("trying to get class parameters")

if class.ParametersRef == nil {
log.Info().Msg("no class parameters reference found, so using default values")
return &gpuv1alpha1.GPUClassParametersSpec{
DeviceSelector: []gpuv1alpha1.DeviceSelector{
{
Expand All @@ -75,6 +79,7 @@ func (d *driver) GetClassParameters(ctx context.Context, class *resourcev1alpha2
if err != nil {
return nil, fmt.Errorf("error getting DeviceClassParameters called '%s': %w", class.ParametersRef.Name, err)
}
log.Info().Msg(fmt.Sprintf("successfully retrieved class parameters: %s", class.ParametersRef.Name))

return &dc, nil
}
Expand All @@ -87,7 +92,11 @@ func (d *driver) GetClaimParameters(
claim *resourcev1alpha2.ResourceClaim,
class *resourcev1alpha2.ResourceClass,
classParameters interface{}) (interface{}, error) {
log := d.log.With().Str("resourceClaim", claim.GetName()).Logger()
log.Info().Msg("trying to get claim parameters")

if claim.Spec.ParametersRef == nil {
log.Info().Msg("no claim parameters reference found, so using default values")
return gpuv1alpha1.GPUClaimParametersSpec{
Count: 1,
}, nil
Expand All @@ -113,15 +122,18 @@ func (d *driver) GetClaimParameters(
if err := d.validateClaimParameters(&rc.Spec); err != nil {
return nil, fmt.Errorf("error validating GPUClaimParameters called '%v' in namespace '%v': %w", claim.Spec.ParametersRef.Name, claim.Namespace, err)
}
log.Info().Msg(fmt.Sprintf("successfully retrieved claim parameters: %s", claim.Spec.ParametersRef.Name))

return &rc, nil
}

// Allocate is called when all same-driver ResourceClaims for Pod are ready to be
// allocated.
// see https://pkg.go.dev/k8s.io/dynamic-resource-allocation/controller#Driver
func (d *driver) Allocate(ctx context.Context, claimAllocations []*dractrl.ClaimAllocation, selectedNode string) {
for _, ca := range claimAllocations {
if selectedNode == "" {
ca.Error = fmt.Errorf("failed to allocate device: immediate allocation is not supported.")
ca.Error = fmt.Errorf("immediate allocation is not supported.")
continue
}

Expand All @@ -133,36 +145,13 @@ func (d *driver) Allocate(ctx context.Context, claimAllocations []*dractrl.Claim
}
}

func (d *driver) nodeDeviceAllocation(ctx context.Context, namespace, selectedNode string) (*allocationv1alpha1.NodeDeviceAllocation, error) {
listOpts := metav1.ListOptions{
LabelSelector: "kubernetes.io/hostname=" + selectedNode,
}

deviceAllocations, err := d.clientsets.AllocationV1alpha1().NodeDeviceAllocations(namespace).List(ctx, listOpts)
if err != nil {
return nil, err
}

if len(deviceAllocations.Items) != 0 {
return nil, fmt.Errorf("expected exactly one matching node for device allocation for node %s, got %d", selectedNode, len(deviceAllocations.Items))
}
deviceAllocation := deviceAllocations.Items[0]

if deviceAllocation.Status.State != allocationv1alpha1.Ready {
return nil, fmt.Errorf("failed to allocate device: device allocation not ready")
}

if deviceAllocation.Status.AllocatedClaims == nil {
deviceAllocation.Status.AllocatedClaims = make(map[string]allocationv1alpha1.AllocatedDevices)
}

return deviceAllocation.DeepCopy(), nil
}

func (d *driver) allocateGPU(
ctx context.Context,
claimAllocation *dractrl.ClaimAllocation,
selectedNode string) error {
log := d.log.With().Str("podClaimName", claimAllocation.PodClaimName).Str("selectedNode", selectedNode).Logger()
log.Info().Msg("allocating GPU...")

var (
claim = claimAllocation.Claim
claimUID = string(claim.GetUID())
Expand All @@ -173,45 +162,68 @@ func (d *driver) allocateGPU(
if err != nil {
return err
}
log = log.With().Str("deviceAllocation", deviceAllocation.GetName()).Str("claimUID", claimUID).Logger()

// if there is an on-going allocation, let it finish
if _, exists := deviceAllocation.Status.AllocatedClaims[claimUID]; exists {
log.Info().Msg("on-going allocation already exists, let it finish")
return nil
}

claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParameters)
claimParams, ok := claimAllocation.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec)
if !ok {
return fmt.Errorf("unsupported claim parameters kind: %T", claimAllocation.ClaimParameters)
}

classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParameters)
classParams, ok := claimAllocation.ClassParameters.(*gpuv1alpha1.GPUClassParametersSpec)
if !ok {
return fmt.Errorf("unsupported class parameters kind: %T", claimAllocation.ClassParameters)
}

allocatedClaims, err := d.gpu.pendingAllocatedClaims(
claimUID,
selectedNode,
claimParams.DeepCopy(),
classParams.DeepCopy())
claimParams,
classParams)
if err != nil {
return err
}

deviceAllocation.Status.AllocatedClaims[claimUID] = allocatedClaims

updateOpts := metav1.UpdateOptions{}
if _, err := d.clientsets.AllocationV1alpha1().NodeDeviceAllocations(claimNamespace).Update(ctx, deviceAllocation, updateOpts); err != nil {
return err
}
log.Info().Msg("GPU allocation successful")

return d.gpu.removeAllocatedClaim(claimUID)
}

func (d *driver) nodeDeviceAllocation(ctx context.Context, namespace, selectedNode string) (*allocationv1alpha1.NodeDeviceAllocation, error) {
getOpts := metav1.GetOptions{}
deviceAllocation, err := d.clientsets.AllocationV1alpha1().NodeDeviceAllocations(namespace).Get(ctx, selectedNode, getOpts)
if err != nil {
return nil, err
}

if deviceAllocation.Status.State != allocationv1alpha1.Ready {
return nil, fmt.Errorf("failed to allocate device: device allocation not ready")
}

if deviceAllocation.Status.AllocatedClaims == nil {
deviceAllocation.Status.AllocatedClaims = make(map[string]allocationv1alpha1.AllocatedDevices)
}

return deviceAllocation.DeepCopy(), nil
}

// Deallocate gets called when a ResourceClaim is ready to be freed.
// see https://pkg.go.dev/k8s.io/dynamic-resource-allocation/controller#Driver
func (d *driver) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error {
d.log.Info().Msg("deallocating GPU...")

selectedNode := getSelectedNode(claim)
if selectedNode == "" {
d.log.Info().Msg("no selected node found, skipping deallocation")
return nil
}

Expand All @@ -225,15 +237,19 @@ func (d *driver) Deallocate(ctx context.Context, claim *resourcev1alpha2.Resourc
}

claimUID := string(claim.GetUID())
log := d.log.With().Str("selectedNode", selectedNode).Str("deviceAllocation", deviceAllocation.GetName()).Str("claimUID", claimUID).Logger()

allocatedDevices, exists := deviceAllocation.Status.AllocatedClaims[claimUID]
if !exists {
log.Info().Msg("no allocated claims found, skipping deallocation")
return nil
}

var errs error
if gpus := allocatedDevices.GPUs; gpus != nil {
for _, gpu := range gpus.Devices {
if err := d.deallocateGPU(ctx, claimUID, gpu); err != nil {
log.Info().Msg(fmt.Sprintf("deallocating GPU %s...", gpu.UUID))
if err := d.gpu.removeAllocatedClaim(gpu.UUID); err != nil {
errs = errors.Join(errs, err)
}
}
Expand All @@ -247,6 +263,7 @@ func (d *driver) Deallocate(ctx context.Context, claim *resourcev1alpha2.Resourc
if _, err := d.clientsets.AllocationV1alpha1().NodeDeviceAllocations(claimNamespace).Update(ctx, deviceAllocation, updateOpts); err != nil {
return err
}
log.Info().Msg("deallocation completed successfully")

delete(deviceAllocation.Status.AllocatedClaims, claimUID)
return nil
Expand All @@ -264,12 +281,11 @@ func getSelectedNode(claim *resourcev1alpha2.ResourceClaim) string {
return claim.Status.Allocation.AvailableOnNodes.NodeSelectorTerms[0].MatchFields[0].Values[0]
}

func (d *driver) deallocateGPU(ctx context.Context, claimUID string, gpu allocationv1alpha1.AllocatedGPU) error {
return d.gpu.removeAllocatedClaim(claimUID)
}

// UnsuitableNodes checks all pending claims with delayed allocation for a pod.
// see https://pkg.go.dev/k8s.io/dynamic-resource-allocation/controller#Driver
func (d *driver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims []*dractrl.ClaimAllocation, potentialNodes []string) error {
d.log.Info().Msg("checking unsuitable nodes...")

var (
errs error
gpuClaims = []*dractrl.ClaimAllocation{}
Expand All @@ -284,7 +300,7 @@ func (d *driver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims []
}

for _, claim := range claims {
if _, ok := claim.ClaimParameters.(*gpuv1alpha1.GPUClaimParameters); !ok {
if _, ok := claim.ClaimParameters.(*gpuv1alpha1.GPUClaimParametersSpec); !ok {
errs = errors.Join(errs, fmt.Errorf("unsupported claim parameters kind: %T", claim.ClaimParameters))
continue
}
Expand All @@ -296,6 +312,11 @@ func (d *driver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims []
}
}

for _, claim := range claims {
d.log.Info().Str("podClaimName", claim.PodClaimName).Msg(fmt.Sprintf("unsuitable nodes: %v", claim.UnsuitableNodes))
}
d.log.Info().Msg("unsuitable nodes check completed")

return errs
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/drivers/gpu/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ type gpuPlugin struct{}

func (p *gpuPlugin) pendingAllocatedClaims(
claimUID, selectedNode string,
claimParams *gpuv1alpha1.GPUClaimParameters,
classParams *gpuv1alpha1.GPUClassParameters) (allocationapiv1alpha1.AllocatedDevices, error) {
claimParams *gpuv1alpha1.GPUClaimParametersSpec,
classParams *gpuv1alpha1.GPUClassParametersSpec) (allocationapiv1alpha1.AllocatedDevices, error) {
return allocationapiv1alpha1.AllocatedDevices{}, nil
}

func (p *gpuPlugin) removeAllocatedClaim(claimUID string) error {
func (p *gpuPlugin) removeAllocatedClaim(gpuUUID string) error {
return nil
}

Expand Down

0 comments on commit 10a1c1b

Please sign in to comment.