Skip to content

Commit

Permalink
Add more logging to terraform.ProviderRunner
Browse files Browse the repository at this point in the history
Signed-off-by: Alper Rifat Ulucinar <[email protected]>
  • Loading branch information
ulucinar committed Mar 24, 2023
1 parent 1fd1238 commit 9426eae
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 36 deletions.
83 changes: 50 additions & 33 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
Expand Down Expand Up @@ -47,11 +48,10 @@ func WithCallbackProvider(ac CallbackProvider) Option {
}
}

// WithProviderScheduler sets the native Terraform provider scheduler to be used
// by a Connector.
func WithProviderScheduler(s terraform.ProviderScheduler) Option {
// WithLogger configures a logger for the Connector.
func WithLogger(l logging.Logger) Option {
return func(c *Connector) {
c.providerScheduler = s
c.logger = l
}
}

Expand All @@ -62,7 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi
getTerraformSetup: sf,
store: ws,
config: cfg,
providerScheduler: terraform.NewNoOpProviderScheduler(),
logger: logging.NewNopLogger(),
}
for _, f := range opts {
f(c)
Expand All @@ -78,8 +78,7 @@ type Connector struct {
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

// Connect makes sure the underlying client is ready to issue requests to the
Expand All @@ -94,56 +93,62 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
if err != nil {
return nil, errors.Wrap(err, errGetTerraformSetup)
}
if ts.Scheduler != nil {
c.providerScheduler = ts.Scheduler
}

ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
if err != nil {
return nil, errors.Wrap(err, errGetWorkspace)
}
if err := c.scheduleProvider(ws); err != nil {
return nil, errors.Wrap(err, errScheduleProvider)
}
return &external{
workspace: ws,
config: c.config,
callback: c.callback,
workspace: ws,
config: c.config,
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
logger: c.logger.WithValues("uid", mg.GetUID()),
}, nil
}

func (c *Connector) scheduleProvider(ws *terraform.Workspace) error {
if c.providerScheduler == nil || ws == nil {
type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

func (e *external) scheduleProvider() error {
if e.providerScheduler == nil || e.workspace == nil {
return nil
}
inuse, attachmentConfig, err := c.providerScheduler.Start(ws.ProviderHandle)
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
return errors.Wrap(err, "cannot schedule a shared provider for the workspace")
return errors.Wrap(err, errScheduleProvider)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
ws.UseSharedProvider(inuse, attachmentConfig)
c.providerHandle = ws.ProviderHandle
return nil
}

// Disconnect releases any resources held by the Connector.
func (c *Connector) Disconnect(_ context.Context) error {
if c.providerScheduler == nil {
return nil
func (e *external) stopProvider() {
if e.providerScheduler == nil {
return
}
if err := e.providerScheduler.Stop(e.providerHandle); err != nil {
e.logger.Info("ExternalClient failed to stop the native provider", "error", err)
}
return errors.Wrap(c.providerScheduler.Stop(c.providerHandle), "cannot stop the shared provider for the workspace")
}

type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
}

func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo
// We skip the gocyclo check because most of the operations are straight-forward
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
if err := e.scheduleProvider(); err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
defer e.stopProvider()
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
Expand Down Expand Up @@ -258,6 +263,10 @@ func addTTR(mg xpresource.Managed) {
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand Down Expand Up @@ -285,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand All @@ -304,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
if err := e.scheduleProvider(); err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Workspace interface {
Plan(context.Context) (terraform.PlanResult, error)
}

// ProviderSharer shares a native provider process with the receiver.
type ProviderSharer interface {
UseProvider(inuse terraform.InUse, attachmentConfig string)
}

// Store is where we can get access to the Terraform workspace of given resource.
type Store interface {
Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
}
r := managed.NewReconciler(mgr,
xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind),
managed.WithExternalConnectDisconnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"],
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))),
{{- end}}
Expand Down
4 changes: 4 additions & 0 deletions pkg/terraform/provider_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig)
return sr.reattachConfig, nil
}
log.Debug("Provider runner not yet started. Will fork a new native provider.")
errCh := make(chan error, 1)
reattachCh := make(chan string, 1)
sr.stopCh = make(chan bool, 1)
Expand All @@ -189,6 +190,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
errCh <- err
return
}
log.Debug("Forked new native provider.")
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
t := scanner.Text()
Expand All @@ -211,6 +213,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
errCh <- err
case <-sr.stopCh:
cmd.Stop()
log.Debug("Stopped the provider runner.")
}
}()

Expand All @@ -229,6 +232,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
func (sr *SharedProvider) Stop() error {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.logger.Debug("Attempting to stop the provider runner.")
if sr.stopCh == nil {
return errors.New("shared provider process not started yet")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/terraform/provider_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOptio
}

func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) {
s.logger.Debug("Starting workspace scoped shared provider runner.", "handle", h)
s.logger.Debug("Starting workspace scoped provider runner.", "handle", h)
reattachConfig, err := s.runner.Start()
return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/terraform/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ type Workspace struct {
filterFn func(string) string
}

func (w *Workspace) UseSharedProvider(inuse InUse, attachmentConfig string) {
// UseProvider shares a native provider with the receiver Workspace.
func (w *Workspace) UseProvider(inuse InUse, attachmentConfig string) {
w.mu.Lock()
defer w.mu.Unlock()
// remove existing reattach configs
Expand Down

0 comments on commit 9426eae

Please sign in to comment.