Skip to content

Commit

Permalink
Merge pull request #107 from vinted/vinted_fixes_0035
Browse files Browse the repository at this point in the history
*: reduce CPU usage
  • Loading branch information
GiedriusS authored May 30, 2024
2 parents 958e626 + 4e3ccd6 commit a8ee89f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 11 deletions.
12 changes: 7 additions & 5 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,18 @@ func NewMultiTSDB(
type localClient struct {
storepb.StoreClient
store *store.TSDBStore
desc string
}

func newLocalClient(c storepb.StoreClient, store *store.TSDBStore) *localClient {
mint, maxt := store.TimeRange()
return &localClient{
StoreClient: c,
store: store,
desc: fmt.Sprintf(
"LabelSets: %v MinTime: %d MaxTime: %d",
labelpb.PromLabelSetsToString(labelpb.ZLabelSetsToPromLabelSets(store.LabelSet()...)), mint, maxt,
),
}
}

Expand Down Expand Up @@ -133,11 +139,7 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo {
}

func (l *localClient) String() string {
mint, maxt := l.store.TimeRange()
return fmt.Sprintf(
"LabelSets: %v MinTime: %d MaxTime: %d",
labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt,
)
return l.desc
}

func (l *localClient) Addr() (string, bool) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func newAsyncRespSet(
var closeSeries context.CancelFunc

storeAddr, isLocalStore := st.Addr()
storeID := labelpb.PromLabelSetsToString(st.LabelSets())
storeID := st.String()
if storeID == "" {
storeID = "Store Gateway"
}
Expand Down Expand Up @@ -556,7 +556,7 @@ func newAsyncRespSet(
return newLazyRespSet(
span,
frameTimeout,
st.String(),
storeID,
st.LabelSets(),
closeSeries,
cl,
Expand All @@ -568,7 +568,7 @@ func newAsyncRespSet(
return newEagerRespSet(
span,
frameTimeout,
st.String(),
storeID,
st.LabelSets(),
closeSeries,
cl,
Expand Down
85 changes: 82 additions & 3 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"bytes"
"encoding/binary"
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"

"github.com/cespare/xxhash/v2"
"github.com/gogo/protobuf/types"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -374,9 +378,12 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
return res, nil
}

// MatchersToPromMatchers returns Prometheus matchers from proto matchers.
// NOTE: It allocates memory.
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
type matchersCache struct {
hasherPool sync.Pool
c *lru.Cache
}

func (mc *matchersCache) convertMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
var t labels.MatchType
Expand All @@ -402,6 +409,78 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
return res, nil
}

func copyMatcherSlice(in []*labels.Matcher) []*labels.Matcher {
out := make([]*labels.Matcher, 0, len(in))
out = append(out, in...)

return out
}

func (mc *matchersCache) getOrSetMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
var hasher *xxhash.Digest

hasherPooled := mc.hasherPool.Get()
if hasherPooled == nil {
hasher = xxhash.New()
} else {
hasher = hasherPooled.(*xxhash.Digest)
}
hasher.Reset()
defer mc.hasherPool.Put(hasher)

for _, m := range ms {
_, _ = hasher.WriteString(m.Name)
_, _ = hasher.WriteString(m.Value)
// NOTE(GiedriusS): these can only be 0-4 so we can safely cast to byte.
_, _ = hasher.Write([]byte{byte(m.Type)})
}

h := hasher.Sum64()

if val, ok := mc.c.Get(h); ok {
return copyMatcherSlice(val.([]*labels.Matcher)), nil
}

convert, err := mc.convertMatchers(ms...)
if err != nil {
return nil, fmt.Errorf("converting matchers: %w", err)
}

mc.c.Add(h, convert)
return copyMatcherSlice(convert), nil
}

func mustNewLRU() *lru.Cache {
var lruSize int

lruSizeParam := os.Getenv("THANOS_LRU_SIZE")
if lruSizeParam == "" {
lruSize = 1000
} else {
parsedLRUSize, err := strconv.Atoi(lruSizeParam)
if err != nil {
panic(fmt.Sprintf("parsing %s: %v", lruSizeParam, err))
}
lruSize = parsedLRUSize
}

c, err := lru.New(lruSize)
if err != nil {
panic(err)
}
return c
}

var mc *matchersCache = &matchersCache{
c: mustNewLRU(),
}

// MatchersToPromMatchers returns Prometheus matchers from proto matchers.
// NOTE: It allocates memory.
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
return mc.getOrSetMatchers(ms...)
}

// MatchersToString converts label matchers to string format.
// String should be parsable as a valid PromQL query metric selector.
func MatchersToString(ms ...LabelMatcher) string {
Expand Down

0 comments on commit a8ee89f

Please sign in to comment.