Skip to content

Commit

Permalink
Merge pull request #101 from vinted/pull_compactor_fixes
Browse files Browse the repository at this point in the history
*: pull compactor fixes
  • Loading branch information
GiedriusS authored May 6, 2024
2 parents cd2c681 + 6906f18 commit 23078f0
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 4 deletions.
9 changes: 8 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,20 @@ func runCompact(
conf.blockFilesConcurrency,
conf.compactBlocksFetchConcurrency,
)
var planner compact.Planner

tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
largeIndexFilterPlanner := compact.WithLargeTotalIndexSizeFilter(
tsdbPlanner,
insBkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
if enableVerticalCompaction {
planner = compact.WithVerticalCompactionDownsampleFilter(largeIndexFilterPlanner, insBkt, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.DownsampleVerticalCompactionNoCompactReason))
} else {
planner = largeIndexFilterPlanner
}
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
logger,
Expand Down
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const (
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
// DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average.
DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
74 changes: 71 additions & 3 deletions pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,67 @@ type largeTotalIndexSizeFilter struct {

var _ Planner = &largeTotalIndexSizeFilter{}

type verticalCompactionDownsampleFilter struct {
bkt objstore.Bucket
markedForNoCompact prometheus.Counter

*largeTotalIndexSizeFilter
}

var _ Planner = &verticalCompactionDownsampleFilter{}

func WithVerticalCompactionDownsampleFilter(with *largeTotalIndexSizeFilter, bkt objstore.Bucket, markedForNoCompact prometheus.Counter) Planner {
return &verticalCompactionDownsampleFilter{
markedForNoCompact: markedForNoCompact,
bkt: bkt,
largeTotalIndexSizeFilter: with,
}
}

func (v *verticalCompactionDownsampleFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) {
noCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, 0)
PlanLoop:
for {
plan, err := v.plan(ctx, noCompactMarked, metasByMinTime)
if err != nil {
return nil, err
}

if len(selectOverlappingMetas(plan)) == 0 {
return plan, nil
}

// If we have downsampled blocks, we need to mark them as no compact because it's impossible to do that with vertical compaction.
// Technically, the resolution is part of the group key but do not attach ourselves to that level of detail.
var marked = false
for _, m := range plan {
if m.Thanos.Downsample.Resolution == 0 {
continue
}
if err := block.MarkForNoCompact(
ctx,
v.logger,
v.bkt,
m.ULID,
metadata.DownsampleVerticalCompactionNoCompactReason,
"verticalCompactionDownsampleFilter: Downsampled block, see https://github.com/thanos-io/thanos/issues/6775",
v.markedForNoCompact,
); err != nil {
return nil, errors.Wrapf(err, "mark %v for no compaction", m.ULID.String())
}
noCompactMarked[m.ULID] = &metadata.NoCompactMark{ID: m.ULID, Version: metadata.NoCompactMarkVersion1}
marked = true
}

if marked {
continue PlanLoop
}

return plan, nil

}
}

// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size.
// When found, it marks block for no compaction by placing no-compact-mark.json and updating cache.
// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes.
Expand All @@ -243,16 +304,19 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket,
return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) {
func (t *largeTotalIndexSizeFilter) plan(ctx context.Context, extraNoCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := t.noCompBlocksFunc()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)+len(extraNoCompactMarked))
for k, v := range noCompactMarked {
copiedNoCompactMarked[k] = v
}
for k, v := range extraNoCompactMarked {
copiedNoCompactMarked[k] = v
}

PlanLoop:
for {
plan, err := t.plan(copiedNoCompactMarked, metasByMinTime)
plan, err := t.tsdbBasedPlanner.plan(copiedNoCompactMarked, metasByMinTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,3 +367,7 @@ PlanLoop:
return plan, nil
}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) {
return t.plan(ctx, nil, metasByMinTime)
}
77 changes: 77 additions & 0 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,80 @@ func TestCompactorDownsampleIgnoresMarked(t *testing.T) {
testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics()))

}

// TestCompactorIssue6775 tests that the compactor does not crash when
// compacting 5m downsampled blocks with some overlap.
func TestCompactorIssue6775(t *testing.T) {
const minTime = 1710374400014
const maxTime = 1711584000000

logger := log.NewNopLogger()
e, err := e2e.NewDockerEnvironment("c-issue6775")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

const bucket = "compact-test"
m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(logger,
e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed")
testutil.Ok(t, err)

baseBlockDesc := blockDesc{
series: []labels.Labels{
labels.FromStrings("z", "1", "b", "2"),
labels.FromStrings("z", "1", "b", "5"),
},
extLset: labels.FromStrings("case", "downsampled-block-with-overlap"),
mint: minTime,
maxt: maxTime,
}

for i := 0; i < 2; i++ {
rawBlockID, err := baseBlockDesc.Create(context.Background(), dir, 0, metadata.NoneFunc, 1200+i)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, rawBlockID.String()), rawBlockID.String()))
}

// Downsample them first.
bds := e2ethanos.NewToolsBucketDownsample(e, "downsample", client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()),
})
testutil.Ok(t, bds.Start())

// NOTE(GiedriusS): can't use WaitSumMetrics here because the e2e library doesn't
// work well with histograms.
testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stderr), 1*time.Second, make(<-chan struct{}), func() (rerr error) {
resp, err := http.Get(fmt.Sprintf("http://%s/metrics", bds.Endpoint("http")))
if err != nil {
return fmt.Errorf("getting metrics: %w", err)
}
defer runutil.CloseWithErrCapture(&rerr, resp.Body, "close body")

b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("reading metrics: %w", err)
}

if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{group="0@14846485652960182170"} 2`)) {
return fmt.Errorf("failed to find the right downsampling metric")
}

return nil
}))

testutil.Ok(t, bds.Stop())

// Run the compactor.
c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()),
}, nil, "--compact.enable-vertical-compaction")
testutil.Ok(t, e2e.StartAndWaitReady(c))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics()))
}
30 changes: 30 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,3 +1217,33 @@ func NewRedis(e e2e.Environment, name string) e2e.Runnable {
},
)
}

func NewToolsBucketDownsample(e e2e.Environment, name string, bucketConfig client.BucketConfig) *e2eobs.Observable {
f := e.Runnable(fmt.Sprintf("downsampler-%s", name)).
WithPorts(map[string]int{"http": 8080}).
Future()

bktConfigBytes, err := yaml.Marshal(bucketConfig)
if err != nil {
return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "generate store config file: %v", bucketConfig))}
}

args := []string{"bucket", "downsample"}

args = append(args, e2e.BuildArgs(map[string]string{
"--http-address": ":8080",
"--log.level": "debug",
"--objstore.config": string(bktConfigBytes),
"--data-dir": f.InternalDir(),
})...)

return e2eobs.AsObservable(f.Init(
e2e.StartOptions{
Image: DefaultImage(),
Command: e2e.NewCommand("tools", args...),
User: strconv.Itoa(os.Getuid()),
Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200),
WaitReadyBackoff: &defaultBackoffConfig,
},
), "http")
}

0 comments on commit 23078f0

Please sign in to comment.