diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a45248af6bbd3..64f89fc696b9f 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -124,8 +124,8 @@ func (cfg *Config) Validate() error { return fmt.Errorf("compactor.delete-request-store should be configured when retention is enabled") } - if cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 { - return fmt.Errorf("interval for applying retention should either be set to a 0 or a multiple of compaction interval") + if cfg.ApplyRetentionInterval == 0 { + cfg.ApplyRetentionInterval = cfg.CompactionInterval } if err := config.ValidatePathPrefix(cfg.DeleteRequestStoreKeyPrefix); err != nil { @@ -153,6 +153,7 @@ type Compactor struct { wg sync.WaitGroup indexCompactors map[string]IndexCompactor schemaConfig config.SchemaConfig + tableLocker *tableLocker // Ring used for running a single compactor ringLifecycler *ring.BasicLifecycler @@ -193,6 +194,7 @@ func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.Objec ringPollPeriod: 5 * time.Second, indexCompactors: map[string]IndexCompactor{}, schemaConfig: schemaConfig, + tableLocker: newTableLocker(), } ringStore, err := kv.NewClient( @@ -503,41 +505,52 @@ func (c *Compactor) runCompactions(ctx context.Context) { } }() - lastRetentionRunAt := time.Unix(0, 0) - runCompaction := func() { - applyRetention := false - if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval { - level.Info(util_log.Logger).Log("msg", "applying retention with compaction") - applyRetention = true - } + // do the initial compaction + if err := c.RunCompaction(ctx, false); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to run compaction", err) + } - err := c.RunCompaction(ctx, applyRetention) - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) - } + c.wg.Add(1) + go func() { + defer c.wg.Done() - if applyRetention { - lastRetentionRunAt = time.Now() + ticker := time.NewTicker(c.cfg.CompactionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.RunCompaction(ctx, false); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to run compaction", err) + } + case <-ctx.Done(): + return + } } - } + }() c.wg.Add(1) go func() { defer c.wg.Done() - runCompaction() + if err := c.RunCompaction(ctx, true); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to apply retention", err) + } - ticker := time.NewTicker(c.cfg.CompactionInterval) + ticker := time.NewTicker(c.cfg.ApplyRetentionInterval) defer ticker.Stop() for { select { case <-ticker.C: - runCompaction() + if err := c.RunCompaction(ctx, true); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to apply retention", err) + } case <-ctx.Done(): return } } }() + if c.cfg.RetentionEnabled { for _, container := range c.storeContainers { c.wg.Add(1) @@ -576,6 +589,37 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRet return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String()) } + for { + locked, lockWaiterChan := c.tableLocker.lockTable(tableName) + if locked { + break + } + // do not wait for lock to be released if we are only compacting the table since + // compaction should happen more frequently than retention and retention anyway compacts un-compacted files as well. + if !applyRetention { + hasUncompactedIndex, err := tableHasUncompactedIndex(ctx, tableName, sc.indexStorageClient) + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to check if table has uncompacted index", "table_name", tableName) + hasUncompactedIndex = true + } + + if hasUncompactedIndex { + c.metrics.skippedCompactingLockedTables.Inc() + level.Warn(util_log.Logger).Log("msg", "skipped compacting table which likely has uncompacted index since it is locked by retention", "table_name", tableName) + } + return nil + } + + // we are applying retention and processing delete requests so, + // wait for lock to be released since we can't mark delete requests as processed without checking all the tables + select { + case <-lockWaiterChan: + case <-ctx.Done(): + return nil + } + } + defer c.tableLocker.unlockTable(tableName) + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), sc.indexStorageClient, indexCompactor, schemaCfg, sc.tableMarker, c.expirationChecker, c.cfg.UploadParallelism) if err != nil { @@ -601,7 +645,7 @@ func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor Inde c.indexCompactors[indexType] = indexCompactor } -func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error { +func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) (err error) { status := statusSuccess start := time.Now() @@ -610,11 +654,15 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro } defer func() { - c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc() + if err != nil { + status = statusFailure + } + withRetentionLabelValue := fmt.Sprintf("%v", applyRetention) + c.metrics.compactTablesOperationTotal.WithLabelValues(status, withRetentionLabelValue).Inc() runtime := time.Since(start) if status == statusSuccess { - c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds()) - c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime() + c.metrics.compactTablesOperationDurationSeconds.WithLabelValues(withRetentionLabelValue).Set(runtime.Seconds()) + c.metrics.compactTablesOperationLastSuccess.WithLabelValues(withRetentionLabelValue).SetToCurrentTime() if applyRetention { c.metrics.applyRetentionLastSuccess.SetToCurrentTime() } @@ -627,7 +675,7 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro c.expirationChecker.MarkPhaseFailed() } } - if runtime > c.cfg.CompactionInterval { + if !applyRetention && runtime > c.cfg.CompactionInterval { level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("last compaction took %s which is longer than the compaction interval of %s, this can lead to duplicate compactors running if not running a standalone compactor instance.", runtime, c.cfg.CompactionInterval)) } }() @@ -644,7 +692,6 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro sc.indexStorageClient.RefreshIndexTableNamesCache(ctx) tbls, err := sc.indexStorageClient.ListTables(ctx) if err != nil { - status = statusFailure return fmt.Errorf("failed to list tables: %w", err) } @@ -721,12 +768,15 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro for i := 0; i < c.cfg.MaxCompactionParallelism; i++ { err := <-errChan if err != nil && firstErr == nil { - status = statusFailure firstErr = err } } - return firstErr + if firstErr != nil { + return firstErr + } + + return ctx.Err() } type expirationChecker struct { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 854339ca6ecaf..6913956aaa70e 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -18,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/util/constants" loki_net "github.com/grafana/loki/pkg/util/net" + "github.com/grafana/loki/pkg/validation" ) const indexTablePrefix = "table_" @@ -41,7 +44,8 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob cfg := Config{} flagext.DefaultValues(&cfg) cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName) - cfg.RetentionEnabled = false + cfg.RetentionEnabled = true + cfg.DeleteRequestStore = periodConfigs[len(periodConfigs)-1].ObjectType cfg.CompactorRing.InstanceAddr = localhost if loopbackIFace, err := loki_net.LoopbackInterfaceName(); err == nil { @@ -50,9 +54,16 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob require.NoError(t, cfg.Validate()) - c, err := NewCompactor(cfg, objectClients, nil, config.SchemaConfig{ + defaultLimits := validation.Limits{} + flagext.DefaultValues(&defaultLimits) + require.NoError(t, defaultLimits.RetentionPeriod.Set("30d")) + + overrides, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + c, err := NewCompactor(cfg, objectClients, objectClients[periodConfigs[len(periodConfigs)-1].From], config.SchemaConfig{ Configs: periodConfigs, - }, nil, nil, constants.Loki) + }, overrides, prometheus.NewPedanticRegistry(), constants.Loki) require.NoError(t, err) c.RegisterIndexCompactor("dummy", testIndexCompactor{}) @@ -292,3 +303,136 @@ func Test_tableSort(t *testing.T) { sortTablesByRange(intervals) require.Equal(t, []string{"index_19195", "index_19192", "index_19191"}, intervals) } + +func TestCompactor_TableLocking(t *testing.T) { + commonDBsConfig := IndexesConfig{NumUnCompactedFiles: 5} + perUserDBsConfig := PerUserIndexesConfig{} + + daySeconds := int64(24 * time.Hour / time.Second) + tableNumEnd := time.Now().Unix() / daySeconds + tableNumStart := tableNumEnd - 5 + + setupCompactorAndIndex := func(tempDir string) *Compactor { + tablesPath := filepath.Join(tempDir, "index") + + periodConfigs := []config.PeriodConfig{ + { + From: config.DayTime{Time: model.Time(0)}, + IndexType: "dummy", + ObjectType: "fs_01", + IndexTables: config.IndexPeriodicTableConfig{ + PathPrefix: "index/", + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: indexTablePrefix, + Period: config.ObjectStorageIndexRequiredPeriod, + }}, + }, + } + + for i := tableNumStart; i <= tableNumEnd; i++ { + SetupTable(t, filepath.Join(tablesPath, fmt.Sprintf("%s%d", indexTablePrefix, i)), IndexesConfig{NumUnCompactedFiles: 5}, PerUserIndexesConfig{}) + } + + var ( + objectClients = map[config.DayTime]client.ObjectClient{} + err error + ) + objectClients[periodConfigs[0].From], err = local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) + require.NoError(t, err) + + return setupTestCompactor(t, objectClients, periodConfigs, tempDir) + } + + for _, tc := range []struct { + name string + lockTable string + applyRetention bool + + compactionShouldTimeout bool + }{ + { + name: "no table locked - not applying retention", + }, + { + name: "no table locked - applying retention", + applyRetention: true, + }, + { + name: "first table locked - not applying retention", + lockTable: fmt.Sprintf("%s%d", indexTablePrefix, tableNumEnd), + }, + { + name: "first table locked - applying retention", + lockTable: fmt.Sprintf("%s%d", indexTablePrefix, tableNumEnd), + applyRetention: true, + compactionShouldTimeout: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + tempDir := t.TempDir() + tablesPath := filepath.Join(tempDir, "index") + compactor := setupCompactorAndIndex(tempDir) + + // run the compaction twice, 2nd time without any table locking + for n := 1; n <= 2; n++ { + t.Run(fmt.Sprintf("%d", n), func(t *testing.T) { + // lock table only for the first run + if n == 1 && tc.lockTable != "" { + locked, _ := compactor.tableLocker.lockTable(tc.lockTable) + require.True(t, locked) + + defer compactor.tableLocker.unlockTable(tc.lockTable) + } + + // set a timeout so that retention does not get blocked forever on acquiring table lock. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := compactor.RunCompaction(ctx, tc.applyRetention) + // compaction should not timeout after first run since we won't be locking the table + if n == 1 && tc.compactionShouldTimeout { + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusFailure, "true"))) + require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusFailure, "false"))) + return + } + require.NoError(t, err) + + if n > 1 && tc.compactionShouldTimeout { + // this should be the first successful run if compaction was expected to be timeout out during first run + require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", tc.applyRetention)))) + } else { + // else it should have succeeded during all the n runs + require.Equal(t, float64(n), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", tc.applyRetention)))) + } + require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", !tc.applyRetention)))) + + // if the table was locked and compaction ran without retention then only locked table should have been skipped + if tc.lockTable != "" { + if tc.applyRetention { + require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.skippedCompactingLockedTables)) + } else { + require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.skippedCompactingLockedTables)) + } + } + + for tableNum := tableNumStart; tableNum <= tableNumEnd; tableNum++ { + name := fmt.Sprintf("%s%d", indexTablePrefix, tableNum) + files, err := os.ReadDir(filepath.Join(tablesPath, name)) + require.NoError(t, err) + + if n == 1 && name == tc.lockTable { + // locked table should not be compacted during first run + require.Len(t, files, 5) + } else { + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + + verifyCompactedIndexTable(t, commonDBsConfig, perUserDBsConfig, filepath.Join(tablesPath, name)) + } + } + }) + } + }) + } +} diff --git a/pkg/compactor/metrics.go b/pkg/compactor/metrics.go index b81ae2ab51da4..7cbf404c81848 100644 --- a/pkg/compactor/metrics.go +++ b/pkg/compactor/metrics.go @@ -8,14 +8,17 @@ import ( const ( statusFailure = "failure" statusSuccess = "success" + + lblWithRetention = "with_retention" ) type metrics struct { compactTablesOperationTotal *prometheus.CounterVec - compactTablesOperationDurationSeconds prometheus.Gauge - compactTablesOperationLastSuccess prometheus.Gauge + compactTablesOperationDurationSeconds *prometheus.GaugeVec + compactTablesOperationLastSuccess *prometheus.GaugeVec applyRetentionLastSuccess prometheus.Gauge compactorRunning prometheus.Gauge + skippedCompactingLockedTables prometheus.Counter } func newMetrics(r prometheus.Registerer) *metrics { @@ -23,18 +26,18 @@ func newMetrics(r prometheus.Registerer) *metrics { compactTablesOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", Name: "compact_tables_operation_total", - Help: "Total number of tables compaction done by status", - }, []string{"status"}), - compactTablesOperationDurationSeconds: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Help: "Total number of tables compaction done by status and with/without retention", + }, []string{"status", lblWithRetention}), + compactTablesOperationDurationSeconds: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ Namespace: "loki_boltdb_shipper", Name: "compact_tables_operation_duration_seconds", - Help: "Time (in seconds) spent in compacting all the tables", - }), - compactTablesOperationLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Help: "Time (in seconds) spent in compacting all the tables with/without retention", + }, []string{lblWithRetention}), + compactTablesOperationLastSuccess: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ Namespace: "loki_boltdb_shipper", Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", - }), + }, []string{lblWithRetention}), applyRetentionLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: "loki_boltdb_shipper", Name: "apply_retention_last_successful_run_timestamp_seconds", @@ -45,6 +48,11 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compactor_running", Help: "Value will be 1 if compactor is currently running on this instance", }), + skippedCompactingLockedTables: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki_compactor", + Name: "skipped_compacting_locked_tables_total", + Help: "Count of uncompacted tables being skipped due to them being locked by retention", + }), } return &m diff --git a/pkg/compactor/table.go b/pkg/compactor/table.go index 92059a7c15e29..b7b94627c7415 100644 --- a/pkg/compactor/table.go +++ b/pkg/compactor/table.go @@ -265,3 +265,11 @@ func (t *table) openCompactedIndexForRetention(idxSet *indexSet) error { return nil } + +// tableHasUncompactedIndex returns true if we have more than "1" common index files. +// We are checking for more than "1" because earlier boltdb-shipper index type did not have per tenant index so there would be only common index files. +// In case of per tenant index, it is okay to consider it compacted since having just 1 uncompacted index file for a while should be fine. +func tableHasUncompactedIndex(ctx context.Context, tableName string, indexStorageClient storage.Client) (bool, error) { + commonIndexFiles, _, err := indexStorageClient.ListFiles(ctx, tableName, false) + return len(commonIndexFiles) > 1, err +} diff --git a/pkg/compactor/table_locker.go b/pkg/compactor/table_locker.go new file mode 100644 index 0000000000000..bce818a5d2b62 --- /dev/null +++ b/pkg/compactor/table_locker.go @@ -0,0 +1,52 @@ +package compactor + +import "sync" + +type lockWaiterChan chan struct{} + +type tableLocker struct { + lockedTables map[string]lockWaiterChan + lockedTablesMtx sync.RWMutex +} + +func newTableLocker() *tableLocker { + return &tableLocker{ + lockedTables: map[string]lockWaiterChan{}, + } +} + +// lockTable attempts to lock a table. It returns true if the lock gets acquired for the caller. +// It also returns a channel which the caller can watch to detect unlocking of table if it was already locked by some other caller. +func (t *tableLocker) lockTable(tableName string) (bool, <-chan struct{}) { + locked := false + + t.lockedTablesMtx.RLock() + c, ok := t.lockedTables[tableName] + t.lockedTablesMtx.RUnlock() + if ok { + return false, c + } + + t.lockedTablesMtx.Lock() + defer t.lockedTablesMtx.Unlock() + + c, ok = t.lockedTables[tableName] + if !ok { + t.lockedTables[tableName] = make(chan struct{}) + c = t.lockedTables[tableName] + locked = true + } + + return locked, c +} + +func (t *tableLocker) unlockTable(tableName string) { + t.lockedTablesMtx.Lock() + defer t.lockedTablesMtx.Unlock() + + c, ok := t.lockedTables[tableName] + if ok { + close(c) + } + delete(t.lockedTables, tableName) +}