Skip to content

Commit

Permalink
Merge pull request #58 from vinted/20230607_vinted
Browse files Browse the repository at this point in the history
*: bring in our patches for newest main
  • Loading branch information
GiedriusS authored Jun 7, 2023
2 parents 6622110 + e3e897a commit 11b7737
Show file tree
Hide file tree
Showing 20 changed files with 760 additions and 208 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ type prometheusConfig struct {
getConfigInterval time.Duration
getConfigTimeout time.Duration
httpClient *extflag.PathOrContent
useCompressedXOR bool
}

func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("compressed.xor", "Use the new compact, compressed XOR stream.").Default("false").BoolVar(&pc.useCompressedXOR)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
Expand Down
24 changes: 13 additions & 11 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.limitMaxMatchedSeries, conf.prometheus.useCompressedXOR)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -477,16 +477,17 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.SeriesSelectLimits
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.SeriesSelectLimits
limitMaxMatchedSeries int
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -501,4 +502,5 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.storeRateLimits.RegisterFlags(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
cmd.Flag("max-matched-series", "Maximum number of series can be matched before reading series data").Default("0").IntVar(&sc.limitMaxMatchedSeries)
}
33 changes: 27 additions & 6 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/relabel"
"github.com/thanos-io/objstore/client"

commonmodel "github.com/prometheus/common/model"
Expand Down Expand Up @@ -326,13 +327,15 @@ func runStore(

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),
metadataFilterFactory(
conf.filterConf.MinTime,
conf.filterConf.MaxTime,
relabelConfig,
logger,
reg,
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency),
})
conf.blockMetaFetchConcurrency,
conf.consistencyDelay))
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down Expand Up @@ -495,3 +498,21 @@ func runStore(
level.Info(logger).Log("msg", "starting store node")
return nil
}

func metadataFilterFactory(
minTime, maxTime model.TimeOrDurationValue,
relabelConfig []*relabel.Config,
logger log.Logger,
reg prometheus.Registerer,
ignoreDeletionMarkFilter block.MetadataFilter,
fetchConcurrency int,
consistencyDelay commonmodel.Duration,
) []block.MetadataFilter {
return []block.MetadataFilter{
block.NewTimePartitionMetaFilter(minTime, maxTime),
block.NewDeduplicateFilter(fetchConcurrency),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, time.Duration(consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
}
}
149 changes: 149 additions & 0 deletions cmd/thanos/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"context"
"crypto/rand"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
cmodel "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
)

type noopMetaFilter struct{}

func (f *noopMetaFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
return nil
}

func TestShardingHandlesDedupCorrectly(t *testing.T) {
const minTime = `0000-01-01T00:00:00Z`
const maxTime = `9999-12-31T23:59:59Z`

mdvMinTime := model.TimeOrDurationValue{}
testutil.Ok(t, mdvMinTime.Set(minTime))

mdvMaxTime := model.TimeOrDurationValue{}
testutil.Ok(t, mdvMaxTime.Set(maxTime))

monotonicReader := ulid.Monotonic(rand.Reader, 1)

// Generate a bunch of blocks and then there will be one with all children.
parentSources := []ulid.ULID{}
metas := map[ulid.ULID]metadata.Meta{}

for i := 0; i < 100; i++ {
ul := ulid.MustNew(0, monotonicReader)

metas[ul] = metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ul,
Compaction: tsdb.BlockMetaCompaction{
Sources: []ulid.ULID{ul},
},
},
}

parentSources = append(parentSources, ul)
}
ulidParent := ulid.MustNew(0, monotonicReader)
metas[ulidParent] = metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulidParent,
Compaction: tsdb.BlockMetaCompaction{
Sources: parentSources,
},
},
}

metaCopy := make(map[ulid.ULID]*metadata.Meta)
for k, v := range metas {
v := v
metaCopy[k] = &v
}

// There must be exactly one block in total of both shards.
filters := metadataFilterFactory(mdvMinTime,
mdvMaxTime,
[]*relabel.Config{
{
Action: relabel.HashMod,
SourceLabels: cmodel.LabelNames{"__block_id"},
TargetLabel: "shard",
Modulus: 2,
},
{
Action: relabel.Keep,
SourceLabels: cmodel.LabelNames{"shard"},
Regex: relabel.MustNewRegexp("1"),
},
},
log.NewNopLogger(),
prometheus.NewRegistry(),
&noopMetaFilter{},
1, 0,
)

shard1 := applyFilters(t, metaCopy, filters)

metaCopy = make(map[ulid.ULID]*metadata.Meta)
for k, v := range metas {
v := v
metaCopy[k] = &v
}

filters = metadataFilterFactory(mdvMinTime,
mdvMaxTime,
[]*relabel.Config{
{
Action: relabel.HashMod,
SourceLabels: cmodel.LabelNames{"__block_id"},
TargetLabel: "shard",
Modulus: 2,
},
{
Action: relabel.Keep,
SourceLabels: cmodel.LabelNames{"shard"},
Regex: relabel.MustNewRegexp("0"),
},
},
log.NewNopLogger(),
prometheus.NewRegistry(),
&noopMetaFilter{},
1, 0,
)
shard2 := applyFilters(t, metaCopy, filters)

testutil.Equals(t, 1, len(shard1)+len(shard2))
}

func applyFilters(t *testing.T, blocks map[ulid.ULID]*metadata.Meta, filters []block.MetadataFilter) map[ulid.ULID]*metadata.Meta {
t.Helper()

r := prometheus.NewRegistry()

synced := promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "random_metric_name",
}, []string{"duplicate"})

modified := promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "not_a_random_metric_name",
}, []string{"duplicate"})

for _, f := range filters {
testutil.Ok(t, f.Filter(context.TODO(), blocks, synced, modified))
}

return blocks
}
3 changes: 3 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ usage: thanos sidecar [<flags>]
Sidecar for Prometheus server.
Flags:
--compressed.xor Use the new compact, compressed XOR stream.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down Expand Up @@ -112,6 +113,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--max-matched-series=0 Maximum number of series can be matched before
reading series data
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
sidecar will serve only metrics, which happened
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,7 @@ replace (
k8s.io/klog => github.com/simonpasquier/klog-gokit v0.3.0
k8s.io/klog/v2 => github.com/simonpasquier/klog-gokit/v3 v3.0.0
)

replace github.com/prometheus/prometheus => github.com/vinted/prometheus v1.8.2-0.20230605130252-fb29ac940e9f

replace github.com/thanos-io/promql-engine => github.com/vinted/promql-engine v0.0.0-20230607131632-52afa29e6b44
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,6 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/prometheus v0.44.1-0.20230524110021-37e5249e33e4 h1:4O512Pmg+w4/uCs7Ho3z0zRSC/03831QvcRE+8nOOrQ=
github.com/prometheus/prometheus v0.44.1-0.20230524110021-37e5249e33e4/go.mod h1:jb6RryShYMiWFAIDoSDvichHUejQuzFl4nFWhMnsvAY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.2-go1.18 h1:ZmiZSZY9Htzn/Ri+vZ5o1snD2inOoqKjezypNqwAgKk=
github.com/redis/rueidis v1.0.2-go1.18/go.mod h1:aJiezBQL+bZKAZ+d7YOuj6xKQhrXvEPBiOfotEhG5R8=
Expand Down Expand Up @@ -895,8 +893,6 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204 h1:W4w5Iph7j32Sf1QFWLJDCqvO0WgZS0jHGID+qnq3wV0=
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204/go.mod h1:STSgpY8M6EKF2G/raUFdbIMf2U9GgYlEjAEHJxjvpAo=
github.com/thanos-io/promql-engine v0.0.0-20230524050847-9a1a33217cf2 h1:bz8GzwMTYV0cRxdgeESmqrRRXE2yH/NiyMD+zVVtWJo=
github.com/thanos-io/promql-engine v0.0.0-20230524050847-9a1a33217cf2/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand All @@ -917,6 +913,10 @@ github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVK
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vinted/prometheus v1.8.2-0.20230605130252-fb29ac940e9f h1:8QsV9dYie4rVwq7Nx8Tn0sgUqW5Zpj2HNltYINaWB3g=
github.com/vinted/prometheus v1.8.2-0.20230605130252-fb29ac940e9f/go.mod h1:jb6RryShYMiWFAIDoSDvichHUejQuzFl4nFWhMnsvAY=
github.com/vinted/promql-engine v0.0.0-20230607131632-52afa29e6b44 h1:DI/OYxoE7ItPyboeLqF9sc9jG46mSSyvKG3SANqbETQ=
github.com/vinted/promql-engine v0.0.0-20230607131632-52afa29e6b44/go.mod h1:dWMfKCnZEUIA25eiUV/FtirPaujoYDEjlZSLnCoaRic=
github.com/vultr/govultr/v2 v2.17.2 h1:gej/rwr91Puc/tgh+j33p/BLR16UrIPnSr+AIwYWZQs=
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d h1:9Z/HiqeGN+LOnmotAMpFEQjuXZ4AGAVFG0rC1laP5Go=
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d/go.mod h1:Fnq3+U51tMkPRMC6Wr7zKGUeFFYX4YjNrNK50iU0fcE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ func TestParseStoreDebugMatchersParam(t *testing.T) {
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: promgate.New(4),
gate: promgate.New(4, fmt.Sprintf("%d", i), prometheus.NewRegistry()),
queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{
Name: "query_range_hist",
}),
Expand Down
5 changes: 5 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
Expand Down Expand Up @@ -145,6 +146,10 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "encode meta file")
}

if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file"))
}

if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestUpload(t *testing.T) {
{
// Full block.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc))
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestUpload(t *testing.T) {
{
// Test Upload is idempotent.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc))
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -207,7 +207,7 @@ func TestUpload(t *testing.T) {
err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.NotOk(t, err)
testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error())
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 4, len(bkt.Objects()))
}
{
// No external labels with UploadPromBlocks.
Expand All @@ -221,7 +221,7 @@ func TestUpload(t *testing.T) {
testutil.Ok(t, err)
err = UploadPromBlock(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 8, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
Expand Down
Loading

0 comments on commit 11b7737

Please sign in to comment.