From 3c4d66856976ca00a4ae9122811a6b4a62194c77 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 28 Feb 2024 21:09:36 -0800 Subject: [PATCH 01/20] first --- cmd/server.go | 9 ++++++++- server/legacy/user_config.go | 3 +++ server/legacy/user_config_test.go | 8 ++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cmd/server.go b/cmd/server.go index 58b5f1a9c..ed88f0cad 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -231,7 +231,8 @@ var stringFlags = map[string]stringFlag{ "default: Runs atlantis with default event handler that processes events within same.\n" + "gateway: Runs atlantis with gateway event handler that publishes events through sns.\n" + "worker: Runs atlantis with a sqs handler that polls for events in the queue to process.\n" + - "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.", + "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.\n" + + "terraformadmin: Runs atlantis in a mode that allows for running terraform commands on behalf of other users.", defaultValue: "", }, LyftWorkerQueueURLFlag: { @@ -344,6 +345,7 @@ func NewServerCmd(v *viper.Viper, version string) *ServerCmd { GatewayCreator: &GatewayCreator{}, WorkerCreator: &WorkerCreator{}, TemporalWorkerCreator: &TemporalWorker{}, + TerraformAdminCreator: &TerraformAdmin{}, }, Viper: v, AtlantisVersion: version, @@ -374,6 +376,7 @@ type ServerCreatorProxy struct { GatewayCreator ServerCreator WorkerCreator ServerCreator TemporalWorkerCreator ServerCreator + TerraformAdminCreator ServerCreator } func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config server.Config) (ServerStarter, error) { @@ -389,6 +392,10 @@ func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config serv return d.TemporalWorkerCreator.NewServer(userConfig, config) } + if userConfig.ToLyftMode() == server.TerraformAdmin { + return d.TerraformAdminCreator.NewServer(userConfig, config) + } + return d.WorkerCreator.NewServer(userConfig, config) } diff --git a/server/legacy/user_config.go b/server/legacy/user_config.go index fb84431db..980a38a01 100644 --- a/server/legacy/user_config.go +++ b/server/legacy/user_config.go @@ -11,6 +11,7 @@ const ( Gateway Worker TemporalWorker + TerraformAdmin ) // UserConfig holds config values passed in by the user. @@ -104,6 +105,8 @@ func (u UserConfig) ToLyftMode() Mode { return Worker case "temporalworker": return TemporalWorker + case "terraformadmin": + return TerraformAdmin } return Default } diff --git a/server/legacy/user_config_test.go b/server/legacy/user_config_test.go index 52e0a85d2..de2e91335 100644 --- a/server/legacy/user_config_test.go +++ b/server/legacy/user_config_test.go @@ -70,6 +70,14 @@ func TestUserConfig_ToLyftMode(t *testing.T) { "", server.Default, }, + { + "terraformadmin", + server.TerraformAdmin, + }, + { + "temporalworker", + server.TemporalWorker, + }, } for _, c := range cases { From 8e3b9610d5347673e0e3bddfca81270e40bbffcb Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 29 Feb 2024 08:47:41 -0800 Subject: [PATCH 02/20] oops --- cmd/terraformadmin.go | 81 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 cmd/terraformadmin.go diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go new file mode 100644 index 000000000..95c357c05 --- /dev/null +++ b/cmd/terraformadmin.go @@ -0,0 +1,81 @@ +package cmd + +import ( + "github.com/pkg/errors" + cfgParser "github.com/runatlantis/atlantis/server/config" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/neptune/temporalworker" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" +) + +type TerraformAdmin struct{} + +// NewServer returns the real Atlantis server object. +func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { + ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) + if err != nil { + return nil, errors.Wrap(err, "failed to build context logger") + } + + globalCfg := valid.NewGlobalCfg(userConfig.DataDir) + validator := &cfgParser.ParserValidator{} + if userConfig.RepoConfig != "" { + globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) + if err != nil { + return nil, errors.Wrapf(err, "parsing %s file", userConfig.RepoConfig) + } + } + parsedURL, err := legacy.ParseAtlantisURL(userConfig.AtlantisURL) + if err != nil { + return nil, errors.Wrapf(err, + "parsing atlantis url %q", userConfig.AtlantisURL) + } + + // TODO: we should just supply a yaml file with this info and load it directly into the + // app config struct + appConfig, err := createGHAppConfig(userConfig) + if err != nil { + return nil, err + } + + cfg := &neptune.Config{ + AuthCfg: neptune.AuthConfig{ + SslCertFile: userConfig.SSLCertFile, + SslKeyFile: userConfig.SSLKeyFile, + }, + ServerCfg: neptune.ServerConfig{ + URL: parsedURL, + Version: config.AtlantisVersion, + Port: userConfig.Port, + }, + FeatureConfig: neptune.FeatureConfig{ + FFOwner: userConfig.FFOwner, + FFRepo: userConfig.FFRepo, + FFPath: userConfig.FFPath, + FFBranch: userConfig.FFBranch, + }, + TerraformCfg: neptune.TerraformConfig{ + DefaultVersion: userConfig.DefaultTFVersion, + DownloadURL: userConfig.TFDownloadURL, + LogFilters: globalCfg.TerraformLogFilter, + }, + ValidationConfig: neptune.ValidationConfig{ + DefaultVersion: globalCfg.PolicySets.Version, + Policies: globalCfg.PolicySets, + }, + JobConfig: globalCfg.PersistenceConfig.Jobs, + DeploymentConfig: globalCfg.PersistenceConfig.Deployments, + DataDir: userConfig.DataDir, + TemporalCfg: globalCfg.Temporal, + GithubCfg: globalCfg.Github, + App: appConfig, + CtxLogger: ctxLogger, + StatsNamespace: userConfig.StatsNamespace, + Metrics: globalCfg.Metrics, + LyftAuditJobsSnsTopicArn: userConfig.LyftAuditJobsSnsTopicArn, + RevisionSetter: globalCfg.RevisionSetter, + } + return temporalworker.NewServer(cfg) +} From d84574f256b94a83dc992e1209d34d287f9a0a74 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 29 Feb 2024 15:08:09 -0800 Subject: [PATCH 03/20] ok --- cmd/server.go | 2 +- server/neptune/terraformadmin/server.go | 440 ++++++++++++++++++++++++ 2 files changed, 441 insertions(+), 1 deletion(-) create mode 100644 server/neptune/terraformadmin/server.go diff --git a/cmd/server.go b/cmd/server.go index ed88f0cad..8c9f754ab 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -232,7 +232,7 @@ var stringFlags = map[string]stringFlag{ "gateway: Runs atlantis with gateway event handler that publishes events through sns.\n" + "worker: Runs atlantis with a sqs handler that polls for events in the queue to process.\n" + "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.\n" + - "terraformadmin: Runs atlantis in a mode that allows for running terraform commands on behalf of other users.", + "terraformadmin: Runs atlantis in a mode that allows for running terraform commands.", defaultValue: "", }, LyftWorkerQueueURLFlag: { diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go new file mode 100644 index 000000000..ed71c3920 --- /dev/null +++ b/server/neptune/terraformadmin/server.go @@ -0,0 +1,440 @@ +package terraformadmin + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/neptune/lyft/feature" + ghClient "github.com/runatlantis/atlantis/server/neptune/workflows/activities/github" + "github.com/runatlantis/atlantis/server/vcs/provider/github" + + assetfs "github.com/elazarl/go-bindata-assetfs" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/metrics" + neptune_http "github.com/runatlantis/atlantis/server/neptune/http" + lyftActivities "github.com/runatlantis/atlantis/server/neptune/lyft/activities" + "github.com/runatlantis/atlantis/server/neptune/lyft/executor" + "github.com/runatlantis/atlantis/server/neptune/lyft/notifier" + lyftWorkflows "github.com/runatlantis/atlantis/server/neptune/lyft/workflows" + "github.com/runatlantis/atlantis/server/neptune/temporal" + "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" + "github.com/runatlantis/atlantis/server/neptune/temporalworker/controllers" + "github.com/runatlantis/atlantis/server/neptune/temporalworker/job" + "github.com/runatlantis/atlantis/server/neptune/workflows" + "github.com/runatlantis/atlantis/server/neptune/workflows/activities" + "github.com/runatlantis/atlantis/server/neptune/workflows/plugins" + "github.com/runatlantis/atlantis/server/static" + "github.com/uber-go/tally/v4" + "github.com/urfave/negroni" + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +const ( + ProjectJobsViewRouteName = "project-jobs-detail" + + // to make this clear, + // time t event + // 0 min sigterm received from kube + // 50 min activity ctx canceled + // 50 + x min sigkill received from kube + // + // Note: x must be configured outside atlantis and is the grace period effectively. + TemporalWorkerTimeout = 50 * time.Minute + + // allow any in-progress PRRevision workflow executions to gracefully exit which shouldn't take longer than 10 minutes + PRRevisionWorkerTimeout = 10 * time.Minute + + // 5 minutes to allow cleaning up the job store + StreamHandlerTimeout = 5 * time.Minute + PRRevisionTaskQueueActivitiesPerSecond = 2 +) + +type Server struct { + Logger logging.Logger + HTTPServerProxy *neptune_http.ServerProxy + Port int + StatsScope tally.Scope + StatsCloser io.Closer + TemporalClient *temporal.ClientWrapper + JobStreamHandler *job.StreamHandler + DeployActivities *activities.Deploy + TerraformActivities *activities.Terraform + GithubActivities *activities.Github + RevisionSetterActivities *lyftActivities.RevisionSetter + // Temporary until we move this into our private code + AuditActivity *lyftActivities.Audit + PRRevisionGithubActivities *lyftActivities.Github + TerraformTaskQueue string + RevisionSetterConfig valid.RevisionSetter + AdditionalNotifiers []plugins.TerraformWorkflowNotifier +} + +func NewServer(config *config.Config) (*Server, error) { + statsReporter, err := metrics.NewReporter(config.Metrics, config.CtxLogger) + + if err != nil { + return nil, err + } + + scope, statsCloser := metrics.NewScopeWithReporter(config.Metrics, config.CtxLogger, config.StatsNamespace, statsReporter) + if err != nil { + return nil, err + } + + scope = scope.Tagged(map[string]string{ + "mode": "worker", + }) + + // Build dependencies required for output handler and jobs controller + jobStore, err := job.NewStorageBackendStore(config.JobConfig, scope.SubScope("job.store"), config.CtxLogger) + if err != nil { + return nil, errors.Wrapf(err, "initializing job store") + } + receiverRegistry := job.NewReceiverRegistry() + + // terraform job output handler + jobStreamHandler := job.NewStreamHandler(jobStore, receiverRegistry, config.TerraformCfg.LogFilters, config.CtxLogger) + jobsController := controllers.NewJobsController(jobStore, receiverRegistry, config.ServerCfg, scope, config.CtxLogger) + + // temporal client + worker initialization + opts := &temporal.Options{ + StatsReporter: statsReporter, + } + opts = opts.WithClientInterceptors(temporal.NewMetricsInterceptor(scope)) + temporalClient, err := temporal.NewClient(config.CtxLogger, config.TemporalCfg, opts) + if err != nil { + return nil, errors.Wrap(err, "initializing temporal client") + } + + // router initialization + router := mux.NewRouter() + router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) + router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) + router.HandleFunc("/jobs/{job-id}", jobsController.GetProjectJobs).Methods(http.MethodGet).Name(ProjectJobsViewRouteName) + router.HandleFunc("/jobs/{job-id}/ws", jobsController.GetProjectJobsWS).Methods(http.MethodGet) + n := negroni.New(&negroni.Recovery{ + Logger: log.New(os.Stdout, "", log.LstdFlags), + PrintStack: false, + StackAll: false, + StackSize: 1024 * 8, + }) + n.UseHandler(router) + httpServerProxy := &neptune_http.ServerProxy{ + SSLCertFile: config.AuthCfg.SslCertFile, + SSLKeyFile: config.AuthCfg.SslKeyFile, + Server: &http.Server{Addr: fmt.Sprintf(":%d", config.ServerCfg.Port), Handler: n, ReadHeaderTimeout: time.Second * 10}, + Logger: config.CtxLogger, + } + + auditActivity, err := lyftActivities.NewAuditActivity(config.LyftAuditJobsSnsTopicArn) + if err != nil { + return nil, errors.Wrap(err, "initializing lyft activities") + } + deployActivities, err := activities.NewDeploy(config.DeploymentConfig) + if err != nil { + return nil, errors.Wrap(err, "initializing deploy activities") + } + + terraformActivities, err := activities.NewTerraform( + config.TerraformCfg, + config.ValidationConfig, + config.App, + config.DataDir, + config.ServerCfg.URL, + config.TemporalCfg.TerraformTaskQueue, + config.GithubCfg.TemporalAppInstallationID, + jobStreamHandler, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing terraform activities") + } + clientCreator, err := githubapp.NewDefaultCachingClientCreator( + config.App, + githubapp.WithClientMiddleware( + ghClient.ClientMetrics(scope.SubScope("app")), + )) + if err != nil { + return nil, errors.Wrap(err, "client creator") + } + repoConfig := feature.RepoConfig{ + Owner: config.FeatureConfig.FFOwner, + Repo: config.FeatureConfig.FFRepo, + Branch: config.FeatureConfig.FFBranch, + Path: config.FeatureConfig.FFPath, + } + installationFetcher := &github.InstallationRetriever{ + ClientCreator: clientCreator, + } + fileFetcher := &github.SingleFileContentsFetcher{ + ClientCreator: clientCreator, + } + retriever := &feature.CustomGithubInstallationRetriever{ + InstallationFetcher: installationFetcher, + FileContentsFetcher: fileFetcher, + Cfg: repoConfig, + } + featureAllocator, err := feature.NewGHSourcedAllocator(retriever, config.CtxLogger) + if err != nil { + return nil, errors.Wrap(err, "initializing feature allocator") + } + + githubActivities, err := activities.NewGithub( + clientCreator, + config.GithubCfg.TemporalAppInstallationID, + config.DataDir, + featureAllocator, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing github activities") + } + + revisionSetterActivities, err := lyftActivities.NewRevisionSetter(config.RevisionSetter) + if err != nil { + return nil, errors.Wrap(err, "initializing revision setter activities") + } + + prRevisionGithubActivities := &lyftActivities.Github{ + ClientCreator: clientCreator, + InstallationID: config.GithubCfg.TemporalAppInstallationID, + } + + server := Server{ + Logger: config.CtxLogger, + HTTPServerProxy: httpServerProxy, + Port: config.ServerCfg.Port, + StatsScope: scope, + StatsCloser: statsCloser, + TemporalClient: temporalClient, + JobStreamHandler: jobStreamHandler, + DeployActivities: deployActivities, + TerraformActivities: terraformActivities, + GithubActivities: githubActivities, + RevisionSetterActivities: revisionSetterActivities, + TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, + RevisionSetterConfig: config.RevisionSetter, + AuditActivity: auditActivity, + PRRevisionGithubActivities: prRevisionGithubActivities, + } + return &server, nil +} + +func (s Server) Start() error { + defer s.shutdown() + + ctx := context.Background() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + deployWorker := s.buildDeployWorker() + if err := deployWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start deploy worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down deploy worker, resource clean up may still be occurring in the background") + }() + + wg.Add(1) + go func() { + defer wg.Done() + prWorker := worker.New(s.TemporalClient.Client, workflows.PRTaskQueue, worker.Options{ + WorkerStopTimeout: TemporalWorkerTimeout, + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + }) + prWorker.RegisterActivity(s.GithubActivities) + prWorker.RegisterActivity(s.TerraformActivities) + prWorker.RegisterWorkflow(workflows.PR) + prWorker.RegisterWorkflow(workflows.Terraform) + if err := prWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start pr worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down pr worker, resource clean up may still be occurring in the background") + }() + + wg.Add(1) + go func() { + defer wg.Done() + + terraformWorker := s.buildTerraformWorker() + if err := terraformWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start terraform worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") + }() + + // Spinning up a new worker process here adds complexity to the shutdown logic for this worker + // TODO: Investigate the feasibility of deploying this worker process in it's own worker + wg.Add(1) + go func() { + defer wg.Done() + + prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionTaskQueue, worker.Options{ + WorkerStopTimeout: PRRevisionWorkerTimeout, + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.DefaultTaskQueue.ActivitiesPerSecond, + }) + prRevisionWorker.RegisterWorkflow(lyftWorkflows.PRRevision) + prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) + prRevisionWorker.RegisterActivity(s.RevisionSetterActivities) + + if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start pr revision default worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down pr revision default worker, resource clean up may still be occurring in the background") + }() + + wg.Add(1) + go func() { + defer wg.Done() + + prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionSlowTaskQueue, worker.Options{ + WorkerStopTimeout: PRRevisionWorkerTimeout, + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.SlowTaskQueue.ActivitiesPerSecond, + }) + prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) + + if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start pr revision slow worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down pr revision slow worker, resource clean up may still be occurring in the background") + }() + + // Ensure server gracefully drains connections when stopped. + stop := make(chan os.Signal, 1) + // Stop on SIGINTs and SIGTERMs. + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + s.Logger.Info(fmt.Sprintf("Atlantis started - listening on port %v", s.Port)) + + go func() { + err := s.HTTPServerProxy.ListenAndServe() + + if err != nil && err != http.ErrServerClosed { + s.Logger.Error(err.Error()) + } + }() + + <-stop + wg.Wait() + + return nil +} + +func (s Server) shutdown() { + // On cleanup, stream handler closes all active receivers and persists in memory jobs to storage + ctx, cancel := context.WithTimeout(context.Background(), StreamHandlerTimeout) + defer cancel() + if err := s.JobStreamHandler.CleanUp(ctx); err != nil { + s.Logger.Error(err.Error()) + } + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { + s.Logger.Error(err.Error()) + } + + s.TemporalClient.Close() + + // flush stats before shutdown + if err := s.StatsCloser.Close(); err != nil { + s.Logger.Error(err.Error()) + } + + s.Logger.Close() +} + +// TODO: consider building these before initializing the server so that the server is just responsible +// for running the workers and has no knowledge of their dependencies. +func (s Server) buildDeployWorker() worker.Worker { + // pass the underlying client otherwise this will panic() + deployWorker := worker.New(s.TemporalClient.Client, workflows.DeployTaskQueue, worker.Options{ + WorkerStopTimeout: TemporalWorkerTimeout, + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + }) + deployWorker.RegisterActivity(s.DeployActivities) + deployWorker.RegisterActivity(s.GithubActivities) + deployWorker.RegisterActivity(s.AuditActivity) + deployWorker.RegisterActivity(s.TerraformActivities) + deployWorker.RegisterWorkflowWithOptions(workflows.GetDeployWithPlugins( + func(ctx workflow.Context, dr workflows.DeployRequest) (plugins.Deploy, error) { + var a *lyftActivities.Audit + + return plugins.Deploy{ + Notifiers: []plugins.TerraformWorkflowNotifier{ + ¬ifier.SNSNotifier{ + Activity: a, + }, + }, + PostDeployExecutors: []plugins.PostDeployExecutor{ + &executor.PRRevisionWorkflowExecutor{TaskQueue: lyftWorkflows.PRRevisionTaskQueue}, + }, + }, nil + }, + ), workflow.RegisterOptions{ + Name: workflows.Deploy, + }) + deployWorker.RegisterWorkflow(workflows.Terraform) + return deployWorker +} + +func (s Server) buildTerraformWorker() worker.Worker { + // pass the underlying client otherwise this will panic() + terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ + WorkerStopTimeout: TemporalWorkerTimeout, + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + MaxConcurrentActivityExecutionSize: 30, + }) + terraformWorker.RegisterActivity(s.TerraformActivities) + terraformWorker.RegisterActivity(s.GithubActivities) + terraformWorker.RegisterWorkflow(workflows.Terraform) + return terraformWorker +} + +// Healthz returns the health check response. It always returns a 200 currently. +func Healthz(w http.ResponseWriter, _ *http.Request) { + data, err := json.MarshalIndent(&struct { + Status string `json:"status"` + }{ + Status: "ok", + }, "", " ") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Error creating status json response: %s", err) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(data) // nolint: errcheck +} From 4ab7d4046f1e5ab715f0d94bbc7931e556bde9cb Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 29 Feb 2024 16:03:42 -0800 Subject: [PATCH 04/20] ok --- cmd/terraformadmin.go | 15 +++++++-------- server/neptune/terraformadmin/server.go | 14 +++++++------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index 95c357c05..735645a6c 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -6,8 +6,8 @@ import ( "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/legacy" "github.com/runatlantis/atlantis/server/logging" - "github.com/runatlantis/atlantis/server/neptune/temporalworker" neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" + "github.com/runatlantis/atlantis/server/neptune/terraformadmin" ) type TerraformAdmin struct{} @@ -21,12 +21,15 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C globalCfg := valid.NewGlobalCfg(userConfig.DataDir) validator := &cfgParser.ParserValidator{} + + // TODO: should terraformadminmode pass in this stuff? if userConfig.RepoConfig != "" { globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) if err != nil { return nil, errors.Wrapf(err, "parsing %s file", userConfig.RepoConfig) } } + parsedURL, err := legacy.ParseAtlantisURL(userConfig.AtlantisURL) if err != nil { return nil, errors.Wrapf(err, @@ -40,6 +43,8 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C return nil, err } + // we don't need the feature config + cfg := &neptune.Config{ AuthCfg: neptune.AuthConfig{ SslCertFile: userConfig.SSLCertFile, @@ -50,12 +55,6 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C Version: config.AtlantisVersion, Port: userConfig.Port, }, - FeatureConfig: neptune.FeatureConfig{ - FFOwner: userConfig.FFOwner, - FFRepo: userConfig.FFRepo, - FFPath: userConfig.FFPath, - FFBranch: userConfig.FFBranch, - }, TerraformCfg: neptune.TerraformConfig{ DefaultVersion: userConfig.DefaultTFVersion, DownloadURL: userConfig.TFDownloadURL, @@ -77,5 +76,5 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C LyftAuditJobsSnsTopicArn: userConfig.LyftAuditJobsSnsTopicArn, RevisionSetter: globalCfg.RevisionSetter, } - return temporalworker.NewServer(cfg) + return terraformadmin.NewServer(cfg) } diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index ed71c3920..2a8fe6255 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -77,7 +77,6 @@ type Server struct { GithubActivities *activities.Github RevisionSetterActivities *lyftActivities.RevisionSetter // Temporary until we move this into our private code - AuditActivity *lyftActivities.Audit PRRevisionGithubActivities *lyftActivities.Github TerraformTaskQueue string RevisionSetterConfig valid.RevisionSetter @@ -141,10 +140,8 @@ func NewServer(config *config.Config) (*Server, error) { Logger: config.CtxLogger, } - auditActivity, err := lyftActivities.NewAuditActivity(config.LyftAuditJobsSnsTopicArn) - if err != nil { - return nil, errors.Wrap(err, "initializing lyft activities") - } + // we don't need audit activities + deployActivities, err := activities.NewDeploy(config.DeploymentConfig) if err != nil { return nil, errors.Wrap(err, "initializing deploy activities") @@ -227,12 +224,16 @@ func NewServer(config *config.Config) (*Server, error) { RevisionSetterActivities: revisionSetterActivities, TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, RevisionSetterConfig: config.RevisionSetter, - AuditActivity: auditActivity, PRRevisionGithubActivities: prRevisionGithubActivities, } return &server, nil } +/* + * BIG TODO FOR ATLANTIS DEPRECATION: + * Figure out what workers / activities / stuff and things we don't need and remove them + */ + func (s Server) Start() error { defer s.shutdown() @@ -384,7 +385,6 @@ func (s Server) buildDeployWorker() worker.Worker { }) deployWorker.RegisterActivity(s.DeployActivities) deployWorker.RegisterActivity(s.GithubActivities) - deployWorker.RegisterActivity(s.AuditActivity) deployWorker.RegisterActivity(s.TerraformActivities) deployWorker.RegisterWorkflowWithOptions(workflows.GetDeployWithPlugins( func(ctx workflow.Context, dr workflows.DeployRequest) (plugins.Deploy, error) { From 8f4893b72a5deb82695f0801abf040ce30edd1a2 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Fri, 1 Mar 2024 11:02:20 -0800 Subject: [PATCH 05/20] ok --- cmd/terraformadmin.go | 3 - server/neptune/terraformadmin/server.go | 199 +++--------------------- 2 files changed, 18 insertions(+), 184 deletions(-) diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index 735645a6c..357db9687 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -36,15 +36,12 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C "parsing atlantis url %q", userConfig.AtlantisURL) } - // TODO: we should just supply a yaml file with this info and load it directly into the - // app config struct appConfig, err := createGHAppConfig(userConfig) if err != nil { return nil, err } // we don't need the feature config - cfg := &neptune.Config{ AuthCfg: neptune.AuthConfig{ SslCertFile: userConfig.SSLCertFile, diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index 2a8fe6255..3ea5bed6f 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -26,13 +26,8 @@ import ( "github.com/runatlantis/atlantis/server/metrics" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" lyftActivities "github.com/runatlantis/atlantis/server/neptune/lyft/activities" - "github.com/runatlantis/atlantis/server/neptune/lyft/executor" - "github.com/runatlantis/atlantis/server/neptune/lyft/notifier" - lyftWorkflows "github.com/runatlantis/atlantis/server/neptune/lyft/workflows" "github.com/runatlantis/atlantis/server/neptune/temporal" "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" - "github.com/runatlantis/atlantis/server/neptune/temporalworker/controllers" - "github.com/runatlantis/atlantis/server/neptune/temporalworker/job" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" "github.com/runatlantis/atlantis/server/neptune/workflows/plugins" @@ -41,7 +36,6 @@ import ( "github.com/urfave/negroni" "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" ) const ( @@ -65,17 +59,14 @@ const ( ) type Server struct { - Logger logging.Logger - HTTPServerProxy *neptune_http.ServerProxy - Port int - StatsScope tally.Scope - StatsCloser io.Closer - TemporalClient *temporal.ClientWrapper - JobStreamHandler *job.StreamHandler - DeployActivities *activities.Deploy - TerraformActivities *activities.Terraform - GithubActivities *activities.Github - RevisionSetterActivities *lyftActivities.RevisionSetter + Logger logging.Logger + HTTPServerProxy *neptune_http.ServerProxy + Port int + StatsScope tally.Scope + StatsCloser io.Closer + TemporalClient *temporal.ClientWrapper + TerraformActivities *activities.Terraform + GithubActivities *activities.Github // Temporary until we move this into our private code PRRevisionGithubActivities *lyftActivities.Github TerraformTaskQueue string @@ -96,20 +87,9 @@ func NewServer(config *config.Config) (*Server, error) { } scope = scope.Tagged(map[string]string{ - "mode": "worker", + "mode": "terraformadmin", }) - // Build dependencies required for output handler and jobs controller - jobStore, err := job.NewStorageBackendStore(config.JobConfig, scope.SubScope("job.store"), config.CtxLogger) - if err != nil { - return nil, errors.Wrapf(err, "initializing job store") - } - receiverRegistry := job.NewReceiverRegistry() - - // terraform job output handler - jobStreamHandler := job.NewStreamHandler(jobStore, receiverRegistry, config.TerraformCfg.LogFilters, config.CtxLogger) - jobsController := controllers.NewJobsController(jobStore, receiverRegistry, config.ServerCfg, scope, config.CtxLogger) - // temporal client + worker initialization opts := &temporal.Options{ StatsReporter: statsReporter, @@ -124,8 +104,6 @@ func NewServer(config *config.Config) (*Server, error) { router := mux.NewRouter() router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) - router.HandleFunc("/jobs/{job-id}", jobsController.GetProjectJobs).Methods(http.MethodGet).Name(ProjectJobsViewRouteName) - router.HandleFunc("/jobs/{job-id}/ws", jobsController.GetProjectJobsWS).Methods(http.MethodGet) n := negroni.New(&negroni.Recovery{ Logger: log.New(os.Stdout, "", log.LstdFlags), PrintStack: false, @@ -140,13 +118,6 @@ func NewServer(config *config.Config) (*Server, error) { Logger: config.CtxLogger, } - // we don't need audit activities - - deployActivities, err := activities.NewDeploy(config.DeploymentConfig) - if err != nil { - return nil, errors.Wrap(err, "initializing deploy activities") - } - terraformActivities, err := activities.NewTerraform( config.TerraformCfg, config.ValidationConfig, @@ -155,7 +126,7 @@ func NewServer(config *config.Config) (*Server, error) { config.ServerCfg.URL, config.TemporalCfg.TerraformTaskQueue, config.GithubCfg.TemporalAppInstallationID, - jobStreamHandler, + nil, ) if err != nil { return nil, errors.Wrap(err, "initializing terraform activities") @@ -200,40 +171,19 @@ func NewServer(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing github activities") } - revisionSetterActivities, err := lyftActivities.NewRevisionSetter(config.RevisionSetter) - if err != nil { - return nil, errors.Wrap(err, "initializing revision setter activities") - } - - prRevisionGithubActivities := &lyftActivities.Github{ - ClientCreator: clientCreator, - InstallationID: config.GithubCfg.TemporalAppInstallationID, - } - server := Server{ - Logger: config.CtxLogger, - HTTPServerProxy: httpServerProxy, - Port: config.ServerCfg.Port, - StatsScope: scope, - StatsCloser: statsCloser, - TemporalClient: temporalClient, - JobStreamHandler: jobStreamHandler, - DeployActivities: deployActivities, - TerraformActivities: terraformActivities, - GithubActivities: githubActivities, - RevisionSetterActivities: revisionSetterActivities, - TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, - RevisionSetterConfig: config.RevisionSetter, - PRRevisionGithubActivities: prRevisionGithubActivities, + Logger: config.CtxLogger, + HTTPServerProxy: httpServerProxy, + Port: config.ServerCfg.Port, + StatsScope: scope, + StatsCloser: statsCloser, + TemporalClient: temporalClient, + TerraformActivities: terraformActivities, + GithubActivities: githubActivities, } return &server, nil } -/* - * BIG TODO FOR ATLANTIS DEPRECATION: - * Figure out what workers / activities / stuff and things we don't need and remove them - */ - func (s Server) Start() error { defer s.shutdown() @@ -241,37 +191,6 @@ func (s Server) Start() error { var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - deployWorker := s.buildDeployWorker() - if err := deployWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start deploy worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down deploy worker, resource clean up may still be occurring in the background") - }() - - wg.Add(1) - go func() { - defer wg.Done() - prWorker := worker.New(s.TemporalClient.Client, workflows.PRTaskQueue, worker.Options{ - WorkerStopTimeout: TemporalWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - }) - prWorker.RegisterActivity(s.GithubActivities) - prWorker.RegisterActivity(s.TerraformActivities) - prWorker.RegisterWorkflow(workflows.PR) - prWorker.RegisterWorkflow(workflows.Terraform) - if err := prWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr worker, resource clean up may still be occurring in the background") - }() - wg.Add(1) go func() { defer wg.Done() @@ -284,50 +203,6 @@ func (s Server) Start() error { s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") }() - // Spinning up a new worker process here adds complexity to the shutdown logic for this worker - // TODO: Investigate the feasibility of deploying this worker process in it's own worker - wg.Add(1) - go func() { - defer wg.Done() - - prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionTaskQueue, worker.Options{ - WorkerStopTimeout: PRRevisionWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.DefaultTaskQueue.ActivitiesPerSecond, - }) - prRevisionWorker.RegisterWorkflow(lyftWorkflows.PRRevision) - prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) - prRevisionWorker.RegisterActivity(s.RevisionSetterActivities) - - if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr revision default worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr revision default worker, resource clean up may still be occurring in the background") - }() - - wg.Add(1) - go func() { - defer wg.Done() - - prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionSlowTaskQueue, worker.Options{ - WorkerStopTimeout: PRRevisionWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.SlowTaskQueue.ActivitiesPerSecond, - }) - prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) - - if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr revision slow worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr revision slow worker, resource clean up may still be occurring in the background") - }() - // Ensure server gracefully drains connections when stopped. stop := make(chan os.Signal, 1) // Stop on SIGINTs and SIGTERMs. @@ -353,9 +228,6 @@ func (s Server) shutdown() { // On cleanup, stream handler closes all active receivers and persists in memory jobs to storage ctx, cancel := context.WithTimeout(context.Background(), StreamHandlerTimeout) defer cancel() - if err := s.JobStreamHandler.CleanUp(ctx); err != nil { - s.Logger.Error(err.Error()) - } ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -373,41 +245,6 @@ func (s Server) shutdown() { s.Logger.Close() } -// TODO: consider building these before initializing the server so that the server is just responsible -// for running the workers and has no knowledge of their dependencies. -func (s Server) buildDeployWorker() worker.Worker { - // pass the underlying client otherwise this will panic() - deployWorker := worker.New(s.TemporalClient.Client, workflows.DeployTaskQueue, worker.Options{ - WorkerStopTimeout: TemporalWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - }) - deployWorker.RegisterActivity(s.DeployActivities) - deployWorker.RegisterActivity(s.GithubActivities) - deployWorker.RegisterActivity(s.TerraformActivities) - deployWorker.RegisterWorkflowWithOptions(workflows.GetDeployWithPlugins( - func(ctx workflow.Context, dr workflows.DeployRequest) (plugins.Deploy, error) { - var a *lyftActivities.Audit - - return plugins.Deploy{ - Notifiers: []plugins.TerraformWorkflowNotifier{ - ¬ifier.SNSNotifier{ - Activity: a, - }, - }, - PostDeployExecutors: []plugins.PostDeployExecutor{ - &executor.PRRevisionWorkflowExecutor{TaskQueue: lyftWorkflows.PRRevisionTaskQueue}, - }, - }, nil - }, - ), workflow.RegisterOptions{ - Name: workflows.Deploy, - }) - deployWorker.RegisterWorkflow(workflows.Terraform) - return deployWorker -} - func (s Server) buildTerraformWorker() worker.Worker { // pass the underlying client otherwise this will panic() terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ From a62fcc248e270dce01b2e0deaf870dba203ebf5d Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Tue, 5 Mar 2024 12:02:12 -0800 Subject: [PATCH 06/20] ok --- cmd/terraformadmin.go | 3 +- server/neptune/terraformadmin/server.go | 39 ++----------------------- 2 files changed, 4 insertions(+), 38 deletions(-) diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index 357db9687..ac40cd1ac 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -43,10 +43,12 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C // we don't need the feature config cfg := &neptune.Config{ + // we need the authCfg and ssl stuff for the http server AuthCfg: neptune.AuthConfig{ SslCertFile: userConfig.SSLCertFile, SslKeyFile: userConfig.SSLKeyFile, }, + // we need the servercfg stuff, see setAtlantisURL ServerCfg: neptune.ServerConfig{ URL: parsedURL, Version: config.AtlantisVersion, @@ -61,7 +63,6 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C DefaultVersion: globalCfg.PolicySets.Version, Policies: globalCfg.PolicySets, }, - JobConfig: globalCfg.PersistenceConfig.Jobs, DeploymentConfig: globalCfg.PersistenceConfig.Deployments, DataDir: userConfig.DataDir, TemporalCfg: globalCfg.Temporal, diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index 3ea5bed6f..191668e8c 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -38,26 +38,6 @@ import ( "go.temporal.io/sdk/worker" ) -const ( - ProjectJobsViewRouteName = "project-jobs-detail" - - // to make this clear, - // time t event - // 0 min sigterm received from kube - // 50 min activity ctx canceled - // 50 + x min sigkill received from kube - // - // Note: x must be configured outside atlantis and is the grace period effectively. - TemporalWorkerTimeout = 50 * time.Minute - - // allow any in-progress PRRevision workflow executions to gracefully exit which shouldn't take longer than 10 minutes - PRRevisionWorkerTimeout = 10 * time.Minute - - // 5 minutes to allow cleaning up the job store - StreamHandlerTimeout = 5 * time.Minute - PRRevisionTaskQueueActivitiesPerSecond = 2 -) - type Server struct { Logger logging.Logger HTTPServerProxy *neptune_http.ServerProxy @@ -139,12 +119,8 @@ func NewServer(config *config.Config) (*Server, error) { if err != nil { return nil, errors.Wrap(err, "client creator") } - repoConfig := feature.RepoConfig{ - Owner: config.FeatureConfig.FFOwner, - Repo: config.FeatureConfig.FFRepo, - Branch: config.FeatureConfig.FFBranch, - Path: config.FeatureConfig.FFPath, - } + // TODO fill in details here + repoConfig := feature.RepoConfig{} installationFetcher := &github.InstallationRetriever{ ClientCreator: clientCreator, } @@ -225,16 +201,6 @@ func (s Server) Start() error { } func (s Server) shutdown() { - // On cleanup, stream handler closes all active receivers and persists in memory jobs to storage - ctx, cancel := context.WithTimeout(context.Background(), StreamHandlerTimeout) - defer cancel() - - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { - s.Logger.Error(err.Error()) - } - s.TemporalClient.Close() // flush stats before shutdown @@ -248,7 +214,6 @@ func (s Server) shutdown() { func (s Server) buildTerraformWorker() worker.Worker { // pass the underlying client otherwise this will panic() terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ - WorkerStopTimeout: TemporalWorkerTimeout, Interceptors: []interceptor.WorkerInterceptor{ temporal.NewWorkerInterceptor(), }, From 6edbdd09a5f59f77ac6961422257367294e9bc6a Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Tue, 5 Mar 2024 16:51:43 -0800 Subject: [PATCH 07/20] ok --- server/neptune/terraformadmin/server.go | 59 ++++++++++++++++++++----- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index 191668e8c..923ae4666 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -21,16 +21,13 @@ import ( assetfs "github.com/elazarl/go-bindata-assetfs" "github.com/gorilla/mux" "github.com/pkg/errors" - "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/logging" "github.com/runatlantis/atlantis/server/metrics" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" - lyftActivities "github.com/runatlantis/atlantis/server/neptune/lyft/activities" "github.com/runatlantis/atlantis/server/neptune/temporal" "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" - "github.com/runatlantis/atlantis/server/neptune/workflows/plugins" "github.com/runatlantis/atlantis/server/static" "github.com/uber-go/tally/v4" "github.com/urfave/negroni" @@ -47,11 +44,17 @@ type Server struct { TemporalClient *temporal.ClientWrapper TerraformActivities *activities.Terraform GithubActivities *activities.Github - // Temporary until we move this into our private code - PRRevisionGithubActivities *lyftActivities.Github - TerraformTaskQueue string - RevisionSetterConfig valid.RevisionSetter - AdditionalNotifiers []plugins.TerraformWorkflowNotifier + // differences from temporal worker: + // - no additional notifiers + // - no revision setter + // - no PRRevisionGithubActivities + // - no AuditActivity + // - no RevisionSetterActivities + // - no DeployActivities + // - no JobStreamHandler + // - no CronScheduler + // - no crons + TerraformTaskQueue string } func NewServer(config *config.Config) (*Server, error) { @@ -70,6 +73,8 @@ func NewServer(config *config.Config) (*Server, error) { "mode": "terraformadmin", }) + // difference from temporalworker: no job stuff (handler, controller, etc) + // temporal client + worker initialization opts := &temporal.Options{ StatsReporter: statsReporter, @@ -80,6 +85,7 @@ func NewServer(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing temporal client") } + // difference from temporalworker: no job endpoints // router initialization router := mux.NewRouter() router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) @@ -98,6 +104,8 @@ func NewServer(config *config.Config) (*Server, error) { Logger: config.CtxLogger, } + // difference from temporalworker - no audit or deploy activities + terraformActivities, err := activities.NewTerraform( config.TerraformCfg, config.ValidationConfig, @@ -107,6 +115,7 @@ func NewServer(config *config.Config) (*Server, error) { config.TemporalCfg.TerraformTaskQueue, config.GithubCfg.TemporalAppInstallationID, nil, + // difference from temporalworker: no jobstreamhandler TODO: test if this actually works ) if err != nil { return nil, errors.Wrap(err, "initializing terraform activities") @@ -119,8 +128,13 @@ func NewServer(config *config.Config) (*Server, error) { if err != nil { return nil, errors.Wrap(err, "client creator") } - // TODO fill in details here - repoConfig := feature.RepoConfig{} + // TODO fill in details here - in another PR pull from globalCfg since we have the items in the repo_template + repoConfig := feature.RepoConfig{ + Owner: config.FeatureConfig.FFOwner, + Repo: config.FeatureConfig.FFRepo, + Branch: config.FeatureConfig.FFBranch, + Path: config.FeatureConfig.FFPath, + } installationFetcher := &github.InstallationRetriever{ ClientCreator: clientCreator, } @@ -147,6 +161,10 @@ func NewServer(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing github activities") } + // difference from temporalworker: + // - no revisionSetterActivities or prRevisionGithubActivities + // - no cron scheduler or crons + server := Server{ Logger: config.CtxLogger, HTTPServerProxy: httpServerProxy, @@ -179,6 +197,11 @@ func (s Server) Start() error { s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") }() + // note the difference from temporalworker: + // - no default prRevisionWorker + // - no slow prRevisionWorker + // - no deployWorker + // Ensure server gracefully drains connections when stopped. stop := make(chan os.Signal, 1) // Stop on SIGINTs and SIGTERMs. @@ -201,6 +224,16 @@ func (s Server) Start() error { } func (s Server) shutdown() { + // this differs from temporalworker in that we: + // - don't shut down the jobstreamhandler + // - don't shutdown the cron scheduler + // as we don't need them + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { + s.Logger.Error(err.Error()) + } + s.TemporalClient.Close() // flush stats before shutdown @@ -211,12 +244,18 @@ func (s Server) shutdown() { s.Logger.Close() } +// TODO: add deployWorker if we need it + func (s Server) buildTerraformWorker() worker.Worker { // pass the underlying client otherwise this will panic() terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ Interceptors: []interceptor.WorkerInterceptor{ temporal.NewWorkerInterceptor(), }, + // default is 1k, however Nish's comment says that: + // "30 falls in line with our current cpu resourcing for each worker and should be the + // maximum number of concurrent tf operations we allow." + // We shouldn't need too many as we are only doing one thing here on this batch pod MaxConcurrentActivityExecutionSize: 30, }) terraformWorker.RegisterActivity(s.TerraformActivities) From fd7c8ba2208c1704885e5635af11b338a66d107b Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Tue, 5 Mar 2024 19:47:56 -0800 Subject: [PATCH 08/20] ok --- cmd/terraformadmin.go | 33 ++++++++++++++++--------- server/neptune/terraformadmin/server.go | 5 ++++ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index ac40cd1ac..bb15f571e 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -48,31 +48,42 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C SslCertFile: userConfig.SSLCertFile, SslKeyFile: userConfig.SSLKeyFile, }, - // we need the servercfg stuff, see setAtlantisURL + // we need the servercfg stuff, see setAtlantisURL() TODO: is this true? ServerCfg: neptune.ServerConfig{ URL: parsedURL, Version: config.AtlantisVersion, Port: userConfig.Port, }, + // we need the terraformcfg stuff, since we need terraformActivities TerraformCfg: neptune.TerraformConfig{ DefaultVersion: userConfig.DefaultTFVersion, DownloadURL: userConfig.TFDownloadURL, LogFilters: globalCfg.TerraformLogFilter, }, + // also passed to terraform activities, even though we don't need conf test OPA stuff + // TODO: But we have to introduce branching if we remove this... ValidationConfig: neptune.ValidationConfig{ DefaultVersion: globalCfg.PolicySets.Version, Policies: globalCfg.PolicySets, }, - DeploymentConfig: globalCfg.PersistenceConfig.Deployments, - DataDir: userConfig.DataDir, - TemporalCfg: globalCfg.Temporal, - GithubCfg: globalCfg.Github, - App: appConfig, - CtxLogger: ctxLogger, - StatsNamespace: userConfig.StatsNamespace, - Metrics: globalCfg.Metrics, - LyftAuditJobsSnsTopicArn: userConfig.LyftAuditJobsSnsTopicArn, - RevisionSetter: globalCfg.RevisionSetter, + + // Do not need deployment config + // do need datadir, we will save the archive there + DataDir: userConfig.DataDir, + // do need temporalconfig since we use temporal + TemporalCfg: globalCfg.Temporal, + // do need githubcfg, since we use github to get the archive + GithubCfg: globalCfg.Github, + // same as above + App: appConfig, + // we do need logging + CtxLogger: ctxLogger, + // we do need stats + StatsNamespace: userConfig.StatsNamespace, + // we do need metrics + Metrics: globalCfg.Metrics, + // no SnsTopicArn since we don't use the auditing + // no revision setter } return terraformadmin.NewServer(cfg) } diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index 923ae4666..88d25c1ce 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -246,6 +246,9 @@ func (s Server) shutdown() { // TODO: add deployWorker if we need it +// Note that we will need to do things similar to how gateway does it to get the metadata we need +// specifically the root + func (s Server) buildTerraformWorker() worker.Worker { // pass the underlying client otherwise this will panic() terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ @@ -264,6 +267,8 @@ func (s Server) buildTerraformWorker() worker.Worker { return terraformWorker } +// TODO: eventually we can make it so the pod is ready when the repo is done cloning... + // Healthz returns the health check response. It always returns a 200 currently. func Healthz(w http.ResponseWriter, _ *http.Request) { data, err := json.MarshalIndent(&struct { From 1ad555f226e92402aab3dfda938a5099281be5f7 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 13:08:27 -0800 Subject: [PATCH 09/20] still making progress --- cmd/server.go | 2 +- cmd/terraformadmin.go | 12 ++-- server/legacy/user_config.go | 6 +- server/legacy/user_config_test.go | 4 +- server/neptune/admin/config/adminconfig.go | 61 +++++++++++++++++++ .../{terraformadmin => admin}/server.go | 21 ++----- 6 files changed, 79 insertions(+), 27 deletions(-) create mode 100644 server/neptune/admin/config/adminconfig.go rename server/neptune/{terraformadmin => admin}/server.go (93%) diff --git a/cmd/server.go b/cmd/server.go index 8c9f754ab..39a719fe8 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -232,7 +232,7 @@ var stringFlags = map[string]stringFlag{ "gateway: Runs atlantis with gateway event handler that publishes events through sns.\n" + "worker: Runs atlantis with a sqs handler that polls for events in the queue to process.\n" + "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.\n" + - "terraformadmin: Runs atlantis in a mode that allows for running terraform commands.", + "admin: Runs atlantis in an admin mode that allows for running terraform commands.", defaultValue: "", }, LyftWorkerQueueURLFlag: { diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index bb15f571e..06a3cbfe5 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -6,8 +6,8 @@ import ( "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/legacy" "github.com/runatlantis/atlantis/server/logging" - neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" "github.com/runatlantis/atlantis/server/neptune/terraformadmin" + adminconfig "github.com/runatlantis/atlantis/server/neptune/terraformadmin/config" ) type TerraformAdmin struct{} @@ -42,27 +42,27 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C } // we don't need the feature config - cfg := &neptune.Config{ + cfg := &adminconfig.Config{ // we need the authCfg and ssl stuff for the http server - AuthCfg: neptune.AuthConfig{ + AuthCfg: adminconfig.AuthConfig{ SslCertFile: userConfig.SSLCertFile, SslKeyFile: userConfig.SSLKeyFile, }, // we need the servercfg stuff, see setAtlantisURL() TODO: is this true? - ServerCfg: neptune.ServerConfig{ + ServerCfg: adminconfig.ServerConfig{ URL: parsedURL, Version: config.AtlantisVersion, Port: userConfig.Port, }, // we need the terraformcfg stuff, since we need terraformActivities - TerraformCfg: neptune.TerraformConfig{ + TerraformCfg: adminconfig.TerraformConfig{ DefaultVersion: userConfig.DefaultTFVersion, DownloadURL: userConfig.TFDownloadURL, LogFilters: globalCfg.TerraformLogFilter, }, // also passed to terraform activities, even though we don't need conf test OPA stuff // TODO: But we have to introduce branching if we remove this... - ValidationConfig: neptune.ValidationConfig{ + ValidationConfig: adminconfig.ValidationConfig{ DefaultVersion: globalCfg.PolicySets.Version, Policies: globalCfg.PolicySets, }, diff --git a/server/legacy/user_config.go b/server/legacy/user_config.go index 980a38a01..0f17931b8 100644 --- a/server/legacy/user_config.go +++ b/server/legacy/user_config.go @@ -11,7 +11,7 @@ const ( Gateway Worker TemporalWorker - TerraformAdmin + Admin ) // UserConfig holds config values passed in by the user. @@ -105,8 +105,8 @@ func (u UserConfig) ToLyftMode() Mode { return Worker case "temporalworker": return TemporalWorker - case "terraformadmin": - return TerraformAdmin + case "admin": + return Admin } return Default } diff --git a/server/legacy/user_config_test.go b/server/legacy/user_config_test.go index de2e91335..b266d0d22 100644 --- a/server/legacy/user_config_test.go +++ b/server/legacy/user_config_test.go @@ -71,8 +71,8 @@ func TestUserConfig_ToLyftMode(t *testing.T) { server.Default, }, { - "terraformadmin", - server.TerraformAdmin, + "admin", + server.Admin, }, { "temporalworker", diff --git a/server/neptune/admin/config/adminconfig.go b/server/neptune/admin/config/adminconfig.go new file mode 100644 index 000000000..2a4d11cf9 --- /dev/null +++ b/server/neptune/admin/config/adminconfig.go @@ -0,0 +1,61 @@ +package adminconfig + +import ( + "net/url" + + "github.com/hashicorp/go-version" + + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/logging" +) + +type AuthConfig struct { + SslCertFile string + SslKeyFile string +} + +type ServerConfig struct { + URL *url.URL + Version string + Port int +} + +type TerraformConfig struct { + DefaultVersion string + DownloadURL string + LogFilters valid.TerraformLogFilters +} + +type ValidationConfig struct { + DefaultVersion *version.Version + Policies valid.PolicySets +} + +type FeatureConfig struct { + FFOwner string + FFRepo string + FFPath string + FFBranch string +} + +// Config is TerraformAdmin (admin mode) specific user config +type Config struct { + AuthCfg AuthConfig + ServerCfg ServerConfig + FeatureConfig FeatureConfig + TemporalCfg valid.Temporal + GithubCfg valid.Github + TerraformCfg TerraformConfig + ValidationConfig ValidationConfig + DeploymentConfig valid.StoreConfig + JobConfig valid.StoreConfig + Metrics valid.Metrics + + StatsNamespace string + + DataDir string + CtxLogger logging.Logger + App githubapp.Config + LyftAuditJobsSnsTopicArn string +} diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/admin/server.go similarity index 93% rename from server/neptune/terraformadmin/server.go rename to server/neptune/admin/server.go index 88d25c1ce..848b15e1e 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/admin/server.go @@ -1,4 +1,4 @@ -package terraformadmin +package admin import ( "context" @@ -25,7 +25,7 @@ import ( "github.com/runatlantis/atlantis/server/metrics" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" "github.com/runatlantis/atlantis/server/neptune/temporal" - "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" + adminconfig "github.com/runatlantis/atlantis/server/neptune/terraformadmin/config" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" "github.com/runatlantis/atlantis/server/static" @@ -44,20 +44,11 @@ type Server struct { TemporalClient *temporal.ClientWrapper TerraformActivities *activities.Terraform GithubActivities *activities.Github - // differences from temporal worker: - // - no additional notifiers - // - no revision setter - // - no PRRevisionGithubActivities - // - no AuditActivity - // - no RevisionSetterActivities - // - no DeployActivities - // - no JobStreamHandler - // - no CronScheduler - // - no crons + TerraformTaskQueue string } -func NewServer(config *config.Config) (*Server, error) { +func NewServer(config *adminconfig.Config) (*Server, error) { statsReporter, err := metrics.NewReporter(config.Metrics, config.CtxLogger) if err != nil { @@ -70,7 +61,7 @@ func NewServer(config *config.Config) (*Server, error) { } scope = scope.Tagged(map[string]string{ - "mode": "terraformadmin", + "mode": "admin", }) // difference from temporalworker: no job stuff (handler, controller, etc) @@ -115,7 +106,7 @@ func NewServer(config *config.Config) (*Server, error) { config.TemporalCfg.TerraformTaskQueue, config.GithubCfg.TemporalAppInstallationID, nil, - // difference from temporalworker: no jobstreamhandler TODO: test if this actually works + // difference from temporalworker: no jobstreamhandler TODO: test if this actually works ) if err != nil { return nil, errors.Wrap(err, "initializing terraform activities") From 7046388f7791c0c124ba0ee75b68d889ad274f9d Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 13:14:50 -0800 Subject: [PATCH 10/20] still making progress --- cmd/server.go | 8 +-- cmd/terraformadmin.go | 89 ------------------------------- server/config/raw/global_cfg.go | 12 ++--- server/config/valid/global_cfg.go | 4 +- server/neptune/admin/server.go | 2 +- 5 files changed, 13 insertions(+), 102 deletions(-) delete mode 100644 cmd/terraformadmin.go diff --git a/cmd/server.go b/cmd/server.go index 39a719fe8..7130ac4ee 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -345,7 +345,7 @@ func NewServerCmd(v *viper.Viper, version string) *ServerCmd { GatewayCreator: &GatewayCreator{}, WorkerCreator: &WorkerCreator{}, TemporalWorkerCreator: &TemporalWorker{}, - TerraformAdminCreator: &TerraformAdmin{}, + AdminCreator: &Admin{}, }, Viper: v, AtlantisVersion: version, @@ -376,7 +376,7 @@ type ServerCreatorProxy struct { GatewayCreator ServerCreator WorkerCreator ServerCreator TemporalWorkerCreator ServerCreator - TerraformAdminCreator ServerCreator + AdminCreator ServerCreator } func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config server.Config) (ServerStarter, error) { @@ -392,8 +392,8 @@ func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config serv return d.TemporalWorkerCreator.NewServer(userConfig, config) } - if userConfig.ToLyftMode() == server.TerraformAdmin { - return d.TerraformAdminCreator.NewServer(userConfig, config) + if userConfig.ToLyftMode() == server.Admin { + return d.AdminCreator.NewServer(userConfig, config) } return d.WorkerCreator.NewServer(userConfig, config) diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go deleted file mode 100644 index 06a3cbfe5..000000000 --- a/cmd/terraformadmin.go +++ /dev/null @@ -1,89 +0,0 @@ -package cmd - -import ( - "github.com/pkg/errors" - cfgParser "github.com/runatlantis/atlantis/server/config" - "github.com/runatlantis/atlantis/server/config/valid" - "github.com/runatlantis/atlantis/server/legacy" - "github.com/runatlantis/atlantis/server/logging" - "github.com/runatlantis/atlantis/server/neptune/terraformadmin" - adminconfig "github.com/runatlantis/atlantis/server/neptune/terraformadmin/config" -) - -type TerraformAdmin struct{} - -// NewServer returns the real Atlantis server object. -func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { - ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) - if err != nil { - return nil, errors.Wrap(err, "failed to build context logger") - } - - globalCfg := valid.NewGlobalCfg(userConfig.DataDir) - validator := &cfgParser.ParserValidator{} - - // TODO: should terraformadminmode pass in this stuff? - if userConfig.RepoConfig != "" { - globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) - if err != nil { - return nil, errors.Wrapf(err, "parsing %s file", userConfig.RepoConfig) - } - } - - parsedURL, err := legacy.ParseAtlantisURL(userConfig.AtlantisURL) - if err != nil { - return nil, errors.Wrapf(err, - "parsing atlantis url %q", userConfig.AtlantisURL) - } - - appConfig, err := createGHAppConfig(userConfig) - if err != nil { - return nil, err - } - - // we don't need the feature config - cfg := &adminconfig.Config{ - // we need the authCfg and ssl stuff for the http server - AuthCfg: adminconfig.AuthConfig{ - SslCertFile: userConfig.SSLCertFile, - SslKeyFile: userConfig.SSLKeyFile, - }, - // we need the servercfg stuff, see setAtlantisURL() TODO: is this true? - ServerCfg: adminconfig.ServerConfig{ - URL: parsedURL, - Version: config.AtlantisVersion, - Port: userConfig.Port, - }, - // we need the terraformcfg stuff, since we need terraformActivities - TerraformCfg: adminconfig.TerraformConfig{ - DefaultVersion: userConfig.DefaultTFVersion, - DownloadURL: userConfig.TFDownloadURL, - LogFilters: globalCfg.TerraformLogFilter, - }, - // also passed to terraform activities, even though we don't need conf test OPA stuff - // TODO: But we have to introduce branching if we remove this... - ValidationConfig: adminconfig.ValidationConfig{ - DefaultVersion: globalCfg.PolicySets.Version, - Policies: globalCfg.PolicySets, - }, - - // Do not need deployment config - // do need datadir, we will save the archive there - DataDir: userConfig.DataDir, - // do need temporalconfig since we use temporal - TemporalCfg: globalCfg.Temporal, - // do need githubcfg, since we use github to get the archive - GithubCfg: globalCfg.Github, - // same as above - App: appConfig, - // we do need logging - CtxLogger: ctxLogger, - // we do need stats - StatsNamespace: userConfig.StatsNamespace, - // we do need metrics - Metrics: globalCfg.Metrics, - // no SnsTopicArn since we don't use the auditing - // no revision setter - } - return terraformadmin.NewServer(cfg) -} diff --git a/server/config/raw/global_cfg.go b/server/config/raw/global_cfg.go index 9f760ff44..486975fd7 100644 --- a/server/config/raw/global_cfg.go +++ b/server/config/raw/global_cfg.go @@ -26,22 +26,22 @@ type GlobalCfg struct { Persistence Persistence `yaml:"persistence" json:"persistence"` RevisionSetter RevisionSetter `yaml:"revision_setter" json:"revision_setter"` Admin Admin `yaml:"admin" json:"admin"` - TerraformAdminMode TerraformAdminMode `yaml:"terraform_admin_mode" json:"terraform_admin_mode"` + AdminMode AdminMode `yaml:"admin_mode" json:"admin_mode"` } -type TerraformAdminMode struct { +type AdminMode struct { Repo string `yaml:"repo" json:"repo"` Root string `yaml:"root" json:"root"` } -func (t TerraformAdminMode) ToValid() valid.TerraformAdminMode { - return valid.TerraformAdminMode{ +func (t AdminMode) ToValid() valid.AdminMode { + return valid.AdminMode{ Repo: t.Repo, Root: t.Root, } } -func (t TerraformAdminMode) Validate() error { +func (t AdminMode) Validate() error { // We don't need to validate the inputs so we can just return nil return nil } @@ -214,7 +214,7 @@ func (g GlobalCfg) ToValid(defaultCfg valid.GlobalCfg) valid.GlobalCfg { Github: g.Github.ToValid(), Admin: g.Admin.ToValid(), RevisionSetter: g.RevisionSetter.ToValid(), - TerraformAdminMode: g.TerraformAdminMode.ToValid(), + AdminMode: g.AdminMode.ToValid(), } } diff --git a/server/config/valid/global_cfg.go b/server/config/valid/global_cfg.go index 7c2b45f8b..baabeabdd 100644 --- a/server/config/valid/global_cfg.go +++ b/server/config/valid/global_cfg.go @@ -64,10 +64,10 @@ type GlobalCfg struct { Github Github RevisionSetter RevisionSetter Admin Admin - TerraformAdminMode TerraformAdminMode + AdminMode AdminMode } -type TerraformAdminMode struct { +type AdminMode struct { Repo string Root string } diff --git a/server/neptune/admin/server.go b/server/neptune/admin/server.go index 848b15e1e..2a0164b78 100644 --- a/server/neptune/admin/server.go +++ b/server/neptune/admin/server.go @@ -23,9 +23,9 @@ import ( "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/logging" "github.com/runatlantis/atlantis/server/metrics" + adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" "github.com/runatlantis/atlantis/server/neptune/temporal" - adminconfig "github.com/runatlantis/atlantis/server/neptune/terraformadmin/config" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" "github.com/runatlantis/atlantis/server/static" From e8056c53ef31452b02f5f2b493f35c217df124dc Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 13:25:05 -0800 Subject: [PATCH 11/20] still making progress --- server/neptune/admin/config/adminconfig.go | 44 +++------------------- server/neptune/admin/server.go | 3 +- 2 files changed, 7 insertions(+), 40 deletions(-) diff --git a/server/neptune/admin/config/adminconfig.go b/server/neptune/admin/config/adminconfig.go index 2a4d11cf9..fe6892054 100644 --- a/server/neptune/admin/config/adminconfig.go +++ b/server/neptune/admin/config/adminconfig.go @@ -1,53 +1,21 @@ package adminconfig import ( - "net/url" - - "github.com/hashicorp/go-version" - "github.com/palantir/go-githubapp/githubapp" "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/logging" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" ) -type AuthConfig struct { - SslCertFile string - SslKeyFile string -} - -type ServerConfig struct { - URL *url.URL - Version string - Port int -} - -type TerraformConfig struct { - DefaultVersion string - DownloadURL string - LogFilters valid.TerraformLogFilters -} - -type ValidationConfig struct { - DefaultVersion *version.Version - Policies valid.PolicySets -} - -type FeatureConfig struct { - FFOwner string - FFRepo string - FFPath string - FFBranch string -} - // Config is TerraformAdmin (admin mode) specific user config type Config struct { - AuthCfg AuthConfig - ServerCfg ServerConfig - FeatureConfig FeatureConfig + AuthCfg neptune.AuthConfig + ServerCfg neptune.ServerConfig + FeatureConfig neptune.FeatureConfig TemporalCfg valid.Temporal GithubCfg valid.Github - TerraformCfg TerraformConfig - ValidationConfig ValidationConfig + TerraformCfg neptune.TerraformConfig + ValidationConfig neptune.ValidationConfig DeploymentConfig valid.StoreConfig JobConfig valid.StoreConfig Metrics valid.Metrics diff --git a/server/neptune/admin/server.go b/server/neptune/admin/server.go index 2a0164b78..7d7e37150 100644 --- a/server/neptune/admin/server.go +++ b/server/neptune/admin/server.go @@ -164,6 +164,7 @@ func NewServer(config *adminconfig.Config) (*Server, error) { StatsCloser: statsCloser, TemporalClient: temporalClient, TerraformActivities: terraformActivities, + TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, GithubActivities: githubActivities, } return &server, nil @@ -235,8 +236,6 @@ func (s Server) shutdown() { s.Logger.Close() } -// TODO: add deployWorker if we need it - // Note that we will need to do things similar to how gateway does it to get the metadata we need // specifically the root From 56d94a5f2aadc0313c013b45a5f4b334788a6713 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 13:49:22 -0800 Subject: [PATCH 12/20] still making progress --- server/neptune/admin/config/adminconfig.go | 1 - server/neptune/admin/server.go | 24 ++-------------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/server/neptune/admin/config/adminconfig.go b/server/neptune/admin/config/adminconfig.go index fe6892054..9fa087713 100644 --- a/server/neptune/admin/config/adminconfig.go +++ b/server/neptune/admin/config/adminconfig.go @@ -15,7 +15,6 @@ type Config struct { TemporalCfg valid.Temporal GithubCfg valid.Github TerraformCfg neptune.TerraformConfig - ValidationConfig neptune.ValidationConfig DeploymentConfig valid.StoreConfig JobConfig valid.StoreConfig Metrics valid.Metrics diff --git a/server/neptune/admin/server.go b/server/neptune/admin/server.go index 7d7e37150..f477f8da6 100644 --- a/server/neptune/admin/server.go +++ b/server/neptune/admin/server.go @@ -26,6 +26,7 @@ import ( adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" "github.com/runatlantis/atlantis/server/neptune/temporal" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" "github.com/runatlantis/atlantis/server/static" @@ -64,9 +65,6 @@ func NewServer(config *adminconfig.Config) (*Server, error) { "mode": "admin", }) - // difference from temporalworker: no job stuff (handler, controller, etc) - - // temporal client + worker initialization opts := &temporal.Options{ StatsReporter: statsReporter, } @@ -76,8 +74,6 @@ func NewServer(config *adminconfig.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing temporal client") } - // difference from temporalworker: no job endpoints - // router initialization router := mux.NewRouter() router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) @@ -95,18 +91,15 @@ func NewServer(config *adminconfig.Config) (*Server, error) { Logger: config.CtxLogger, } - // difference from temporalworker - no audit or deploy activities - terraformActivities, err := activities.NewTerraform( config.TerraformCfg, - config.ValidationConfig, + neptune.ValidationConfig{}, config.App, config.DataDir, config.ServerCfg.URL, config.TemporalCfg.TerraformTaskQueue, config.GithubCfg.TemporalAppInstallationID, nil, - // difference from temporalworker: no jobstreamhandler TODO: test if this actually works ) if err != nil { return nil, errors.Wrap(err, "initializing terraform activities") @@ -152,10 +145,6 @@ func NewServer(config *adminconfig.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing github activities") } - // difference from temporalworker: - // - no revisionSetterActivities or prRevisionGithubActivities - // - no cron scheduler or crons - server := Server{ Logger: config.CtxLogger, HTTPServerProxy: httpServerProxy, @@ -189,11 +178,6 @@ func (s Server) Start() error { s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") }() - // note the difference from temporalworker: - // - no default prRevisionWorker - // - no slow prRevisionWorker - // - no deployWorker - // Ensure server gracefully drains connections when stopped. stop := make(chan os.Signal, 1) // Stop on SIGINTs and SIGTERMs. @@ -216,10 +200,6 @@ func (s Server) Start() error { } func (s Server) shutdown() { - // this differs from temporalworker in that we: - // - don't shut down the jobstreamhandler - // - don't shutdown the cron scheduler - // as we don't need them ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { From 0e831f35d04dfd9570d2672e4a670994a70341ef Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 13:49:31 -0800 Subject: [PATCH 13/20] still making progress --- cmd/admin.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 cmd/admin.go diff --git a/cmd/admin.go b/cmd/admin.go new file mode 100644 index 000000000..7292ba96b --- /dev/null +++ b/cmd/admin.go @@ -0,0 +1,80 @@ +package cmd + +import ( + "github.com/pkg/errors" + cfgParser "github.com/runatlantis/atlantis/server/config" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/neptune/admin" + adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" +) + +type Admin struct{} + +// NewServer returns the real Atlantis server object. +func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { + ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) + if err != nil { + return nil, errors.Wrap(err, "failed to build context logger") + } + + globalCfg := valid.NewGlobalCfg(userConfig.DataDir) + validator := &cfgParser.ParserValidator{} + + // TODO: should terraformadminmode pass in this stuff? + if userConfig.RepoConfig != "" { + globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) + if err != nil { + return nil, errors.Wrapf(err, "parsing %s file", userConfig.RepoConfig) + } + } + + parsedURL, err := legacy.ParseAtlantisURL(userConfig.AtlantisURL) + if err != nil { + return nil, errors.Wrapf(err, + "parsing atlantis url %q", userConfig.AtlantisURL) + } + + appConfig, err := createGHAppConfig(userConfig) + if err != nil { + return nil, err + } + + cfg := &adminconfig.Config{ + AuthCfg: neptune.AuthConfig{ + SslCertFile: userConfig.SSLCertFile, + SslKeyFile: userConfig.SSLKeyFile, + }, + ServerCfg: neptune.ServerConfig{ + URL: parsedURL, + Version: config.AtlantisVersion, + Port: userConfig.Port, + }, + // we need the terraformcfg stuff, since we need terraformActivities + TerraformCfg: neptune.TerraformConfig{ + DefaultVersion: userConfig.DefaultTFVersion, + DownloadURL: userConfig.TFDownloadURL, + LogFilters: globalCfg.TerraformLogFilter, + }, + // Do not need deployment config + // do need datadir, we will save the archive there + DataDir: userConfig.DataDir, + // do need temporalconfig since we use temporal + TemporalCfg: globalCfg.Temporal, + // do need githubcfg, since we use github to get the archive + GithubCfg: globalCfg.Github, + // same as above + App: appConfig, + // we do need logging + CtxLogger: ctxLogger, + // we do need stats + StatsNamespace: userConfig.StatsNamespace, + // we do need metrics + Metrics: globalCfg.Metrics, + // no SnsTopicArn since we don't use the auditing + // no revision setter + } + return admin.NewServer(cfg) +} From 5f84555b91c1f21cf99604c90130beadc72cb9d7 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 14:06:25 -0800 Subject: [PATCH 14/20] still making progress --- server/neptune/admin/server.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/server/neptune/admin/server.go b/server/neptune/admin/server.go index f477f8da6..af84ae864 100644 --- a/server/neptune/admin/server.go +++ b/server/neptune/admin/server.go @@ -15,6 +15,7 @@ import ( "github.com/palantir/go-githubapp/githubapp" "github.com/runatlantis/atlantis/server/neptune/lyft/feature" + "github.com/runatlantis/atlantis/server/neptune/sync/crons" ghClient "github.com/runatlantis/atlantis/server/neptune/workflows/activities/github" "github.com/runatlantis/atlantis/server/vcs/provider/github" @@ -25,6 +26,7 @@ import ( "github.com/runatlantis/atlantis/server/metrics" adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" + internalSync "github.com/runatlantis/atlantis/server/neptune/sync" "github.com/runatlantis/atlantis/server/neptune/temporal" neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" "github.com/runatlantis/atlantis/server/neptune/workflows" @@ -38,6 +40,8 @@ import ( type Server struct { Logger logging.Logger + CronScheduler *internalSync.CronScheduler + Crons []*internalSync.Cron HTTPServerProxy *neptune_http.ServerProxy Port int StatsScope tally.Scope @@ -145,8 +149,17 @@ func NewServer(config *adminconfig.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing github activities") } + cronScheduler := internalSync.NewCronScheduler(config.CtxLogger) + server := Server{ - Logger: config.CtxLogger, + Logger: config.CtxLogger, + CronScheduler: cronScheduler, + Crons: []*internalSync.Cron{ + { + Executor: crons.NewRuntimeStats(scope).Run, + Frequency: 1 * time.Minute, + }, + }, HTTPServerProxy: httpServerProxy, Port: config.ServerCfg.Port, StatsScope: scope, From 659b3cbbc412e5a3d4df8a179eedca755cf0c4dc Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Wed, 6 Mar 2024 14:07:33 -0800 Subject: [PATCH 15/20] still making progress --- cmd/admin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/admin.go b/cmd/admin.go index 7292ba96b..3081f381a 100644 --- a/cmd/admin.go +++ b/cmd/admin.go @@ -23,7 +23,7 @@ func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (S globalCfg := valid.NewGlobalCfg(userConfig.DataDir) validator := &cfgParser.ParserValidator{} - // TODO: should terraformadminmode pass in this stuff? + // TODO: fill in values from globalCfg if userConfig.RepoConfig != "" { globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) if err != nil { From f2e037f169f1941a9708c2aed7bb5a2c4cdcc21d Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 7 Mar 2024 12:30:54 -0800 Subject: [PATCH 16/20] still making progress --- cmd/admin.go | 33 +-- server/config/raw/global_cfg.go | 12 +- server/config/valid/global_cfg.go | 4 +- server/neptune/admin/config/adminconfig.go | 28 --- server/neptune/admin/server.go | 269 --------------------- 5 files changed, 18 insertions(+), 328 deletions(-) delete mode 100644 server/neptune/admin/config/adminconfig.go delete mode 100644 server/neptune/admin/server.go diff --git a/cmd/admin.go b/cmd/admin.go index 3081f381a..99b2795f0 100644 --- a/cmd/admin.go +++ b/cmd/admin.go @@ -6,14 +6,13 @@ import ( "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/legacy" "github.com/runatlantis/atlantis/server/logging" - "github.com/runatlantis/atlantis/server/neptune/admin" - adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" + adhoc "github.com/runatlantis/atlantis/server/neptune/adhoc" + adhocconfig "github.com/runatlantis/atlantis/server/neptune/adhoc/config" neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" ) type Admin struct{} -// NewServer returns the real Atlantis server object. func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) if err != nil { @@ -23,7 +22,6 @@ func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (S globalCfg := valid.NewGlobalCfg(userConfig.DataDir) validator := &cfgParser.ParserValidator{} - // TODO: fill in values from globalCfg if userConfig.RepoConfig != "" { globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) if err != nil { @@ -42,7 +40,7 @@ func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (S return nil, err } - cfg := &adminconfig.Config{ + cfg := &adhocconfig.Config{ AuthCfg: neptune.AuthConfig{ SslCertFile: userConfig.SSLCertFile, SslKeyFile: userConfig.SSLKeyFile, @@ -52,29 +50,18 @@ func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (S Version: config.AtlantisVersion, Port: userConfig.Port, }, - // we need the terraformcfg stuff, since we need terraformActivities TerraformCfg: neptune.TerraformConfig{ DefaultVersion: userConfig.DefaultTFVersion, DownloadURL: userConfig.TFDownloadURL, LogFilters: globalCfg.TerraformLogFilter, }, - // Do not need deployment config - // do need datadir, we will save the archive there - DataDir: userConfig.DataDir, - // do need temporalconfig since we use temporal - TemporalCfg: globalCfg.Temporal, - // do need githubcfg, since we use github to get the archive - GithubCfg: globalCfg.Github, - // same as above - App: appConfig, - // we do need logging - CtxLogger: ctxLogger, - // we do need stats + DataDir: userConfig.DataDir, + TemporalCfg: globalCfg.Temporal, + GithubCfg: globalCfg.Github, + App: appConfig, + CtxLogger: ctxLogger, StatsNamespace: userConfig.StatsNamespace, - // we do need metrics - Metrics: globalCfg.Metrics, - // no SnsTopicArn since we don't use the auditing - // no revision setter + Metrics: globalCfg.Metrics, } - return admin.NewServer(cfg) + return adhoc.NewServer(cfg) } diff --git a/server/config/raw/global_cfg.go b/server/config/raw/global_cfg.go index 486975fd7..03870625a 100644 --- a/server/config/raw/global_cfg.go +++ b/server/config/raw/global_cfg.go @@ -26,22 +26,22 @@ type GlobalCfg struct { Persistence Persistence `yaml:"persistence" json:"persistence"` RevisionSetter RevisionSetter `yaml:"revision_setter" json:"revision_setter"` Admin Admin `yaml:"admin" json:"admin"` - AdminMode AdminMode `yaml:"admin_mode" json:"admin_mode"` + AdhocMode AdhocMode `yaml:"adhoc_mode" json:"adhoc_mode"` } -type AdminMode struct { +type AdhocMode struct { Repo string `yaml:"repo" json:"repo"` Root string `yaml:"root" json:"root"` } -func (t AdminMode) ToValid() valid.AdminMode { - return valid.AdminMode{ +func (t AdhocMode) ToValid() valid.AdhocMode { + return valid.AdhocMode{ Repo: t.Repo, Root: t.Root, } } -func (t AdminMode) Validate() error { +func (t AdhocMode) Validate() error { // We don't need to validate the inputs so we can just return nil return nil } @@ -214,7 +214,7 @@ func (g GlobalCfg) ToValid(defaultCfg valid.GlobalCfg) valid.GlobalCfg { Github: g.Github.ToValid(), Admin: g.Admin.ToValid(), RevisionSetter: g.RevisionSetter.ToValid(), - AdminMode: g.AdminMode.ToValid(), + AdhocMode: g.AdhocMode.ToValid(), } } diff --git a/server/config/valid/global_cfg.go b/server/config/valid/global_cfg.go index baabeabdd..fb847af43 100644 --- a/server/config/valid/global_cfg.go +++ b/server/config/valid/global_cfg.go @@ -64,10 +64,10 @@ type GlobalCfg struct { Github Github RevisionSetter RevisionSetter Admin Admin - AdminMode AdminMode + AdhocMode AdhocMode } -type AdminMode struct { +type AdhocMode struct { Repo string Root string } diff --git a/server/neptune/admin/config/adminconfig.go b/server/neptune/admin/config/adminconfig.go deleted file mode 100644 index 9fa087713..000000000 --- a/server/neptune/admin/config/adminconfig.go +++ /dev/null @@ -1,28 +0,0 @@ -package adminconfig - -import ( - "github.com/palantir/go-githubapp/githubapp" - "github.com/runatlantis/atlantis/server/config/valid" - "github.com/runatlantis/atlantis/server/logging" - neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" -) - -// Config is TerraformAdmin (admin mode) specific user config -type Config struct { - AuthCfg neptune.AuthConfig - ServerCfg neptune.ServerConfig - FeatureConfig neptune.FeatureConfig - TemporalCfg valid.Temporal - GithubCfg valid.Github - TerraformCfg neptune.TerraformConfig - DeploymentConfig valid.StoreConfig - JobConfig valid.StoreConfig - Metrics valid.Metrics - - StatsNamespace string - - DataDir string - CtxLogger logging.Logger - App githubapp.Config - LyftAuditJobsSnsTopicArn string -} diff --git a/server/neptune/admin/server.go b/server/neptune/admin/server.go deleted file mode 100644 index af84ae864..000000000 --- a/server/neptune/admin/server.go +++ /dev/null @@ -1,269 +0,0 @@ -package admin - -import ( - "context" - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/palantir/go-githubapp/githubapp" - "github.com/runatlantis/atlantis/server/neptune/lyft/feature" - "github.com/runatlantis/atlantis/server/neptune/sync/crons" - ghClient "github.com/runatlantis/atlantis/server/neptune/workflows/activities/github" - "github.com/runatlantis/atlantis/server/vcs/provider/github" - - assetfs "github.com/elazarl/go-bindata-assetfs" - "github.com/gorilla/mux" - "github.com/pkg/errors" - "github.com/runatlantis/atlantis/server/logging" - "github.com/runatlantis/atlantis/server/metrics" - adminconfig "github.com/runatlantis/atlantis/server/neptune/admin/config" - neptune_http "github.com/runatlantis/atlantis/server/neptune/http" - internalSync "github.com/runatlantis/atlantis/server/neptune/sync" - "github.com/runatlantis/atlantis/server/neptune/temporal" - neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" - "github.com/runatlantis/atlantis/server/neptune/workflows" - "github.com/runatlantis/atlantis/server/neptune/workflows/activities" - "github.com/runatlantis/atlantis/server/static" - "github.com/uber-go/tally/v4" - "github.com/urfave/negroni" - "go.temporal.io/sdk/interceptor" - "go.temporal.io/sdk/worker" -) - -type Server struct { - Logger logging.Logger - CronScheduler *internalSync.CronScheduler - Crons []*internalSync.Cron - HTTPServerProxy *neptune_http.ServerProxy - Port int - StatsScope tally.Scope - StatsCloser io.Closer - TemporalClient *temporal.ClientWrapper - TerraformActivities *activities.Terraform - GithubActivities *activities.Github - - TerraformTaskQueue string -} - -func NewServer(config *adminconfig.Config) (*Server, error) { - statsReporter, err := metrics.NewReporter(config.Metrics, config.CtxLogger) - - if err != nil { - return nil, err - } - - scope, statsCloser := metrics.NewScopeWithReporter(config.Metrics, config.CtxLogger, config.StatsNamespace, statsReporter) - if err != nil { - return nil, err - } - - scope = scope.Tagged(map[string]string{ - "mode": "admin", - }) - - opts := &temporal.Options{ - StatsReporter: statsReporter, - } - opts = opts.WithClientInterceptors(temporal.NewMetricsInterceptor(scope)) - temporalClient, err := temporal.NewClient(config.CtxLogger, config.TemporalCfg, opts) - if err != nil { - return nil, errors.Wrap(err, "initializing temporal client") - } - - router := mux.NewRouter() - router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) - router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) - n := negroni.New(&negroni.Recovery{ - Logger: log.New(os.Stdout, "", log.LstdFlags), - PrintStack: false, - StackAll: false, - StackSize: 1024 * 8, - }) - n.UseHandler(router) - httpServerProxy := &neptune_http.ServerProxy{ - SSLCertFile: config.AuthCfg.SslCertFile, - SSLKeyFile: config.AuthCfg.SslKeyFile, - Server: &http.Server{Addr: fmt.Sprintf(":%d", config.ServerCfg.Port), Handler: n, ReadHeaderTimeout: time.Second * 10}, - Logger: config.CtxLogger, - } - - terraformActivities, err := activities.NewTerraform( - config.TerraformCfg, - neptune.ValidationConfig{}, - config.App, - config.DataDir, - config.ServerCfg.URL, - config.TemporalCfg.TerraformTaskQueue, - config.GithubCfg.TemporalAppInstallationID, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "initializing terraform activities") - } - clientCreator, err := githubapp.NewDefaultCachingClientCreator( - config.App, - githubapp.WithClientMiddleware( - ghClient.ClientMetrics(scope.SubScope("app")), - )) - if err != nil { - return nil, errors.Wrap(err, "client creator") - } - // TODO fill in details here - in another PR pull from globalCfg since we have the items in the repo_template - repoConfig := feature.RepoConfig{ - Owner: config.FeatureConfig.FFOwner, - Repo: config.FeatureConfig.FFRepo, - Branch: config.FeatureConfig.FFBranch, - Path: config.FeatureConfig.FFPath, - } - installationFetcher := &github.InstallationRetriever{ - ClientCreator: clientCreator, - } - fileFetcher := &github.SingleFileContentsFetcher{ - ClientCreator: clientCreator, - } - retriever := &feature.CustomGithubInstallationRetriever{ - InstallationFetcher: installationFetcher, - FileContentsFetcher: fileFetcher, - Cfg: repoConfig, - } - featureAllocator, err := feature.NewGHSourcedAllocator(retriever, config.CtxLogger) - if err != nil { - return nil, errors.Wrap(err, "initializing feature allocator") - } - - githubActivities, err := activities.NewGithub( - clientCreator, - config.GithubCfg.TemporalAppInstallationID, - config.DataDir, - featureAllocator, - ) - if err != nil { - return nil, errors.Wrap(err, "initializing github activities") - } - - cronScheduler := internalSync.NewCronScheduler(config.CtxLogger) - - server := Server{ - Logger: config.CtxLogger, - CronScheduler: cronScheduler, - Crons: []*internalSync.Cron{ - { - Executor: crons.NewRuntimeStats(scope).Run, - Frequency: 1 * time.Minute, - }, - }, - HTTPServerProxy: httpServerProxy, - Port: config.ServerCfg.Port, - StatsScope: scope, - StatsCloser: statsCloser, - TemporalClient: temporalClient, - TerraformActivities: terraformActivities, - TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, - GithubActivities: githubActivities, - } - return &server, nil -} - -func (s Server) Start() error { - defer s.shutdown() - - ctx := context.Background() - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - - terraformWorker := s.buildTerraformWorker() - if err := terraformWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start terraform worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") - }() - - // Ensure server gracefully drains connections when stopped. - stop := make(chan os.Signal, 1) - // Stop on SIGINTs and SIGTERMs. - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - - s.Logger.Info(fmt.Sprintf("Atlantis started - listening on port %v", s.Port)) - - go func() { - err := s.HTTPServerProxy.ListenAndServe() - - if err != nil && err != http.ErrServerClosed { - s.Logger.Error(err.Error()) - } - }() - - <-stop - wg.Wait() - - return nil -} - -func (s Server) shutdown() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { - s.Logger.Error(err.Error()) - } - - s.TemporalClient.Close() - - // flush stats before shutdown - if err := s.StatsCloser.Close(); err != nil { - s.Logger.Error(err.Error()) - } - - s.Logger.Close() -} - -// Note that we will need to do things similar to how gateway does it to get the metadata we need -// specifically the root - -func (s Server) buildTerraformWorker() worker.Worker { - // pass the underlying client otherwise this will panic() - terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - // default is 1k, however Nish's comment says that: - // "30 falls in line with our current cpu resourcing for each worker and should be the - // maximum number of concurrent tf operations we allow." - // We shouldn't need too many as we are only doing one thing here on this batch pod - MaxConcurrentActivityExecutionSize: 30, - }) - terraformWorker.RegisterActivity(s.TerraformActivities) - terraformWorker.RegisterActivity(s.GithubActivities) - terraformWorker.RegisterWorkflow(workflows.Terraform) - return terraformWorker -} - -// TODO: eventually we can make it so the pod is ready when the repo is done cloning... - -// Healthz returns the health check response. It always returns a 200 currently. -func Healthz(w http.ResponseWriter, _ *http.Request) { - data, err := json.MarshalIndent(&struct { - Status string `json:"status"` - }{ - Status: "ok", - }, "", " ") - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Error creating status json response: %s", err) - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(data) // nolint: errcheck -} From 9f0733d13d43694ab945f559bc1b9c1559506805 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 7 Mar 2024 12:39:18 -0800 Subject: [PATCH 17/20] still making progress --- server/neptune/adhoc/config/config.go | 27 +++ server/neptune/adhoc/server.go | 265 ++++++++++++++++++++++++++ 2 files changed, 292 insertions(+) create mode 100644 server/neptune/adhoc/config/config.go create mode 100644 server/neptune/adhoc/server.go diff --git a/server/neptune/adhoc/config/config.go b/server/neptune/adhoc/config/config.go new file mode 100644 index 000000000..ec1997c86 --- /dev/null +++ b/server/neptune/adhoc/config/config.go @@ -0,0 +1,27 @@ +package adhocconfig + +import ( + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/logging" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" +) + +// Config is TerraformAdmin (Adhoc mode) specific user config +type Config struct { + AuthCfg neptune.AuthConfig + ServerCfg neptune.ServerConfig + FeatureConfig neptune.FeatureConfig + TemporalCfg valid.Temporal + GithubCfg valid.Github + TerraformCfg neptune.TerraformConfig + DeploymentConfig valid.StoreConfig + JobConfig valid.StoreConfig + Metrics valid.Metrics + + StatsNamespace string + + DataDir string + CtxLogger logging.Logger + App githubapp.Config +} diff --git a/server/neptune/adhoc/server.go b/server/neptune/adhoc/server.go new file mode 100644 index 000000000..b6ac517b6 --- /dev/null +++ b/server/neptune/adhoc/server.go @@ -0,0 +1,265 @@ +package admin + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/neptune/lyft/feature" + "github.com/runatlantis/atlantis/server/neptune/sync/crons" + ghClient "github.com/runatlantis/atlantis/server/neptune/workflows/activities/github" + "github.com/runatlantis/atlantis/server/vcs/provider/github" + + assetfs "github.com/elazarl/go-bindata-assetfs" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/metrics" + adhocconfig "github.com/runatlantis/atlantis/server/neptune/adhoc/config" + neptune_http "github.com/runatlantis/atlantis/server/neptune/http" + internalSync "github.com/runatlantis/atlantis/server/neptune/sync" + "github.com/runatlantis/atlantis/server/neptune/temporal" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" + "github.com/runatlantis/atlantis/server/neptune/workflows" + "github.com/runatlantis/atlantis/server/neptune/workflows/activities" + "github.com/runatlantis/atlantis/server/static" + "github.com/uber-go/tally/v4" + "github.com/urfave/negroni" + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/worker" +) + +type Server struct { + Logger logging.Logger + CronScheduler *internalSync.CronScheduler + Crons []*internalSync.Cron + HTTPServerProxy *neptune_http.ServerProxy + Port int + StatsScope tally.Scope + StatsCloser io.Closer + TemporalClient *temporal.ClientWrapper + TerraformActivities *activities.Terraform + GithubActivities *activities.Github + + TerraformTaskQueue string +} + +func NewServer(config *adhocconfig.Config) (*Server, error) { + statsReporter, err := metrics.NewReporter(config.Metrics, config.CtxLogger) + + if err != nil { + return nil, err + } + + scope, statsCloser := metrics.NewScopeWithReporter(config.Metrics, config.CtxLogger, config.StatsNamespace, statsReporter) + if err != nil { + return nil, err + } + + scope = scope.Tagged(map[string]string{ + "mode": "adhoc", + }) + + opts := &temporal.Options{ + StatsReporter: statsReporter, + } + opts = opts.WithClientInterceptors(temporal.NewMetricsInterceptor(scope)) + temporalClient, err := temporal.NewClient(config.CtxLogger, config.TemporalCfg, opts) + if err != nil { + return nil, errors.Wrap(err, "initializing temporal client") + } + + router := mux.NewRouter() + router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) + router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) + n := negroni.New(&negroni.Recovery{ + Logger: log.New(os.Stdout, "", log.LstdFlags), + PrintStack: false, + StackAll: false, + StackSize: 1024 * 8, + }) + n.UseHandler(router) + httpServerProxy := &neptune_http.ServerProxy{ + SSLCertFile: config.AuthCfg.SslCertFile, + SSLKeyFile: config.AuthCfg.SslKeyFile, + Server: &http.Server{Addr: fmt.Sprintf(":%d", config.ServerCfg.Port), Handler: n, ReadHeaderTimeout: time.Second * 10}, + Logger: config.CtxLogger, + } + + terraformActivities, err := activities.NewTerraform( + config.TerraformCfg, + neptune.ValidationConfig{}, + config.App, + config.DataDir, + config.ServerCfg.URL, + config.TemporalCfg.TerraformTaskQueue, + config.GithubCfg.TemporalAppInstallationID, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing terraform activities") + } + clientCreator, err := githubapp.NewDefaultCachingClientCreator( + config.App, + githubapp.WithClientMiddleware( + ghClient.ClientMetrics(scope.SubScope("app")), + )) + if err != nil { + return nil, errors.Wrap(err, "client creator") + } + // TODO fill in details here - in another PR pull from globalCfg since we have the items in the repo_template + repoConfig := feature.RepoConfig{ + Owner: config.FeatureConfig.FFOwner, + Repo: config.FeatureConfig.FFRepo, + Branch: config.FeatureConfig.FFBranch, + Path: config.FeatureConfig.FFPath, + } + installationFetcher := &github.InstallationRetriever{ + ClientCreator: clientCreator, + } + fileFetcher := &github.SingleFileContentsFetcher{ + ClientCreator: clientCreator, + } + retriever := &feature.CustomGithubInstallationRetriever{ + InstallationFetcher: installationFetcher, + FileContentsFetcher: fileFetcher, + Cfg: repoConfig, + } + featureAllocator, err := feature.NewGHSourcedAllocator(retriever, config.CtxLogger) + if err != nil { + return nil, errors.Wrap(err, "initializing feature allocator") + } + + githubActivities, err := activities.NewGithub( + clientCreator, + config.GithubCfg.TemporalAppInstallationID, + config.DataDir, + featureAllocator, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing github activities") + } + + cronScheduler := internalSync.NewCronScheduler(config.CtxLogger) + + server := Server{ + Logger: config.CtxLogger, + CronScheduler: cronScheduler, + Crons: []*internalSync.Cron{ + { + Executor: crons.NewRuntimeStats(scope).Run, + Frequency: 1 * time.Minute, + }, + }, + HTTPServerProxy: httpServerProxy, + Port: config.ServerCfg.Port, + StatsScope: scope, + StatsCloser: statsCloser, + TemporalClient: temporalClient, + TerraformActivities: terraformActivities, + TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, + GithubActivities: githubActivities, + } + return &server, nil +} + +func (s Server) Start() error { + defer s.shutdown() + + ctx := context.Background() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + terraformWorker := s.buildTerraformWorker() + if err := terraformWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start terraform worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") + }() + + // Ensure server gracefully drains connections when stopped. + stop := make(chan os.Signal, 1) + // Stop on SIGINTs and SIGTERMs. + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + s.Logger.Info(fmt.Sprintf("Atlantis started - listening on port %v", s.Port)) + + go func() { + err := s.HTTPServerProxy.ListenAndServe() + + if err != nil && err != http.ErrServerClosed { + s.Logger.Error(err.Error()) + } + }() + + <-stop + wg.Wait() + + return nil +} + +func (s Server) shutdown() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { + s.Logger.Error(err.Error()) + } + + s.TemporalClient.Close() + + // flush stats before shutdown + if err := s.StatsCloser.Close(); err != nil { + s.Logger.Error(err.Error()) + } + + s.Logger.Close() +} + +// Note that we will need to do things similar to how gateway does it to get the metadata we need +// specifically the root + +func (s Server) buildTerraformWorker() worker.Worker { + // pass the underlying client otherwise this will panic() + terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + MaxConcurrentActivityExecutionSize: 30, + }) + terraformWorker.RegisterActivity(s.TerraformActivities) + terraformWorker.RegisterActivity(s.GithubActivities) + terraformWorker.RegisterWorkflow(workflows.Terraform) + return terraformWorker +} + +// TODO: eventually we can make it so the pod is ready when the repo is done cloning... + +// Healthz returns the health check response. It always returns a 200 currently. +func Healthz(w http.ResponseWriter, _ *http.Request) { + data, err := json.MarshalIndent(&struct { + Status string `json:"status"` + }{ + Status: "ok", + }, "", " ") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Error creating status json response: %s", err) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(data) // nolint: errcheck +} From afb271530b8abd44666f51345f0921eb1d6fad93 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 7 Mar 2024 12:41:35 -0800 Subject: [PATCH 18/20] still making progress --- cmd/admin.go | 4 ++-- cmd/server.go | 10 +++++----- server/legacy/user_config.go | 6 +++--- server/legacy/user_config_test.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/admin.go b/cmd/admin.go index 99b2795f0..21b93c0fb 100644 --- a/cmd/admin.go +++ b/cmd/admin.go @@ -11,9 +11,9 @@ import ( neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" ) -type Admin struct{} +type Adhoc struct{} -func (a *Admin) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { +func (a *Adhoc) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) if err != nil { return nil, errors.Wrap(err, "failed to build context logger") diff --git a/cmd/server.go b/cmd/server.go index 7130ac4ee..e7a24de2d 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -232,7 +232,7 @@ var stringFlags = map[string]stringFlag{ "gateway: Runs atlantis with gateway event handler that publishes events through sns.\n" + "worker: Runs atlantis with a sqs handler that polls for events in the queue to process.\n" + "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.\n" + - "admin: Runs atlantis in an admin mode that allows for running terraform commands.", + "adhoc: Runs atlantis in an admin mode that allows for running adhoc terraform commands.", defaultValue: "", }, LyftWorkerQueueURLFlag: { @@ -345,7 +345,7 @@ func NewServerCmd(v *viper.Viper, version string) *ServerCmd { GatewayCreator: &GatewayCreator{}, WorkerCreator: &WorkerCreator{}, TemporalWorkerCreator: &TemporalWorker{}, - AdminCreator: &Admin{}, + AdhocCreator: &Adhoc{}, }, Viper: v, AtlantisVersion: version, @@ -376,7 +376,7 @@ type ServerCreatorProxy struct { GatewayCreator ServerCreator WorkerCreator ServerCreator TemporalWorkerCreator ServerCreator - AdminCreator ServerCreator + AdhocCreator ServerCreator } func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config server.Config) (ServerStarter, error) { @@ -392,8 +392,8 @@ func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config serv return d.TemporalWorkerCreator.NewServer(userConfig, config) } - if userConfig.ToLyftMode() == server.Admin { - return d.AdminCreator.NewServer(userConfig, config) + if userConfig.ToLyftMode() == server.Adhoc { + return d.AdhocCreator.NewServer(userConfig, config) } return d.WorkerCreator.NewServer(userConfig, config) diff --git a/server/legacy/user_config.go b/server/legacy/user_config.go index 0f17931b8..486e45999 100644 --- a/server/legacy/user_config.go +++ b/server/legacy/user_config.go @@ -11,7 +11,7 @@ const ( Gateway Worker TemporalWorker - Admin + Adhoc ) // UserConfig holds config values passed in by the user. @@ -105,8 +105,8 @@ func (u UserConfig) ToLyftMode() Mode { return Worker case "temporalworker": return TemporalWorker - case "admin": - return Admin + case "adhoc": + return Adhoc } return Default } diff --git a/server/legacy/user_config_test.go b/server/legacy/user_config_test.go index b266d0d22..669e27cd7 100644 --- a/server/legacy/user_config_test.go +++ b/server/legacy/user_config_test.go @@ -71,8 +71,8 @@ func TestUserConfig_ToLyftMode(t *testing.T) { server.Default, }, { - "admin", - server.Admin, + "adhoc", + server.Adhoc, }, { "temporalworker", From 5efe4851fc228c992199c6bf4a2a8422c061227e Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 7 Mar 2024 12:43:50 -0800 Subject: [PATCH 19/20] still making progress --- server/neptune/adhoc/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/neptune/adhoc/server.go b/server/neptune/adhoc/server.go index b6ac517b6..447b929f0 100644 --- a/server/neptune/adhoc/server.go +++ b/server/neptune/adhoc/server.go @@ -116,7 +116,7 @@ func NewServer(config *adhocconfig.Config) (*Server, error) { if err != nil { return nil, errors.Wrap(err, "client creator") } - // TODO fill in details here - in another PR pull from globalCfg since we have the items in the repo_template + repoConfig := feature.RepoConfig{ Owner: config.FeatureConfig.FFOwner, Repo: config.FeatureConfig.FFRepo, From 4b78357ed90a959ff4c5a8d0c3f76f4b02e7fbd5 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 7 Mar 2024 12:45:54 -0800 Subject: [PATCH 20/20] still making progress --- server/neptune/adhoc/config/config.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/neptune/adhoc/config/config.go b/server/neptune/adhoc/config/config.go index ec1997c86..dd77db7f1 100644 --- a/server/neptune/adhoc/config/config.go +++ b/server/neptune/adhoc/config/config.go @@ -9,15 +9,13 @@ import ( // Config is TerraformAdmin (Adhoc mode) specific user config type Config struct { - AuthCfg neptune.AuthConfig - ServerCfg neptune.ServerConfig - FeatureConfig neptune.FeatureConfig - TemporalCfg valid.Temporal - GithubCfg valid.Github - TerraformCfg neptune.TerraformConfig - DeploymentConfig valid.StoreConfig - JobConfig valid.StoreConfig - Metrics valid.Metrics + AuthCfg neptune.AuthConfig + ServerCfg neptune.ServerConfig + FeatureConfig neptune.FeatureConfig + TemporalCfg valid.Temporal + GithubCfg valid.Github + TerraformCfg neptune.TerraformConfig + Metrics valid.Metrics StatsNamespace string