Skip to content

Commit

Permalink
Merge pull request volcano-sh#1672 from eggiter/feat-unschedulable-info
Browse files Browse the repository at this point in the history
feat: expose detailed scheduling reason of pending tasks based on las…
  • Loading branch information
volcano-sh-bot authored Aug 20, 2021
2 parents 8b3c1af + a3c7954 commit 6f5bb36
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 24 deletions.
46 changes: 46 additions & 0 deletions docs/design/scheduling-reason.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Scheduling Reason

[@eggiter](https://github.com/eggiter); Aug 13, 2021

## Table of Contents

* [Table of Contents](#table-of-contents)
* [Problem](#problem)
* [Solution](#solution)

## Problem

- Currently, the reason and message in `pod.status.conditions["PodScheduled"]` and `podgroup.status.conditions["Unschedulable"]` is **NOT** informative, for example `x/x tasks in gang unschedulable: pod group is not ready, x Pending, x minAvailable`.
- Users want to know which one breaks the scheduling cycle and why is that.

## Solution

- Introducing two extra scheduling reasons in PodScheduled PodCondition:
+ `Schedulable`: means that the scheduler can schedule the pod right now, but not bind yet.
+ `Undetermined`: means that the scheduler skips scheduling the pod which left the pod `Undetermined`, for example due to unschedulable pod already occurred.

- Case:
+ 3 nodes: node1, node2, node3;
+ 6 tasks in the PodGroup(`minAvailable = 6`);
+ only 1 task(Task6) is unschedulable and other 5 tasks(Task1 ~ 5) can be scheduled, thus the whole job is unscheduable.

- Current information:
+ |Tasks|Reason|Message|
|-|-|-|
|PodGroup |NotEnoughResources|6/6 tasks in gang unschedulable: pod group is not ready, 6 Pending, 6 minAvailable |
|Task1 ~ 5|Unschedulable|6/6 tasks in gang unschedulable: pod group is not ready, 6 Pending, 6 minAvailable|
|Task6|Unschedulable| all nodes are unavailable: 1 plugin InterPodAffinity predicates failed node(s) didn't match pod affinity/anti-affinity, node(s) didn't match pod affinity rules, 2 node(s) resource fit failed.|

- Improved information:
+ |Tasks|Reason|Message|
|-|-|-|
|PodGroup |(same)| **3/6** tasks in gang unschedulable: pod group is not ready, 6 Pending, 6 minAvailable; **Pending: 1 Unschedulable, 2 Undetermined, 3 Schedulable** |
|Task1 ~ 2|**Undetermined**| **3/6** tasks in gang unschedulable: pod group is not ready, 6 Pending, 6 minAvailable; **Pending: 1 Unschedulable, 2 Undetermined, 3 Schedulable** |
|Task3|**Schedulable**| **Pod ns1/task-1 can possibly be assgined to node1** |
|Task4|**Schedulable**| **Pod ns1/task-2 can possibly be assgined to node2** |
|Task5|**Schedulable**| **Pod ns1/task-3 can possibly be assgined to node3** |
|Task6|Unschedulable| all nodes are unavailable: 1 plugin InterPodAffinity predicates failed node(s) didn't match pod affinity/anti-affinity, node(s) didn't match pod affinity rules, 2 node(s) resource fit failed.|

- Note: Task1 & 2 are `Undetermined` maybe because that this two locate after task6 by `TaskOrderFn`;

- In improved information, we can easily find the one that breaks the whole scheduling cycle and why dose that happen. Additionally, we can find the histogram of reason why there are some tasks whose status is pending.
114 changes: 100 additions & 14 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ const JobWaitingTime = "sla-waiting-time"
// TaskID is UID type for Task
type TaskID types.UID

// TransactionContext holds all the fields that needed by scheduling transaction
type TransactionContext struct {
NodeName string
Status TaskStatus
}

// Clone return a clone of TransactionContext
func (ctx *TransactionContext) Clone() *TransactionContext {
if ctx == nil {
return nil
}
clone := *ctx
return &clone
}

// TaskInfo will have all infos about the task
type TaskInfo struct {
UID TaskID
Expand All @@ -78,8 +93,10 @@ type TaskInfo struct {
// InitResreq is the resource that used to launch a task.
InitResreq *Resource

NodeName string
Status TaskStatus
TransactionContext
// LastTransaction holds the context of last scheduling transaction
LastTransaction *TransactionContext

Priority int32
VolumeReady bool
Preemptable bool
Expand Down Expand Up @@ -128,15 +145,18 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Priority: 1,
Pod: pod,
Resreq: resReq,
InitResreq: initResReq,
Preemptable: preemptable,
RevocableZone: revocableZone,
TopologyPolicy: topologyPolicy,

TransactionContext: TransactionContext{
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
},
}

if pod.Spec.Priority != nil {
Expand All @@ -146,15 +166,29 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
return ti
}

// GetTransactionContext get transaction context of a task
func (ti *TaskInfo) GetTransactionContext() TransactionContext {
return ti.TransactionContext
}

// GenerateLastTxContext generate and set context of last transaction for a task
func (ti *TaskInfo) GenerateLastTxContext() {
ctx := ti.GetTransactionContext()
ti.LastTransaction = &ctx
}

// ClearLastTxContext clear context of last transaction for a task
func (ti *TaskInfo) ClearLastTxContext() {
ti.LastTransaction = nil
}

// Clone is used for cloning a task
func (ti *TaskInfo) Clone() *TaskInfo {
return &TaskInfo{
UID: ti.UID,
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
NodeName: ti.NodeName,
Status: ti.Status,
Priority: ti.Priority,
Pod: ti.Pod,
Resreq: ti.Resreq.Clone(),
Expand All @@ -163,6 +197,12 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Preemptable: ti.Preemptable,
RevocableZone: ti.RevocableZone,
TopologyPolicy: ti.TopologyPolicy,

TransactionContext: TransactionContext{
NodeName: ti.NodeName,
Status: ti.Status,
},
LastTransaction: ti.LastTransaction.Clone(),
}
}

Expand Down Expand Up @@ -485,24 +525,70 @@ func (ji JobInfo) String() string {
// FitError returns detailed information on why a job's task failed to fit on
// each available node
func (ji *JobInfo) FitError() string {
reasons := make(map[string]int)
for status, taskMap := range ji.TaskStatusIndex {
reasons[status.String()] += len(taskMap)
}
reasons["minAvailable"] = int(ji.MinAvailable)

sortReasonsHistogram := func() []string {
sortReasonsHistogram := func(reasons map[string]int) []string {
reasonStrings := []string{}
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf("%v, %v.", scheduling.PodGroupNotReady, strings.Join(sortReasonsHistogram(), ", "))

// Stat histogram for all tasks of the job
reasons := make(map[string]int)
for status, taskMap := range ji.TaskStatusIndex {
reasons[status.String()] += len(taskMap)
}
reasons["minAvailable"] = int(ji.MinAvailable)
reasonMsg := fmt.Sprintf("%v, %v", scheduling.PodGroupNotReady, strings.Join(sortReasonsHistogram(reasons), ", "))

// Stat histogram for pending tasks only
reasons = make(map[string]int)
for uid := range ji.TaskStatusIndex[Pending] {
reason, _ := ji.TaskSchedulingReason(uid)
reasons[reason]++
}
if len(reasons) > 0 {
reasonMsg += "; " + fmt.Sprintf("%s: %s", Pending.String(), strings.Join(sortReasonsHistogram(reasons), ", "))
}
return reasonMsg
}

// TaskSchedulingReason get detailed reason and message of the given task
// It returns detailed reason and message for tasks based on last scheduling transaction.
func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) {
taskInfo, exists := ji.Tasks[tid]
if !exists {
return "", ""
}

// Get detailed scheduling reason based on LastTransaction
ctx := taskInfo.GetTransactionContext()
if taskInfo.LastTransaction != nil {
ctx = *taskInfo.LastTransaction
}

msg = ji.JobFitErrors
switch status := ctx.Status; status {
case Allocated, Pipelined:
// Pod is schedulable
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
if status == Pipelined {
msg += " once resource is released"
}
return PodReasonSchedulable, msg
case Pending:
if fe := ji.NodesFitErrors[tid]; fe != nil {
// Pod is not schedulable
return PodReasonUnschedulable, fe.Error()
}
// Pod is not scheduled yet
return PodReasonUndetermined, msg
default:
return status.String(), msg
}
}

// ReadyTaskNum returns the number of tasks that are ready or that is best-effort.
func (ji *JobInfo) ReadyTaskNum() int32 {
var occupied int32
Expand Down
97 changes: 97 additions & 0 deletions pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"volcano.sh/apis/pkg/apis/scheduling"
schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func jobInfoEqual(l, r *JobInfo) bool {
Expand Down Expand Up @@ -191,3 +195,96 @@ func TestDeleteTaskInfo(t *testing.T) {
}
}
}

func TestTaskSchedulingReason(t *testing.T) {
t1 := buildPod("ns1", "task-1", "", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))
t2 := buildPod("ns1", "task-2", "", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))
t3 := buildPod("ns1", "task-3", "node1", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))
t4 := buildPod("ns1", "task-4", "node2", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))
t5 := buildPod("ns1", "task-5", "node3", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))
t6 := buildPod("ns1", "task-6", "", v1.PodPending, buildResourceList("1", "1G"), nil, make(map[string]string))

tests := []struct {
desc string
pods []*v1.Pod
jobid JobID
nodefes map[TaskID]*FitErrors
expected map[types.UID]string
}{
{
desc: "task3 ~ 5 are schedulable",
pods: []*v1.Pod{t1, t2, t3, t4, t5, t6},
jobid: JobID("case1"),
nodefes: map[TaskID]*FitErrors{
TaskID(t6.UID): {
nodes: map[string]*FitError{
"node1": {Reasons: []string{NodePodNumberExceeded}},
"node2": {Reasons: []string{NodeResourceFitFailed}},
"node3": {Reasons: []string{NodeResourceFitFailed}},
},
},
},
expected: map[types.UID]string{
"pg": "pod group is not ready, 6 Pending, 6 minAvailable; Pending: 1 Unschedulable, 2 Undetermined, 3 Schedulable",
t1.UID: "pod group is not ready, 6 Pending, 6 minAvailable; Pending: 1 Unschedulable, 2 Undetermined, 3 Schedulable",
t2.UID: "pod group is not ready, 6 Pending, 6 minAvailable; Pending: 1 Unschedulable, 2 Undetermined, 3 Schedulable",
t3.UID: "Pod ns1/task-3 can possibly be assigned to node1",
t4.UID: "Pod ns1/task-4 can possibly be assigned to node2",
t5.UID: "Pod ns1/task-5 can possibly be assigned to node3",
t6.UID: "all nodes are unavailable: 1 node(s) pod number exceeded, 2 node(s) resource fit failed.",
},
},
}

for i, test := range tests {
job := NewJobInfo(test.jobid)
pg := scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "pg1",
},
Spec: scheduling.PodGroupSpec{
MinMember: int32(len(test.pods)),
},
}
for _, pod := range test.pods {
// set pod group
pod.Annotations = map[string]string{
schedulingv2.KubeGroupNameAnnotationKey: pg.Name,
}

// add TaskInfo
ti := NewTaskInfo(pod)
job.AddTaskInfo(ti)

// pod is schedulable
if len(pod.Spec.NodeName) > 0 {
ti.LastTransaction = &TransactionContext{
NodeName: pod.Spec.NodeName,
Status: Allocated,
}
}
}
// complete job
job.SetPodGroup(&PodGroup{PodGroup: pg})
job.NodesFitErrors = test.nodefes
job.TaskStatusIndex = map[TaskStatus]tasksMap{Pending: {}}
for _, task := range job.Tasks {
task.Status = Pending
job.TaskStatusIndex[Pending][task.UID] = task
}
job.JobFitErrors = job.FitError()

// assert
for uid, exp := range test.expected {
msg := job.JobFitErrors
if uid != "pg" {
_, msg = job.TaskSchedulingReason(TaskID(uid))
}
t.Logf("case #%d, task %v, result: %s", i, uid, msg)
if msg != exp {
t.Errorf("[x] case #%d, task %v, expected: %s, got: %s", i, uid, exp, msg)
}
}
}
}
13 changes: 13 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ const (
AllNodeUnavailableMsg = "all nodes are unavailable"
)

// These are reasons for a pod's transition to a condition.
const (
// PodReasonUnschedulable reason in PodScheduled PodCondition means that the scheduler
// can't schedule the pod right now, for example due to insufficient resources in the cluster.
PodReasonUnschedulable = "Unschedulable"
// PodReasonSchedulable reason in PodScheduled PodCondition means that the scheduler
// can schedule the pod right now, but not bind yet
PodReasonSchedulable = "Schedulable"
// PodReasonUndetermined reason in PodScheduled PodCondition means that the scheduler
// skips scheduling the pod which left the pod `Undetermined`, for example due to unschedulable pod already occurred.
PodReasonUndetermined = "Undetermined"
)

// FitErrors is set of FitError on many nodes
type FitErrors struct {
nodes map[string]*FitError
Expand Down
15 changes: 7 additions & 8 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,13 @@ func (sc *SchedulerCache) UpdateSchedulerNumaInfo(AllocatedSets map[string]sched
}

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, message string) error {
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message string) error {
pod := task.Pod

condition := &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Reason: reason, // Add more reasons in order to distinguish more specific scenario of pending tasks
Message: message,
}

Expand All @@ -721,7 +721,7 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, messag

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition should be "Unschedulable"
// The reason field in PodCondition can be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil {
return err
Expand Down Expand Up @@ -950,12 +950,11 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo) {
// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} {
for _, taskInfo := range job.TaskStatusIndex[status] {
msg := baseErrorMessage
fitError := job.NodesFitErrors[taskInfo.UID]
if fitError != nil {
msg = fitError.Error()
reason, msg := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
}
if err := sc.taskUnschedulable(taskInfo, msg); err != nil {
if err := sc.taskUnschedulable(taskInfo, reason, msg); err != nil {
klog.Errorf("Failed to update unschedulable task status <%s/%s>: %v",
taskInfo.Namespace, taskInfo.Name, err)
}
Expand Down
Loading

0 comments on commit 6f5bb36

Please sign in to comment.