diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 3420cc432..b4c4f0121 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -518,6 +518,10 @@ func (app *Application) onReserving() { // onReservationStateChange is called when there is an add or a release of a placeholder // If we have all the required placeholders progress the application status, otherwise nothing happens func (app *Application) onReservationStateChange() { + if app.originatingTask != nil { + events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, v1.EventTypeNormal, "GangScheduling", + "Placeholder Allocated", "Application %s placeholder has been allocated.", app.applicationID) + } desireCounts := make(map[string]int32, len(app.taskGroups)) for _, tg := range app.taskGroups { desireCounts[tg.Name] = tg.MinMember @@ -542,6 +546,12 @@ func (app *Application) onReservationStateChange() { return } } + + if app.originatingTask != nil { + // Now that all placeholders has been allocated, send a final conclusion message + events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, v1.EventTypeNormal, "GangScheduling", + "Gang reservations completed. All placeholders are allocated.", "Application %s all placeholders are allocated. Transitioning to running state.", app.applicationID) + } dispatcher.Dispatch(NewRunApplicationEvent(app.applicationID)) } diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index 56c9e8184..d6b45cb36 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -1220,6 +1220,11 @@ func TestApplication_onReservationStateChange(t *testing.T) { dispatcher.Start() defer dispatcher.Stop() + recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) + if !ok { + t.Fatal("the EventRecorder is expected to be of type FakeRecorder") + } + app := NewApplication(appID, "root.a", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) context.addApplicationToContext(app) @@ -1265,6 +1270,7 @@ func TestApplication_onReservationStateChange(t *testing.T) { app.addTask(task1) app.addTask(task2) app.addTask(task3) + app.setOriginatingTask(task1) // app stays in accepted with taskgroups defined none bound app.onReservationStateChange() @@ -1287,6 +1293,28 @@ func TestApplication_onReservationStateChange(t *testing.T) { task3.setTaskGroupName("test-group-2") app.onReservationStateChange() assertAppState(t, app, ApplicationStates().Running, 1*time.Second) + + message := "placeholder has been allocated" + reason := "GangScheduling" + counter := 0 + // check that the event has been published + err := utils.WaitForCondition(func() bool { + for { + select { + case event := <-recorder.Events: + print(event) + if strings.Contains(event, reason) && strings.Contains(event, message) { + counter++ + if counter == 4 { + return true + } + } + default: + return false + } + } + }, 5*time.Millisecond, time.Second) + assert.NilError(t, err, "event should have been emitted") } func TestTaskRemoval(t *testing.T) {