Skip to content

Commit

Permalink
Remove all non-resourcelist gogo annotations from schedulerobjects (#…
Browse files Browse the repository at this point in the history
…4189)

* wip

* converted to scheduling info

* fix tests

* resourceRequirements

* tolerations

* jobschedulinginfo timestamp

* remove deterministic

* executor update time

* node last_seen

* node taints

* job run state

* typo

* fixes

* fix merge conflict

* deep copyjob_scheduling_info

* lint

* fix tests
  • Loading branch information
d80tb7 authored Feb 10, 2025
1 parent 6569e24 commit 723cf64
Show file tree
Hide file tree
Showing 41 changed files with 603 additions and 556 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ issues:
max-issues-per-linter: 0
max-same-issues: 0
exclude-rules:
- path: internal/scheduler/schedulerobjects/podutils_test.go
- path: internal/scheduler/internaltypes/podutils_test.go
linters:
- lll

Expand Down
2 changes: 1 addition & 1 deletion internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j

PodSpec: podSpec,
PodSpecs: podSpecs,
SchedulingResourceRequirements: &schedulingResourceRequirements,
SchedulingResourceRequirements: schedulingResourceRequirements,

Created: protoutil.ToTimestamp(time),
Owner: ownerId,
Expand Down
11 changes: 7 additions & 4 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

protoutil "github.com/armadaproject/armada/internal/common/proto"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/internal/server/configuration"
Expand Down Expand Up @@ -167,7 +168,7 @@ var Leased = &armadaevents.EventSequence_Event{
ScheduledAtPriority: 15,
UpdateSequenceNumber: 1,
PodRequirementsOverlay: &schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
Tolerations: []*v1.Toleration{
{
Key: "whale",
Value: "true",
Expand Down Expand Up @@ -290,11 +291,13 @@ var JobRequeued = &armadaevents.EventSequence_Event{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
NodeSelector: NodeSelector,
Tolerations: Tolerations,
NodeSelector: NodeSelector,
Tolerations: armadaslices.Map(Tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
PreemptionPolicy: "PreemptLowerPriority",
Affinity: Affinity,
ResourceRequirements: v1.ResourceRequirements{
ResourceRequirements: &v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("150m"),
Expand Down
9 changes: 6 additions & 3 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)
Expand All @@ -17,9 +18,11 @@ func PodRequirementsFromPodSpec(podSpec *v1.PodSpec) *schedulerobjects.PodRequir
preemptionPolicy = string(*podSpec.PreemptionPolicy)
}
return &schedulerobjects.PodRequirements{
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: podSpec.Tolerations,
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: armadaslices.Map(podSpec.Tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
PreemptionPolicy: preemptionPolicy,
ResourceRequirements: api.SchedulingResourceRequirementsFromPodSpec(podSpec),
}
Expand Down
10 changes: 7 additions & 3 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

protoutil "github.com/armadaproject/armada/internal/common/proto"

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -296,15 +298,17 @@ func addNodeSelector(podSpec *armadaevents.PodSpecWithAvoidList, key string, val
}
}

func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) {
func addTolerations(job *armadaevents.SubmitJob, tolerations []*v1.Toleration) {
if job == nil || len(tolerations) == 0 {
return
}
if job.MainObject != nil {
switch typed := job.MainObject.Object.(type) {
case *armadaevents.KubernetesMainObject_PodSpec:
if typed.PodSpec != nil && typed.PodSpec.PodSpec != nil {
typed.PodSpec.PodSpec.Tolerations = append(typed.PodSpec.PodSpec.Tolerations, tolerations...)
for _, toleration := range tolerations {
typed.PodSpec.PodSpec.Tolerations = append(typed.PodSpec.PodSpec.Tolerations, *toleration)
}
}
}
}
Expand Down Expand Up @@ -381,7 +385,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
Id: req.ExecutorId,
Pool: req.Pool,
Nodes: nodes,
LastUpdateTime: now,
LastUpdateTime: protoutil.ToTimestamp(now),
UnassignedJobRuns: req.UnassignedJobRunIds,
}
}
Expand Down
21 changes: 12 additions & 9 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/auth/permission"
"github.com/armadaproject/armada/internal/common/compress"
mocks "github.com/armadaproject/armada/internal/common/mocks"
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/slices"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
Expand All @@ -29,7 +30,7 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/internal/server/configuration"
mocks2 "github.com/armadaproject/armada/internal/server/mocks"
servermocks "github.com/armadaproject/armada/internal/server/mocks"
"github.com/armadaproject/armada/internal/server/permissions"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
Expand Down Expand Up @@ -92,11 +93,11 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Resources: nil,
},
},
LastSeen: testClock.Now().UTC(),
LastSeen: protoutil.ToTimestamp(testClock.Now().UTC()),
ReportingNodeType: "node-type-1",
},
},
LastUpdateTime: testClock.Now().UTC(),
LastUpdateTime: protoutil.ToTimestamp(testClock.Now().UTC()),
UnassignedJobRuns: []string{runId3},
}

Expand Down Expand Up @@ -181,7 +182,9 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmit,
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: tolerations,
Tolerations: armadaslices.Map(tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
Annotations: map[string]string{configuration.PoolAnnotation: "test-pool", "runtime_gang_cardinality": "3"},
},
),
Expand Down Expand Up @@ -310,7 +313,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

runIds, err := runIdsFromLeaseRequest(tc.request)
require.NoError(t, err)
Expand Down Expand Up @@ -377,7 +380,7 @@ func TestExecutorApi_LeaseJobRuns_Unauthorised(t *testing.T) {
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

// set up mocks
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
Expand Down Expand Up @@ -504,7 +507,7 @@ func TestExecutorApi_Publish(t *testing.T) {
mockPulsarPublisher := mocks.NewMockPublisher[*armadaevents.EventSequence](ctrl)
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

// capture all sent messages
var capturedEvents []*armadaevents.EventSequence
Expand Down Expand Up @@ -549,7 +552,7 @@ func TestExecutorApi_Publish_Unauthorised(t *testing.T) {
mockPulsarPublisher := mocks.NewMockPublisher[*armadaevents.EventSequence](ctrl)
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

sequences := []*armadaevents.EventSequence{
{
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/database/executor_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *PostgresExecutorRepository) StoreExecutor(ctx *armadacontext.Context, e
err = queries.UpsertExecutor(ctx, UpsertExecutorParams{
ExecutorID: executor.Id,
LastRequest: compressed,
UpdateTime: executor.LastUpdateTime,
UpdateTime: protoutil.ToStdTime(executor.LastUpdateTime),
})
if err != nil {
return errors.WithStack(err)
Expand Down
16 changes: 10 additions & 6 deletions internal/scheduler/database/executor_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"golang.org/x/exp/slices"

"github.com/armadaproject/armada/internal/common/armadacontext"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

func TestExecutorRepository_LoadAndSave(t *testing.T) {
t1 := time.Now().UTC().Round(1 * time.Microsecond) // postgres only stores times with micro precision
t1Proto := protoutil.ToTimestamp(t1)
tests := map[string]struct {
executors []*schedulerobjects.Executor
}{
Expand All @@ -26,10 +28,10 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {
Nodes: []*schedulerobjects.Node{
{
Id: "test-node-1",
LastSeen: t1,
LastSeen: t1Proto,
},
},
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
UnassignedJobRuns: []string{"run1", "run2"},
},
{
Expand All @@ -38,10 +40,10 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {
Nodes: []*schedulerobjects.Node{
{
Id: "test-node-2",
LastSeen: t1,
LastSeen: t1Proto,
},
},
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
UnassignedJobRuns: []string{"run3", "run4"},
},
},
Expand Down Expand Up @@ -86,7 +88,9 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {

func TestExecutorRepository_GetLastUpdateTimes(t *testing.T) {
t1 := time.Now().UTC().Round(1 * time.Microsecond) // postgres only stores times with micro precision
t1Proto := protoutil.ToTimestamp(t1)
t2 := t1.Add(-1 * time.Second)
t2Proto := protoutil.ToTimestamp(t2)
tests := map[string]struct {
executors []*schedulerobjects.Executor
expectedUpdateTimes map[string]time.Time
Expand All @@ -95,11 +99,11 @@ func TestExecutorRepository_GetLastUpdateTimes(t *testing.T) {
executors: []*schedulerobjects.Executor{
{
Id: "test-executor-1",
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
},
{
Id: "test-executor-2",
LastUpdateTime: t2,
LastUpdateTime: t2Proto,
},
},
expectedUpdateTimes: map[string]time.Time{"test-executor-1": t1, "test-executor-2": t2},
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/database/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func TestFetchJobRunLeases(t *testing.T) {
Pool: "test-pool",
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
Tolerations: []*v1.Toleration{
{
Key: "whale",
Value: "true",
Expand Down
34 changes: 22 additions & 12 deletions internal/scheduler/internaltypes/job_scheduling_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

protoutil "github.com/armadaproject/armada/internal/common/proto"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -73,17 +74,24 @@ func FromSchedulerObjectsJobSchedulingInfo(j *schedulerobjects.JobSchedulingInfo
if podRequirements == nil {
return nil, errors.Errorf("job must have pod requirements")
}
rr := podRequirements.GetResourceRequirements().DeepCopy()
if rr == nil {
rr = &v1.ResourceRequirements{}
}
return &JobSchedulingInfo{
Lifetime: j.Lifetime,
PriorityClassName: j.PriorityClassName,
SubmitTime: j.SubmitTime,
SubmitTime: protoutil.ToStdTime(j.SubmitTime),
Priority: j.Priority,
PodRequirements: &PodRequirements{
NodeSelector: podRequirements.NodeSelector,
Affinity: podRequirements.Affinity,
Tolerations: podRequirements.Tolerations,
Annotations: podRequirements.Annotations,
ResourceRequirements: podRequirements.ResourceRequirements,
NodeSelector: maps.Clone(podRequirements.NodeSelector),
Affinity: proto.Clone(podRequirements.Affinity).(*v1.Affinity),
Tolerations: armadaslices.Map(podRequirements.Tolerations, func(t *v1.Toleration) v1.Toleration {
cloned := proto.Clone(t).(*v1.Toleration)
return *cloned
}),
Annotations: maps.Clone(podRequirements.Annotations),
ResourceRequirements: *rr,
},
Version: j.Version,
}, nil
Expand All @@ -94,17 +102,19 @@ func ToSchedulerObjectsJobSchedulingInfo(j *JobSchedulingInfo) *schedulerobjects
return &schedulerobjects.JobSchedulingInfo{
Lifetime: j.Lifetime,
PriorityClassName: j.PriorityClassName,
SubmitTime: j.SubmitTime,
SubmitTime: protoutil.ToTimestamp(j.SubmitTime),
Priority: j.Priority,
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
NodeSelector: podRequirements.NodeSelector,
Affinity: podRequirements.Affinity,
Tolerations: podRequirements.Tolerations,
Annotations: podRequirements.Annotations,
ResourceRequirements: podRequirements.ResourceRequirements,
NodeSelector: maps.Clone(podRequirements.NodeSelector),
Affinity: podRequirements.Affinity.DeepCopy(),
Tolerations: armadaslices.Map(podRequirements.Tolerations, func(t v1.Toleration) *v1.Toleration {
return proto.Clone(&t).(*v1.Toleration)
}),
Annotations: maps.Clone(podRequirements.Annotations),
ResourceRequirements: podRequirements.ResourceRequirements.DeepCopy(),
},
},
},
Expand Down
9 changes: 8 additions & 1 deletion internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,21 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
}
allocatableByPriority[EvictedPriority] = allocatableResources

taints := make([]v1.Taint, 0, len(node.Taints))
for _, t := range node.Taints {
if t != nil {
taints = append(taints, *t)
}
}

return CreateNodeAndType(
node.Id,
nodeIndex,
node.Executor,
node.Name,
node.Pool,
node.Unschedulable,
node.Taints,
taints,
node.Labels,
indexedTaints,
indexedNodeLabels,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package schedulerobjects
package internaltypes

import (
"crypto/rand"
Expand All @@ -16,18 +16,6 @@ type SchedulingKey [highwayhash.Size]byte

var EmptySchedulingKey SchedulingKey

func (req *PodRequirements) GetAffinityNodeSelector() *v1.NodeSelector {
affinity := req.Affinity
if affinity == nil {
return nil
}
nodeAffinity := affinity.NodeAffinity
if nodeAffinity == nil {
return nil
}
return nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}

// SchedulingKeyGenerator is used to generate scheduling keys efficiently.
// A scheduling key is the canonical hash of the scheduling requirements of a job.
// All memory is allocated up-front and re-used. Thread-safe.
Expand Down
Loading

0 comments on commit 723cf64

Please sign in to comment.