Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error for unhealthy backend discovery during subresrource observeBackend calls #298

Merged
merged 7 commits into from
Nov 4, 2024
23 changes: 15 additions & 8 deletions internal/controller/bucket/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (l *ACLClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backen
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NoAction here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}
observationChan <- l.observeBackend(bucket, beName)
}()
}
Expand All @@ -57,14 +66,6 @@ func (l *ACLClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backen
func (l *ACLClient) observeBackend(bucket *v1alpha1.Bucket, backendName string) ResourceStatus {
l.log.Debug("Observing subresource acl on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning Updated.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return Updated
}

// If your bucket uses the bucket owner enforced setting for S3 Object
// Ownership, ACLs are disabled and no longer affect permissions.
if s3types.ObjectOwnership(aws.ToString(bucket.Spec.ForProvider.ObjectOwnership)) == s3types.ObjectOwnershipBucketOwnerEnforced {
Expand Down Expand Up @@ -92,6 +93,12 @@ func (l *ACLClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendName
ctx, span := otel.Tracer("").Start(ctx, "bucket.ACLClient.Handle")
defer span.End()

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

switch l.observeBackend(b, backendName) {
case NoAction, Updated:
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/bucket/consts.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package bucket

import "github.com/crossplane/crossplane-runtime/pkg/errors"

var errUnhealthyBackend = errors.New("backend marked as unhealthy in backendstore")

const (
// k8s error messages.
errNotBucket = "managed resource is not a Bucket custom resource"
Expand Down
25 changes: 16 additions & 9 deletions internal/controller/bucket/lifecycleconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (l *LifecycleConfigurationClient) Observe(ctx context.Context, bucket *v1al
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NoAction here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := l.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -80,18 +90,9 @@ func (l *LifecycleConfigurationClient) Observe(ctx context.Context, bucket *v1al
return Updated, nil
}

//nolint:gocyclo,cyclop // Function requires multiple checks.
func (l *LifecycleConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
l.log.Debug("Observing subresource lifecycle configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return NoAction, nil
}

s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -149,6 +150,12 @@ func (l *LifecycleConfigurationClient) Handle(ctx context.Context, b *v1alpha1.B
ctx, span := otel.Tracer("").Start(ctx, "bucket.LifecycleConfigurationClient.Handle")
defer span.End()

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := l.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandleLifecycleConfig)
Expand Down
25 changes: 25 additions & 0 deletions internal/controller/bucket/lifecycleconfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,31 @@ func TestHandle(t *testing.T) {
args args
want want
}{
"Unhealthy backend": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
},
Spec: v1alpha1.BucketSpec{
LifecycleConfigurationDisabled: false,
},
},
backendName: beName,
},
want: want{
err: errUnhealthyBackend,
},
},
"Lifecycle config deletes successfully": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/bucket/objectlockconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func (l *ObjectLockConfigurationClient) Observe(ctx context.Context, bucket *v1a
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NoAction here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := l.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -88,14 +98,6 @@ func (l *ObjectLockConfigurationClient) Observe(ctx context.Context, bucket *v1a
func (l *ObjectLockConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
l.log.Debug("Observing subresource object lock configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return NoAction, nil
}

s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -129,6 +131,12 @@ func (l *ObjectLockConfigurationClient) Handle(ctx context.Context, b *v1alpha1.
return nil
}

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := l.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandleVersioningConfig)
Expand Down
66 changes: 35 additions & 31 deletions internal/controller/bucket/objectlockconfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,6 @@ func TestObjectLockConfigObserveBackend(t *testing.T) {
args args
want want
}{
"Attempt to observe object lock config on unhealthy backend (consider it NoAction to unblock)": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}

bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: "bucket",
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockConfiguration: &v1alpha1.ObjectLockConfiguration{
ObjectLockEnabled: &objLockEnabled,
},
},
},
},
backendName: "s3-backend-1",
},
want: want{
status: NoAction,
err: nil,
},
},
"External error getting object lock": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down Expand Up @@ -265,6 +234,41 @@ func TestObjectLockConfigurationHandle(t *testing.T) {
args args
want want
}{
"Unhealthy backend": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend(beName, &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockEnabledForBucket: &enabledTrue,
ObjectLockConfiguration: &v1alpha1.ObjectLockConfiguration{
ObjectLockEnabled: &objLockEnabled,
Rule: &v1alpha1.ObjectLockRule{
DefaultRetention: &v1alpha1.DefaultRetention{
Mode: v1alpha1.ModeGovernance,
},
},
},
},
},
},
backendName: beName,
},
want: want{
err: errUnhealthyBackend,
},
},
"Object lock is not enabled for Bucket CR - nil value": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/bucket/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func (p *PolicyClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, bac
for _, backendName := range backendNames {
beName := backendName
go func() {
if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NoAction here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := p.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -77,14 +87,6 @@ func (p *PolicyClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, bac
func (p *PolicyClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
p.log.Debug("Observing subresource policy on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning Updated.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return Updated, nil
}

s3Client, err := p.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -131,6 +133,12 @@ func (p *PolicyClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendNa
ctx, span := otel.Tracer("").Start(ctx, "bucket.PolicyClient.Handle")
defer span.End()

if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := p.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandlePolicy)
Expand Down
1 change: 0 additions & 1 deletion internal/controller/bucket/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestPolicyObserveBackend(t *testing.T) {
status: Updated,
},
},

"s3 error": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
25 changes: 17 additions & 8 deletions internal/controller/bucket/versioningconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (l *VersioningConfigurationClient) Observe(ctx context.Context, bucket *v1a
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NoAction here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := l.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -83,18 +93,11 @@ func (l *VersioningConfigurationClient) Observe(ctx context.Context, bucket *v1a
func (l *VersioningConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
l.log.Debug("Observing subresource versioning configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return NoAction, nil
}

s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
}

response, err := rgw.GetBucketVersioning(ctx, s3Client, aws.String(bucket.Name))
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -144,6 +147,12 @@ func (l *VersioningConfigurationClient) Handle(ctx context.Context, b *v1alpha1.
ctx, span := otel.Tracer("").Start(ctx, "bucket.VersioningConfigurationClient.Handle")
defer span.End()

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := l.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandleVersioningConfig)
Expand Down
27 changes: 27 additions & 0 deletions internal/controller/bucket/versioningconfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,33 @@ func TestVersioningConfigurationHandle(t *testing.T) {
args args
want want
}{
"Unhealthy backend": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockEnabledForBucket: &enabledTrue,
},
},
},
backendName: beName,
},
want: want{
err: errUnhealthyBackend,
},
},
"Object lock enabled for bucket but no versioning config so set default enabled versioning": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
Loading