Skip to content

Commit

Permalink
chore(dataobj): Refactor processor & builder to separate concerns (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Feb 4, 2025
1 parent 536dbc6 commit 7f32de6
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 275 deletions.
10 changes: 6 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,6 @@ kafka_config:

dataobj_consumer:
builderconfig:
# The size of the SHA prefix to use for the data object builder.
# CLI flag: -dataobj-consumer.sha-prefix-size
[sha_prefix_size: <int> | default = 2]

# The size of the target page to use for the data object builder.
# CLI flag: -dataobj-consumer.target-page-size
[target_page_size: <int> | default = 2MiB]
Expand All @@ -808,6 +804,12 @@ dataobj_consumer:
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]

uploader:
# The size of the SHA prefix to use for generating object storage keys for
# data objects.
# CLI flag: -dataobj-consumer.sha-prefix-size
[shaprefixsize: <int> | default = 2]

# The prefix to use for the storage bucket.
# CLI flag: -dataobj-consumer.storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,4 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push
// leodido fork his project to continue support
replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0

replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866
replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,8 @@ github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675 h1:U94jQ2TQr1m3
github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 h1:/y3qC0I9kttHjLPxp4bGf+4jcJw60C6hrokTPckHYT8=
github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6 h1:SlGPi1Sg15c/OzhGMAd7/EOnYJ03ZX6Wuql8lQ2pRU4=
github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
Expand Down
134 changes: 41 additions & 93 deletions pkg/dataobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package dataobj
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"fmt"
Expand All @@ -14,7 +12,6 @@ import (
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
Expand All @@ -23,16 +20,15 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
)

// ErrBufferFull is returned by [Builder.Append] when the buffer is full and
// ErrBuilderFull is returned by [Builder.Append] when the buffer is full and
// needs to flush; call [Builder.Flush] to flush it.
var ErrBufferFull = errors.New("buffer full")
var (
ErrBuilderFull = errors.New("builder full")
ErrBuilderEmpty = errors.New("builder empty")
)

// BuilderConfig configures a data object [Builder].
type BuilderConfig struct {
// SHAPrefixSize sets the number of bytes of the SHA filename to use as a
// folder path.
SHAPrefixSize int `yaml:"sha_prefix_size"`

// TargetPageSize configures a target size for encoded pages within the data
// object. TargetPageSize accounts for encoding, but not for compression.
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
Expand Down Expand Up @@ -65,7 +61,6 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
_ = cfg.BufferSize.Set("16MB") // Page Size * 8
_ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8

f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.")
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.")
f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.")
Expand All @@ -76,10 +71,6 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
func (cfg *BuilderConfig) Validate() error {
var errs []error

if cfg.SHAPrefixSize <= 0 {
errs = append(errs, errors.New("SHAPrefixSize must be greater than 0"))
}

if cfg.TargetPageSize <= 0 {
errs = append(errs, errors.New("TargetPageSize must be greater than 0"))
} else if cfg.TargetPageSize >= cfg.TargetObjectSize {
Expand Down Expand Up @@ -108,47 +99,41 @@ func (cfg *BuilderConfig) Validate() error {
// Methods on Builder are not goroutine-safe; callers are responsible for
// synchronizing calls.
type Builder struct {
cfg BuilderConfig
metrics *metrics
bucket objstore.Bucket
tenantID string
cfg BuilderConfig
metrics *metrics

labelCache *lru.Cache[string, labels.Labels]

currentSizeEstimate int
state builderState

streams *streams.Streams
logs *logs.Logs

flushBuffer *bytes.Buffer
encoder *encoding.Encoder
state builderState

encoder *encoding.Encoder
}

type builderState int

type FlushResult struct {
Path string
MinTimestamp time.Time
MaxTimestamp time.Time
}

const (
// builderStateReady indicates the builder is empty and ready to accept new data.
// builderStateEmpty indicates the builder is empty and ready to accept new data.
builderStateEmpty builderState = iota

// builderStateDirty indicates the builder has been modified since the last flush.
builderStateDirty

// builderStateFlushing indicates the builder has data to flush.
builderStateFlush
)

type FlushStats struct {
MinTimestamp time.Time
MaxTimestamp time.Time
}

// NewBuilder creates a new Builder which stores data objects for the specified
// tenant in a bucket.
//
// NewBuilder returns an error if BuilderConfig is invalid.
func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) {
func NewBuilder(cfg BuilderConfig) (*Builder, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -160,17 +145,12 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu

var (
metrics = newMetrics()

flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize)))
encoder = encoding.NewEncoder(flushBuffer)
)
metrics.ObserveConfig(cfg)

return &Builder{
cfg: cfg,
metrics: metrics,
bucket: bucket,
tenantID: tenantID,
cfg: cfg,
metrics: metrics,

labelCache: labelCache,

Expand All @@ -181,23 +161,17 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu
SectionSize: int(cfg.TargetSectionSize),
}),

flushBuffer: flushBuffer,
encoder: encoder,
encoder: encoding.NewEncoder(nil),
}, nil
}

// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBufferFull] if the
// error if the stream labels cannot be parsed or [ErrBuilderFull] if the
// builder is full.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) Append(stream logproto.Stream) error {
// Don't allow appending to a builder that has data to be flushed.
if b.state == builderStateFlush {
return ErrBufferFull
}

ls, err := b.parseLabels(stream.Labels)
if err != nil {
return err
Expand All @@ -210,7 +184,7 @@ func (b *Builder) Append(stream logproto.Stream) error {
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) {
return ErrBufferFull
return ErrBuilderFull
}

timer := prometheus.NewTimer(b.metrics.appendTime)
Expand Down Expand Up @@ -286,60 +260,37 @@ func streamSizeEstimate(stream logproto.Stream) int {
return size
}

// Flush flushes all buffered data to object storage. Calling Flush can result
// Flush flushes all buffered data to the buffer provided. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
func (b *Builder) Flush(ctx context.Context) (FlushResult, error) {
buf, err := b.FlushToBuffer()
if err != nil {
return FlushResult{}, fmt.Errorf("flushing buffer: %w", err)
// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended.
func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
if b.state == builderStateEmpty {
return FlushStats{}, ErrBuilderEmpty
}

timer := prometheus.NewTimer(b.metrics.flushTime)
defer timer.ObserveDuration()

sum := sha256.Sum224(b.flushBuffer.Bytes())
sumStr := hex.EncodeToString(sum[:])

objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:])
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(buf.Bytes())); err != nil {
return FlushResult{}, fmt.Errorf("uploading object: %w", err)
err := b.buildObject(output)
if err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("building object: %w", err)
}

minTime, maxTime := b.streams.GetBounds()
minTime, maxTime := b.streams.TimeRange()

b.Reset()
return FlushResult{
Path: objectPath,
return FlushStats{
MinTimestamp: minTime,
MaxTimestamp: maxTime,
}, nil
}

func (b *Builder) FlushToBuffer() (*bytes.Buffer, error) {
switch b.state {
case builderStateEmpty:
return nil, nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return nil, fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
}

return b.flushBuffer, nil
}

func (b *Builder) buildObject() error {
func (b *Builder) buildObject(output *bytes.Buffer) error {
timer := prometheus.NewTimer(b.metrics.buildTime)
defer timer.ObserveDuration()

// We reset after a successful flush, but we also reset the buffer before
// building for safety.
b.flushBuffer.Reset()
initialBufferSize := output.Len()

b.encoder.Reset(output)

if err := b.streams.EncodeTo(b.encoder); err != nil {
return fmt.Errorf("encoding streams: %w", err)
Expand All @@ -349,12 +300,12 @@ func (b *Builder) buildObject() error {
return fmt.Errorf("encoding object: %w", err)
}

b.metrics.builtSize.Observe(float64(b.flushBuffer.Len()))
b.metrics.builtSize.Observe(float64(output.Len() - initialBufferSize))

// We pass context.Background() below to avoid allowing building an object to
// time out; timing out on build would discard anything we built and would
// cause data loss.
dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len()))
dec := encoding.ReaderAtDecoder(bytes.NewReader(output.Bytes()[initialBufferSize:]), int64(output.Len()-initialBufferSize))
return b.metrics.encoding.Observe(context.Background(), dec)
}

Expand All @@ -363,9 +314,9 @@ func (b *Builder) Reset() {
b.logs.Reset()
b.streams.Reset()

b.state = builderStateEmpty
b.flushBuffer.Reset()
b.metrics.sizeEstimate.Set(0)
b.currentSizeEstimate = 0
b.state = builderStateEmpty
}

// RegisterMetrics registers metrics about builder to report to reg. All
Expand All @@ -374,13 +325,10 @@ func (b *Builder) Reset() {
// If multiple Builders for the same tenant are running in the same process,
// reg must contain additional labels to differentiate between them.
func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)

return b.metrics.Register(reg)
}

// UnregisterMetrics unregisters metrics about builder from reg.
func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)
b.metrics.Unregister(reg)
}
Loading

0 comments on commit 7f32de6

Please sign in to comment.