Skip to content

Commit

Permalink
App stop/start waits for service bindings readiness
Browse files Browse the repository at this point in the history
* The app controller sets the ready state if all of the criteria below
  are met:
  - app service bindings (if any) become ready
  - the app actual state is equal to the desired state
  - the current droplet is set
* The app repository waits for the app `Ready` state in
  `SetAppDesiredState` to ensure synchronous app stop/start
* The condition awaiter verifies that the condition observed generation
  matches the object generation. Thus we ensure that the condition is
  not outdated.

Co-authored-by: Georgi Sabev <[email protected]>

fixes: #3217
  • Loading branch information
danail-branekov committed Apr 30, 2024
1 parent 321ea4e commit 0a89343
Show file tree
Hide file tree
Showing 19 changed files with 837 additions and 822 deletions.
14 changes: 7 additions & 7 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func main() {
privilegedCRClient,
userClientFactory,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
)
spaceRepo := repositories.NewSpaceRepo(
namespaceRetriever,
orgRepo,
userClientFactory,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
)
processRepo := repositories.NewProcessRepo(
namespaceRetriever,
Expand All @@ -148,7 +148,7 @@ func main() {
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
)
dropletRepo := repositories.NewDropletRepo(
userClientFactory,
Expand Down Expand Up @@ -184,19 +184,19 @@ func main() {
nsPermissions,
toolsregistry.NewRepositoryCreator(cfg.ContainerRegistryType),
cfg.ContainerRepositoryPrefix,
conditions.NewStateAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
)
serviceInstanceRepo := repositories.NewServiceInstanceRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
)
serviceBindingRepo := repositories.NewServiceBindingRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
)
buildpackRepo := repositories.NewBuildpackRepository(cfg.BuilderName,
userClientFactory,
Expand All @@ -223,7 +223,7 @@ func main() {
userClientFactory,
namespaceRetriever,
nsPermissions,
conditions.NewStateAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
conditions.NewConditionAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
)
metricsRepo := repositories.NewMetricsRepo(userClientFactory)

Expand Down
8 changes: 1 addition & 7 deletions api/repositories/app_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,7 @@ func (f *AppRepo) SetAppDesiredState(ctx context.Context, authInfo authorization
return AppRecord{}, fmt.Errorf("failed to set app desired state: %w", apierrors.FromK8sError(err, AppResourceType))
}

if _, err := f.appAwaiter.AwaitState(ctx, userClient, cfApp, func(actualApp *korifiv1alpha1.CFApp) error {
desiredState := korifiv1alpha1.AppState(message.DesiredState)
if (actualApp.Spec.DesiredState == desiredState) && (actualApp.Status.ActualState == desiredState) {
return nil
}
return fmt.Errorf("expected actual state to be %s; it is currently %s", message.DesiredState, actualApp.Status.ActualState)
}); err != nil {
if _, err := f.appAwaiter.AwaitCondition(ctx, userClient, cfApp, korifiv1alpha1.ReadyConditionType); err != nil {
return AppRecord{}, apierrors.FromK8sError(err, AppResourceType)
}

Expand Down
14 changes: 8 additions & 6 deletions api/repositories/app_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,11 +1218,12 @@ var _ = Describe("AppRepository", func() {
Expect(returnedAppRecord.SpaceGUID).To(Equal(cfSpace.Name))
})

It("waits for the desired state", func() {
Expect(appAwaiter.AwaitStateCallCount()).To(Equal(1))
actualCFApp := appAwaiter.AwaitStateArgsForCall(0)
It("waits for the app ready condition", func() {
Expect(appAwaiter.AwaitConditionCallCount()).To(Equal(1))
actualCFApp, actualCondition := appAwaiter.AwaitConditionArgsForCall(0)
Expect(actualCFApp.GetName()).To(Equal(cfApp.Name))
Expect(actualCFApp.GetNamespace()).To(Equal(cfApp.Namespace))
Expect(actualCondition).To(Equal(korifiv1alpha1.ReadyConditionType))
})

It("changes the desired state of the App", func() {
Expand All @@ -1242,11 +1243,12 @@ var _ = Describe("AppRepository", func() {
Expect(returnedErr).ToNot(HaveOccurred())
})

It("waits for the desired state", func() {
Expect(appAwaiter.AwaitStateCallCount()).To(Equal(1))
actualCFApp := appAwaiter.AwaitStateArgsForCall(0)
It("waits for the app ready condition", func() {
Expect(appAwaiter.AwaitConditionCallCount()).To(Equal(1))
actualCFApp, actualCondition := appAwaiter.AwaitConditionArgsForCall(0)
Expect(actualCFApp.GetName()).To(Equal(cfApp.Name))
Expect(actualCFApp.GetNamespace()).To(Equal(cfApp.Namespace))
Expect(actualCondition).To(Equal(korifiv1alpha1.ReadyConditionType))
})

It("changes the desired state of the App", func() {
Expand Down
36 changes: 22 additions & 14 deletions api/repositories/conditions/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type Awaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]] struc
timeout time.Duration
}

func NewStateAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
func NewConditionAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
return &Awaiter[T, L, PL]{
timeout: timeout,
}
}

func (a *Awaiter[T, L, PL]) AwaitState(ctx context.Context, k8sClient client.WithWatch, object client.Object, checkState func(T) error) (T, error) {
func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
var empty T
objList := PL(new(L))

Expand All @@ -47,29 +47,37 @@ func (a *Awaiter[T, L, PL]) AwaitState(ctx context.Context, k8sClient client.Wit
}
defer watch.Stop()

var stateCheckErr error
var conditionCheckErr error
for e := range watch.ResultChan() {
obj, ok := e.Object.(T)
if !ok {
continue
}

stateCheckErr = checkState(obj)
if stateCheckErr == nil {
conditionCheckErr = checkConditionIsTrue(ctx, obj, conditionType)
if conditionCheckErr == nil {
return obj, nil
}
}

return empty, fmt.Errorf("object %s/%s did not match desired state within %d ms: %s",
object.GetNamespace(), object.GetName(), a.timeout.Milliseconds(), stateCheckErr.Error(),
return empty, fmt.Errorf("object %s/%s status condition %s did not become true in %.2f s: %s",
object.GetNamespace(), object.GetName(), conditionType, a.timeout.Seconds(), conditionCheckErr.Error(),
)
}

func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
return a.AwaitState(ctx, k8sClient, object, func(obj T) error {
if meta.IsStatusConditionTrue(obj.StatusConditions(), conditionType) {
return nil
}
return fmt.Errorf("expected the %s condition to be true", conditionType)
})
func checkConditionIsTrue[T RuntimeObjectWithStatusConditions](ctx context.Context, obj T, conditionType string) error {
condition := meta.FindStatusCondition(obj.StatusConditions(), conditionType)

if condition == nil {
return fmt.Errorf("condition %s not set yet", conditionType)
}

if condition.ObservedGeneration != obj.GetGeneration() {
return fmt.Errorf("condition %s is outdated", conditionType)
}

if condition.Status == metav1.ConditionTrue {
return nil
}
return fmt.Errorf("expected the %s condition to be true", conditionType)
}
94 changes: 48 additions & 46 deletions api/repositories/conditions/await_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conditions_test

import (
"errors"
"sync"
"time"

Expand All @@ -12,9 +11,10 @@ import (
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("StateAwaiter", func() {
var _ = Describe("ConditionAwaiter", func() {
var (
awaiter *conditions.Awaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList, *korifiv1alpha1.CFTaskList]
task *korifiv1alpha1.CFTask
Expand Down Expand Up @@ -42,7 +42,7 @@ var _ = Describe("StateAwaiter", func() {
}

BeforeEach(func() {
awaiter = conditions.NewStateAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](time.Second)
awaiter = conditions.NewConditionAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](time.Second)
awaitedTask = nil
awaitErr = nil

Expand All @@ -56,65 +56,67 @@ var _ = Describe("StateAwaiter", func() {
Expect(k8sClient.Create(ctx, task)).To(Succeed())
})

Describe("AwaitState", func() {
JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitState(ctx, k8sClient, task, func(actualTask *korifiv1alpha1.CFTask) error {
if actualTask.Status.DropletRef.Name == "" {
return errors.New("droplet ref not set")
}
JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitCondition(ctx, k8sClient, task, korifiv1alpha1.TaskInitializedConditionType)
})

It("returns an error", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("condition Initialized not set yet")))
})

return nil
When("the condition becomes false", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
meta.SetStatusCondition(&cfTask.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionFalse,
Reason: "initialized",
ObservedGeneration: task.Generation,
})
})
})

It("returns an error as the desired state is never reached", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("droplet ref not set")))
It("returns an error", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("expected the Initialized condition to be true")))
})
})

When("the desired state is reached", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
cfTask.Status.DropletRef.Name = "some-droplet"
When("the condition becomes true", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
meta.SetStatusCondition(&cfTask.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionTrue,
Reason: "initialized",
ObservedGeneration: task.Generation,
})
})

It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())

Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(awaitedTask.Status.DropletRef.Name).To(Equal("some-droplet"))
})
})
})

Describe("AwaitCondition", func() {
JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitCondition(ctx, k8sClient, task, korifiv1alpha1.TaskInitializedConditionType)
})
It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())

It("returns an error as the condition never becomes true", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("expected the Initialized condition to be true")))
Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(meta.IsStatusConditionTrue(awaitedTask.Status.Conditions, korifiv1alpha1.TaskInitializedConditionType)).To(BeTrue())
})
})

When("the condition becomes true", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
meta.SetStatusCondition(&cfTask.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionTrue,
Reason: "initialized",
})
When("the condition becomes true but is outdated", func() {
BeforeEach(func() {
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(task), task)).To(Succeed())
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
meta.SetStatusCondition(&cfTask.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionTrue,
Reason: "initialized",
ObservedGeneration: task.Generation - 1,
})
})
})

It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())

Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(meta.IsStatusConditionTrue(awaitedTask.Status.Conditions, korifiv1alpha1.TaskInitializedConditionType)).To(BeTrue())
})
It("returns an error", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("condition Initialized is outdated")))
})
})
})
22 changes: 0 additions & 22 deletions api/repositories/fakeawaiter/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,13 @@ import (
)

type FakeAwaiter[T conditions.RuntimeObjectWithStatusConditions, L any, PL conditions.ObjectList[L]] struct {
awaitStateCalls []struct {
obj client.Object
}
awaitConditionCalls []struct {
obj client.Object
conditionType string
}
AwaitStateStub func(context.Context, client.WithWatch, client.Object, func(T) error) (T, error)
AwaitConditionStub func(context.Context, client.WithWatch, client.Object, string) (T, error)
}

func (a *FakeAwaiter[T, L, PL]) AwaitState(ctx context.Context, k8sClient client.WithWatch, object client.Object, checkState func(T) error) (T, error) {
a.awaitStateCalls = append(a.awaitStateCalls, struct {
obj client.Object
}{
object,
})

return object.(T), nil
}

func (a *FakeAwaiter[T, L, PL]) AwaitStateCallCount() int {
return len(a.awaitStateCalls)
}

func (a *FakeAwaiter[T, L, PL]) AwaitStateArgsForCall(i int) client.Object {
return a.awaitStateCalls[i].obj
}

func (a *FakeAwaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
a.awaitConditionCalls = append(a.awaitConditionCalls, struct {
obj client.Object
Expand Down
1 change: 0 additions & 1 deletion api/repositories/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type RepositoryCreator interface {
}

type Awaiter[T runtime.Object] interface {
AwaitState(context.Context, client.WithWatch, client.Object, func(T) error) (T, error)
AwaitCondition(context.Context, client.WithWatch, client.Object, string) (T, error)
}

Expand Down
Loading

0 comments on commit 0a89343

Please sign in to comment.