From cdd155dc06d42fa569ade72b6781db845d452053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 3 Feb 2025 13:38:37 +0200 Subject: [PATCH] store: lock around iterating over s.blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hold a lock around s.blocks when iterating over it. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 73 +++++++++++++++++++--------------- pkg/store/bucket_test.go | 5 ++- test/e2e/store_gateway_test.go | 41 ++++++++++++------- 3 files changed, 71 insertions(+), 48 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index eec1de1005..acd91dd431 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -670,6 +670,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { go func() { for meta := range blockc { if err := s.addBlock(ctx, meta); err != nil { + level.Warn(s.logger).Log("msg", "adding block failed", "err", err, "id", meta.ULID.String()) continue } } @@ -694,17 +695,32 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return metaFetchErr } + var cleanupBlocks []*bucketBlock + s.mtx.RLock() + keys := make([]ulid.ULID, 0, len(s.blocks)) + for k := range s.blocks { + keys = append(keys, k) + } + s.mtx.RUnlock() + // Drop all blocks that are no longer present in the bucket. - for id := range s.blocks { + for _, id := range keys { if _, ok := metas[id]; ok { continue } - if err := s.removeBlock(id); err != nil { - level.Warn(s.logger).Log("msg", "drop of outdated block failed", "block", id, "err", err) - s.metrics.blockDropFailures.Inc() - } - level.Info(s.logger).Log("msg", "dropped outdated block", "block", id) + + s.mtx.Lock() + b := s.blocks[id] + lset := labels.FromMap(b.meta.Thanos.Labels) + s.blockSets[lset.Hash()].remove(id) + delete(s.blocks, id) + s.mtx.Unlock() + + s.metrics.blocksLoaded.Dec() s.metrics.blockDrops.Inc() + cleanupBlocks = append(cleanupBlocks, b) + + level.Info(s.logger).Log("msg", "dropped outdated block", "block", id) } // Sync advertise labels. @@ -717,6 +733,25 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0 }) s.mtx.Unlock() + + go func() { + for _, b := range cleanupBlocks { + var errs prometheus.MultiError + + errs.Append(b.Close()) + + if b.dir != "" { + errs.Append(os.RemoveAll(b.dir)) + } + + if len(errs) == 0 { + return + } + + level.Warn(s.logger).Log("msg", "close of outdated block failed", "block", b.meta.ULID.String(), "err", errs.Error()) + s.metrics.blockDropFailures.Inc() + } + }() return nil } @@ -849,32 +884,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er return nil } -func (s *BucketStore) removeBlock(id ulid.ULID) error { - s.mtx.Lock() - b, ok := s.blocks[id] - if ok { - lset := labels.FromMap(b.meta.Thanos.Labels) - s.blockSets[lset.Hash()].remove(id) - delete(s.blocks, id) - } - s.mtx.Unlock() - - if !ok { - return nil - } - - s.metrics.blocksLoaded.Dec() - if err := b.Close(); err != nil { - return errors.Wrap(err, "close block") - } - - if b.dir == "" { - return nil - } - - return os.RemoveAll(b.dir) -} - // TimeRange returns the minimum and maximum timestamp of data available in the store. func (s *BucketStore) TimeRange() (mint, maxt int64) { s.mtx.RLock() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8c5b5c61ee..9bdf365807 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1780,7 +1780,10 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Equals(t, numSeries, len(srv.SeriesSet)) }) t.Run("remove second block. Cache stays. Ask for first again.", func(t *testing.T) { - testutil.Ok(t, store.removeBlock(b2.meta.ULID)) + b, _ := store.blocks[b2.meta.ULID] + lset := labels.FromMap(b.meta.Thanos.Labels) + store.blockSets[lset.Hash()].remove(b2.meta.ULID) + delete(store.blocks, b2.meta.ULID) srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, store.Series(&storepb.SeriesRequest{ diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 912712f0a8..2185f6b0c1 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cortexproject/promqlsmith" + "github.com/efficientgo/core/backoff" "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" @@ -816,7 +817,13 @@ metafile_content_ttl: 0s` // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. for _, st := range []*e2eobs.Observable{store1, store2, store3} { t.Run(st.Name(), func(t *testing.T) { - testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_blocks_meta_synced"}, e2emon.WaitMissingMetrics(), e2emon.WithWaitBackoff( + &backoff.Config{ + Min: 1 * time.Second, + Max: 10 * time.Second, + MaxRetries: 30, + }, + ))) testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) @@ -826,23 +833,27 @@ metafile_content_ttl: 0s` } t.Run("query with groupcache loading from object storage", func(t *testing.T) { - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, - time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, - []model.Metric{ - { - "a": "1", - "b": "2", - "ext1": "value1", - "replica": "1", + for i := 0; i < 3; i++ { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, }, - }, - ) + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + } for _, st := range []*e2eobs.Observable{store1, store2, store3} { - testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`})) - testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks")))) + t.Run(st.Name(), func(t *testing.T) { + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`})) + testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks")))) + }) } })