Skip to content

Commit

Permalink
🔊 switch to structured klog logging
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanmeschke committed Dec 19, 2023
1 parent f0a2c98 commit 90e6c9a
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 28 deletions.
4 changes: 2 additions & 2 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func run() error {
ReadOnly: *readOnly,
})

klog.Infof("Starting %s v%s...", os.Args[0], Version)
klog.InfoS(fmt.Sprintf("Starting %s v%s...", os.Args[0], Version))
if *readOnly {
klog.Infof("Running in read-only mode: JetStream state in server will not be changed")
klog.InfoS("Running in read-only mode: JetStream state in server will not be changed")
}
go handleSignals(cancel)
return ctrl.Run()
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func deleteConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e
}()

if spec.PreventDelete {
klog.Infof("Consumer %q is configured to preventDelete on stream %q:", stream, consumer)
klog.InfoS(fmt.Sprintf("Consumer %q is configured to preventDelete on stream %q", stream, consumer), "stream", stream, "consumer", consumer)
return nil
}

Expand Down
34 changes: 17 additions & 17 deletions controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewController(opt Options) *Controller {
if opt.Recorder == nil {
utilruntime.Must(scheme.AddToScheme(k8sscheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartLogging(klog.InfoS)
eventBroadcaster.StartRecordingToSink(&k8styped.EventSinkImpl{
Interface: opt.KubeIface.CoreV1().Events(""),
})
Expand Down Expand Up @@ -291,35 +291,35 @@ func (c *Controller) cleanupStreams() error {
case <-tick.C:
streams, err := c.strLister.List(labels.Everything())
if err != nil {
klog.Infof("failed to list streams for cleanup: %s", err)
klog.InfoS("failed to list streams for cleanup: %s", err)
continue
}
sm := streamsMap(streams)
missing := selectMissingStreamsFromList(prevStreams, sm)
for _, s := range missing {
// A stream that we were tracking but that for some reason
// was not part of the latest list shared by informer.
// Need to double check whether the stream is present before
// Need to double-check whether the stream is present before
// considering deletion.
klog.Infof("stream %s/%s might be missing, looking it up...", s.Namespace, s.Name)
klog.InfoS(fmt.Sprintf("stream %s/%s might be missing, looking it up...", s.Namespace, s.Name), "namespace", s.Namespace, "stream", s.Name)
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
_, err := c.ji.Streams(s.Namespace).Get(ctx, s.Name, k8smeta.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Infof("stream %s/%s was not found anymore, deleting from JetStream", s.Namespace, s.Name)
klog.InfoS(fmt.Sprintf("stream %s/%s was not found anymore, deleting from JetStream", s.Namespace, s.Name), "namespace", s.Namespace, "stream", s.Name)
t := k8smeta.NewTime(time.Now())
s.DeletionTimestamp = &t
if err := c.processStreamObject(s, c.RealJSMC); err != nil && !k8serrors.IsNotFound(err) {
klog.Infof("failed to delete stream %s/%s: %s", s.Namespace, s.Name, err)
klog.InfoS(fmt.Sprintf("failed to delete stream %s/%s: %s", s.Namespace, s.Name, err), "namespace", s.Namespace, "stream", s.Name, "error", err)
continue
}
klog.Infof("deleted stream %s/%s from JetStream", s.Namespace, s.Name)
klog.InfoS(fmt.Sprintf("deleted stream %s/%s from JetStream", s.Namespace, s.Name), "namespace", s.Namespace, "stream", s.Name)
} else {
klog.Warningf("error looking up stream %s/%s", s.Namespace, s.Name)
klog.InfoS(fmt.Sprintf("error looking up stream %s/%s", s.Namespace, s.Name), "namespace", s.Namespace, "stream", s.Name)
}
} else {
klog.Infof("found stream %s/%s, no further action needed", s.Namespace, s.Name)
klog.InfoS(fmt.Sprintf("found stream %s/%s, no further action needed", s.Namespace, s.Name), "namespace", s.Namespace, "stream", s.Name)
}
}
prevStreams = sm
Expand Down Expand Up @@ -361,35 +361,35 @@ func (c *Controller) cleanupConsumers() error {
case <-tick.C:
consumers, err := c.cnsLister.List(labels.Everything())
if err != nil {
klog.Infof("failed to list consumers for cleanup: %s", err)
klog.InfoS(fmt.Sprintf("failed to list consumers for cleanup: %s", err), "error", err)
continue
}
cm := consumerMap(consumers)
missing := selectMissingConsumersFromList(prevConsumers, cm)
for _, cns := range missing {
// A consumer that we were tracking but that for some reason
// was not part of the latest list shared by informer.
// Need to double check whether the consumer is present before
// Need to double-check whether the consumer is present before
// considering deletion.
klog.Infof("consumer %s/%s might be missing, looking it up...", cns.Namespace, cns.Name)
klog.InfoS(fmt.Sprintf("consumer %s/%s might be missing, looking it up...", cns.Namespace, cns.Name), "namespace", cns.Namespace, "consumer", cns.Name)
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
_, err := c.ji.Consumers(cns.Namespace).Get(ctx, cns.Name, k8smeta.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Infof("consumer %s/%s was not found anymore, deleting from JetStream", cns.Namespace, cns.Name)
klog.InfoS(fmt.Sprintf("consumer %s/%s was not found anymore, deleting from JetStream", cns.Namespace, cns.Name), "namespace", cns.Namespace, "consumer", cns.Name)
t := k8smeta.NewTime(time.Now())
cns.DeletionTimestamp = &t
if err := c.processConsumerObject(cns, c.RealJSMC); err != nil && !k8serrors.IsNotFound(err) {
klog.Infof("failed to delete consumer %s/%s: %s", cns.Namespace, cns.Name, err)
klog.InfoS(fmt.Sprintf("failed to delete consumer %s/%s: %s", cns.Namespace, cns.Name, err), "namespace", cns.Namespace, "consumer", cns.Name, "error", err)
continue
}
klog.Infof("deleted consumer %s/%s from JetStream", cns.Namespace, cns.Name)
klog.InfoS(fmt.Sprintf("deleted consumer %s/%s from JetStream", cns.Namespace, cns.Name), "namespace", cns.Namespace, "consumer", cns.Name)
} else {
klog.Warningf("error looking up consumer %s/%s", cns.Namespace, cns.Name)
klog.InfoS(fmt.Sprintf("error looking up consumer %s/%s", cns.Namespace, cns.Name), "namespace", cns.Namespace, "consumer", cns.Name)
}
} else {
klog.Infof("found consumer %s/%s, no further action needed", cns.Namespace, cns.Name)
klog.InfoS(fmt.Sprintf("found consumer %s/%s, no further action needed", cns.Namespace, cns.Name), "namespace", cns.Namespace, "consumer", cns.Name)
}
}
prevConsumers = cm
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func deleteStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
}()

if spec.PreventDelete {
klog.Infof("Stream %q is configured to preventDelete:\n", name)
klog.InfoS(fmt.Sprintf("Stream %q is configured to preventDelete", name), "stream", name)
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/bootconfig/bootconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Controller) Run(ctx context.Context) error {

nodeName := os.Getenv("KUBERNETES_NODE_NAME")
if nodeName == "" {
return errors.New("Target node name is missing")
return errors.New("target node name is missing")
}
log.Infof("Pod running on node %q", nodeName)

Expand All @@ -114,7 +114,7 @@ func (c *Controller) Run(ctx context.Context) error {
if !ok {
externalAddress, ok = node.Labels[c.opts.TargetTag]
if !ok || len(externalAddress) == 0 {
return errors.New("Could not find external IP address.")
return errors.New("could not find external IP address")
}
}
log.Infof("Pod is running on node with external IP: %s", externalAddress)
Expand All @@ -123,15 +123,15 @@ func (c *Controller) Run(ctx context.Context) error {

err = os.WriteFile(c.opts.ClientAdvertiseFileName, []byte(clientAdvertiseConfig), 0644)
if err != nil {
return fmt.Errorf("Could not write client advertise config: %s", err)
return fmt.Errorf("could not write client advertise config: %s", err)
}
log.Infof("Successfully wrote client advertise config to %q", c.opts.ClientAdvertiseFileName)

gatewayAdvertiseConfig := fmt.Sprintf("\nadvertise = \"%s\"\n\n", externalAddress)

err = os.WriteFile(c.opts.GatewayAdvertiseFileName, []byte(gatewayAdvertiseConfig), 0644)
if err != nil {
return fmt.Errorf("Could not write gateway advertise config: %s", err)
return fmt.Errorf("could not write gateway advertise config: %s", err)
}
log.Infof("Successfully wrote gateway advertise config to %q", c.opts.GatewayAdvertiseFileName)

Expand Down
3 changes: 0 additions & 3 deletions tests/nats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ config:
memoryStore:
enabled: true
maxSize: 256Mi

memoryStore:
enabled: true
pvc:
enabled: true
size: 256Mi

0 comments on commit 90e6c9a

Please sign in to comment.