Skip to content

Commit

Permalink
[WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Sep 21, 2023
1 parent 385714f commit e05dec3
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 130 deletions.
8 changes: 4 additions & 4 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ type allocRunner struct {
// registering services and checks
consulClient serviceregistration.Handler

// consulProxiesClient is the client used by the envoy version hook for
// consulProxiesClientFunc gets a client used by the envoy version hook for
// looking up supported envoy versions of the consul agent.
consulProxiesClient consul.SupportedProxiesAPI
consulProxiesClientFunc consul.SupportedProxiesAPIFunc

// sidsClient is the client used by the service identity hook for
// managing SI tokens
Expand Down Expand Up @@ -226,7 +226,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
consulProxiesClient: config.ConsulProxies,
consulProxiesClientFunc: config.ConsulProxiesFunc,
sidsClient: config.ConsulSI,
vaultClientFunc: config.VaultFunc,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
Expand Down Expand Up @@ -302,7 +302,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
StateUpdater: ar,
DynamicRegistry: ar.dynamicRegistry,
Consul: ar.consulClient,
ConsulProxies: ar.consulProxiesClient,
ConsulProxiesFunc: ar.consulProxiesClientFunc,
ConsulSI: ar.sidsClient,
VaultFunc: ar.vaultClientFunc,
DeviceStatsReporter: ar.deviceStatsReporter,
Expand Down
31 changes: 17 additions & 14 deletions client/allocrunner/taskrunner/envoy_version_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ const (
)

type envoyVersionHookConfig struct {
alloc *structs.Allocation
proxiesClient consul.SupportedProxiesAPI
logger hclog.Logger
alloc *structs.Allocation
proxiesClientFunc consul.SupportedProxiesAPIFunc
logger hclog.Logger
}

func newEnvoyVersionHookConfig(alloc *structs.Allocation, proxiesClient consul.SupportedProxiesAPI, logger hclog.Logger) *envoyVersionHookConfig {
func newEnvoyVersionHookConfig(alloc *structs.Allocation, proxiesClientFunc consul.SupportedProxiesAPIFunc, logger hclog.Logger) *envoyVersionHookConfig {
return &envoyVersionHookConfig{
alloc: alloc,
logger: logger,
proxiesClient: proxiesClient,
alloc: alloc,
logger: logger,
proxiesClientFunc: proxiesClientFunc,
}
}

Expand All @@ -45,19 +45,19 @@ type envoyVersionHook struct {
// alloc is the allocation with the envoy task being rewritten.
alloc *structs.Allocation

// proxiesClient is the subset of the Consul API for getting information
// from Consul about the versions of Envoy it supports.
proxiesClient consul.SupportedProxiesAPI
// proxiesClientFunc gets an interface for the subset of the Consul API for
// getting information from Consul about the versions of Envoy it supports.
proxiesClientFunc consul.SupportedProxiesAPIFunc

// logger is used to log things.
logger hclog.Logger
}

func newEnvoyVersionHook(c *envoyVersionHookConfig) *envoyVersionHook {
return &envoyVersionHook{
alloc: c.alloc,
proxiesClient: c.proxiesClient,
logger: c.logger.Named(envoyVersionHookName),
alloc: c.alloc,
proxiesClientFunc: c.proxiesClientFunc,
logger: c.logger.Named(envoyVersionHookName),
}
}

Expand All @@ -81,7 +81,10 @@ func (h *envoyVersionHook) Prestart(_ context.Context, request *ifs.TaskPrestart

// We either need to acquire Consul's preferred Envoy version or fallback
// to the legacy default. Query Consul and use the (possibly empty) result.
proxies, err := h.proxiesClient.Proxies()
//
// TODO: how do we select the right cluster here if we have multiple
// services which could have their own cluster field value?
proxies, err := h.proxiesClientFunc("default").Proxies()
if err != nil {
return fmt.Errorf("error retrieving supported Envoy versions from Consul: %w", err)
}
Expand Down
84 changes: 42 additions & 42 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ type TaskRunner struct {
// registering services and checks
consulServiceClient serviceregistration.Handler

// consulProxiesClient is the client used by the envoy version hook for
// consulProxiesClientFunc gets a client used by the envoy version hook for
// asking consul what version of envoy nomad should inject into the connect
// sidecar or gateway task.
consulProxiesClient consul.SupportedProxiesAPI
consulProxiesClientFunc consul.SupportedProxiesAPIFunc

// sidsClient is the client used by the service identity hook for managing
// service identity tokens
Expand Down Expand Up @@ -282,9 +282,9 @@ type Config struct {
// Consul is the client to use for managing Consul service registrations
Consul serviceregistration.Handler

// ConsulProxies is the client to use for looking up supported envoy versions
// ConsulProxiesFunc gets a client to use for looking up supported envoy versions
// from Consul.
ConsulProxies consul.SupportedProxiesAPI
ConsulProxiesFunc consul.SupportedProxiesAPIFunc

// ConsulSI is the client to use for managing Consul SI tokens
ConsulSI consul.ServiceIdentityAPI
Expand Down Expand Up @@ -369,44 +369,44 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
}

tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
dynamicRegistry: config.DynamicRegistry,
consulServiceClient: config.Consul,
consulProxiesClient: config.ConsulProxies,
siClient: config.ConsulSI,
vaultClientFunc: config.VaultFunc,
state: tstate,
localState: state.NewLocalState(),
allocHookResources: config.AllocHookResources,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
restartCh: make(chan struct{}, restartChCap),
waitCh: make(chan struct{}),
csiManager: config.CSIManager,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCh: config.StartConditionMetCh,
shutdownDelayCtx: config.ShutdownDelayCtx,
shutdownDelayCancelFn: config.ShutdownDelayCancelFn,
serviceRegWrapper: config.ServiceRegWrapper,
getter: config.Getter,
wranglers: config.Wranglers,
widmgr: config.WIDMgr,
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
dynamicRegistry: config.DynamicRegistry,
consulServiceClient: config.Consul,
consulProxiesClientFunc: config.ConsulProxiesFunc,
siClient: config.ConsulSI,
vaultClientFunc: config.VaultFunc,
state: tstate,
localState: state.NewLocalState(),
allocHookResources: config.AllocHookResources,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
restartCh: make(chan struct{}, restartChCap),
waitCh: make(chan struct{}),
csiManager: config.CSIManager,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCh: config.StartConditionMetCh,
shutdownDelayCtx: config.ShutdownDelayCtx,
shutdownDelayCancelFn: config.ShutdownDelayCancelFn,
serviceRegWrapper: config.ServiceRegWrapper,
getter: config.Getter,
wranglers: config.Wranglers,
widmgr: config.WIDMgr,
}

// Create the logger based on the allocation ID
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (tr *TaskRunner) initHooks() {

if task.UsesConnectSidecar() {
tr.runnerHooks = append(tr.runnerHooks,
newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClient, hookLogger)),
newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClientFunc, hookLogger)),
newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, tr.clientConfig.ConsulConfig, consulNamespace, hookLogger)),
)
} else if task.Kind.IsConnectNative() {
Expand Down
36 changes: 19 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ type Client struct {
// pendingUpdates stores allocations that need to be synced to the server.
pendingUpdates *pendingClientUpdates

// consulService is the Consul handler implementation for managing services
// and checks.
consulService serviceregistration.Handler
// consulServices gets a Consul handler implementation for managing
// services and checks.
consulServices serviceregistration.Handler

// nomadService is the Nomad handler implementation for managing service
// registrations.
Expand All @@ -237,12 +237,13 @@ type Client struct {
// this without needing to identify which backend provider should be used.
serviceRegWrapper *wrapper.HandlerWrapper

// consulProxies is Nomad's custom Consul client for looking up supported
// envoy versions
consulProxies consulApi.SupportedProxiesAPI
// consulProxiesFunc gets an interface to Nomad's custom Consul client for
// looking up supported envoy versions
consulProxiesFunc consulApi.SupportedProxiesAPIFunc

// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI
// consulCatalogFunc gets an interface for the subset of Consul's Catalog
// API Nomad uses.
consulCatalogFunc consul.CatalogAPIFunc

// HostStatsCollector collects host resource usage stats
hostStatsCollector *hoststats.HostStatsCollector
Expand Down Expand Up @@ -351,7 +352,7 @@ var (
// registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place
// of the client's normal RPC handlers. This allows server tests to override
// the behavior of the client.
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalogFunc consul.CatalogAPIFunc, consulProxiesFunc consulApi.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand All @@ -375,9 +376,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
// Create the client
c := &Client{
config: cfg,
consulCatalog: consulCatalog,
consulProxies: consulProxies,
consulService: consulService,
consulCatalogFunc: consulCatalogFunc,
consulProxiesFunc: consulProxiesFunc,
consulServices: consulServices,
start: time.Now(),
connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
Expand Down Expand Up @@ -529,7 +530,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
// implementations. The Nomad implementation is only ever used on the
// client, so we do that here rather than within the agent.
c.setupNomadServiceRegistrationHandler()
c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService)
c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulServices, c.nomadService)

// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup.
Expand Down Expand Up @@ -2746,8 +2747,8 @@ func (c *Client) newAllocRunnerConfig(
CSIManager: c.csimanager,
CheckStore: c.checkStore,
ClientConfig: c.GetConfig(),
Consul: c.consulService,
ConsulProxies: c.consulProxies,
Consul: c.consulServices,
ConsulProxiesFunc: c.consulProxiesFunc,
ConsulSI: c.tokensClient,
DeviceManager: c.devicemanager,
DeviceStatsReporter: c,
Expand All @@ -2770,6 +2771,7 @@ func (c *Client) newAllocRunnerConfig(

// setupConsulTokenClient configures a tokenClient for managing consul service
// identity tokens.
// DEPRECATED: remove in 1.9.0
func (c *Client) setupConsulTokenClient() error {
tc := consulApi.NewIdentitiesClient(c.logger, c.deriveSIToken)
c.tokensClient = tc
Expand Down Expand Up @@ -3035,7 +3037,7 @@ func (c *Client) consulDiscovery() {
func (c *Client) consulDiscoveryImpl() error {
consulLogger := c.logger.Named("consul")

dcs, err := c.consulCatalog.Datacenters()
dcs, err := c.consulCatalogFunc("default").Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
Expand Down Expand Up @@ -3063,7 +3065,7 @@ DISCOLOOP:
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
consulServices, _, err := c.consulCatalogFunc("default").Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
Expand Down
6 changes: 3 additions & 3 deletions client/config/arconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type AllocRunnerConfig struct {
// Consul is the Consul client used to register task services and checks
Consul serviceregistration.Handler

// ConsulProxies is the Consul client used to lookup supported envoy versions
// of the Consul agent.
ConsulProxies consul.SupportedProxiesAPI
// ConsulProxiesFunc gets a Consul client used to lookup supported envoy
// versions of the Consul agent.
ConsulProxiesFunc consul.SupportedProxiesAPIFunc

// ConsulSI is the Consul client used to manage service identity tokens.
ConsulSI consul.ServiceIdentityAPI
Expand Down
2 changes: 2 additions & 0 deletions client/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type SupportedProxiesAPI interface {
Proxies() (map[string][]string, error)
}

type SupportedProxiesAPIFunc func(string) SupportedProxiesAPI

// JWTLoginRequest is an object representing a login request with JWT
type JWTLoginRequest struct {
JWT string
Expand Down
2 changes: 2 additions & 0 deletions client/serviceregistration/service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Handler interface {
UpdateTTL(id, namespace, output, status string) error
}

type HandlerFunc func(string) Handler

// WorkloadRestarter allows the checkWatcher to restart tasks or entire task
// groups.
type WorkloadRestarter interface {
Expand Down
5 changes: 5 additions & 0 deletions client/serviceregistration/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type WorkloadServices struct {

// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork

// Tokens are explicit API tokens that should be used by the caller when
// synchronizing services and check; currently this only used for Consul
// services and only when the Workload Identity workflow is used.
Tokens map[string]string // .Services[].Name -> token
}

// RegistrationProvider identifies the service registration provider for the
Expand Down
4 changes: 2 additions & 2 deletions client/serviceregistration/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type HandlerWrapper struct {
log hclog.Logger

// consulServiceProvider is the handler for services where Consul is the
// consulServiceProviderFunc gets the handler for services where Consul is the
// provider. This provider is always created and available.
consulServiceProvider serviceregistration.Handler

Expand All @@ -33,7 +33,7 @@ type HandlerWrapper struct {
// implementation to allow future flexibility and is initially only intended
// for use with the alloc and task runner service hooks.
func NewHandlerWrapper(
log hclog.Logger, consulProvider, nomadProvider serviceregistration.Handler) *HandlerWrapper {
log hclog.Logger, consulProvider serviceregistration.Handler, nomadProvider serviceregistration.Handler) *HandlerWrapper {
return &HandlerWrapper{
log: log,
nomadServiceProvider: nomadProvider,
Expand Down
4 changes: 3 additions & 1 deletion client/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/command/agent/consul"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
Expand Down Expand Up @@ -56,8 +57,9 @@ func TestClientWithRPCs(t testing.T, cb func(c *config.Config), rpcs map[string]
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader)
}
mockCatalog := agentconsul.NewMockCatalog(logger)
mockCatalogFunc := func(_ string) consul.CatalogAPI { return mockCatalog }
mockService := mock.NewServiceRegistrationHandler(logger)
client, err := NewClient(conf, mockCatalog, nil, mockService, rpcs)
client, err := NewClient(conf, mockCatalogFunc, nil, mockService, rpcs)
if err != nil {
cleanup()
t.Fatalf("err: %v", err)
Expand Down
Loading

0 comments on commit e05dec3

Please sign in to comment.