From 7f32de6cf256e21ced948b860b835cb37f89a6b9 Mon Sep 17 00:00:00 2001 From: benclive Date: Tue, 4 Feb 2025 15:36:48 +0000 Subject: [PATCH] chore(dataobj): Refactor processor & builder to separate concerns (#16055) --- docs/sources/shared/configuration.md | 10 +- go.mod | 2 +- go.sum | 4 +- pkg/dataobj/builder.go | 134 ++++++------------ pkg/dataobj/builder_test.go | 65 ++++----- pkg/dataobj/consumer/config.go | 7 + pkg/dataobj/consumer/metrics.go | 15 +- pkg/dataobj/consumer/partition_processor.go | 98 ++++++++----- pkg/dataobj/consumer/service.go | 2 +- pkg/dataobj/internal/encoding/encoder.go | 8 ++ .../internal/sections/streams/streams.go | 16 +-- pkg/dataobj/metastore/metastore.go | 54 ++++--- pkg/dataobj/metastore/metastore_test.go | 25 ++-- pkg/dataobj/metrics.go | 44 ++---- pkg/dataobj/uploader/metrics.go | 63 ++++++++ pkg/dataobj/uploader/uploader.go | 93 ++++++++++++ .../providers/filesystem/filesystem.go | 10 +- .../thanos-io/objstore/providers/gcs/gcs.go | 4 +- vendor/modules.txt | 4 +- 19 files changed, 383 insertions(+), 275 deletions(-) create mode 100644 pkg/dataobj/uploader/metrics.go create mode 100644 pkg/dataobj/uploader/uploader.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 035c550db7eeb..b53d5266caba6 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -788,10 +788,6 @@ kafka_config: dataobj_consumer: builderconfig: - # The size of the SHA prefix to use for the data object builder. - # CLI flag: -dataobj-consumer.sha-prefix-size - [sha_prefix_size: | default = 2] - # The size of the target page to use for the data object builder. # CLI flag: -dataobj-consumer.target-page-size [target_page_size: | default = 2MiB] @@ -808,6 +804,12 @@ dataobj_consumer: # CLI flag: -dataobj-consumer.buffer-size [buffer_size: | default = 16MiB] + uploader: + # The size of the SHA prefix to use for generating object storage keys for + # data objects. + # CLI flag: -dataobj-consumer.sha-prefix-size + [shaprefixsize: | default = 2] + # The prefix to use for the storage bucket. # CLI flag: -dataobj-consumer.storage-bucket-prefix [storage_bucket_prefix: | default = "dataobj/"] diff --git a/go.mod b/go.mod index c2f1561b05771..2ae12f5d18c2e 100644 --- a/go.mod +++ b/go.mod @@ -409,4 +409,4 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push // leodido fork his project to continue support replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0 -replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 +replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6 diff --git a/go.sum b/go.sum index 7fd2d908908b6..0e1a136281545 100644 --- a/go.sum +++ b/go.sum @@ -628,8 +628,8 @@ github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675 h1:U94jQ2TQr1m3 github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 h1:/y3qC0I9kttHjLPxp4bGf+4jcJw60C6hrokTPckHYT8= -github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM= +github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6 h1:SlGPi1Sg15c/OzhGMAd7/EOnYJ03ZX6Wuql8lQ2pRU4= +github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index 87f668bc4fd2c..fefe62cb1c1d8 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -3,8 +3,6 @@ package dataobj import ( "bytes" "context" - "crypto/sha256" - "encoding/hex" "errors" "flag" "fmt" @@ -14,7 +12,6 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" @@ -23,16 +20,15 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" ) -// ErrBufferFull is returned by [Builder.Append] when the buffer is full and +// ErrBuilderFull is returned by [Builder.Append] when the buffer is full and // needs to flush; call [Builder.Flush] to flush it. -var ErrBufferFull = errors.New("buffer full") +var ( + ErrBuilderFull = errors.New("builder full") + ErrBuilderEmpty = errors.New("builder empty") +) // BuilderConfig configures a data object [Builder]. type BuilderConfig struct { - // SHAPrefixSize sets the number of bytes of the SHA filename to use as a - // folder path. - SHAPrefixSize int `yaml:"sha_prefix_size"` - // TargetPageSize configures a target size for encoded pages within the data // object. TargetPageSize accounts for encoding, but not for compression. TargetPageSize flagext.Bytes `yaml:"target_page_size"` @@ -65,7 +61,6 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet _ = cfg.BufferSize.Set("16MB") // Page Size * 8 _ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8 - f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.") f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.") f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") @@ -76,10 +71,6 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet func (cfg *BuilderConfig) Validate() error { var errs []error - if cfg.SHAPrefixSize <= 0 { - errs = append(errs, errors.New("SHAPrefixSize must be greater than 0")) - } - if cfg.TargetPageSize <= 0 { errs = append(errs, errors.New("TargetPageSize must be greater than 0")) } else if cfg.TargetPageSize >= cfg.TargetObjectSize { @@ -108,47 +99,41 @@ func (cfg *BuilderConfig) Validate() error { // Methods on Builder are not goroutine-safe; callers are responsible for // synchronizing calls. type Builder struct { - cfg BuilderConfig - metrics *metrics - bucket objstore.Bucket - tenantID string + cfg BuilderConfig + metrics *metrics labelCache *lru.Cache[string, labels.Labels] currentSizeEstimate int - state builderState streams *streams.Streams logs *logs.Logs - flushBuffer *bytes.Buffer - encoder *encoding.Encoder + state builderState + + encoder *encoding.Encoder } type builderState int -type FlushResult struct { - Path string - MinTimestamp time.Time - MaxTimestamp time.Time -} - const ( - // builderStateReady indicates the builder is empty and ready to accept new data. + // builderStateEmpty indicates the builder is empty and ready to accept new data. builderStateEmpty builderState = iota // builderStateDirty indicates the builder has been modified since the last flush. builderStateDirty - - // builderStateFlushing indicates the builder has data to flush. - builderStateFlush ) +type FlushStats struct { + MinTimestamp time.Time + MaxTimestamp time.Time +} + // NewBuilder creates a new Builder which stores data objects for the specified // tenant in a bucket. // // NewBuilder returns an error if BuilderConfig is invalid. -func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) { +func NewBuilder(cfg BuilderConfig) (*Builder, error) { if err := cfg.Validate(); err != nil { return nil, err } @@ -160,17 +145,12 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu var ( metrics = newMetrics() - - flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize))) - encoder = encoding.NewEncoder(flushBuffer) ) metrics.ObserveConfig(cfg) return &Builder{ - cfg: cfg, - metrics: metrics, - bucket: bucket, - tenantID: tenantID, + cfg: cfg, + metrics: metrics, labelCache: labelCache, @@ -181,23 +161,17 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu SectionSize: int(cfg.TargetSectionSize), }), - flushBuffer: flushBuffer, - encoder: encoder, + encoder: encoding.NewEncoder(nil), }, nil } // Append buffers a stream to be written to a data object. Append returns an -// error if the stream labels cannot be parsed or [ErrBufferFull] if the +// error if the stream labels cannot be parsed or [ErrBuilderFull] if the // builder is full. // // Once a Builder is full, call [Builder.Flush] to flush the buffered data, // then call Append again with the same entry. func (b *Builder) Append(stream logproto.Stream) error { - // Don't allow appending to a builder that has data to be flushed. - if b.state == builderStateFlush { - return ErrBufferFull - } - ls, err := b.parseLabels(stream.Labels) if err != nil { return err @@ -210,7 +184,7 @@ func (b *Builder) Append(stream logproto.Stream) error { // b.currentSizeEstimate will always be updated to reflect the size following // the previous append. if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) { - return ErrBufferFull + return ErrBuilderFull } timer := prometheus.NewTimer(b.metrics.appendTime) @@ -286,60 +260,37 @@ func streamSizeEstimate(stream logproto.Stream) int { return size } -// Flush flushes all buffered data to object storage. Calling Flush can result +// Flush flushes all buffered data to the buffer provided. Calling Flush can result // in a no-op if there is no buffered data to flush. // -// If Flush builds an object but fails to upload it to object storage, the -// built object is cached and can be retried. [Builder.Reset] can be called to -// discard any pending data and allow new data to be appended. -func (b *Builder) Flush(ctx context.Context) (FlushResult, error) { - buf, err := b.FlushToBuffer() - if err != nil { - return FlushResult{}, fmt.Errorf("flushing buffer: %w", err) +// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended. +func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { + if b.state == builderStateEmpty { + return FlushStats{}, ErrBuilderEmpty } - timer := prometheus.NewTimer(b.metrics.flushTime) - defer timer.ObserveDuration() - - sum := sha256.Sum224(b.flushBuffer.Bytes()) - sumStr := hex.EncodeToString(sum[:]) - - objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:]) - if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(buf.Bytes())); err != nil { - return FlushResult{}, fmt.Errorf("uploading object: %w", err) + err := b.buildObject(output) + if err != nil { + b.metrics.flushFailures.Inc() + return FlushStats{}, fmt.Errorf("building object: %w", err) } - minTime, maxTime := b.streams.GetBounds() + minTime, maxTime := b.streams.TimeRange() b.Reset() - return FlushResult{ - Path: objectPath, + return FlushStats{ MinTimestamp: minTime, MaxTimestamp: maxTime, }, nil } -func (b *Builder) FlushToBuffer() (*bytes.Buffer, error) { - switch b.state { - case builderStateEmpty: - return nil, nil // Nothing to flush - case builderStateDirty: - if err := b.buildObject(); err != nil { - return nil, fmt.Errorf("building object: %w", err) - } - b.state = builderStateFlush - } - - return b.flushBuffer, nil -} - -func (b *Builder) buildObject() error { +func (b *Builder) buildObject(output *bytes.Buffer) error { timer := prometheus.NewTimer(b.metrics.buildTime) defer timer.ObserveDuration() - // We reset after a successful flush, but we also reset the buffer before - // building for safety. - b.flushBuffer.Reset() + initialBufferSize := output.Len() + + b.encoder.Reset(output) if err := b.streams.EncodeTo(b.encoder); err != nil { return fmt.Errorf("encoding streams: %w", err) @@ -349,12 +300,12 @@ func (b *Builder) buildObject() error { return fmt.Errorf("encoding object: %w", err) } - b.metrics.builtSize.Observe(float64(b.flushBuffer.Len())) + b.metrics.builtSize.Observe(float64(output.Len() - initialBufferSize)) // We pass context.Background() below to avoid allowing building an object to // time out; timing out on build would discard anything we built and would // cause data loss. - dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len())) + dec := encoding.ReaderAtDecoder(bytes.NewReader(output.Bytes()[initialBufferSize:]), int64(output.Len()-initialBufferSize)) return b.metrics.encoding.Observe(context.Background(), dec) } @@ -363,9 +314,9 @@ func (b *Builder) Reset() { b.logs.Reset() b.streams.Reset() - b.state = builderStateEmpty - b.flushBuffer.Reset() b.metrics.sizeEstimate.Set(0) + b.currentSizeEstimate = 0 + b.state = builderStateEmpty } // RegisterMetrics registers metrics about builder to report to reg. All @@ -374,13 +325,10 @@ func (b *Builder) Reset() { // If multiple Builders for the same tenant are running in the same process, // reg must contain additional labels to differentiate between them. func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error { - reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) - return b.metrics.Register(reg) } // UnregisterMetrics unregisters metrics about builder from reg. func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) { - reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) b.metrics.Unregister(reg) } diff --git a/pkg/dataobj/builder_test.go b/pkg/dataobj/builder_test.go index ff86e8dcb941c..20e9d0db9b4f7 100644 --- a/pkg/dataobj/builder_test.go +++ b/pkg/dataobj/builder_test.go @@ -1,25 +1,21 @@ package dataobj import ( + "bytes" "context" "errors" - "fmt" "strings" "testing" "time" "github.com/stretchr/testify/require" - "github.com/thanos-io/objstore" "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/logproto" ) var testBuilderConfig = BuilderConfig{ - SHAPrefixSize: 2, - TargetPageSize: 2048, TargetObjectSize: 4096, TargetSectionSize: 4096, @@ -28,7 +24,8 @@ var testBuilderConfig = BuilderConfig{ } func TestBuilder(t *testing.T) { - bucket := objstore.NewInMemBucket() + buf := bytes.NewBuffer(nil) + dirtyBuf := bytes.NewBuffer([]byte("dirty")) streams := []logproto.Stream{ { @@ -75,22 +72,40 @@ func TestBuilder(t *testing.T) { } t.Run("Build", func(t *testing.T) { - builder, err := NewBuilder(testBuilderConfig, bucket, "fake") + builder, err := NewBuilder(testBuilderConfig) require.NoError(t, err) for _, entry := range streams { require.NoError(t, builder.Append(entry)) } - _, err = builder.Flush(context.Background()) + _, err = builder.Flush(buf) require.NoError(t, err) }) t.Run("Read", func(t *testing.T) { - objects, err := result.Collect(listObjects(context.Background(), bucket, "fake")) + obj := FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.StreamsSections) + require.Equal(t, 1, md.LogsSections) + }) + + t.Run("BuildWithDirtyBuffer", func(t *testing.T) { + builder, err := NewBuilder(testBuilderConfig) require.NoError(t, err) - require.Len(t, objects, 1) - obj := FromBucket(bucket, objects[0]) + for _, entry := range streams { + require.NoError(t, builder.Append(entry)) + } + + _, err = builder.Flush(dirtyBuf) + require.NoError(t, err) + + require.Equal(t, buf.Len(), dirtyBuf.Len()-5) + }) + + t.Run("ReadFromDirtyBuffer", func(t *testing.T) { + obj := FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5)) md, err := obj.Metadata(context.Background()) require.NoError(t, err) require.Equal(t, 1, md.StreamsSections) @@ -104,9 +119,7 @@ func TestBuilder_Append(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - bucket := objstore.NewInMemBucket() - - builder, err := NewBuilder(testBuilderConfig, bucket, "fake") + builder, err := NewBuilder(testBuilderConfig) require.NoError(t, err) for { @@ -119,31 +132,9 @@ func TestBuilder_Append(t *testing.T) { Line: strings.Repeat("a", 1024), }}, }) - if errors.Is(err, ErrBufferFull) { + if errors.Is(err, ErrBuilderFull) { break } require.NoError(t, err) } } - -func listObjects(ctx context.Context, bucket objstore.Bucket, tenant string) result.Seq[string] { - tenantPath := fmt.Sprintf("tenant-%s/objects/", tenant) - - return result.Iter(func(yield func(string) bool) error { - errIterationStopped := errors.New("iteration stopped") - - err := bucket.Iter(ctx, tenantPath, func(name string) error { - if !yield(name) { - return errIterationStopped - } - return nil - }, objstore.WithRecursiveIter()) - - switch { - case errors.Is(err, errIterationStopped): - return nil - default: - return err - } - }) -} diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index c62ae612193cb..04b1ba58bc2ba 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -4,15 +4,21 @@ import ( "flag" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/uploader" ) type Config struct { dataobj.BuilderConfig + UploaderConfig uploader.Config `yaml:"uploader"` // StorageBucketPrefix is the prefix to use for the storage bucket. StorageBucketPrefix string `yaml:"storage_bucket_prefix"` } func (cfg *Config) Validate() error { + if err := cfg.UploaderConfig.Validate(); err != nil { + return err + } + return cfg.BuilderConfig.Validate() } @@ -22,5 +28,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) f.StringVar(&cfg.StorageBucketPrefix, prefix+"storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.") } diff --git a/pkg/dataobj/consumer/metrics.go b/pkg/dataobj/consumer/metrics.go index 4525cb512de3b..f2ef8f35f9555 100644 --- a/pkg/dataobj/consumer/metrics.go +++ b/pkg/dataobj/consumer/metrics.go @@ -13,7 +13,6 @@ type partitionOffsetMetrics struct { lastOffset atomic.Int64 // Error counters - flushFailures prometheus.Counter commitFailures prometheus.Counter appendFailures prometheus.Counter @@ -23,10 +22,6 @@ type partitionOffsetMetrics struct { func newPartitionOffsetMetrics() *partitionOffsetMetrics { p := &partitionOffsetMetrics{ - flushFailures: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "loki_dataobj_consumer_flush_failures_total", - Help: "Total number of flush failures", - }), commitFailures: prometheus.NewCounter(prometheus.CounterOpts{ Name: "loki_dataobj_consumer_commit_failures_total", Help: "Total number of commit failures", @@ -62,10 +57,9 @@ func (p *partitionOffsetMetrics) getCurrentOffset() float64 { func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error { collectors := []prometheus.Collector{ - p.currentOffset, - p.flushFailures, p.commitFailures, p.appendFailures, + p.currentOffset, p.processingDelay, } @@ -81,10 +75,9 @@ func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error { func (p *partitionOffsetMetrics) unregister(reg prometheus.Registerer) { collectors := []prometheus.Collector{ - p.currentOffset, - p.flushFailures, p.commitFailures, p.appendFailures, + p.currentOffset, p.processingDelay, } @@ -97,10 +90,6 @@ func (p *partitionOffsetMetrics) updateOffset(offset int64) { p.lastOffset.Store(offset) } -func (p *partitionOffsetMetrics) incFlushFailures() { - p.flushFailures.Inc() -} - func (p *partitionOffsetMetrics) incCommitFailures() { p.commitFailures.Inc() } diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 0033e04c640f5..37e63a87fd7db 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/dataobj/uploader" "github.com/grafana/loki/v3/pkg/kafka" ) @@ -26,15 +27,18 @@ type partitionProcessor struct { partition int32 tenantID []byte // Processing pipeline - records chan *kgo.Record - builder *dataobj.Builder - decoder *kafka.Decoder + records chan *kgo.Record + builder *dataobj.Builder + decoder *kafka.Decoder + uploader *uploader.Uploader + metastoreManager *metastore.Manager // Builder initialization - builderOnce sync.Once - builderCfg dataobj.BuilderConfig - bucket objstore.Bucket - metastoreManager *metastore.Manager + builderOnce sync.Once + builderCfg dataobj.BuilderConfig + bucket objstore.Bucket + flushBuffer *bytes.Buffer + // Metrics metrics *partitionOffsetMetrics @@ -46,7 +50,7 @@ type partitionProcessor struct { logger log.Logger } -func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor { +func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor { ctx, cancel := context.WithCancel(ctx) decoder, err := kafka.NewDecoder() if err != nil { @@ -55,6 +59,7 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d reg = prometheus.WrapRegistererWith(prometheus.Labels{ "shard": strconv.Itoa(int(virtualShard)), "partition": strconv.Itoa(int(partition)), + "tenant": tenantID, "topic": topic, }, reg) @@ -63,11 +68,14 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d level.Error(logger).Log("msg", "failed to register partition metrics", "err", err) } - metastoreManager, err := metastore.NewMetastoreManager(bucket, tenantID, logger, reg) - if err != nil { - level.Error(logger).Log("msg", "failed to create metastore manager", "err", err) - cancel() - return nil + uploader := uploader.New(uploaderCfg, bucket, tenantID) + if err := uploader.RegisterMetrics(reg); err != nil { + level.Error(logger).Log("msg", "failed to register uploader metrics", "err", err) + } + + metastoreManager := metastore.NewManager(bucket, tenantID, logger) + if err := metastoreManager.RegisterMetrics(reg); err != nil { + level.Error(logger).Log("msg", "failed to register metastore manager metrics", "err", err) } return &partitionProcessor{ @@ -84,6 +92,7 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d bucket: bucket, tenantID: []byte(tenantID), metrics: metrics, + uploader: uploader, metastoreManager: metastoreManager, } } @@ -117,6 +126,7 @@ func (p *partitionProcessor) stop() { p.builder.UnregisterMetrics(p.reg) } p.metrics.unregister(p.reg) + p.uploader.UnregisterMetrics(p.reg) } // Drops records from the channel if the processor is stopped. @@ -137,7 +147,8 @@ func (p *partitionProcessor) Append(records []*kgo.Record) bool { func (p *partitionProcessor) initBuilder() error { var initErr error p.builderOnce.Do(func() { - builder, err := dataobj.NewBuilder(p.builderCfg, p.bucket, string(p.tenantID)) + // Dataobj builder + builder, err := dataobj.NewBuilder(p.builderCfg) if err != nil { initErr = err return @@ -147,6 +158,7 @@ func (p *partitionProcessor) initBuilder() error { return } p.builder = builder + p.flushBuffer = bytes.NewBuffer(make([]byte, 0, p.builderCfg.TargetObjectSize)) }) return initErr } @@ -176,42 +188,32 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { } if err := p.builder.Append(stream); err != nil { - if err != dataobj.ErrBufferFull { + if err != dataobj.ErrBuilderFull { level.Error(p.logger).Log("msg", "failed to append stream", "err", err) p.metrics.incAppendFailures() return } - backoff := backoff.New(p.ctx, backoff.Config{ - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 10 * time.Second, - }) - - var flushResult dataobj.FlushResult - for backoff.Ongoing() { - flushResult, err = p.builder.Flush(p.ctx) - if err == nil { - break - } + flushedDataobjStats, err := p.builder.Flush(p.flushBuffer) + if err != nil { level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) - p.metrics.incFlushFailures() - backoff.Wait() + return + } + + objectPath, err := p.uploader.Upload(p.ctx, p.flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to upload object", "err", err) + return } - if err := p.metastoreManager.UpdateMetastore(p.ctx, flushResult); err != nil { + if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) return } - backoff.Reset() - for backoff.Ongoing() { - err = p.client.CommitRecords(p.ctx, record) - if err == nil { - break - } + if err := p.commitRecords(record); err != nil { level.Error(p.logger).Log("msg", "failed to commit records", "err", err) - p.metrics.incCommitFailures() - backoff.Wait() + return } if err := p.builder.Append(stream); err != nil { @@ -220,3 +222,25 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { } } } + +func (p *partitionProcessor) commitRecords(record *kgo.Record) error { + backoff := backoff.New(p.ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 20, + }) + + var lastErr error + backoff.Reset() + for backoff.Ongoing() { + err := p.client.CommitRecords(p.ctx, record) + if err == nil { + return nil + } + level.Error(p.logger).Log("msg", "failed to commit records", "err", err) + p.metrics.incCommitFailures() + lastErr = err + backoff.Wait() + } + return lastErr +} diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index aee69dbd0dfd3..3522d3a9492b1 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -95,7 +95,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie } for _, partition := range parts { - processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg) + processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg) s.partitionHandlers[topic][partition] = processor processor.start() } diff --git a/pkg/dataobj/internal/encoding/encoder.go b/pkg/dataobj/internal/encoding/encoder.go index a022d1795a28a..3ed57f5b3e2cd 100644 --- a/pkg/dataobj/internal/encoding/encoder.go +++ b/pkg/dataobj/internal/encoding/encoder.go @@ -167,6 +167,14 @@ func (enc *Encoder) Flush() error { return nil } +func (enc *Encoder) Reset(w streamio.Writer) { + enc.data.Reset() + enc.sections = nil + enc.curSection = nil + enc.w = w + enc.startOffset = len(magic) +} + func (enc *Encoder) append(data, metadata []byte) error { if enc.curSection == nil { return errElementNoExist diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index 138de989cc3cd..4d208fb1676d5 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" - "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" ) @@ -35,6 +34,7 @@ type Stream struct { Rows int // Number of rows in the stream. } +// Reset zeroes all values in the stream struct so it can be reused. func (s *Stream) Reset() { s.ID = 0 s.Labels = nil @@ -82,18 +82,8 @@ func New(metrics *Metrics, pageSize int) *Streams { } } -func (s *Streams) Iter() result.Seq[Stream] { - return result.Iter(func(yield func(Stream) bool) error { - for _, stream := range s.ordered { - if !yield(*stream) { - return nil - } - } - return nil - }) -} - -func (s *Streams) GetBounds() (time.Time, time.Time) { +// TimeRange returns the minimum and maximum timestamp across all streams. +func (s *Streams) TimeRange() (time.Time, time.Time) { return s.globalMinTimestamp, s.globalMaxTimestamp } diff --git a/pkg/dataobj/metastore/metastore.go b/pkg/dataobj/metastore/metastore.go index 08c0b00364c9a..ffa14672804b8 100644 --- a/pkg/dataobj/metastore/metastore.go +++ b/pkg/dataobj/metastore/metastore.go @@ -25,7 +25,6 @@ const ( var ( // Define our own builder config because metastore objects are significantly smaller. metastoreBuilderCfg = dataobj.BuilderConfig{ - SHAPrefixSize: 2, TargetObjectSize: 32 * 1024 * 1024, TargetPageSize: 4 * 1024 * 1024, BufferSize: 32 * 1024 * 1024, // 8x page size @@ -40,15 +39,13 @@ type Manager struct { bucket objstore.Bucket logger log.Logger backoff *backoff.Backoff + buf *bytes.Buffer builderOnce sync.Once } -func NewMetastoreManager(bucket objstore.Bucket, tenantID string, logger log.Logger, reg prometheus.Registerer) (*Manager, error) { +func NewManager(bucket objstore.Bucket, tenantID string, logger log.Logger) *Manager { metrics := newMetastoreMetrics() - if err := metrics.register(reg); err != nil { - return nil, err - } return &Manager{ bucket: bucket, @@ -60,62 +57,73 @@ func NewMetastoreManager(bucket objstore.Bucket, tenantID string, logger log.Log MaxBackoff: 10 * time.Second, }), builderOnce: sync.Once{}, - }, nil + } +} + +func (m *Manager) RegisterMetrics(reg prometheus.Registerer) error { + return m.metrics.register(reg) +} + +func (m *Manager) UnregisterMetrics(reg prometheus.Registerer) { + m.metrics.unregister(reg) } func (m *Manager) initBuilder() error { var initErr error m.builderOnce.Do(func() { - metastoreBuilder, err := dataobj.NewBuilder(metastoreBuilderCfg, m.bucket, m.tenantID) + metastoreBuilder, err := dataobj.NewBuilder(metastoreBuilderCfg) if err != nil { initErr = err return } + m.buf = bytes.NewBuffer(make([]byte, 0, metastoreBuilderCfg.TargetObjectSize)) m.metastoreBuilder = metastoreBuilder }) return initErr } -func (m *Manager) UpdateMetastore(ctx context.Context, flushResult dataobj.FlushResult) error { +// UpdateMetastore adds provided dataobj path to the metastore. Flush stats are used to determine the stored metadata about this dataobj. +func (m *Manager) UpdateMetastore(ctx context.Context, dataobjPath string, flushStats dataobj.FlushStats) error { var err error - start := time.Now() - defer m.metrics.observeMetastoreProcessing(start) + processingTime := prometheus.NewTimer(m.metrics.metastoreProcessingTime) + defer processingTime.ObserveDuration() // Initialize builder if this is the first call for this partition if err := m.initBuilder(); err != nil { return err } - minTimestamp, maxTimestamp := flushResult.MinTimestamp, flushResult.MaxTimestamp + minTimestamp, maxTimestamp := flushStats.MinTimestamp, flushStats.MaxTimestamp // Work our way through the metastore objects window by window, updating & creating them as needed. // Each one handles its own retries in order to keep making progress in the event of a failure. minMetastoreWindow := minTimestamp.Truncate(metastoreWindowSize) maxMetastoreWindow := maxTimestamp.Truncate(metastoreWindowSize) - for metastoreWindow := minMetastoreWindow; metastoreWindow.Compare(maxMetastoreWindow) <= 0; metastoreWindow = metastoreWindow.Add(metastoreWindowSize) { + for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) { metastorePath := fmt.Sprintf("tenant-%s/metastore/%s.store", m.tenantID, metastoreWindow.Format(time.RFC3339)) m.backoff.Reset() for m.backoff.Ongoing() { err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) { - buf, err := io.ReadAll(existing) + m.buf.Reset() + _, err := io.Copy(m.buf, existing) if err != nil { return nil, err } m.metastoreBuilder.Reset() - if len(buf) > 0 { - replayStart := time.Now() - object := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) + if m.buf.Len() > 0 { + replayDuration := prometheus.NewTimer(m.metrics.metastoreReplayTime) + object := dataobj.FromReaderAt(bytes.NewReader(m.buf.Bytes()), int64(m.buf.Len())) if err := m.readFromExisting(ctx, object); err != nil { return nil, err } - m.metrics.observeMetastoreReplay(replayStart) + replayDuration.ObserveDuration() } - encodingStart := time.Now() + encodingDuration := prometheus.NewTimer(m.metrics.metastoreEncodingTime) - ls := fmt.Sprintf("{__start__=\"%d\", __end__=\"%d\", __path__=\"%s\"}", minTimestamp.UnixNano(), maxTimestamp.UnixNano(), flushResult.Path) + ls := fmt.Sprintf("{__start__=\"%d\", __end__=\"%d\", __path__=\"%s\"}", minTimestamp.UnixNano(), maxTimestamp.UnixNano(), dataobjPath) err = m.metastoreBuilder.Append(logproto.Stream{ Labels: ls, Entries: []logproto.Entry{{Line: ""}}, @@ -124,12 +132,13 @@ func (m *Manager) UpdateMetastore(ctx context.Context, flushResult dataobj.Flush return nil, err } - newMetastore, err := m.metastoreBuilder.FlushToBuffer() + m.buf.Reset() + _, err = m.metastoreBuilder.Flush(m.buf) if err != nil { return nil, err } - m.metrics.observeMetastoreEncoding(encodingStart) - return newMetastore, nil + encodingDuration.ObserveDuration() + return m.buf, nil }) if err == nil { level.Info(m.logger).Log("msg", "successfully merged & updated metastore", "metastore", metastorePath) @@ -145,6 +154,7 @@ func (m *Manager) UpdateMetastore(ctx context.Context, flushResult dataobj.Flush return err } +// readFromExisting reads the provided metastore object and appends the streams to the builder so it can be later modified. func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object) error { // Fetch sections si, err := object.Metadata(ctx) diff --git a/pkg/dataobj/metastore/metastore_test.go b/pkg/dataobj/metastore/metastore_test.go index 582882917b5e9..363e99a6b8b56 100644 --- a/pkg/dataobj/metastore/metastore_test.go +++ b/pkg/dataobj/metastore/metastore_test.go @@ -2,12 +2,10 @@ package metastore import ( "context" - "fmt" "testing" "time" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/dskit/backoff" @@ -21,8 +19,7 @@ func BenchmarkWriteMetastores(t *testing.B) { bucket := objstore.NewInMemBucket() tenantID := "test-tenant" - m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer) - require.NoError(t, err) + m := NewManager(bucket, tenantID, log.NewNopLogger()) // Set limits for the test m.backoff = backoff.New(context.TODO(), backoff.Config{ @@ -34,10 +31,9 @@ func BenchmarkWriteMetastores(t *testing.B) { // Add test data spanning multiple metastore windows now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) - flushResults := make([]dataobj.FlushResult, 1000) + flushStats := make([]dataobj.FlushStats, 1000) for i := 0; i < 1000; i++ { - flushResults[i] = dataobj.FlushResult{ - Path: fmt.Sprintf("test-dataobj-path-%d", i), + flushStats[i] = dataobj.FlushStats{ MinTimestamp: now.Add(-1 * time.Hour).Add(time.Duration(i) * time.Millisecond), MaxTimestamp: now, } @@ -47,7 +43,7 @@ func BenchmarkWriteMetastores(t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { // Test writing metastores - err = m.UpdateMetastore(ctx, flushResults[i%len(flushResults)]) + err := m.UpdateMetastore(ctx, "path", flushStats[i%len(flushStats)]) require.NoError(t, err) } @@ -59,8 +55,7 @@ func TestWriteMetastores(t *testing.T) { bucket := objstore.NewInMemBucket() tenantID := "test-tenant" - m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer) - require.NoError(t, err) + m := NewManager(bucket, tenantID, log.NewNopLogger()) // Set limits for the test m.backoff = backoff.New(context.TODO(), backoff.Config{ @@ -72,8 +67,7 @@ func TestWriteMetastores(t *testing.T) { // Add test data spanning multiple metastore windows now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) - flushResult := dataobj.FlushResult{ - Path: "test-dataobj-path", + flushStats := dataobj.FlushStats{ MinTimestamp: now.Add(-1 * time.Hour), MaxTimestamp: now, } @@ -81,7 +75,7 @@ func TestWriteMetastores(t *testing.T) { require.Len(t, bucket.Objects(), 0) // Test writing metastores - err = m.UpdateMetastore(ctx, flushResult) + err := m.UpdateMetastore(ctx, "test-dataobj-path", flushStats) require.NoError(t, err) require.Len(t, bucket.Objects(), 1) @@ -90,13 +84,12 @@ func TestWriteMetastores(t *testing.T) { originalSize = len(obj) } - flushResult2 := dataobj.FlushResult{ - Path: "different-test-dataobj-path", + flushResult2 := dataobj.FlushStats{ MinTimestamp: now.Add(-15 * time.Minute), MaxTimestamp: now, } - err = m.UpdateMetastore(ctx, flushResult2) + err = m.UpdateMetastore(ctx, "different-dataobj-path", flushResult2) require.NoError(t, err) require.Len(t, bucket.Objects(), 1) diff --git a/pkg/dataobj/metrics.go b/pkg/dataobj/metrics.go index fa18d5159d0b2..9a029363e56da 100644 --- a/pkg/dataobj/metrics.go +++ b/pkg/dataobj/metrics.go @@ -16,13 +16,12 @@ type metrics struct { streams *streams.Metrics encoding *encoding.Metrics - shaPrefixSize prometheus.Gauge targetPageSize prometheus.Gauge targetObjectSize prometheus.Gauge - appendTime prometheus.Histogram - buildTime prometheus.Histogram - flushTime prometheus.Histogram + appendTime prometheus.Histogram + buildTime prometheus.Histogram + flushFailures prometheus.Counter sizeEstimate prometheus.Gauge builtSize prometheus.Histogram @@ -34,15 +33,6 @@ func newMetrics() *metrics { logs: logs.NewMetrics(), streams: streams.NewMetrics(), encoding: encoding.NewMetrics(), - - shaPrefixSize: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "loki", - Subsystem: "dataobj", - Name: "config_sha_prefix_size", - - Help: "Configured SHA prefix size.", - }), - targetPageSize: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "loki", Subsystem: "dataobj", @@ -85,19 +75,6 @@ func newMetrics() *metrics { NativeHistogramMinResetDuration: 0, }), - flushTime: prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: "loki", - Subsystem: "dataobj", - Name: "flush_time_seconds", - - Help: "Time taken flushing data objects to object storage.", - - Buckets: prometheus.DefBuckets, - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 0, - }), - sizeEstimate: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "loki", Subsystem: "dataobj", @@ -117,12 +94,19 @@ func newMetrics() *metrics { NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 0, }), + + flushFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "dataobj", + Name: "flush_failures_total", + + Help: "Total number of flush failures.", + }), } } // ObserveConfig updates config metrics based on the provided [BuilderConfig]. func (m *metrics) ObserveConfig(cfg BuilderConfig) { - m.shaPrefixSize.Set(float64(cfg.SHAPrefixSize)) m.targetPageSize.Set(float64(cfg.TargetPageSize)) m.targetObjectSize.Set(float64(cfg.TargetObjectSize)) } @@ -135,16 +119,15 @@ func (m *metrics) Register(reg prometheus.Registerer) error { errs = append(errs, m.streams.Register(reg)) errs = append(errs, m.encoding.Register(reg)) - errs = append(errs, reg.Register(m.shaPrefixSize)) errs = append(errs, reg.Register(m.targetPageSize)) errs = append(errs, reg.Register(m.targetObjectSize)) errs = append(errs, reg.Register(m.appendTime)) errs = append(errs, reg.Register(m.buildTime)) - errs = append(errs, reg.Register(m.flushTime)) errs = append(errs, reg.Register(m.sizeEstimate)) errs = append(errs, reg.Register(m.builtSize)) + errs = append(errs, reg.Register(m.flushFailures)) return errors.Join(errs...) } @@ -155,14 +138,13 @@ func (m *metrics) Unregister(reg prometheus.Registerer) { m.streams.Unregister(reg) m.encoding.Unregister(reg) - reg.Unregister(m.shaPrefixSize) reg.Unregister(m.targetPageSize) reg.Unregister(m.targetObjectSize) reg.Unregister(m.appendTime) reg.Unregister(m.buildTime) - reg.Unregister(m.flushTime) reg.Unregister(m.sizeEstimate) reg.Unregister(m.builtSize) + reg.Unregister(m.flushFailures) } diff --git a/pkg/dataobj/uploader/metrics.go b/pkg/dataobj/uploader/metrics.go new file mode 100644 index 0000000000000..49a1d7dc82d94 --- /dev/null +++ b/pkg/dataobj/uploader/metrics.go @@ -0,0 +1,63 @@ +package uploader + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + uploadTime prometheus.Histogram + uploadFailures prometheus.Counter + shaPrefixSize prometheus.Gauge +} + +func newMetrics(shaPrefixSize int) *metrics { + metrics := &metrics{ + uploadFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_upload_failures_total", + Help: "Total number of upload failures", + }), + uploadTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "dataobj", + Name: "upload_time_seconds", + Help: "Time taken writing data objects to object storage.", + Buckets: prometheus.DefBuckets, + }), + shaPrefixSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "loki", + Subsystem: "dataobj", + Name: "sha_prefix_size", + Help: "The size of the SHA prefix used for object storage keys.", + }), + } + + metrics.shaPrefixSize.Set(float64(shaPrefixSize)) + return metrics +} + +func (m *metrics) register(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + m.uploadFailures, + m.uploadTime, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + return nil +} + +func (m *metrics) unregister(reg prometheus.Registerer) { + collectors := []prometheus.Collector{ + m.uploadFailures, + m.uploadTime, + } + + for _, collector := range collectors { + reg.Unregister(collector) + } +} diff --git a/pkg/dataobj/uploader/uploader.go b/pkg/dataobj/uploader/uploader.go new file mode 100644 index 0000000000000..b19a8699b82aa --- /dev/null +++ b/pkg/dataobj/uploader/uploader.go @@ -0,0 +1,93 @@ +package uploader + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "flag" + "fmt" + "time" + + "github.com/grafana/dskit/backoff" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" +) + +type Config struct { + // SHAPrefixSize is the size of the SHA prefix used for splitting object storage keys + SHAPrefixSize int +} + +// RegisterFlagsWithPrefix registers flags with the given prefix. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for generating object storage keys for data objects.") +} + +func (cfg *Config) Validate() error { + if cfg.SHAPrefixSize <= 0 { + return fmt.Errorf("SHAPrefixSize must be greater than 0") + } + return nil +} + +type Uploader struct { + SHAPrefixSize int + bucket objstore.Bucket + tenantID string + metrics *metrics +} + +func New(cfg Config, bucket objstore.Bucket, tenantID string) *Uploader { + metrics := newMetrics(cfg.SHAPrefixSize) + + return &Uploader{ + SHAPrefixSize: cfg.SHAPrefixSize, + bucket: bucket, + tenantID: tenantID, + metrics: metrics, + } +} + +func (d *Uploader) RegisterMetrics(reg prometheus.Registerer) error { + return d.metrics.register(reg) +} + +func (d *Uploader) UnregisterMetrics(reg prometheus.Registerer) { + d.metrics.unregister(reg) +} + +// getKey determines the key in object storage to upload the object to, based on our path scheme. +func (d *Uploader) getKey(object *bytes.Buffer) string { + sum := sha256.Sum224(object.Bytes()) + sumStr := hex.EncodeToString(sum[:]) + + return fmt.Sprintf("tenant-%s/objects/%s/%s", d.tenantID, sumStr[:d.SHAPrefixSize], sumStr[d.SHAPrefixSize:]) +} + +// Upload uploads an object to the configured bucket and returns the key. +func (d *Uploader) Upload(ctx context.Context, object *bytes.Buffer) (string, error) { + timer := prometheus.NewTimer(d.metrics.uploadTime) + defer timer.ObserveDuration() + + objectPath := d.getKey(object) + + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 20, + }) + + var lastErr error + for backoff.Ongoing() { + err := d.bucket.Upload(ctx, objectPath, bytes.NewReader(object.Bytes())) + if err == nil { + return objectPath, nil + } + lastErr = err + backoff.Wait() + } + + d.metrics.uploadFailures.Inc() + return "", fmt.Errorf("uploading object after %d retries: %w", backoff.NumRetries(), lastErr) +} diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 920f3ace74715..f6538b894edb4 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -4,6 +4,7 @@ package filesystem import ( + "bytes" "context" "fmt" "io" @@ -292,7 +293,7 @@ func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reade defer r.Close() } - newContent, err := f(r) + newContent, err := f(wrapReader(r)) if err != nil { return err } @@ -305,6 +306,13 @@ func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reade return os.WriteFile(file, content, 0600) } +func wrapReader(r io.Reader) io.Reader { + if r == nil { + return bytes.NewReader(nil) + } + return r +} + func isDirEmpty(name string) (ok bool, err error) { f, err := os.Open(filepath.Clean(name)) if os.IsNotExist(err) { diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index cd9105ca31c1b..ee702963190e4 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -395,9 +395,9 @@ func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reade return b.upload(ctx, name, newContent, generation, mustNotExist) } -func wrapReader(r *storage.Reader) io.ReadCloser { +func wrapReader(r *storage.Reader) io.Reader { if r == nil { - return io.NopCloser(bytes.NewReader(nil)) + return bytes.NewReader(nil) } return r diff --git a/vendor/modules.txt b/vendor/modules.txt index a135d97391855..92cceb83f10b3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1633,7 +1633,7 @@ github.com/stretchr/testify/assert/yaml github.com/stretchr/testify/mock github.com/stretchr/testify/require github.com/stretchr/testify/suite -# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 +# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6 ## explicit; go 1.22 github.com/thanos-io/objstore github.com/thanos-io/objstore/clientutil @@ -2562,4 +2562,4 @@ sigs.k8s.io/yaml/goyaml.v2 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc # github.com/grafana/loki/pkg/push => ./pkg/push # github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0 -# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 +# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6