diff --git a/pkg/controller/external.go b/pkg/controller/external.go index daf9cad0..8ced921c 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -9,6 +9,7 @@ import ( "time" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/pkg/errors" @@ -47,11 +48,10 @@ func WithCallbackProvider(ac CallbackProvider) Option { } } -// WithProviderScheduler sets the native Terraform provider scheduler to be used -// by a Connector. -func WithProviderScheduler(s terraform.ProviderScheduler) Option { +// WithLogger configures a logger for the Connector. +func WithLogger(l logging.Logger) Option { return func(c *Connector) { - c.providerScheduler = s + c.logger = l } } @@ -62,7 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi getTerraformSetup: sf, store: ws, config: cfg, - providerScheduler: terraform.NewNoOpProviderScheduler(), + logger: logging.NewNopLogger(), } for _, f := range opts { f(c) @@ -78,8 +78,7 @@ type Connector struct { getTerraformSetup terraform.SetupFn config *config.Resource callback CallbackProvider - providerScheduler terraform.ProviderScheduler - providerHandle terraform.ProviderHandle + logger logging.Logger } // Connect makes sure the underlying client is ready to issue requests to the @@ -94,49 +93,51 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed if err != nil { return nil, errors.Wrap(err, errGetTerraformSetup) } - if ts.Scheduler != nil { - c.providerScheduler = ts.Scheduler - } ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) if err != nil { return nil, errors.Wrap(err, errGetWorkspace) } - if err := c.scheduleProvider(ws); err != nil { - return nil, errors.Wrap(err, errScheduleProvider) - } return &external{ - workspace: ws, - config: c.config, - callback: c.callback, + workspace: ws, + config: c.config, + callback: c.callback, + providerScheduler: ts.Scheduler, + providerHandle: ws.ProviderHandle, + logger: c.logger.WithValues("uid", mg.GetUID()), }, nil } -func (c *Connector) scheduleProvider(ws *terraform.Workspace) error { - if c.providerScheduler == nil || ws == nil { +type external struct { + workspace Workspace + config *config.Resource + callback CallbackProvider + providerScheduler terraform.ProviderScheduler + providerHandle terraform.ProviderHandle + logger logging.Logger +} + +func (e *external) scheduleProvider() error { + if e.providerScheduler == nil || e.workspace == nil { return nil } - inuse, attachmentConfig, err := c.providerScheduler.Start(ws.ProviderHandle) + inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle) if err != nil { - return errors.Wrap(err, "cannot schedule a shared provider for the workspace") + return errors.Wrap(err, errScheduleProvider) + } + if ps, ok := e.workspace.(ProviderSharer); ok { + ps.UseProvider(inuse, attachmentConfig) } - ws.UseSharedProvider(inuse, attachmentConfig) - c.providerHandle = ws.ProviderHandle return nil } -// Disconnect releases any resources held by the Connector. -func (c *Connector) Disconnect(_ context.Context) error { - if c.providerScheduler == nil { - return nil +func (e *external) stopProvider() { + if e.providerScheduler == nil { + return + } + if err := e.providerScheduler.Stop(e.providerHandle); err != nil { + e.logger.Info("ExternalClient failed to stop the native provider", "error", err) } - return errors.Wrap(c.providerScheduler.Stop(c.providerHandle), "cannot stop the shared provider for the workspace") -} - -type external struct { - workspace Workspace - config *config.Resource - callback CallbackProvider } func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo @@ -144,6 +145,10 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. // and serial. // TODO(muvaf): Look for ways to reduce the cyclomatic complexity without // increasing the difficulty of understanding the flow. + if err := e.scheduleProvider(); err != nil { + return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID()) + } + defer e.stopProvider() tr, ok := mg.(resource.Terraformed) if !ok { return managed.ExternalObservation{}, errors.New(errUnexpectedObject) @@ -258,6 +263,10 @@ func addTTR(mg xpresource.Managed) { } func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -285,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -304,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error { + if err := e.scheduleProvider(); err != nil { + return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy) } diff --git a/pkg/controller/interfaces.go b/pkg/controller/interfaces.go index e0af8141..d8978e57 100644 --- a/pkg/controller/interfaces.go +++ b/pkg/controller/interfaces.go @@ -26,6 +26,11 @@ type Workspace interface { Plan(context.Context) (terraform.PlanResult, error) } +// ProviderSharer shares a native provider process with the receiver. +type ProviderSharer interface { + UseProvider(inuse terraform.InUse, attachmentConfig string) +} + // Store is where we can get access to the Terraform workspace of given resource. type Store interface { Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error) diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index cfc422c9..e3279001 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { } r := managed.NewReconciler(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), - managed.WithExternalConnectDisconnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], + managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), {{- if .UseAsync }} tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))), {{- end}} diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index 3ecfebc4..d29446b4 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -165,6 +165,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig) return sr.reattachConfig, nil } + log.Debug("Provider runner not yet started. Will fork a new native provider.") errCh := make(chan error, 1) reattachCh := make(chan string, 1) sr.stopCh = make(chan bool, 1) @@ -189,6 +190,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo errCh <- err return } + log.Debug("Forked new native provider.") scanner := bufio.NewScanner(stdout) for scanner.Scan() { t := scanner.Text() @@ -211,6 +213,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo errCh <- err case <-sr.stopCh: cmd.Stop() + log.Debug("Stopped the provider runner.") } }() @@ -229,6 +232,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo func (sr *SharedProvider) Stop() error { sr.mu.Lock() defer sr.mu.Unlock() + sr.logger.Debug("Attempting to stop the provider runner.") if sr.stopCh == nil { return errors.New("shared provider process not started yet") } diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go index 98abfdf1..578c8fe0 100644 --- a/pkg/terraform/provider_scheduler.go +++ b/pkg/terraform/provider_scheduler.go @@ -215,7 +215,7 @@ func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOptio } func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { - s.logger.Debug("Starting workspace scoped shared provider runner.", "handle", h) + s.logger.Debug("Starting workspace scoped provider runner.", "handle", h) reattachConfig, err := s.runner.Start() return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner") } diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index 62e2375e..ae560593 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -151,7 +151,8 @@ type Workspace struct { filterFn func(string) string } -func (w *Workspace) UseSharedProvider(inuse InUse, attachmentConfig string) { +// UseProvider shares a native provider with the receiver Workspace. +func (w *Workspace) UseProvider(inuse InUse, attachmentConfig string) { w.mu.Lock() defer w.mu.Unlock() // remove existing reattach configs