From 3686f5f5f7077612539d57f8e82410b706a7ff2d Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 22 Sep 2023 21:12:26 +0200 Subject: [PATCH 1/2] Add stream metadata support Signed-off-by: Tomasz Pietrek --- controllers/jetstream/stream.go | 8 ++++++++ deploy/crds.yml | 5 +++++ .../apis/jetstream/v1beta2/streamtypes.go | 1 + .../jetstream/v1beta2/zz_generated.deepcopy.go | 7 +++++++ .../jetstream/v1beta2/streamspec.go | 15 +++++++++++++++ 5 files changed, 36 insertions(+) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 9103ba3a..4a757060 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -434,6 +434,10 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e opts = append(opts, jsm.FirstSequence(uint64(spec.FirstSequence))) } + if spec.Metadata != nil { + opts = append(opts, jsm.StreamMetadata(spec.Metadata)) + } + _, err = c.NewStream(ctx, spec.Name, opts) return err } @@ -519,6 +523,10 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e config.Sources[i] = jss } + if spec.Metadata != nil { + config.Metadata = spec.Metadata + } + switch spec.Compression { case "s2": config.Compression = api.S2Compression diff --git a/deploy/crds.yml b/deploy/crds.yml index ef457e31..8443e30d 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -172,6 +172,11 @@ spec: dest: description: Destination subject. type: string + metadata: + description: Additional Stream metadata. + type: object + additionalProperties: + type: string servers: description: A list of servers for creating stream type: array diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index 1020b853..c93063a4 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -49,6 +49,7 @@ type StreamSpec struct { SubjectTransform *SubjectTransform `json:"subjectTransform"` FirstSequence uint64 `json:"firstSequence"` Compression string `json:"compression"` + Metadata map[string]string `json:"metadata"` Retention string `json:"retention"` Servers []string `json:"servers"` Sources []*StreamSource `json:"sources"` diff --git a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go index ebb3aebe..c1ce657f 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go @@ -441,6 +441,13 @@ func (in *StreamSpec) DeepCopyInto(out *StreamSpec) { *out = new(SubjectTransform) **out = **in } + if in.Metadata != nil { + in, out := &in.Metadata, &out.Metadata + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Servers != nil { in, out := &in.Servers, &out.Servers *out = make([]string, len(*in)) diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go index 779e9371..e5d310b0 100644 --- a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go @@ -49,6 +49,7 @@ type StreamSpecApplyConfiguration struct { SubjectTransform *SubjectTransformApplyConfiguration `json:"subjectTransform,omitempty"` FirstSequence *uint64 `json:"firstSequence,omitempty"` Compression *string `json:"compression,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` Retention *string `json:"retention,omitempty"` Servers []string `json:"servers,omitempty"` Sources []*jetstreamv1beta2.StreamSource `json:"sources,omitempty"` @@ -279,6 +280,20 @@ func (b *StreamSpecApplyConfiguration) WithCompression(value string) *StreamSpec return b } +// WithMetadata puts the entries into the Metadata field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Metadata field, +// overwriting an existing map entries in Metadata field with the same key. +func (b *StreamSpecApplyConfiguration) WithMetadata(entries map[string]string) *StreamSpecApplyConfiguration { + if b.Metadata == nil && len(entries) > 0 { + b.Metadata = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Metadata[k] = v + } + return b +} + // WithRetention sets the Retention field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the Retention field is set to the value of the last call. From f4c520b84e7b8cedac0118219c0e94d5a6c55e10 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 22 Sep 2023 21:17:45 +0200 Subject: [PATCH 2/2] Add consumer metadata Signed-off-by: Tomasz Pietrek --- controllers/jetstream/consumer.go | 4 ++ deploy/crds.yml | 5 ++ .../apis/jetstream/v1beta2/consumertypes.go | 69 ++++++++++--------- .../v1beta2/zz_generated.deepcopy.go | 7 ++ .../jetstream/v1beta2/consumerspec.go | 15 ++++ 5 files changed, 66 insertions(+), 34 deletions(-) diff --git a/controllers/jetstream/consumer.go b/controllers/jetstream/consumer.go index dfab1902..c26393d2 100644 --- a/controllers/jetstream/consumer.go +++ b/controllers/jetstream/consumer.go @@ -433,6 +433,10 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) { opts = append(opts, jsm.MaxDeliveryAttempts(spec.MaxDeliver)) } + if spec.Metadata != nil { + opts = append(opts, jsm.ConsumerMetadata(spec.Metadata)) + } + return opts, nil } diff --git a/deploy/crds.yml b/deploy/crds.yml index 8443e30d..203be8f8 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -600,6 +600,11 @@ spec: description: Force the consumer state to be kept in memory rather than inherit the setting from the stream. type: boolean default: false + metadata: + description: Additional Consumer metadata. + type: object + additionalProperties: + type: string tls: description: A client's TLS certs and keys. type: object diff --git a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go index 9575f697..d565e5c5 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go @@ -22,40 +22,41 @@ func (c *Consumer) GetSpec() interface{} { // ConsumerSpec is the spec for a Consumer resource type ConsumerSpec struct { - AckPolicy string `json:"ackPolicy"` - AckWait string `json:"ackWait"` - BackOff []string `json:"backoff"` - Creds string `json:"creds"` - DeliverGroup string `json:"deliverGroup"` - DeliverPolicy string `json:"deliverPolicy"` - DeliverSubject string `json:"deliverSubject"` - Description string `json:"description"` - PreventDelete bool `json:"preventDelete"` - PreventUpdate bool `json:"preventUpdate"` - DurableName string `json:"durableName"` - FilterSubject string `json:"filterSubject"` - FilterSubjects []string `json:"filterSubjects"` - FlowControl bool `json:"flowControl"` - HeadersOnly bool `json:"headersOnly"` - HeartbeatInterval string `json:"heartbeatInterval"` - MaxAckPending int `json:"maxAckPending"` - MaxDeliver int `json:"maxDeliver"` - MaxRequestBatch int `json:"maxRequestBatch"` - MaxRequestExpires string `json:"maxRequestExpires"` - MaxRequestMaxBytes int `json:"maxRequestMaxBytes"` - MaxWaiting int `json:"maxWaiting"` - MemStorage bool `json:"memStorage"` - Nkey string `json:"nkey"` - OptStartSeq int `json:"optStartSeq"` - OptStartTime string `json:"optStartTime"` - RateLimitBps int `json:"rateLimitBps"` - ReplayPolicy string `json:"replayPolicy"` - Replicas int `json:"replicas"` - SampleFreq string `json:"sampleFreq"` - Servers []string `json:"servers"` - StreamName string `json:"streamName"` - TLS TLS `json:"tls"` - Account string `json:"account"` + AckPolicy string `json:"ackPolicy"` + AckWait string `json:"ackWait"` + BackOff []string `json:"backoff"` + Creds string `json:"creds"` + DeliverGroup string `json:"deliverGroup"` + DeliverPolicy string `json:"deliverPolicy"` + DeliverSubject string `json:"deliverSubject"` + Description string `json:"description"` + PreventDelete bool `json:"preventDelete"` + PreventUpdate bool `json:"preventUpdate"` + DurableName string `json:"durableName"` + FilterSubject string `json:"filterSubject"` + FilterSubjects []string `json:"filterSubjects"` + FlowControl bool `json:"flowControl"` + HeadersOnly bool `json:"headersOnly"` + HeartbeatInterval string `json:"heartbeatInterval"` + MaxAckPending int `json:"maxAckPending"` + MaxDeliver int `json:"maxDeliver"` + MaxRequestBatch int `json:"maxRequestBatch"` + MaxRequestExpires string `json:"maxRequestExpires"` + MaxRequestMaxBytes int `json:"maxRequestMaxBytes"` + MaxWaiting int `json:"maxWaiting"` + MemStorage bool `json:"memStorage"` + Nkey string `json:"nkey"` + OptStartSeq int `json:"optStartSeq"` + OptStartTime string `json:"optStartTime"` + RateLimitBps int `json:"rateLimitBps"` + ReplayPolicy string `json:"replayPolicy"` + Replicas int `json:"replicas"` + SampleFreq string `json:"sampleFreq"` + Servers []string `json:"servers"` + StreamName string `json:"streamName"` + TLS TLS `json:"tls"` + Account string `json:"account"` + Metadata map[string]string `json:"metadata"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go index c1ce657f..5525e111 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go @@ -210,6 +210,13 @@ func (in *ConsumerSpec) DeepCopyInto(out *ConsumerSpec) { copy(*out, *in) } in.TLS.DeepCopyInto(&out.TLS) + if in.Metadata != nil { + in, out := &in.Metadata, &out.Metadata + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go index 313a7b4f..02683d96 100644 --- a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go @@ -52,6 +52,7 @@ type ConsumerSpecApplyConfiguration struct { StreamName *string `json:"streamName,omitempty"` TLS *TLSApplyConfiguration `json:"tls,omitempty"` Account *string `json:"account,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` } // ConsumerSpecApplyConfiguration constructs an declarative configuration of the ConsumerSpec type for use with @@ -337,3 +338,17 @@ func (b *ConsumerSpecApplyConfiguration) WithAccount(value string) *ConsumerSpec b.Account = &value return b } + +// WithMetadata puts the entries into the Metadata field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Metadata field, +// overwriting an existing map entries in Metadata field with the same key. +func (b *ConsumerSpecApplyConfiguration) WithMetadata(entries map[string]string) *ConsumerSpecApplyConfiguration { + if b.Metadata == nil && len(entries) > 0 { + b.Metadata = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Metadata[k] = v + } + return b +}