Skip to content

Commit

Permalink
Improve connection config priority. Add missing option from consumer …
Browse files Browse the repository at this point in the history
…CRD.
  • Loading branch information
samuelattwood committed Jan 31, 2025
1 parent 5db0cff commit ad4cf12
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 101 deletions.
6 changes: 4 additions & 2 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var (
Expand Down Expand Up @@ -94,7 +95,6 @@ func run() error {
klog.Warning("Starting jetStream controller in experimental control loop mode")

natsCfg := &controller.NatsConfig{
CRDConnect: *crdConnect,
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
Expand Down Expand Up @@ -167,9 +167,11 @@ func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, control
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))

log.SetLogger(klog.NewKlogr())

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: klog.NewKlogr().WithName("controller-runtime"),
Logger: log.Log,
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
Expand Down
77 changes: 36 additions & 41 deletions controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,40 +459,40 @@ func (c *Controller) getAccountOverrides(account string, ns string) (*accountOve
return nil, err
}

filesToWrite := make(map[string]string)
var certData, keyData []byte
var certPath, keyPath string

getSecretValue := func(key string) string {
value, ok := secret.Data[key]
if !ok {
return ""
for k, v := range secret.Data {
switch k {
case acc.Spec.TLS.ClientCert:
certPath = filepath.Join(accDir, k)
certData = v
case acc.Spec.TLS.ClientKey:
keyPath = filepath.Join(accDir, k)
keyData = v
case acc.Spec.TLS.RootCAs:
overrides.remoteRootCA = filepath.Join(accDir, k)
if err := os.WriteFile(overrides.remoteRootCA, v, 0o644); err != nil {
return nil, err
}
}
return string(value)
}

remoteClientCertValue := getSecretValue(acc.Spec.TLS.ClientCert)
remoteClientKeyValue := getSecretValue(acc.Spec.TLS.ClientKey)
if remoteClientCertValue != "" && remoteClientKeyValue != "" {
overrides.remoteClientCert = filepath.Join(accDir, acc.Spec.TLS.ClientCert)
overrides.remoteClientKey = filepath.Join(accDir, acc.Spec.TLS.ClientKey)

filesToWrite[acc.Spec.TLS.ClientCert] = remoteClientCertValue
filesToWrite[acc.Spec.TLS.ClientKey] = remoteClientKeyValue
}

remoteRootCAValue := getSecretValue(acc.Spec.TLS.RootCAs)
if remoteRootCAValue != "" {
overrides.remoteRootCA = filepath.Join(accDir, acc.Spec.TLS.RootCAs)
filesToWrite[acc.Spec.TLS.RootCAs] = remoteRootCAValue
}
if certData != nil && keyData != nil {
overrides.remoteClientCert = certPath
overrides.remoteClientKey = keyPath

for file, v := range filesToWrite {
if err := os.WriteFile(filepath.Join(accDir, file), []byte(v), 0o644); err != nil {
if err := os.WriteFile(certPath, certData, 0o644); err != nil {
return nil, err
}
if err := os.WriteFile(keyPath, keyData, 0o644); err != nil {
return nil, err
}
}
}

// Lookup the UserCredentials.
if acc.Spec.Creds != nil {
if acc.Spec.Creds != nil && acc.Spec.Creds.Secret != nil {
secretName := acc.Spec.Creds.Secret.Name
secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{})
if err != nil {
Expand All @@ -504,12 +504,11 @@ func (c *Controller) getAccountOverrides(account string, ns string) (*accountOve
if err := os.MkdirAll(accDir, 0o755); err != nil {
return nil, err
}
for k, v := range secret.Data {
if k == acc.Spec.Creds.File {
overrides.userCreds = filepath.Join(c.cacheDir, ns, account, k)
if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil {
return nil, err
}

if credsBytes, ok := secret.Data[acc.Spec.Creds.File]; ok {
overrides.userCreds = filepath.Join(accDir, acc.Spec.Creds.File)
if err := os.WriteFile(overrides.userCreds, credsBytes, 0o644); err != nil {
return nil, err
}
}
}
Expand All @@ -522,10 +521,8 @@ func (c *Controller) getAccountOverrides(account string, ns string) (*accountOve
return nil, err
}

for k, v := range secret.Data {
if k == acc.Spec.Token.Token {
overrides.token = string(v)
}
if token, ok := secret.Data[acc.Spec.Token.Token]; ok {
overrides.token = string(token)
}
}

Expand All @@ -537,13 +534,11 @@ func (c *Controller) getAccountOverrides(account string, ns string) (*accountOve
return nil, err
}

for k, v := range secret.Data {
if k == acc.Spec.User.User {
overrides.user = string(v)
}
if k == acc.Spec.User.Password {
overrides.password = string(v)
}
userBytes := secret.Data[acc.Spec.User.User]
passwordBytes := secret.Data[acc.Spec.User.Password]
if userBytes != nil && passwordBytes != nil {
overrides.user = string(userBytes)
overrides.password = string(passwordBytes)
}
}

Expand Down
3 changes: 3 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ spec:
maxRequestMaxBytes:
description: The maximum max_bytes value that maybe set when dong a pull on a Pull Consumer.
type: integer
inactiveThreshold:
description: The idle time an Ephemeral Consumer allows before it is removed.
type: string
replicas:
description: When set do not inherit the replica count from the stream but specifically set it to this amount.
type: integer
Expand Down
82 changes: 72 additions & 10 deletions internal/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,93 @@ import (
)

type NatsConfig struct {
CRDConnect bool
ClientName string
Credentials string
NKey string
ServerURL string
CAs []string
Certificate string
Key string
TLSFirst bool
CAs []string
Credentials string
NKey string
Token string
User string
Password string
}

func (o *NatsConfig) Overlay(overlay *NatsConfig) {
if overlay.ClientName != "" {
o.ClientName = overlay.ClientName
}

if overlay.ServerURL != "" {
o.ServerURL = overlay.ServerURL
}

if overlay.Certificate != "" && overlay.Key != "" {
o.Certificate = overlay.Certificate
o.Key = overlay.Key
}

if len(overlay.CAs) > 0 {
o.CAs = overlay.CAs
}

if overlay.TLSFirst {
o.TLSFirst = overlay.TLSFirst
}

if !overlay.HasAuth() {
return
}

o.UnsetAuth()

if overlay.Credentials != "" {
o.Credentials = overlay.Credentials
} else if overlay.NKey != "" {
o.NKey = overlay.NKey
} else if overlay.Token != "" {
o.Token = overlay.Token
} else if overlay.User != "" && overlay.Password != "" {
o.User = overlay.User
o.Password = overlay.Password
}
}

func (o *NatsConfig) HasAuth() bool {
return o.Credentials != "" || o.NKey != "" || o.Token != "" || (o.User != "" && o.Password != "")
}

func (o *NatsConfig) UnsetAuth() {
o.Credentials = ""
o.NKey = ""
o.User = ""
o.Password = ""
o.Token = ""
}

// buildOptions creates options from the config to be used in nats.Connect.
func (o *NatsConfig) buildOptions() ([]nats.Option, error) {
opts := make([]nats.Option, 0)

if o.ClientName != "" {
opts = append(opts, nats.Name(o.ClientName))
}

if o.ServerURL == "" {
return nil, fmt.Errorf("server url is required")
}

if o.Certificate != "" && o.Key != "" {
opts = append(opts, nats.ClientCert(o.Certificate, o.Key))
}

if o.TLSFirst {
opts = append(opts, nats.TLSHandshakeFirst())
}

if o.ClientName != "" {
opts = append(opts, nats.Name(o.ClientName))
if len(o.CAs) > 0 {
opts = append(opts, nats.RootCAs(o.CAs...))
}

if o.Credentials != "" {
Expand All @@ -48,12 +110,12 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) {
opts = append(opts, opt)
}

if o.Certificate != "" && o.Key != "" {
opts = append(opts, nats.ClientCert(o.Certificate, o.Key))
if o.Token != "" {
opts = append(opts, nats.Token(o.Token))
}

if len(o.CAs) > 0 {
opts = append(opts, nats.RootCAs(o.CAs...))
if o.User != "" && o.Password != "" {
opts = append(opts, nats.UserInfo(o.User, o.Password))
}

return opts, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
consumer := &api.Consumer{}
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Consumer deleted.", "consumerName", req.NamespacedName.String())
log.Info("Consumer resource deleted.", "consumerName", req.NamespacedName.String())
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err)
Expand Down
Loading

0 comments on commit ad4cf12

Please sign in to comment.