diff --git a/cmd/smith/app/BUILD.bazel b/cmd/smith/app/BUILD.bazel index 5ea798e..85186d1 100644 --- a/cmd/smith/app/BUILD.bazel +++ b/cmd/smith/app/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/smith/v1:go_default_library", + "//pkg/catalog:go_default_library", "//pkg/cleanup:go_default_library", "//pkg/cleanup/types:go_default_library", "//pkg/client:go_default_library", diff --git a/cmd/smith/app/app.go b/cmd/smith/app/app.go index 2d96603..c082b28 100644 --- a/cmd/smith/app/app.go +++ b/cmd/smith/app/app.go @@ -10,6 +10,7 @@ import ( "time" smith_v1 "github.com/atlassian/smith/pkg/apis/smith/v1" + "github.com/atlassian/smith/pkg/catalog" "github.com/atlassian/smith/pkg/cleanup" clean_types "github.com/atlassian/smith/pkg/cleanup/types" "github.com/atlassian/smith/pkg/client" @@ -117,6 +118,10 @@ func (a *App) Run(ctx context.Context) error { if err != nil { return err } + var catalogger *catalog.Catalog + if a.ServiceCatalogSupport { + catalogger = catalog.NewCatalog(scClient, a.ResyncPeriod) + } // Ready Checker readyTypes := []map[schema.GroupKind]readychecker.IsObjectReady{ready_types.MainKnownTypes} @@ -187,6 +192,7 @@ func (a *App) Run(ctx context.Context) error { Namespace: a.Namespace, PluginContainers: pluginContainers, Scheme: scheme, + Catalog: catalogger, } cntrlr.Prepare(ctx, crdInf, resourceInfs) @@ -198,13 +204,19 @@ func (a *App) Run(ctx context.Context) error { defer stgr.Shutdown() stage := stgr.NextStage() - infs := make([]cache.InformerSynced, 0, len(resourceInfs)) + infs := make([]cache.InformerSynced, 0) // Add all informers to Multi store and start them for gvk, inf := range resourceInfs { multiStore.AddInformer(gvk, inf) stage.StartWithChannel(inf.Run) // Must be after AddInformer() infs = append(infs, inf.HasSynced) } + if catalogger != nil { + for _, inf := range catalogger.InformersToRegister() { + stage.StartWithChannel(inf.Run) + infs = append(infs, inf.HasSynced) + } + } a.Logger.Info("Waiting for informers to sync") if !cache.WaitForCacheSync(ctx.Done(), infs...) { return ctx.Err() diff --git a/pkg/catalog/BUILD.bazel b/pkg/catalog/BUILD.bazel new file mode 100644 index 0000000..40b0525 --- /dev/null +++ b/pkg/catalog/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["catalog.go"], + importpath = "github.com/atlassian/smith/pkg/catalog", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/kubernetes-incubator/service-catalog/pkg/apis/servicecatalog/v1beta1:go_default_library", + "//vendor/github.com/kubernetes-incubator/service-catalog/pkg/client/clientset_generated/clientset:go_default_library", + "//vendor/github.com/kubernetes-incubator/service-catalog/pkg/client/informers_generated/externalversions/servicecatalog/v1beta1:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + ], +) diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go new file mode 100644 index 0000000..f692250 --- /dev/null +++ b/pkg/catalog/catalog.go @@ -0,0 +1,127 @@ +// Package catalog handles interacting with the OSB catalog endpoint +// (i.e. informers/helpers for ClusterServiceClass and ClusterServicePlan) +package catalog + +import ( + "time" + + sc_v1b1 "github.com/kubernetes-incubator/service-catalog/pkg/apis/servicecatalog/v1beta1" + clientset "github.com/kubernetes-incubator/service-catalog/pkg/client/clientset_generated/clientset" + sc_v1b1inf "github.com/kubernetes-incubator/service-catalog/pkg/client/informers_generated/externalversions/servicecatalog/v1beta1" + "github.com/pkg/errors" + "k8s.io/client-go/tools/cache" +) + +const ( + serviceClassExternalNameIndex = "ServiceClassExternalNameIndex" + serviceClassAndPlanExternalNameIndex = "ServiceClassAndPlanExternalNameIndex" +) + +// Catalog is a convenience interface to access OSB catalog information +type Catalog struct { + serviceClassInf cache.SharedIndexInformer + servicePlanInf cache.SharedIndexInformer +} + +func NewCatalog(scClient clientset.Interface, resyncPeriod time.Duration) *Catalog { + return &Catalog{ + serviceClassInf: sc_v1b1inf.NewClusterServiceClassInformer(scClient, resyncPeriod, cache.Indexers{ + serviceClassExternalNameIndex: func(obj interface{}) ([]string, error) { + serviceClass := obj.(*sc_v1b1.ClusterServiceClass) + return []string{serviceClass.Spec.ExternalName}, nil + }, + }), + servicePlanInf: sc_v1b1inf.NewClusterServicePlanInformer(scClient, resyncPeriod, cache.Indexers{ + serviceClassAndPlanExternalNameIndex: func(obj interface{}) ([]string, error) { + servicePlan := obj.(*sc_v1b1.ClusterServicePlan) + return []string{serviceClassAndPlanExternalNameIndexKey(servicePlan.Spec.ClusterServiceClassRef.Name, servicePlan.Spec.ExternalName)}, nil + }, + }), + } +} + +func serviceClassAndPlanExternalNameIndexKey(serviceClassName string, servicePlanExternalName string) string { + return serviceClassName + "/" + servicePlanExternalName +} + +func (c *Catalog) InformersToRegister() []cache.SharedIndexInformer { + return []cache.SharedIndexInformer{c.servicePlanInf, c.serviceClassInf} +} + +func (c *Catalog) GetClassOf(serviceInstanceSpec *sc_v1b1.ServiceInstanceSpec) (*sc_v1b1.ClusterServiceClass, error) { + if serviceInstanceSpec.ClusterServiceClassName == "" && serviceInstanceSpec.ClusterServiceClassExternalName == "" { + return nil, errors.Errorf("ServiceInstance must have at least ClusterServiceClassName or ClusterServiceExternalName") + } + if serviceInstanceSpec.ClusterServiceClassName != "" && serviceInstanceSpec.ClusterServiceClassExternalName != "" { + // Not sure if this is true. Maybe ok if they match? But silly. + return nil, errors.Errorf("ServiceInstance must have only one of ClusterServiceClassName or ClusterServiceExternalName") + } + + if serviceInstanceSpec.ClusterServiceClassName != "" { + item, exists, err := c.serviceClassInf.GetIndexer().GetByKey(serviceInstanceSpec.ClusterServiceClassName) + if err != nil { + return nil, errors.WithStack(err) + } + if !exists { + return nil, errors.Errorf("ServiceInstance refers to non-existant ClusterServiceClass %q", serviceInstanceSpec.ClusterServiceClassName) + } + return item.(*sc_v1b1.ClusterServiceClass), nil + } + + items, err := c.serviceClassInf.GetIndexer().ByIndex(serviceClassExternalNameIndex, serviceInstanceSpec.ClusterServiceClassExternalName) + if err != nil { + return nil, errors.WithStack(err) + } + switch len(items) { + case 0: + return nil, errors.Errorf("ServiceInstance refers to non-existant ClusterServiceClass %q", serviceInstanceSpec.ClusterServiceClassExternalName) + case 1: + return items[0].(*sc_v1b1.ClusterServiceClass), nil + default: + return nil, errors.Errorf("informer reported multiple instances for ClusterServiceClass %q", serviceInstanceSpec.ClusterServiceClassExternalName) + } + +} + +func (c *Catalog) GetPlanOf(serviceInstanceSpec *sc_v1b1.ServiceInstanceSpec) (*sc_v1b1.ClusterServicePlan, error) { + if serviceInstanceSpec.ClusterServicePlanName == "" && serviceInstanceSpec.ClusterServicePlanExternalName == "" { + // TODO actually, this is ok according to Service Catalog if there is only one plan. + // This is annoying to do well, though, because I think we have to setup another index. + return nil, errors.Errorf("ServiceInstance must have at least ClusterServicePlanName or ClusterServiceExternalName") + } + if serviceInstanceSpec.ClusterServicePlanName != "" && serviceInstanceSpec.ClusterServicePlanExternalName != "" { + // Not sure if this is true. Maybe ok if they match? But silly. + return nil, errors.Errorf("ServiceInstance must have only one of ClusterServicePlanName or ClusterServiceExternalName") + } + + if serviceInstanceSpec.ClusterServicePlanName != "" { + item, exists, err := c.servicePlanInf.GetIndexer().GetByKey(serviceInstanceSpec.ClusterServicePlanName) + if err != nil { + return nil, errors.WithStack(err) + } + if !exists { + return nil, errors.Errorf("ServiceInstance refers to non-existant plan %q", serviceInstanceSpec.ClusterServicePlanName) + } + return item.(*sc_v1b1.ClusterServicePlan), nil + } + + // If we don't have the plan UUID, we need to look up the class to find its UUID + serviceClass, err := c.GetClassOf(serviceInstanceSpec) + if err != nil { + return nil, err + } + + planKey := serviceClassAndPlanExternalNameIndexKey(serviceClass.Name, serviceInstanceSpec.ClusterServicePlanExternalName) + items, err := c.servicePlanInf.GetIndexer().ByIndex(serviceClassAndPlanExternalNameIndex, planKey) + if err != nil { + return nil, errors.WithStack(err) + } + switch len(items) { + case 0: + return nil, errors.Errorf("ServiceInstance refers to non-existant ClusterServicePlan %q", planKey) + case 1: + return items[0].(*sc_v1b1.ClusterServicePlan), nil + default: + return nil, errors.Errorf("informer reported multiple instances for ClusterServicePlan %q", planKey) + } +} diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 971e5bc..abbb6eb 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -19,6 +19,7 @@ go_library( deps = [ "//:go_default_library", "//pkg/apis/smith/v1:go_default_library", + "//pkg/catalog:go_default_library", "//pkg/client/clientset_generated/clientset/typed/smith/v1:go_default_library", "//pkg/plugin:go_default_library", "//pkg/resources:go_default_library", diff --git a/pkg/controller/bundle_sync_task.go b/pkg/controller/bundle_sync_task.go index 2ccfbaa..89b6037 100644 --- a/pkg/controller/bundle_sync_task.go +++ b/pkg/controller/bundle_sync_task.go @@ -5,6 +5,7 @@ import ( "fmt" smith_v1 "github.com/atlassian/smith/pkg/apis/smith/v1" + "github.com/atlassian/smith/pkg/catalog" smithClient_v1 "github.com/atlassian/smith/pkg/client/clientset_generated/clientset/typed/smith/v1" "github.com/atlassian/smith/pkg/plugin" "github.com/atlassian/smith/pkg/util/graph" @@ -30,6 +31,7 @@ type bundleSyncTask struct { processedResources map[smith_v1.ResourceName]*resourceInfo pluginContainers map[smith_v1.PluginName]plugin.PluginContainer scheme *runtime.Scheme + catalog *catalog.Catalog } // Parse bundle, build resource graph, traverse graph, assert each resource exists. @@ -74,6 +76,7 @@ func (st *bundleSyncTask) process() (retriableError bool, e error) { processedResources: st.processedResources, pluginContainers: st.pluginContainers, scheme: st.scheme, + catalog: st.catalog, } resInfo := rst.processResource(&res) if retriable, err := resInfo.fetchError(); err != nil && api_errors.IsConflict(errors.Cause(err)) { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e4393ce..cf7b039 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,6 +6,7 @@ import ( "time" smith_v1 "github.com/atlassian/smith/pkg/apis/smith/v1" + "github.com/atlassian/smith/pkg/catalog" smithClient_v1 "github.com/atlassian/smith/pkg/client/clientset_generated/clientset/typed/smith/v1" "github.com/atlassian/smith/pkg/plugin" "github.com/atlassian/smith/pkg/util/logz" @@ -53,6 +54,8 @@ type BundleController struct { PluginContainers map[smith_v1.PluginName]plugin.PluginContainer Scheme *runtime.Scheme + + Catalog *catalog.Catalog } // Prepare prepares the controller to be run. diff --git a/pkg/controller/controller_worker.go b/pkg/controller/controller_worker.go index a374632..497da83 100644 --- a/pkg/controller/controller_worker.go +++ b/pkg/controller/controller_worker.go @@ -106,6 +106,7 @@ func (c *BundleController) ProcessKey(logger *zap.Logger, key string) (retriable bundle: bundleObj.(*smith_v1.Bundle).DeepCopy(), // Deep-copy otherwise we are mutating our cache. pluginContainers: c.PluginContainers, scheme: c.Scheme, + catalog: c.Catalog, } retriable, err := st.process() return st.handleProcessResult(retriable, err) diff --git a/pkg/controller/resource_sync_task.go b/pkg/controller/resource_sync_task.go index 4fe7377..71b0b03 100644 --- a/pkg/controller/resource_sync_task.go +++ b/pkg/controller/resource_sync_task.go @@ -3,6 +3,7 @@ package controller import ( "github.com/atlassian/smith" smith_v1 "github.com/atlassian/smith/pkg/apis/smith/v1" + "github.com/atlassian/smith/pkg/catalog" "github.com/atlassian/smith/pkg/plugin" "github.com/atlassian/smith/pkg/util" "github.com/atlassian/smith/pkg/util/logz" @@ -73,11 +74,22 @@ type resourceSyncTask struct { processedResources map[smith_v1.ResourceName]*resourceInfo pluginContainers map[smith_v1.PluginName]plugin.PluginContainer scheme *runtime.Scheme + catalog *catalog.Catalog } func (st *resourceSyncTask) processResource(res *smith_v1.Resource) resourceInfo { st.logger.Debug("Processing resource") + // Do as much prevalidation of the spec as we can before dependencies are resolved. + // (e.g. plugin/service instance/service binding schemas) + if err := st.prevalidate(res); err != nil { + return resourceInfo{ + status: resourceStatusError{ + err: err, + }, + } + } + // Check if all resource dependencies are ready (so we can start processing this one) notReadyDependencies := st.checkAllDependenciesAreReady(res) if len(notReadyDependencies) > 0 { @@ -263,6 +275,39 @@ func (st *resourceSyncTask) getActualObject(res *smith_v1.Resource) (runtime.Obj return actual, nil } +// prevalidate does as much validation as possible before doing any real work. +func (st *resourceSyncTask) prevalidate(res *smith_v1.Resource) error { + serviceInstanceGvk := sc_v1b1.SchemeGroupVersion.WithKind("ServiceInstance") + if res.Spec.Object != nil { + if res.Spec.Object.GetObjectKind().GroupVersionKind() == serviceInstanceGvk { + if st.catalog == nil { + // can't do anything, since service catalog wasn't enabled. + return nil + } + actual, err := st.scheme.ConvertToVersion(res.Spec.Object, serviceInstanceGvk.GroupVersion()) + if err != nil { + return errors.WithStack(err) + } + serviceInstance := actual.(*sc_v1b1.ServiceInstance) + servicePlan, err := st.catalog.GetPlanOf(&serviceInstance.Spec) + if err != nil { + return err + } + + // TODO preprocess and actually validate + // performance implications of this may be horrifying... + _ = servicePlan.Spec.ServiceInstanceCreateParameterSchema + } + // TODO validate service binding parameters + return nil + } else if res.Spec.Plugin != nil { + // TODO validate plugin against schema + return nil + } else { + return nil + } +} + // evalSpec evaluates the resource specification and returns the result. func (st *resourceSyncTask) evalSpec(res *smith_v1.Resource, actual runtime.Object) (*unstructured.Unstructured, error) { // Process the spec diff --git a/pkg/controller_test/BUILD.bazel b/pkg/controller_test/BUILD.bazel index 4f2f153..6cb5296 100644 --- a/pkg/controller_test/BUILD.bazel +++ b/pkg/controller_test/BUILD.bazel @@ -25,6 +25,7 @@ go_test( "//examples/sleeper:go_default_library", "//examples/sleeper/pkg/apis/sleeper/v1:go_default_library", "//pkg/apis/smith/v1:go_default_library", + "//pkg/catalog:go_default_library", "//pkg/cleanup:go_default_library", "//pkg/cleanup/types:go_default_library", "//pkg/client:go_default_library", diff --git a/pkg/controller_test/no_actions_for_blocked_resources_test.go b/pkg/controller_test/no_actions_for_blocked_resources_test.go index 54478c7..79f256a 100644 --- a/pkg/controller_test/no_actions_for_blocked_resources_test.go +++ b/pkg/controller_test/no_actions_for_blocked_resources_test.go @@ -48,6 +48,7 @@ func TestNoActionsForBlockedResources(t *testing.T) { ObjectMeta: meta_v1.ObjectMeta{ Name: si1, }, + Spec: serviceInstanceSpec, }, }, }, diff --git a/pkg/controller_test/no_deletions_while_in_progress_test.go b/pkg/controller_test/no_deletions_while_in_progress_test.go index cf17887..82d575c 100644 --- a/pkg/controller_test/no_deletions_while_in_progress_test.go +++ b/pkg/controller_test/no_deletions_while_in_progress_test.go @@ -39,6 +39,7 @@ func TestNoDeletionsWhileInProgress(t *testing.T) { ObjectMeta: meta_v1.ObjectMeta{ Name: si1, }, + Spec: serviceInstanceSpec, }, }, }, diff --git a/pkg/controller_test/plugin_spec_processed_test.go b/pkg/controller_test/plugin_spec_processed_test.go index b55b1dc..3a20ed9 100644 --- a/pkg/controller_test/plugin_spec_processed_test.go +++ b/pkg/controller_test/plugin_spec_processed_test.go @@ -69,6 +69,7 @@ func TestPluginSpecProcessed(t *testing.T) { ObjectMeta: meta_v1.ObjectMeta{ Name: si1, }, + Spec: serviceInstanceSpec, }, }, }, diff --git a/pkg/controller_test/processing_continues_after_error_test.go b/pkg/controller_test/processing_continues_after_error_test.go index e7a2b94..d745a84 100644 --- a/pkg/controller_test/processing_continues_after_error_test.go +++ b/pkg/controller_test/processing_continues_after_error_test.go @@ -49,6 +49,7 @@ func TestProcessingContinuesAfterNonBlockingError(t *testing.T) { ObjectMeta: meta_v1.ObjectMeta{ Name: si1, }, + Spec: serviceInstanceSpec, }, }, }, diff --git a/pkg/controller_test/resolve_binding_secret_references_test.go b/pkg/controller_test/resolve_binding_secret_references_test.go index a155980..e4823e7 100644 --- a/pkg/controller_test/resolve_binding_secret_references_test.go +++ b/pkg/controller_test/resolve_binding_secret_references_test.go @@ -76,6 +76,7 @@ func TestResolveBindingSecretReferences(t *testing.T) { }, }, }, + Spec: serviceInstanceSpec, Status: sc_v1b1.ServiceInstanceStatus{ Conditions: []sc_v1b1.ServiceInstanceCondition{ { @@ -105,6 +106,7 @@ func TestResolveBindingSecretReferences(t *testing.T) { ObjectMeta: meta_v1.ObjectMeta{ Name: si1, }, + Spec: serviceInstanceSpec, }, }, }, @@ -148,6 +150,7 @@ func TestResolveBindingSecretReferences(t *testing.T) { "Secret": "{{" + resSb1 + ":bindsecret#Data.mysecret}}", }, }, + Spec: serviceInstanceSpec, }, }, }, diff --git a/pkg/controller_test/zz_plumbing_for_test.go b/pkg/controller_test/zz_plumbing_for_test.go index 643a28c..e8a1187 100644 --- a/pkg/controller_test/zz_plumbing_for_test.go +++ b/pkg/controller_test/zz_plumbing_for_test.go @@ -13,6 +13,7 @@ import ( "github.com/atlassian/smith/examples/sleeper" sleeper_v1 "github.com/atlassian/smith/examples/sleeper/pkg/apis/sleeper/v1" smith_v1 "github.com/atlassian/smith/pkg/apis/smith/v1" + "github.com/atlassian/smith/pkg/catalog" "github.com/atlassian/smith/pkg/cleanup" clean_types "github.com/atlassian/smith/pkg/cleanup/types" "github.com/atlassian/smith/pkg/client" @@ -123,6 +124,20 @@ const ( pluginSimpleConfigMap = "simpleConfigMap" pluginConfigMapWithDeps = "configMapWithDeps" + + serviceClassName = "uid-1" + serviceClassExternalName = "database" + servicePlanName = "uid-2" + servicePlanExternalName = "default" +) + +var ( + serviceInstanceSpec = sc_v1b1.ServiceInstanceSpec{ + PlanReference: sc_v1b1.PlanReference{ + ClusterServiceClassExternalName: serviceClassExternalName, + ClusterServicePlanExternalName: servicePlanExternalName, + }, + } ) func (tc *testCase) run(t *testing.T) { @@ -148,13 +163,49 @@ func (tc *testCase) run(t *testing.T) { crdClient.AddReactor(reactor.verb, reactor.resource, reactor.reactor(t)) } var scClient scClientset.Interface + var catalogger *catalog.Catalog if tc.enableServiceCatalog { - scClientFake := scFake.NewSimpleClientset(tc.scClientObjects...) + scClientFake := scFake.NewSimpleClientset(append( + tc.scClientObjects, + []runtime.Object{ + &sc_v1b1.ClusterServiceClass{ + TypeMeta: meta_v1.TypeMeta{ + Kind: "ClusterServiceClass", + APIVersion: sc_v1b1.SchemeGroupVersion.String(), + }, + ObjectMeta: meta_v1.ObjectMeta{ + Name: serviceClassName, + }, + Spec: sc_v1b1.ClusterServiceClassSpec{ + ExternalName: serviceClassExternalName, + ExternalID: serviceClassName, + }, + }, + &sc_v1b1.ClusterServicePlan{ + TypeMeta: meta_v1.TypeMeta{ + Kind: "ClusterServicePlan", + APIVersion: sc_v1b1.SchemeGroupVersion.String(), + }, + ObjectMeta: meta_v1.ObjectMeta{ + Name: servicePlanName, + }, + Spec: sc_v1b1.ClusterServicePlanSpec{ + ClusterServiceClassRef: sc_v1b1.ClusterObjectReference{ + Name: serviceClassName, + }, + ExternalName: servicePlanExternalName, + ExternalID: servicePlanName, + }, + }, + }...)..., + ) + tc.scFake = &scClientFake.Fake scClient = scClientFake for _, reactor := range tc.scReactors { scClientFake.AddReactor(reactor.verb, reactor.resource, reactor.reactor(t)) } + catalogger = catalog.NewCatalog(scClient, 0) } crdInf := apiext_v1b1inf.NewCustomResourceDefinitionInformer(crdClient, 0, cache.Indexers{}) @@ -247,6 +298,7 @@ func (tc *testCase) run(t *testing.T) { Namespace: tc.namespace, PluginContainers: pluginContainers, Scheme: scheme, + Catalog: catalogger, } prepare := func(ctx context.Context) { cntrlr.Prepare(ctx, crdInf, resourceInfs) @@ -260,6 +312,12 @@ func (tc *testCase) run(t *testing.T) { stage.StartWithChannel(inf.Run) infs = append(infs, inf.HasSynced) } + if catalogger != nil { + for _, inf := range catalogger.InformersToRegister() { + stage.StartWithChannel(inf.Run) + infs = append(infs, inf.HasSynced) + } + } require.True(t, cache.WaitForCacheSync(ctx.Done(), infs...)) } @@ -635,6 +693,7 @@ func serviceInstance(ready, inProgress, error bool) *sc_v1b1.ServiceInstance { }, }, Status: status, + Spec: serviceInstanceSpec, } }