diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index b4f52193af8..95726e47da1 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -74,9 +74,9 @@ type allocRunner struct { // registering services and checks consulClient serviceregistration.Handler - // consulProxiesClient is the client used by the envoy version hook for + // consulProxiesClientFunc gets a client used by the envoy version hook for // looking up supported envoy versions of the consul agent. - consulProxiesClient consul.SupportedProxiesAPI + consulProxiesClientFunc consul.SupportedProxiesAPIFunc // sidsClient is the client used by the service identity hook for // managing SI tokens @@ -226,7 +226,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e alloc: alloc, clientConfig: config.ClientConfig, consulClient: config.Consul, - consulProxiesClient: config.ConsulProxies, + consulProxiesClientFunc: config.ConsulProxiesFunc, sidsClient: config.ConsulSI, vaultClientFunc: config.VaultFunc, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), @@ -302,7 +302,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { StateUpdater: ar, DynamicRegistry: ar.dynamicRegistry, Consul: ar.consulClient, - ConsulProxies: ar.consulProxiesClient, + ConsulProxiesFunc: ar.consulProxiesClientFunc, ConsulSI: ar.sidsClient, VaultFunc: ar.vaultClientFunc, DeviceStatsReporter: ar.deviceStatsReporter, diff --git a/client/allocrunner/taskrunner/envoy_version_hook.go b/client/allocrunner/taskrunner/envoy_version_hook.go index 2e1b9317c24..7f38ec1239d 100644 --- a/client/allocrunner/taskrunner/envoy_version_hook.go +++ b/client/allocrunner/taskrunner/envoy_version_hook.go @@ -24,16 +24,16 @@ const ( ) type envoyVersionHookConfig struct { - alloc *structs.Allocation - proxiesClient consul.SupportedProxiesAPI - logger hclog.Logger + alloc *structs.Allocation + proxiesClientFunc consul.SupportedProxiesAPIFunc + logger hclog.Logger } -func newEnvoyVersionHookConfig(alloc *structs.Allocation, proxiesClient consul.SupportedProxiesAPI, logger hclog.Logger) *envoyVersionHookConfig { +func newEnvoyVersionHookConfig(alloc *structs.Allocation, proxiesClientFunc consul.SupportedProxiesAPIFunc, logger hclog.Logger) *envoyVersionHookConfig { return &envoyVersionHookConfig{ - alloc: alloc, - logger: logger, - proxiesClient: proxiesClient, + alloc: alloc, + logger: logger, + proxiesClientFunc: proxiesClientFunc, } } @@ -45,9 +45,9 @@ type envoyVersionHook struct { // alloc is the allocation with the envoy task being rewritten. alloc *structs.Allocation - // proxiesClient is the subset of the Consul API for getting information - // from Consul about the versions of Envoy it supports. - proxiesClient consul.SupportedProxiesAPI + // proxiesClientFunc gets an interface for the subset of the Consul API for + // getting information from Consul about the versions of Envoy it supports. + proxiesClientFunc consul.SupportedProxiesAPIFunc // logger is used to log things. logger hclog.Logger @@ -55,9 +55,9 @@ type envoyVersionHook struct { func newEnvoyVersionHook(c *envoyVersionHookConfig) *envoyVersionHook { return &envoyVersionHook{ - alloc: c.alloc, - proxiesClient: c.proxiesClient, - logger: c.logger.Named(envoyVersionHookName), + alloc: c.alloc, + proxiesClientFunc: c.proxiesClientFunc, + logger: c.logger.Named(envoyVersionHookName), } } @@ -81,7 +81,10 @@ func (h *envoyVersionHook) Prestart(_ context.Context, request *ifs.TaskPrestart // We either need to acquire Consul's preferred Envoy version or fallback // to the legacy default. Query Consul and use the (possibly empty) result. - proxies, err := h.proxiesClient.Proxies() + // + // TODO: how do we select the right cluster here if we have multiple + // services which could have their own cluster field value? + proxies, err := h.proxiesClientFunc("default").Proxies() if err != nil { return fmt.Errorf("error retrieving supported Envoy versions from Consul: %w", err) } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9cf953c031f..991aaa17a53 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -185,10 +185,10 @@ type TaskRunner struct { // registering services and checks consulServiceClient serviceregistration.Handler - // consulProxiesClient is the client used by the envoy version hook for + // consulProxiesClientFunc gets a client used by the envoy version hook for // asking consul what version of envoy nomad should inject into the connect // sidecar or gateway task. - consulProxiesClient consul.SupportedProxiesAPI + consulProxiesClientFunc consul.SupportedProxiesAPIFunc // sidsClient is the client used by the service identity hook for managing // service identity tokens @@ -282,9 +282,9 @@ type Config struct { // Consul is the client to use for managing Consul service registrations Consul serviceregistration.Handler - // ConsulProxies is the client to use for looking up supported envoy versions + // ConsulProxiesFunc gets a client to use for looking up supported envoy versions // from Consul. - ConsulProxies consul.SupportedProxiesAPI + ConsulProxiesFunc consul.SupportedProxiesAPIFunc // ConsulSI is the client to use for managing Consul SI tokens ConsulSI consul.ServiceIdentityAPI @@ -369,44 +369,44 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { } tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - taskLeader: config.Task.Leader, - envBuilder: envBuilder, - dynamicRegistry: config.DynamicRegistry, - consulServiceClient: config.Consul, - consulProxiesClient: config.ConsulProxies, - siClient: config.ConsulSI, - vaultClientFunc: config.VaultFunc, - state: tstate, - localState: state.NewLocalState(), - allocHookResources: config.AllocHookResources, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - deviceStatsReporter: config.DeviceStatsReporter, - killCtx: killCtx, - killCtxCancel: killCancel, - shutdownCtx: trCtx, - shutdownCtxCancel: trCancel, - triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), - restartCh: make(chan struct{}, restartChCap), - waitCh: make(chan struct{}), - csiManager: config.CSIManager, - devicemanager: config.DeviceManager, - driverManager: config.DriverManager, - maxEvents: defaultMaxEvents, - serversContactedCh: config.ServersContactedCh, - startConditionMetCh: config.StartConditionMetCh, - shutdownDelayCtx: config.ShutdownDelayCtx, - shutdownDelayCancelFn: config.ShutdownDelayCancelFn, - serviceRegWrapper: config.ServiceRegWrapper, - getter: config.Getter, - wranglers: config.Wranglers, - widmgr: config.WIDMgr, + alloc: config.Alloc, + allocID: config.Alloc.ID, + clientConfig: config.ClientConfig, + task: config.Task, + taskDir: config.TaskDir, + taskName: config.Task.Name, + taskLeader: config.Task.Leader, + envBuilder: envBuilder, + dynamicRegistry: config.DynamicRegistry, + consulServiceClient: config.Consul, + consulProxiesClientFunc: config.ConsulProxiesFunc, + siClient: config.ConsulSI, + vaultClientFunc: config.VaultFunc, + state: tstate, + localState: state.NewLocalState(), + allocHookResources: config.AllocHookResources, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + deviceStatsReporter: config.DeviceStatsReporter, + killCtx: killCtx, + killCtxCancel: killCancel, + shutdownCtx: trCtx, + shutdownCtxCancel: trCancel, + triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + restartCh: make(chan struct{}, restartChCap), + waitCh: make(chan struct{}), + csiManager: config.CSIManager, + devicemanager: config.DeviceManager, + driverManager: config.DriverManager, + maxEvents: defaultMaxEvents, + serversContactedCh: config.ServersContactedCh, + startConditionMetCh: config.StartConditionMetCh, + shutdownDelayCtx: config.ShutdownDelayCtx, + shutdownDelayCancelFn: config.ShutdownDelayCancelFn, + serviceRegWrapper: config.ServiceRegWrapper, + getter: config.Getter, + wranglers: config.Wranglers, + widmgr: config.WIDMgr, } // Create the logger based on the allocation ID diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 9e849ec395d..c137fcd51b3 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -152,7 +152,7 @@ func (tr *TaskRunner) initHooks() { if task.UsesConnectSidecar() { tr.runnerHooks = append(tr.runnerHooks, - newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClient, hookLogger)), + newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClientFunc, hookLogger)), newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, tr.clientConfig.ConsulConfig, consulNamespace, hookLogger)), ) } else if task.Kind.IsConnectNative() { diff --git a/client/client.go b/client/client.go index a4eb7b82f69..691971fa10d 100644 --- a/client/client.go +++ b/client/client.go @@ -220,9 +220,9 @@ type Client struct { // pendingUpdates stores allocations that need to be synced to the server. pendingUpdates *pendingClientUpdates - // consulService is the Consul handler implementation for managing services - // and checks. - consulService serviceregistration.Handler + // consulServices gets a Consul handler implementation for managing + // services and checks. + consulServices serviceregistration.Handler // nomadService is the Nomad handler implementation for managing service // registrations. @@ -237,12 +237,13 @@ type Client struct { // this without needing to identify which backend provider should be used. serviceRegWrapper *wrapper.HandlerWrapper - // consulProxies is Nomad's custom Consul client for looking up supported - // envoy versions - consulProxies consulApi.SupportedProxiesAPI + // consulProxiesFunc gets an interface to Nomad's custom Consul client for + // looking up supported envoy versions + consulProxiesFunc consulApi.SupportedProxiesAPIFunc - // consulCatalog is the subset of Consul's Catalog API Nomad uses. - consulCatalog consul.CatalogAPI + // consulCatalogFunc gets an interface for the subset of Consul's Catalog + // API Nomad uses. + consulCatalogFunc consul.CatalogAPIFunc // HostStatsCollector collects host resource usage stats hostStatsCollector *hoststats.HostStatsCollector @@ -351,7 +352,7 @@ var ( // registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place // of the client's normal RPC handlers. This allows server tests to override // the behavior of the client. -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalogFunc consul.CatalogAPIFunc, consulProxiesFunc consulApi.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { @@ -375,9 +376,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie // Create the client c := &Client{ config: cfg, - consulCatalog: consulCatalog, - consulProxies: consulProxies, - consulService: consulService, + consulCatalogFunc: consulCatalogFunc, + consulProxiesFunc: consulProxiesFunc, + consulServices: consulServices, start: time.Now(), connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap), tlsWrap: tlsWrap, @@ -529,7 +530,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie // implementations. The Nomad implementation is only ever used on the // client, so we do that here rather than within the agent. c.setupNomadServiceRegistrationHandler() - c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService) + c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulServices, c.nomadService) // Batching of initial fingerprints is done to reduce the number of node // updates sent to the server on startup. @@ -2746,8 +2747,8 @@ func (c *Client) newAllocRunnerConfig( CSIManager: c.csimanager, CheckStore: c.checkStore, ClientConfig: c.GetConfig(), - Consul: c.consulService, - ConsulProxies: c.consulProxies, + Consul: c.consulServices, + ConsulProxiesFunc: c.consulProxiesFunc, ConsulSI: c.tokensClient, DeviceManager: c.devicemanager, DeviceStatsReporter: c, @@ -2770,6 +2771,7 @@ func (c *Client) newAllocRunnerConfig( // setupConsulTokenClient configures a tokenClient for managing consul service // identity tokens. +// DEPRECATED: remove in 1.9.0 func (c *Client) setupConsulTokenClient() error { tc := consulApi.NewIdentitiesClient(c.logger, c.deriveSIToken) c.tokensClient = tc @@ -3035,7 +3037,7 @@ func (c *Client) consulDiscovery() { func (c *Client) consulDiscoveryImpl() error { consulLogger := c.logger.Named("consul") - dcs, err := c.consulCatalog.Datacenters() + dcs, err := c.consulCatalogFunc("default").Datacenters() if err != nil { return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err) } @@ -3063,7 +3065,7 @@ DISCOLOOP: Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } - consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts) + consulServices, _, err := c.consulCatalogFunc("default").Service(serviceName, consul.ServiceTagRPC, consulOpts) if err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err)) continue diff --git a/client/config/arconfig.go b/client/config/arconfig.go index 3b5603aa7ea..ee03434fb27 100644 --- a/client/config/arconfig.go +++ b/client/config/arconfig.go @@ -52,9 +52,9 @@ type AllocRunnerConfig struct { // Consul is the Consul client used to register task services and checks Consul serviceregistration.Handler - // ConsulProxies is the Consul client used to lookup supported envoy versions - // of the Consul agent. - ConsulProxies consul.SupportedProxiesAPI + // ConsulProxiesFunc gets a Consul client used to lookup supported envoy + // versions of the Consul agent. + ConsulProxiesFunc consul.SupportedProxiesAPIFunc // ConsulSI is the Consul client used to manage service identity tokens. ConsulSI consul.ServiceIdentityAPI diff --git a/client/consul/consul.go b/client/consul/consul.go index 6054489345a..a6dd82806ad 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -38,6 +38,8 @@ type SupportedProxiesAPI interface { Proxies() (map[string][]string, error) } +type SupportedProxiesAPIFunc func(string) SupportedProxiesAPI + // JWTLoginRequest is an object representing a login request with JWT type JWTLoginRequest struct { JWT string diff --git a/client/serviceregistration/service_registration.go b/client/serviceregistration/service_registration.go index 62f3bd27c21..783ebea8717 100644 --- a/client/serviceregistration/service_registration.go +++ b/client/serviceregistration/service_registration.go @@ -48,6 +48,8 @@ type Handler interface { UpdateTTL(id, namespace, output, status string) error } +type HandlerFunc func(string) Handler + // WorkloadRestarter allows the checkWatcher to restart tasks or entire task // groups. type WorkloadRestarter interface { diff --git a/client/serviceregistration/workload.go b/client/serviceregistration/workload.go index 7ccc352c1fa..5b72eec7ef3 100644 --- a/client/serviceregistration/workload.go +++ b/client/serviceregistration/workload.go @@ -47,6 +47,11 @@ type WorkloadServices struct { // DriverNetwork is the network specified by the driver and may be nil. DriverNetwork *drivers.DriverNetwork + + // Tokens are explicit API tokens that should be used by the caller when + // synchronizing services and check; currently this only used for Consul + // services and only when the Workload Identity workflow is used. + Tokens map[string]string // .Services[].Name -> token } // RegistrationProvider identifies the service registration provider for the diff --git a/client/serviceregistration/wrapper/wrapper.go b/client/serviceregistration/wrapper/wrapper.go index 1086b1f2bf5..a9901beab48 100644 --- a/client/serviceregistration/wrapper/wrapper.go +++ b/client/serviceregistration/wrapper/wrapper.go @@ -18,7 +18,7 @@ import ( type HandlerWrapper struct { log hclog.Logger - // consulServiceProvider is the handler for services where Consul is the + // consulServiceProviderFunc gets the handler for services where Consul is the // provider. This provider is always created and available. consulServiceProvider serviceregistration.Handler @@ -33,7 +33,7 @@ type HandlerWrapper struct { // implementation to allow future flexibility and is initially only intended // for use with the alloc and task runner service hooks. func NewHandlerWrapper( - log hclog.Logger, consulProvider, nomadProvider serviceregistration.Handler) *HandlerWrapper { + log hclog.Logger, consulProvider serviceregistration.Handler, nomadProvider serviceregistration.Handler) *HandlerWrapper { return &HandlerWrapper{ log: log, nomadServiceProvider: nomadProvider, diff --git a/client/testing.go b/client/testing.go index d4ecab57162..2546ca7a493 100644 --- a/client/testing.go +++ b/client/testing.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/servers" "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/command/agent/consul" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/pluginutils/singleton" @@ -56,8 +57,9 @@ func TestClientWithRPCs(t testing.T, cb func(c *config.Config), rpcs map[string] conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader) } mockCatalog := agentconsul.NewMockCatalog(logger) + mockCatalogFunc := func(_ string) consul.CatalogAPI { return mockCatalog } mockService := mock.NewServiceRegistrationHandler(logger) - client, err := NewClient(conf, mockCatalog, nil, mockService, rpcs) + client, err := NewClient(conf, mockCatalogFunc, nil, mockService, rpcs) if err != nil { cleanup() t.Fatalf("err: %v", err) diff --git a/command/agent/agent.go b/command/agent/agent.go index 53195f6ca64..832f54f8ce1 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -23,6 +23,7 @@ import ( uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + clientconsul "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/state" @@ -79,20 +80,28 @@ type Agent struct { // EnterpriseAgent holds information and methods for enterprise functionality EnterpriseAgent *EnterpriseAgent - // consulService is Nomad's custom Consul client for managing services - // and checks. - consulService *consul.ServiceClient + // consulServices is Nomad's custom Consul client for managing services + // and checks. Used by both client and server. + consulServices *consul.ServiceClientWrapper // consulProxies is the subset of Consul's Agent API Nomad uses. - consulProxies *consul.ConnectProxies + // Used by client only. + consulProxies map[string]*consul.ConnectProxies + consulProxiesFunc clientconsul.SupportedProxiesAPIFunc // consulCatalog is the subset of Consul's Catalog API Nomad uses. - consulCatalog consul.CatalogAPI + // Used by both client and server. + consulCatalogs map[string]consul.CatalogAPI + consulCatalogFunc consul.CatalogAPIFunc - // consulConfigEntries is the subset of Consul's Configuration Entries API Nomad uses. - consulConfigEntries consul.ConfigAPI + // consulConfigEntries is the subset of Consul's Configuration Entries API + // Nomad uses. Used by server only, but used for jobs so needs to support + // multi-cluster + consulConfigEntries map[string]consul.ConfigAPI + consulConfigEntriesFunc consul.ConfigAPIFunc // consulACLs is Nomad's subset of Consul's ACL API Nomad uses. + // Used by server only, so only needs default cfg. consulACLs consul.ACLsAPI // client is the launched Nomad Client. Can be nil if the agent isn't @@ -143,7 +152,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i // Global logger should match internal logger as much as possible golog.SetFlags(golog.LstdFlags | golog.Lmicroseconds) - if err := a.setupConsul(config.Consul); err != nil { + if err := a.setupConsuls(config.Consuls); err != nil { return nil, fmt.Errorf("Failed to initialize Consul client: %v", err) } @@ -914,7 +923,7 @@ func (a *Agent) setupServer() error { } // Create the server - server, err := nomad.NewServer(conf, a.consulCatalog, a.consulConfigEntries, a.consulACLs) + server, err := nomad.NewServer(conf, a.consulCatalogs["default"], a.consulConfigEntries["default"], a.consulACLs) if err != nil { return fmt.Errorf("server setup failed: %v", err) } @@ -974,7 +983,7 @@ func (a *Agent) setupServer() error { serfServ, httpServ, } - if err := a.consulService.RegisterAgent(consulRoleServer, consulServices); err != nil { + if err := a.consulServices.RegisterAgent(consulRoleServer, consulServices); err != nil { return err } } @@ -1107,7 +1116,7 @@ func (a *Agent) setupClient() error { conf.APIListenerRegistrar = a.taskAPIServer nomadClient, err := client.NewClient( - conf, a.consulCatalog, a.consulProxies, a.consulService, nil) + conf, a.consulCatalogFunc, a.consulProxiesFunc, a.consulServices, nil) if err != nil { return fmt.Errorf("client setup failed: %v", err) } @@ -1124,7 +1133,7 @@ func (a *Agent) setupClient() error { if check := a.agentHTTPCheck(isServer); check != nil { httpServ.Checks = []*structs.ServiceCheck{check} } - if err := a.consulService.RegisterAgent(consulRoleClient, []*structs.Service{httpServ}); err != nil { + if err := a.consulServices.RegisterAgent(consulRoleClient, []*structs.Service{httpServ}); err != nil { return err } } @@ -1227,7 +1236,7 @@ func (a *Agent) Shutdown() error { } } - if err := a.consulService.Shutdown(); err != nil { + if err := a.consulServices.Shutdown(); err != nil { a.logger.Error("shutting down Consul client failed", "error", err) } @@ -1395,43 +1404,107 @@ func (a *Agent) GetMetricsSink() *metrics.InmemSink { return a.inmemSink } -// setupConsul creates the Consul client and starts its main Run loop. -func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error { - apiConf, err := consulConfig.ApiConfig() - if err != nil { - return err - } +func (a *Agent) setupConsuls(cfgs map[string]*config.ConsulConfig) error { - consulClient, err := consulapi.NewClient(apiConf) - if err != nil { - return err + isClient := false + if a.config.Client != nil && a.config.Client.Enabled { + isClient = true } - // Create Consul Catalog client for service discovery. - a.consulCatalog = consulClient.Catalog() + a.consulServices = consul.NewServiceClientWrapper() + consulProxies := map[string]*consul.ConnectProxies{} + consulCatalogs := map[string]consul.CatalogAPI{} + consulConfigEntries := map[string]consul.ConfigAPI{} - // Create Consul ConfigEntries client for managing Config Entries. - a.consulConfigEntries = consulClient.ConfigEntries() + for cluster, consulConfig := range cfgs { + apiConf, err := consulConfig.ApiConfig() + if err != nil { + return err + } - // Create Consul ACL client for managing tokens. - a.consulACLs = consulClient.ACL() + consulClient, err := consulapi.NewClient(apiConf) + if err != nil { + return err + } - // Create Consul Service client for service advertisement and checks. - isClient := false - if a.config.Client != nil && a.config.Client.Enabled { - isClient = true + // Create Consul Catalog client for service discovery. + a.consulCatalogs[cluster] = consulClient.Catalog() + + // Create Consul ConfigEntries client for managing Config Entries. + a.consulConfigEntries[cluster] = consulClient.ConfigEntries() + + // Create Consul ACL client for managing tokens in the legacy + // workflow on the server + if cluster == "default" { + a.consulACLs = consulClient.ACL() + } + + // Create Consul Service client for service advertisement and checks. + consulAgentClient := consulClient.Agent() + namespacesClient := consul.NewNamespacesClient(consulClient.Namespaces(), consulAgentClient) + + a.consulServices.AddClient(cluster, + consul.NewServiceClient(consulAgentClient, namespacesClient, a.logger, isClient)) + a.consulProxies[cluster] = consul.NewConnectProxiesClient(consulAgentClient) } - // Create Consul Agent client for looking info about the agent. - consulAgentClient := consulClient.Agent() - namespacesClient := consul.NewNamespacesClient(consulClient.Namespaces(), consulAgentClient) - a.consulService = consul.NewServiceClient(consulAgentClient, namespacesClient, a.logger, isClient) - a.consulProxies = consul.NewConnectProxiesClient(consulAgentClient) - // Run the Consul service client's sync'ing main loop - go a.consulService.Run() + a.consulCatalogFunc = func(cluster string) consul.CatalogAPI { + return consulCatalogs[cluster] + } + + a.consulProxiesFunc = func(cluster string) clientconsul.SupportedProxiesAPI { + return consulProxies[cluster] + } + + a.consulConfigEntriesFunc = func(cluster string) consul.ConfigAPI { + return consulConfigEntries[cluster] + } + + // Run the each Consul service client's sync'ing main loop (will spawn a + // goroutine for each one) + a.consulServices.Run() + return nil } +// // setupConsul creates the Consul client and starts its main Run loop. +// func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error { +// apiConf, err := consulConfig.ApiConfig() +// if err != nil { +// return err +// } + +// consulClient, err := consulapi.NewClient(apiConf) +// if err != nil { +// return err +// } + +// // Create Consul Catalog client for service discovery. +// a.consulCatalogs["default"] = consulClient.Catalog() + +// // Create Consul ConfigEntries client for managing Config Entries. +// a.consulConfigEntries["default"] = consulClient.ConfigEntries() + +// // Create Consul ACL client for managing tokens. +// a.consulACLs["default"] = consulClient.ACL() + +// // Create Consul Service client for service advertisement and checks. +// isClient := false +// if a.config.Client != nil && a.config.Client.Enabled { +// isClient = true +// } +// // Create Consul Agent client for looking info about the agent. +// consulAgentClient := consulClient.Agent() +// namespacesClient := consul.NewNamespacesClient(consulClient.Namespaces(), consulAgentClient) +// a.consulServices["default"] = consul.NewServiceClient(consulAgentClient, namespacesClient, a.logger, isClient) +// a.defaultConsulService = a.consulServices["default"] +// a.consulProxies["default"] = consul.NewConnectProxiesClient(consulAgentClient) + +// // Run the Consul service client's sync'ing main loop +// go a.defaultConsulService.Run() +// return nil +// } + // noOpAuditor is a no-op Auditor that fulfills the // event.Auditor interface. type noOpAuditor struct{} diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 1cca3facfa6..a4fa955c627 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -14,6 +14,14 @@ import ( "github.com/hashicorp/go-hclog" ) +type MockClient struct { + MockCatalog + MockNamespaces +} + +var _ NamespaceAPI = (*MockClient)(nil) +var _ CatalogAPI = (*MockClient)(nil) + // MockNamespaces is a mock implementation of NamespaceAPI. type MockNamespaces struct { namespaces []*api.Namespace @@ -265,7 +273,7 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { } // CheckRegister implements AgentAPI -func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { +func (c *MockAgent) CheckRegisterOpts(check *api.AgentCheckRegistration, _ *api.QueryOptions) error { c.mu.Lock() defer c.mu.Unlock() return c.checkRegister(check) @@ -322,8 +330,10 @@ func (c *MockAgent) CheckDeregisterOpts(checkID string, q *api.QueryOptions) err return nil } -// ServiceRegister implements AgentAPI -func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error { +// ServiceRegisterOpts implements AgentAPI +func (c *MockAgent) ServiceRegisterOpts( + service *api.AgentServiceRegistration, opts api.ServiceRegisterOpts) error { + c.mu.Lock() defer c.mu.Unlock() diff --git a/command/agent/consul/connect_proxies.go b/command/agent/consul/connect_proxies.go index 0e8433b3c7b..202b4626368 100644 --- a/command/agent/consul/connect_proxies.go +++ b/command/agent/consul/connect_proxies.go @@ -7,6 +7,8 @@ import ( "errors" ) +type ConnectProxiesFunc func(string) *ConnectProxies + // ConnectProxies implements SupportedProxiesAPI by using the Consul Agent API. type ConnectProxies struct { agentAPI AgentAPI diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index ea11dcdeb8a..80a1e538c1d 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -5,6 +5,7 @@ package consul import ( "context" + "errors" "fmt" "maps" "net" @@ -21,6 +22,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/helper" @@ -102,6 +104,9 @@ type CatalogAPI interface { Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error) } +// CatalogAPIFunc returns a CatalogAPI interface for the specific cluster +type CatalogAPIFunc func(string) CatalogAPI + // NamespaceAPI is the consul/api.Namespace API used by Nomad. // // ACL requirements @@ -110,24 +115,30 @@ type NamespaceAPI interface { List(q *api.QueryOptions) ([]*api.Namespace, *api.QueryMeta, error) } +// NamespaceAPIFunc returns a NamespaceAPI interface for the specific cluster +type NamespaceAPIFunc func(string) NamespaceAPI + // AgentAPI is the consul/api.Agent API used by Nomad. // // ACL requirements // - agent:read // - service:write type AgentAPI interface { - CheckRegister(check *api.AgentCheckRegistration) error + CheckRegisterOpts(check *api.AgentCheckRegistration, q *api.QueryOptions) error CheckDeregisterOpts(checkID string, q *api.QueryOptions) error ChecksWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentCheck, error) UpdateTTLOpts(id, output, status string, q *api.QueryOptions) error - ServiceRegister(service *api.AgentServiceRegistration) error + ServiceRegisterOpts(service *api.AgentServiceRegistration, opts api.ServiceRegisterOpts) error ServiceDeregisterOpts(serviceID string, q *api.QueryOptions) error ServicesWithFilterOpts(filter string, q *api.QueryOptions) (map[string]*api.AgentService, error) Self() (map[string]map[string]interface{}, error) } +// AgentAPIFunc returns an AgentAPI interface for the specific cluster +type AgentAPIFunc func(string) AgentAPI + // ConfigAPI is the consul/api.ConfigEntries API subset used by Nomad Server. // // ACL requirements @@ -137,6 +148,9 @@ type ConfigAPI interface { // Delete(kind, name string, w *api.WriteOptions) (*api.WriteMeta, error) (not used) } +// ConfigAPIFunc returns a ConfigAPI interface for the specific cluster +type ConfigAPIFunc func(clusterName string) ConfigAPI + // ACLsAPI is the consul/api.ACL API subset used by Nomad Server. // // ACL requirements @@ -423,6 +437,167 @@ func (o *operations) String() string { return fmt.Sprintf("<%d, %d, %d, %d>", len(o.regServices), len(o.regChecks), len(o.deregServices), len(o.deregChecks)) } +type ServiceClientWrapper struct { + serviceClients map[string]*ServiceClient +} + +func NewServiceClientWrapper() *ServiceClientWrapper { + return &ServiceClientWrapper{ + serviceClients: map[string]*ServiceClient{}, + } +} + +func (scw *ServiceClientWrapper) AddClient(name string, client *ServiceClient) { + scw.serviceClients[name] = client +} + +func (scw *ServiceClientWrapper) Run() { + for _, serviceClient := range scw.serviceClients { + go serviceClient.Run() + } +} + +func (scw *ServiceClientWrapper) Shutdown() error { + var merr *multierror.Error + for _, serviceClient := range scw.serviceClients { + err := serviceClient.Shutdown() + if err != nil { + merr = multierror.Append(merr, err) + } + } + return merr.ErrorOrNil() +} + +func (scw *ServiceClientWrapper) RegisterAgent(role string, services []*structs.Service) error { + serviceClient, ok := scw.serviceClients["default"] + if !ok { + return errors.New("no default Consul services client") + } + return serviceClient.RegisterAgent(role, services) +} + +func (scw *ServiceClientWrapper) RegisterWorkload(workload *serviceregistration.WorkloadServices) error { + clusters := scw.clustersInWorkload(workload) + if len(clusters) == 1 { + return scw.serviceClients[clusters[0]].RegisterWorkload(workload) + } + + workloadsByCluster := scw.sliceWorkloadsByCluster(workload, clusters) + for cluster, workload := range workloadsByCluster { + err := scw.serviceClients[cluster].RegisterWorkload(workload) + if err != nil { + return err + } + } + + return nil +} + +func (scw *ServiceClientWrapper) RemoveWorkload(workload *serviceregistration.WorkloadServices) { + clusters := scw.clustersInWorkload(workload) + if len(clusters) == 1 { + scw.serviceClients[clusters[0]].RemoveWorkload(workload) + return + } + + workloadsByCluster := scw.sliceWorkloadsByCluster(workload, clusters) + for cluster, workload := range workloadsByCluster { + scw.serviceClients[cluster].RemoveWorkload(workload) + } +} + +func (scw *ServiceClientWrapper) UpdateWorkload( + old, newTask *serviceregistration.WorkloadServices) error { + + clusters := scw.clustersInWorkload(newTask) + if len(clusters) == 1 { + return scw.serviceClients[clusters[0]].UpdateWorkload(old, newTask) + } + + newWorkloadsByCluster := scw.sliceWorkloadsByCluster(newTask, clusters) + oldWorkloadsByCluster := scw.sliceWorkloadsByCluster(old, clusters) + for cluster, old := range oldWorkloadsByCluster { + newTask := newWorkloadsByCluster[cluster] + err := scw.serviceClients[cluster].UpdateWorkload(old, newTask) + if err != nil { + return err + } + } + + return nil +} + +func (scw *ServiceClientWrapper) AllocRegistrations(allocID string) ( + *serviceregistration.AllocRegistration, error) { + + if len(scw.serviceClients) == 1 { + return scw.serviceClients["default"].AllocRegistrations(allocID) + } + + allocReg := &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{}, + } + for _, serviceClient := range scw.serviceClients { + reg, err := serviceClient.AllocRegistrations(allocID) + if err != nil { + return nil, err + } + if reg != nil { + for t, task := range reg.Tasks { + // TODO: will these overwrite? + allocReg.Tasks[t] = task + } + } + } + return allocReg, nil +} + +func (scw *ServiceClientWrapper) UpdateTTL(id, namespace, output, status string) error { + if len(scw.serviceClients) == 0 { + return scw.serviceClients["default"].UpdateTTL(id, namespace, output, status) + } + + for _, serviceClient := range scw.serviceClients { + // TODO: is this actually a correct match for the ID? + if serviceClient.agentServices.Contains(id) { + return serviceClient.UpdateTTL(id, namespace, output, status) + } + } + + return nil +} + +// clustersInWorkload returns a de-duplicated set of clusters in the workload, +// always returning at least the default workload +func (scw *ServiceClientWrapper) clustersInWorkload(workload *serviceregistration.WorkloadServices) []string { + clusters := set.From([]string{"default"}) + for _, service := range workload.Services { + if service.IsConsul() && service.Cluster != "" { + clusters.Insert(service.Cluster) + } + } + + return clusters.Slice() +} + +// sliceWorkloadsByCluster returns a map of clusters to WorkloadServices. This +// does some expensive copying of the services so callers should check there's +// actually multiple clusters first with clustersInWorkload +func (scw *ServiceClientWrapper) sliceWorkloadsByCluster(workload *serviceregistration.WorkloadServices, clusters []string) map[string]*serviceregistration.WorkloadServices { + + workloadsByCluster := make(map[string]*serviceregistration.WorkloadServices, len(clusters)) + for _, cluster := range clusters { + clusterWorkload := workload.Copy() + slices.DeleteFunc(clusterWorkload.Services, func(service *structs.Service) bool { + return service.IsConsul() && + service.Cluster != cluster || + (cluster == "default" && service.Cluster == "") + }) + workloadsByCluster[cluster] = clusterWorkload + } + return workloadsByCluster +} + // ServiceClient handles task and agent service registration with Consul. type ServiceClient struct { agentAPI AgentAPI @@ -838,7 +1013,8 @@ func (c *ServiceClient) sync(reason syncReason) error { if !exists || c.agentServiceUpdateRequired(reason, serviceInNomad, serviceInConsul, sidecarInConsul) { c.logger.Trace("must register service", "id", id, "exists", exists, "reason", reason) - if err = c.agentAPI.ServiceRegister(serviceInNomad); err != nil { + if err = c.agentAPI.ServiceRegisterOpts( + serviceInNomad, api.ServiceRegisterOpts{}); err != nil { metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err } @@ -906,7 +1082,7 @@ func (c *ServiceClient) sync(reason syncReason) error { // Already in Consul; skipping continue } - if err := c.agentAPI.CheckRegister(check); err != nil { + if err := c.agentAPI.CheckRegisterOpts(check, nil); err != nil { metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err }