diff --git a/pkg/controller/external.go b/pkg/controller/external.go index ae468afc..8ced921c 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -9,6 +9,7 @@ import ( "time" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/pkg/errors" @@ -32,6 +33,7 @@ const ( errApply = "cannot apply" errDestroy = "cannot destroy" errStatusUpdate = "cannot update status of custom resource" + errScheduleProvider = "cannot schedule native Terraform provider process" ) // Option allows you to configure Connector. @@ -46,6 +48,13 @@ func WithCallbackProvider(ac CallbackProvider) Option { } } +// WithLogger configures a logger for the Connector. +func WithLogger(l logging.Logger) Option { + return func(c *Connector) { + c.logger = l + } +} + // NewConnector returns a new Connector object. func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector { c := &Connector{ @@ -53,6 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi getTerraformSetup: sf, store: ws, config: cfg, + logger: logging.NewNopLogger(), } for _, f := range opts { f(c) @@ -68,6 +78,7 @@ type Connector struct { getTerraformSetup terraform.SetupFn config *config.Resource callback CallbackProvider + logger logging.Logger } // Connect makes sure the underlying client is ready to issue requests to the @@ -83,22 +94,50 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed return nil, errors.Wrap(err, errGetTerraformSetup) } - tf, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) + ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) if err != nil { return nil, errors.Wrap(err, errGetWorkspace) } - return &external{ - workspace: tf, - config: c.config, - callback: c.callback, + workspace: ws, + config: c.config, + callback: c.callback, + providerScheduler: ts.Scheduler, + providerHandle: ws.ProviderHandle, + logger: c.logger.WithValues("uid", mg.GetUID()), }, nil } type external struct { - workspace Workspace - config *config.Resource - callback CallbackProvider + workspace Workspace + config *config.Resource + callback CallbackProvider + providerScheduler terraform.ProviderScheduler + providerHandle terraform.ProviderHandle + logger logging.Logger +} + +func (e *external) scheduleProvider() error { + if e.providerScheduler == nil || e.workspace == nil { + return nil + } + inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle) + if err != nil { + return errors.Wrap(err, errScheduleProvider) + } + if ps, ok := e.workspace.(ProviderSharer); ok { + ps.UseProvider(inuse, attachmentConfig) + } + return nil +} + +func (e *external) stopProvider() { + if e.providerScheduler == nil { + return + } + if err := e.providerScheduler.Stop(e.providerHandle); err != nil { + e.logger.Info("ExternalClient failed to stop the native provider", "error", err) + } } func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo @@ -106,6 +145,10 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. // and serial. // TODO(muvaf): Look for ways to reduce the cyclomatic complexity without // increasing the difficulty of understanding the flow. + if err := e.scheduleProvider(); err != nil { + return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID()) + } + defer e.stopProvider() tr, ok := mg.(resource.Terraformed) if !ok { return managed.ExternalObservation{}, errors.New(errUnexpectedObject) @@ -220,6 +263,10 @@ func addTTR(mg xpresource.Managed) { } func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -247,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -266,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error { + if err := e.scheduleProvider(); err != nil { + return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy) } diff --git a/pkg/controller/external_test.go b/pkg/controller/external_test.go index 7bc4ce3c..afbb563c 100644 --- a/pkg/controller/external_test.go +++ b/pkg/controller/external_test.go @@ -26,6 +26,10 @@ import ( "github.com/upbound/upjet/pkg/terraform" ) +const ( + testPath = "test/path" +) + var ( errBoom = errors.New("boom") exampleState = &json.StateV4{ @@ -154,7 +158,7 @@ func TestConnect(t *testing.T) { }, store: StoreFns{ WorkspaceFn: func(_ context.Context, _ resource.SecretClient, _ resource.Terraformed, _ terraform.Setup, _ *config.Resource) (*terraform.Workspace, error) { - return nil, nil + return terraform.NewWorkspace(testPath), nil }, }, }, diff --git a/pkg/controller/interfaces.go b/pkg/controller/interfaces.go index e0af8141..d8978e57 100644 --- a/pkg/controller/interfaces.go +++ b/pkg/controller/interfaces.go @@ -26,6 +26,11 @@ type Workspace interface { Plan(context.Context) (terraform.PlanResult, error) } +// ProviderSharer shares a native provider process with the receiver. +type ProviderSharer interface { + UseProvider(inuse terraform.InUse, attachmentConfig string) +} + // Store is where we can get access to the Terraform workspace of given resource. type Store interface { Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 14e3f4bb..31d73f0b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -25,28 +25,6 @@ const ( promSysResource = "resource" ) -// ExecMode is the Terraform CLI execution mode label -type ExecMode int - -const ( - // ModeSync represents the synchronous execution mode - ModeSync ExecMode = iota - // ModeASync represents the asynchronous execution mode - ModeASync -) - -// String converts an execMode to string -func (em ExecMode) String() string { - switch em { - case ModeSync: - return "sync" - case ModeASync: - return "async" - default: - return "unknown" - } -} - var ( // CLITime is the Terraform CLI execution times histogram. CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index 7dd13a43..e3279001 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { } r := managed.NewReconciler(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), - managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], + managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), {{- if .UseAsync }} tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))), {{- end}} diff --git a/pkg/terraform/files.go b/pkg/terraform/files.go index 459b646b..12ec9b4e 100644 --- a/pkg/terraform/files.go +++ b/pkg/terraform/files.go @@ -94,7 +94,7 @@ type FileProducer struct { // WriteMainTF writes the content main configuration file that has the desired // state configuration for Terraform. -func (fp *FileProducer) WriteMainTF() error { +func (fp *FileProducer) WriteMainTF() (ProviderHandle, error) { // If the resource is in a deletion process, we need to remove the deletion // protection. fp.parameters["lifecycle"] = map[string]bool{ @@ -129,9 +129,13 @@ func (fp *FileProducer) WriteMainTF() error { } rawMainTF, err := json.JSParser.Marshal(m) if err != nil { - return errors.Wrap(err, "cannot marshal main hcl object") + return InvalidProviderHandle, errors.Wrap(err, "cannot marshal main hcl object") } - return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile) + h, err := fp.Setup.Configuration.ToProviderHandle() + if err != nil { + return InvalidProviderHandle, errors.Wrap(err, "cannot get scheduler handle") + } + return h, errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile) } // EnsureTFState writes the Terraform state that should exist in the filesystem diff --git a/pkg/terraform/files_test.go b/pkg/terraform/files_test.go index 84bfdff1..d43747fb 100644 --- a/pkg/terraform/files_test.go +++ b/pkg/terraform/files_test.go @@ -414,7 +414,7 @@ func TestWriteMainTF(t *testing.T) { if err != nil { t.Errorf("cannot initialize a file producer: %s", err.Error()) } - err = fp.WriteMainTF() + _, err = fp.WriteMainTF() if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { t.Errorf("\n%s\nWriteMainTF(...): -want error, +got error:\n%s", tc.reason, diff) } diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index 75a1953d..d29446b4 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -1,6 +1,16 @@ -/* -Copyright 2022 Upbound Inc. -*/ +// Copyright 2022 Upbound Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package terraform @@ -23,10 +33,9 @@ const ( errFmtTimeout = "timed out after %v while waiting for the reattach configuration string" // an example value would be: '{"registry.terraform.io/hashicorp/aws": {"Protocol": "grpc", "ProtocolVersion":5, "Pid":... "Addr":{"Network": "unix","String": "..."}}}' - fmtReattachEnv = `{"%s":{"Protocol":"grpc","ProtocolVersion":%d,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "%s"}}}` - fmtSetEnv = "%s=%s" - envReattachConfig = "TF_REATTACH_PROVIDERS" - envMagicCookie = "TF_PLUGIN_MAGIC_COOKIE" + fmtReattachEnv = `{"%s":{"Protocol":"grpc","ProtocolVersion":%d,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "%s"}}}` + fmtSetEnv = "%s=%s" + envMagicCookie = "TF_PLUGIN_MAGIC_COOKIE" // Terraform provider plugin expects this magic cookie in its environment // (as the value of key TF_PLUGIN_MAGIC_COOKIE): // https://github.com/hashicorp/terraform/blob/d35bc0531255b496beb5d932f185cbcdb2d61a99/internal/plugin/serve.go#L33 @@ -44,6 +53,7 @@ var ( // gRPC server mode type ProviderRunner interface { Start() (string, error) + Stop() error } // NoOpProviderRunner is a no-op ProviderRunner @@ -59,6 +69,11 @@ func (NoOpProviderRunner) Start() (string, error) { return "", nil } +// Stop takes no action +func (NoOpProviderRunner) Stop() error { + return nil +} + // SharedProvider runs the configured native provider plugin // using the supplied command-line args type SharedProvider struct { @@ -71,20 +86,21 @@ type SharedProvider struct { executor exec.Interface clock clock.Clock mu *sync.Mutex + stopCh chan bool } -// SharedGRPCRunnerOption lets you configure the shared gRPC runner. -type SharedGRPCRunnerOption func(runner *SharedProvider) +// SharedProviderOption lets you configure the shared gRPC runner. +type SharedProviderOption func(runner *SharedProvider) // WithNativeProviderArgs are the arguments to be passed to the native provider -func WithNativeProviderArgs(args ...string) SharedGRPCRunnerOption { +func WithNativeProviderArgs(args ...string) SharedProviderOption { return func(sr *SharedProvider) { sr.nativeProviderArgs = args } } // WithNativeProviderExecutor sets the process executor to be used -func WithNativeProviderExecutor(e exec.Interface) SharedGRPCRunnerOption { +func WithNativeProviderExecutor(e exec.Interface) SharedProviderOption { return func(sr *SharedProvider) { sr.executor = e } @@ -92,23 +108,43 @@ func WithNativeProviderExecutor(e exec.Interface) SharedGRPCRunnerOption { // WithProtocolVersion sets the gRPC protocol version in use between // the Terraform CLI and the native provider. -func WithProtocolVersion(protocolVersion int) SharedGRPCRunnerOption { +func WithProtocolVersion(protocolVersion int) SharedProviderOption { return func(sr *SharedProvider) { sr.protocolVersion = protocolVersion } } -// NewSharedProvider instantiates a SharedProvider with an -// OS executor using the supplied logger -func NewSharedProvider(l logging.Logger, nativeProviderPath, nativeProviderName string, opts ...SharedGRPCRunnerOption) *SharedProvider { +// WithNativeProviderPath configures the Terraform provider executable path +// for the runner. +func WithNativeProviderPath(p string) SharedProviderOption { + return func(sr *SharedProvider) { + sr.nativeProviderPath = p + } +} + +// WithNativeProviderName configures the Terraform provider name +// for the runner. +func WithNativeProviderName(n string) SharedProviderOption { + return func(sr *SharedProvider) { + sr.nativeProviderName = n + } +} + +// WithNativeProviderLogger configures the logger for the runner. +func WithNativeProviderLogger(logger logging.Logger) SharedProviderOption { + return func(sr *SharedProvider) { + sr.logger = logger + } +} + +// NewSharedProvider instantiates a SharedProvider runner with an +// OS executor using the supplied options. +func NewSharedProvider(opts ...SharedProviderOption) *SharedProvider { sr := &SharedProvider{ - logger: l, - nativeProviderPath: nativeProviderPath, - nativeProviderName: nativeProviderName, - protocolVersion: defaultProtocolVersion, - executor: exec.New(), - clock: clock.RealClock{}, - mu: &sync.Mutex{}, + protocolVersion: defaultProtocolVersion, + executor: exec.New(), + clock: clock.RealClock{}, + mu: &sync.Mutex{}, } for _, o := range opts { o(sr) @@ -129,8 +165,10 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig) return sr.reattachConfig, nil } + log.Debug("Provider runner not yet started. Will fork a new native provider.") errCh := make(chan error, 1) reattachCh := make(chan string, 1) + sr.stopCh = make(chan bool, 1) go func() { defer close(errCh) @@ -152,6 +190,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo errCh <- err return } + log.Debug("Forked new native provider.") scanner := bufio.NewScanner(stdout) for scanner.Scan() { t := scanner.Text() @@ -162,9 +201,19 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo reattachCh <- fmt.Sprintf(fmtReattachEnv, sr.nativeProviderName, sr.protocolVersion, os.Getpid(), matches[1]) break } - if err := cmd.Wait(); err != nil { + + waitErrCh := make(chan error, 1) + go func() { + defer close(waitErrCh) + waitErrCh <- cmd.Wait() + }() + select { + case err := <-waitErrCh: log.Info("Native Terraform provider process error", "error", err) errCh <- err + case <-sr.stopCh: + cmd.Stop() + log.Debug("Stopped the provider runner.") } }() @@ -178,3 +227,17 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo return "", errors.Errorf(errFmtTimeout, reattachTimeout) } } + +// Stop attempts to stop a shared gRPC server if it's already running. +func (sr *SharedProvider) Stop() error { + sr.mu.Lock() + defer sr.mu.Unlock() + sr.logger.Debug("Attempting to stop the provider runner.") + if sr.stopCh == nil { + return errors.New("shared provider process not started yet") + } + sr.stopCh <- true + close(sr.stopCh) + sr.stopCh = nil + return nil +} diff --git a/pkg/terraform/provider_runner_test.go b/pkg/terraform/provider_runner_test.go index 5e60037b..7087d482 100644 --- a/pkg/terraform/provider_runner_test.go +++ b/pkg/terraform/provider_runner_test.go @@ -48,8 +48,8 @@ func TestStartSharedServer(t *testing.T) { }, "SuccessfullyStarted": { args: args{ - runner: NewSharedProvider(logging.NewNopLogger(), testPath, testName, WithNativeProviderArgs(testArgs...), - WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, nil))), + runner: NewSharedProvider(WithNativeProviderLogger(logging.NewNopLogger()), WithNativeProviderPath(testPath), + WithNativeProviderName(testName), WithNativeProviderArgs(testArgs...), WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, nil))), }, want: want{ reattachConfig: fmt.Sprintf(`{"provider-test":{"Protocol":"grpc","ProtocolVersion":5,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "test1"}}}`, os.Getpid()), @@ -71,8 +71,8 @@ func TestStartSharedServer(t *testing.T) { }, "NativeProviderError": { args: args{ - runner: NewSharedProvider(logging.NewNopLogger(), testPath, testName, - WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, testErr))), + runner: NewSharedProvider(WithNativeProviderLogger(logging.NewNopLogger()), WithNativeProviderPath(testPath), + WithNativeProviderName(testName), WithNativeProviderArgs(testArgs...), WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, testErr))), }, want: want{ err: testErr, @@ -102,7 +102,7 @@ func TestStartSharedServer(t *testing.T) { if err != nil { return } - if diff := cmp.Diff(reattachConfig, tt.want.reattachConfig); diff != "" { + if diff := cmp.Diff(tt.want.reattachConfig, reattachConfig); diff != "" { t.Errorf("\n%s\nStartSharedServer(): -want reattachConfig, +got reattachConfig:\n%s", name, diff) } }) diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go new file mode 100644 index 00000000..0639b83b --- /dev/null +++ b/pkg/terraform/provider_scheduler.go @@ -0,0 +1,233 @@ +// Copyright 2023 Upbound Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package terraform + +import ( + "sync" + + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/pkg/errors" +) + +// ProviderHandle represents native plugin (Terraform provider) process +// handles used by the various schedulers to map Terraform workspaces +// to these processes. +type ProviderHandle string + +const ( + // InvalidProviderHandle is an invalid ProviderHandle. + InvalidProviderHandle ProviderHandle = "" + + ttlMargin = 0.1 +) + +// ProviderScheduler represents a shared native plugin process scheduler. +type ProviderScheduler interface { + // Start forks or reuses a native plugin process associated with + // the supplied ProviderHandle. + Start(ProviderHandle) (InUse, string, error) + // Stop terminates the native plugin process, if it exists, for + // the specified ProviderHandle. + Stop(ProviderHandle) error +} + +// InUse keeps track of the usage of a shared resource, +// like a native plugin process. +type InUse interface { + // Increment marks one more user of a shared resource + // such as a native plugin process. + Increment() + // Decrement marks when a user of a shared resource, + // such as a native plugin process, has released the resource. + Decrement() +} + +// noopInUse satisfies the InUse interface and is a noop implementation. +type noopInUse struct{} + +func (noopInUse) Increment() {} + +func (noopInUse) Decrement() {} + +// NoOpProviderScheduler satisfied the ProviderScheduler interface +// and is a noop implementation, i.e., it does not schedule any +// native plugin processes. +type NoOpProviderScheduler struct{} + +// NewNoOpProviderScheduler initializes a new NoOpProviderScheduler. +func NewNoOpProviderScheduler() NoOpProviderScheduler { + return NoOpProviderScheduler{} +} + +func (NoOpProviderScheduler) Start(ProviderHandle) (InUse, string, error) { + return noopInUse{}, "", nil +} + +func (NoOpProviderScheduler) Stop(ProviderHandle) error { + return nil +} + +type schedulerEntry struct { + ProviderRunner + inUse int + invocationCount int +} + +type providerInUse struct { + scheduler *SharedProviderScheduler + handle ProviderHandle +} + +func (p *providerInUse) Increment() { + p.scheduler.mu.Lock() + defer p.scheduler.mu.Unlock() + r := p.scheduler.runners[p.handle] + r.inUse++ + r.invocationCount++ +} + +func (p *providerInUse) Decrement() { + p.scheduler.mu.Lock() + defer p.scheduler.mu.Unlock() + if p.scheduler.runners[p.handle].inUse == 0 { + return + } + p.scheduler.runners[p.handle].inUse-- +} + +// SharedProviderScheduler is a ProviderScheduler that +// shares a native plugin (Terraform provider) process between +// MR reconciliation loops whose MRs yield the same ProviderHandle, i.e., +// whose Terraform resource blocks are configuration-wise identical. +// SharedProviderScheduler is configured with a max TTL and it will gracefully +// attempt to replace ProviderRunners whose TTL exceed this maximum, +// if they are not in-use. +type SharedProviderScheduler struct { + runnerOpts []SharedProviderOption + runners map[ProviderHandle]*schedulerEntry + ttl int + mu *sync.Mutex + logger logging.Logger +} + +// NewSharedProviderScheduler initializes a new SharedProviderScheduler +// with the specified logger and options. +func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedProviderOption) *SharedProviderScheduler { + return &SharedProviderScheduler{ + runnerOpts: opts, + mu: &sync.Mutex{}, + runners: make(map[ProviderHandle]*schedulerEntry), + logger: l, + ttl: ttl, + } +} + +func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { + logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlMargin", ttlMargin) + s.mu.Lock() + defer s.mu.Unlock() + + r := s.runners[h] + switch { + case r != nil && (r.invocationCount < s.ttl || r.inUse > 0): + if r.invocationCount > int(float64(s.ttl)*(1+ttlMargin)) { + logger.Debug("Reuse budget has been exceeded. Caller will need to retry.") + return nil, "", errors.Errorf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, s.ttl) + } + + logger.Debug("Reusing the provider runner", "invocationCount", r.invocationCount) + rc, err := r.Start() + return &providerInUse{ + scheduler: s, + handle: h, + }, rc, errors.Wrapf(err, "cannot use already started provider with handle: %s", h) + case r != nil: + logger.Debug("The provider runner has expired. Attempting to stop...", "invocationCount", r.invocationCount) + if err := r.Stop(); err != nil { + return nil, "", errors.Wrapf(err, "cannot schedule a new shared provider for handle: %s", h) + } + } + + runner := NewSharedProvider(s.runnerOpts...) + r = &schedulerEntry{ + ProviderRunner: runner, + } + runner.logger = logger + s.runners[h] = r + logger.Debug("Starting new shared provider...") + rc, err := s.runners[h].Start() + return &providerInUse{ + scheduler: s, + handle: h, + }, rc, errors.Wrapf(err, "cannot start the shared provider runner for handle: %s", h) +} + +func (s *SharedProviderScheduler) Stop(ProviderHandle) error { + // noop + return nil +} + +// WorkspaceProviderScheduler is a ProviderScheduler that +// shares a native plugin (Terraform provider) process between +// the Terraform CLI invocations in the context of a single +// reconciliation loop (belonging to a single workspace). +// When the managed.ExternalDisconnecter disconnects, +// the scheduler terminates the native plugin process. +type WorkspaceProviderScheduler struct { + runner ProviderRunner + logger logging.Logger + inUse *workspaceInUse +} + +type workspaceInUse struct { + wg *sync.WaitGroup +} + +func (w *workspaceInUse) Increment() { + w.wg.Add(1) +} + +func (w *workspaceInUse) Decrement() { + w.wg.Done() +} + +// NewWorkspaceProviderScheduler initializes a new WorkspaceProviderScheduler. +func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOption) *WorkspaceProviderScheduler { + return &WorkspaceProviderScheduler{ + logger: l, + runner: NewSharedProvider(append([]SharedProviderOption{WithNativeProviderLogger(l)}, opts...)...), + inUse: &workspaceInUse{ + wg: &sync.WaitGroup{}, + }, + } +} + +func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { + s.logger.Debug("Starting workspace scoped provider runner.", "handle", h) + reattachConfig, err := s.runner.Start() + return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner") +} + +func (s *WorkspaceProviderScheduler) Stop(h ProviderHandle) error { + s.logger.Debug("Attempting to stop workspace scoped shared provider runner.", "handle", h) + go func() { + s.inUse.wg.Wait() + s.logger.Debug("Provider runner not in-use, stopping it.", "handle", h) + if err := s.runner.Stop(); err != nil { + s.logger.Info("Failed to stop provider runner", "error", errors.Wrap(err, "cannot stop a workspace provider runner")) + } + }() + return nil +} diff --git a/pkg/terraform/store.go b/pkg/terraform/store.go index 556cdaf8..22143703 100644 --- a/pkg/terraform/store.go +++ b/pkg/terraform/store.go @@ -16,9 +16,11 @@ package terraform import ( "context" + "crypto/sha256" "fmt" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -37,10 +39,6 @@ import ( "github.com/upbound/upjet/pkg/resource" ) -const ( - fmtEnv = "%s=%s" -) - // SetupFn is a function that returns Terraform setup which contains // provider requirement, configuration and Terraform version. type SetupFn func(ctx context.Context, client client.Client, mg xpresource.Managed) (Setup, error) @@ -57,6 +55,48 @@ type ProviderRequirement struct { // ProviderConfiguration holds the setup configuration body type ProviderConfiguration map[string]any +// ToProviderHandle converts a provider configuration to a handle +// for the provider scheduler. +func (pc ProviderConfiguration) ToProviderHandle() (ProviderHandle, error) { + h := strings.Join(getSortedKeyValuePairs("", pc), ",") + hash := sha256.New() + if _, err := hash.Write([]byte(h)); err != nil { + return InvalidProviderHandle, errors.Wrap(err, "cannot convert provider configuration to scheduler handle") + } + return ProviderHandle(fmt.Sprintf("%x", hash.Sum(nil))), nil +} + +func getSortedKeyValuePairs(parent string, m map[string]any) []string { + result := make([]string, 0, len(m)) + sortedKeys := make([]string, 0, len(m)) + for k := range m { + sortedKeys = append(sortedKeys, k) + } + sort.Strings(sortedKeys) + for _, k := range sortedKeys { + v := m[k] + switch t := v.(type) { + case []string: + result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(t, ","))) + case map[string]any: + result = append(result, getSortedKeyValuePairs(parent+k+".", t)...) + case []map[string]any: + cArr := make([]string, 0, len(t)) + for i, e := range t { + cArr = append(cArr, getSortedKeyValuePairs(fmt.Sprintf("%s%s[%d].", parent, k, i), e)...) + } + result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(cArr, ","))) + case *string: + if t != nil { + result = append(result, fmt.Sprintf("%q:%q", parent+k, *t)) + } + default: + result = append(result, fmt.Sprintf("%q:%q", parent+k, t)) + } + } + return result +} + // Setup holds values for the Terraform version and setup // requirements and configuration body type Setup struct { @@ -78,6 +118,12 @@ type Setup struct { // not part of the Terraform AWS Provider configuration, so it could be // made available only by this map. ClientMetadata map[string]string + + // Scheduler specifies the provider scheduler to be used for the Terraform + // workspace being setup. If not set, no scheduler is configured and + // the lifecycle of Terraform provider processes will be managed by + // the Terraform CLI. + Scheduler ProviderScheduler } // Map returns the Setup object in map form. The initial reason was so that @@ -105,13 +151,6 @@ func WithFs(fs afero.Fs) WorkspaceStoreOption { } } -// WithProviderRunner sets the ProviderRunner to be used. -func WithProviderRunner(pr ProviderRunner) WorkspaceStoreOption { - return func(ws *WorkspaceStore) { - ws.providerRunner = pr - } -} - // WithProcessReportInterval enables the upjet.terraform.running_processes // metric, which periodically reports the total number of Terraform CLI and // Terraform provider processes in the system. @@ -121,15 +160,23 @@ func WithProcessReportInterval(d time.Duration) WorkspaceStoreOption { } } +// WithDisableInit disables `terraform init` invocations in case +// workspace initialization is not needed (e.g., when using the +// shared gRPC server runtime). +func WithDisableInit(disable bool) WorkspaceStoreOption { + return func(ws *WorkspaceStore) { + ws.disableInit = disable + } +} + // NewWorkspaceStore returns a new WorkspaceStore. func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore { ws := &WorkspaceStore{ - store: map[types.UID]*Workspace{}, - logger: l, - mu: sync.Mutex{}, - fs: afero.Afero{Fs: afero.NewOsFs()}, - executor: exec.New(), - providerRunner: NewNoOpProviderRunner(), + store: map[types.UID]*Workspace{}, + logger: l, + mu: sync.Mutex{}, + fs: afero.Afero{Fs: afero.NewOsFs()}, + executor: exec.New(), } for _, f := range opts { f(ws) @@ -149,11 +196,11 @@ type WorkspaceStore struct { // cause rehashing in some cases. store map[types.UID]*Workspace logger logging.Logger - providerRunner ProviderRunner mu sync.Mutex processReportInterval time.Duration fs afero.Afero executor exec.Interface + disableInit bool } // Workspace makes sure the Terraform workspace for the given resource is ready @@ -184,35 +231,37 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient if err := fp.EnsureTFState(ctx); err != nil { return nil, errors.Wrap(err, "cannot ensure tfstate file") } - isNeedProviderUpgrade, err := fp.needProviderUpgrade() - if err != nil { - return nil, errors.Wrap(err, "cannot check if a Terraform dependency update is required") + + isNeedProviderUpgrade := false + if !ws.disableInit { + isNeedProviderUpgrade, err = fp.needProviderUpgrade() + if err != nil { + return nil, errors.Wrap(err, "cannot check if a Terraform dependency update is required") + } } - if err := fp.WriteMainTF(); err != nil { + + if w.ProviderHandle, err = fp.WriteMainTF(); err != nil { return nil, errors.Wrap(err, "cannot write main tf file") } if isNeedProviderUpgrade { - out, err := w.runTF(ctx, metrics.ModeSync, "init", "-upgrade", "-input=false") + out, err := w.runTF(ctx, ModeSync, "init", "-upgrade", "-input=false") w.logger.Debug("init -upgrade ended", "out", ts.filterSensitiveInformation(string(out))) if err != nil { return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out))) } } - attachmentConfig, err := ws.providerRunner.Start() - if err != nil { - return nil, err + if ws.disableInit { + return w, nil } _, err = ws.fs.Stat(filepath.Join(dir, ".terraform.lock.hcl")) if xpresource.Ignore(os.IsNotExist, err) != nil { return nil, errors.Wrap(err, "cannot stat init lock file") } - w.env = append(w.env, fmt.Sprintf(fmtEnv, envReattachConfig, attachmentConfig)) - // We need to initialize only if the workspace hasn't been initialized yet. if !os.IsNotExist(err) { return w, nil } - out, err := w.runTF(ctx, metrics.ModeSync, "init", "-input=false") + out, err := w.runTF(ctx, ModeSync, "init", "-input=false") w.logger.Debug("init ended", "out", ts.filterSensitiveInformation(string(out))) return w, errors.Wrapf(err, "cannot init workspace: %s", ts.filterSensitiveInformation(string(out))) } @@ -234,7 +283,7 @@ func (ws *WorkspaceStore) Remove(obj xpresource.Object) error { } func (ws *WorkspaceStore) initMetrics() { - for _, mode := range []metrics.ExecMode{metrics.ModeSync, metrics.ModeASync} { + for _, mode := range []ExecMode{ModeSync, ModeASync} { for _, subcommand := range []string{"init", "apply", "destroy", "plan"} { metrics.CLIExecutions.WithLabelValues(subcommand, mode.String()).Set(0) } diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index 886c4dd9..ae560593 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -16,9 +16,11 @@ package terraform import ( "context" + "fmt" "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -34,8 +36,32 @@ import ( const ( defaultAsyncTimeout = 1 * time.Hour + envReattachConfig = "TF_REATTACH_PROVIDERS" + fmtEnv = "%s=%s" ) +// ExecMode is the Terraform CLI execution mode label +type ExecMode int + +const ( + // ModeSync represents the synchronous execution mode + ModeSync ExecMode = iota + // ModeASync represents the asynchronous execution mode + ModeASync +) + +// String converts an execMode to string +func (em ExecMode) String() string { + switch em { + case ModeSync: + return "sync" + case ModeASync: + return "async" + default: + return "unknown" + } +} + // WorkspaceOption allows you to configure Workspace objects. type WorkspaceOption func(*Workspace) @@ -67,12 +93,21 @@ func WithAferoFs(fs afero.Fs) WorkspaceOption { } } +// WithFilterFn configures the debug log sensitive information filtering func. func WithFilterFn(filterFn func(string) string) WorkspaceOption { return func(w *Workspace) { w.filterFn = filterFn } } +// WithProviderInUse configures an InUse for keeping track of +// the shared provider InUse by this Terraform workspace. +func WithProviderInUse(providerInUse InUse) WorkspaceOption { + return func(w *Workspace) { + w.providerInUse = providerInUse + } +} + // NewWorkspace returns a new Workspace object that operates in the given // directory. func NewWorkspace(dir string, opts ...WorkspaceOption) *Workspace { @@ -81,6 +116,8 @@ func NewWorkspace(dir string, opts ...WorkspaceOption) *Workspace { dir: dir, logger: logging.NewNopLogger(), fs: afero.Afero{Fs: afero.NewOsFs()}, + providerInUse: noopInUse{}, + mu: &sync.Mutex{}, } for _, f := range opts { f(w) @@ -97,17 +134,40 @@ type CallbackFn func(error, context.Context) error type Workspace struct { // LastOperation contains information about the last operation performed. LastOperation *Operation + // ProviderHandle is the handle of the associated native Terraform provider + // computed from the generated provider resource configuration block + // of the Terraform workspace. + ProviderHandle ProviderHandle dir string env []string - logger logging.Logger - executor k8sExec.Interface - fs afero.Afero + logger logging.Logger + executor k8sExec.Interface + providerInUse InUse + fs afero.Afero + mu *sync.Mutex filterFn func(string) string } +// UseProvider shares a native provider with the receiver Workspace. +func (w *Workspace) UseProvider(inuse InUse, attachmentConfig string) { + w.mu.Lock() + defer w.mu.Unlock() + // remove existing reattach configs + env := make([]string, 0, len(w.env)) + prefix := fmt.Sprintf(fmtEnv, envReattachConfig, "") + for _, e := range w.env { + if !strings.HasPrefix(e, prefix) { + env = append(env, e) + } + } + env = append(env, prefix+attachmentConfig) + w.env = env + w.providerInUse = inuse +} + // ApplyAsync makes a terraform apply call without blocking and calls the given // function once that apply call finishes. func (w *Workspace) ApplyAsync(callback CallbackFn) error { @@ -115,9 +175,10 @@ func (w *Workspace) ApplyAsync(callback CallbackFn) error { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) + w.providerInUse.Increment() go func() { defer cancel() - out, err := w.runTF(ctx, metrics.ModeASync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeASync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewApplyFailed(out) } @@ -142,7 +203,7 @@ func (w *Workspace) Apply(ctx context.Context) (ApplyResult, error) { if w.LastOperation.IsRunning() { return ApplyResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("apply ended", "out", w.filterFn(string(out))) if err != nil { return ApplyResult{}, tferrors.NewApplyFailed(out) @@ -173,9 +234,10 @@ func (w *Workspace) DestroyAsync(callback CallbackFn) error { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) + w.providerInUse.Increment() go func() { defer cancel() - out, err := w.runTF(ctx, metrics.ModeASync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeASync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewDestroyFailed(out) } @@ -195,7 +257,7 @@ func (w *Workspace) Destroy(ctx context.Context) error { if w.LastOperation.IsRunning() { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("destroy ended", "out", w.filterFn(string(out))) if err != nil { return tferrors.NewDestroyFailed(out) @@ -221,7 +283,7 @@ func (w *Workspace) Refresh(ctx context.Context) (RefreshResult, error) { case w.LastOperation.IsEnded(): defer w.LastOperation.Flush() } - out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("refresh ended", "out", w.filterFn(string(out))) if err != nil { return RefreshResult{}, tferrors.NewRefreshFailed(out) @@ -253,7 +315,7 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { if w.LastOperation.IsRunning() { return PlanResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "plan", "-refresh=false", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "plan", "-refresh=false", "-input=false", "-lock=false", "-json") w.logger.Debug("plan ended", "out", w.filterFn(string(out))) if err != nil { return PlanResult{}, tferrors.NewPlanFailed(out) @@ -284,10 +346,17 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { }, nil } -func (w *Workspace) runTF(ctx context.Context, execMode metrics.ExecMode, args ...string) ([]byte, error) { +func (w *Workspace) runTF(ctx context.Context, execMode ExecMode, args ...string) ([]byte, error) { if len(args) < 1 { return nil, errors.New("args cannot be empty") } + w.logger.Debug("Running terraform", "args", args) + if execMode == ModeSync { + w.providerInUse.Increment() + } + defer w.providerInUse.Decrement() + w.mu.Lock() + defer w.mu.Unlock() cmd := w.executor.CommandContext(ctx, "terraform", args...) cmd.SetEnv(append(os.Environ(), w.env...)) cmd.SetDir(w.dir) diff --git a/pkg/terraform/workspace_test.go b/pkg/terraform/workspace_test.go index cfef66f7..abbcc1e0 100644 --- a/pkg/terraform/workspace_test.go +++ b/pkg/terraform/workspace_test.go @@ -98,7 +98,7 @@ func TestWorkspaceApply(t *testing.T) { "Success": { args: args{ w: NewWorkspace(directory, WithExecutor(&testingexec.FakeExec{DisableScripts: true}), WithAferoFs(fs), - WithFilterFn(filterFn)), + WithFilterFn(filterFn), WithProviderInUse(noopInUse{})), }, want: want{ r: ApplyResult{ @@ -109,7 +109,7 @@ func TestWorkspaceApply(t *testing.T) { "Failure": { args: args{ w: NewWorkspace(directory, WithExecutor(newFakeExec(errBoom.Error(), errBoom)), WithAferoFs(fs), - WithFilterFn(filterFn)), + WithFilterFn(filterFn), WithProviderInUse(noopInUse{})), }, want: want{ err: tferrors.NewApplyFailed([]byte(errBoom.Error())),