Skip to content

Commit

Permalink
fix(ha-tracker): set electedAtTime to now when the received replicaDe…
Browse files Browse the repository at this point in the history
…sc ha deletedAt higher than zero (#10443)

* fix(ha-tracker): set electedAtTime to now when the received replicaDesc ha deletedAt higher than zero

Signed-off-by: Nikos Angelopoulos <[email protected]>

* test: add test to validate behaviour

* test: wait until previous entry is removed before updating with the new one to avoid flakyness

* refactor: updateKVStore

* doc: address pr comments , related with the comment

* Update CHANGELOG.md

Co-authored-by: Taylor C <[email protected]>

---------

Signed-off-by: Nikos Angelopoulos <[email protected]>
Co-authored-by: Taylor C <[email protected]>
  • Loading branch information
NickAnge and tacole02 authored Jan 21, 2025
1 parent d55f8d4 commit 9d27b48
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [BUGFIX] MQE: Fix <aggr_over_time> functions with histograms #10400
* [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423
* [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445 #10461
* [BUGFIX] Distributor: Fix edge case at the HA-tracker with memberlist as KVStore, where when a replica in the KVStore is marked as deleted but not yet removed, it fails to update the KVStore. #10443

### Mixin

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func newHaTracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
Help: "Number of elected replicas that failed to be marked for deletion, or deleted.",
}),
}

client, err := kv.NewClient(
cfg.KVStore,
GetReplicaDescCodec(),
Expand Down Expand Up @@ -634,7 +633,10 @@ func (h *defaultHaTracker) updateKVStore(ctx context.Context, userID, cluster, r
electedChanges = desc.ElectedChanges + 1
}
} else {
if desc == nil && electedAtTime == 0 {
if desc == nil || (desc.DeletedAt > 0) {
// if there is no desc in the kvStore , or if the entry in kvStore is marked as deleted but not yet removed,
// set the electedAtTime to avoid being zero
// The second part, (desc.DeletedAt > 0), ensures the replica is elected to "revive" the cluster entry after the previous one was deleted.
electedAtTime = timestamp.FromTime(now)
}
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra
})
}

func waitForHaTrackerCacheEntryRemoval(t require.TestingT, tracker *defaultHaTracker, user string, cluster string, duration time.Duration, tick time.Duration) bool {
condition := assert.Eventually(t, func() bool {
tracker.electedLock.RLock()
defer tracker.electedLock.RUnlock()

info := tracker.clusters[user][cluster]
return info == nil
}, duration, tick)
return condition
}

func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) {
change, err := r1.Merge(r2, false)
if err != nil {
Expand Down Expand Up @@ -319,6 +330,84 @@ func TestHaTrackerWithMemberList(t *testing.T) {
assert.Error(t, err)
}

func TestHaTrackerWithMemberlistWhenReplicaDescIsMarkedDeletedThenKVStoreUpdateIsNotFailing(t *testing.T) {
var config memberlist.KVConfig

const (
cluster = "cluster"
tenant = "tenant"
replica1 = "r1"
replica2 = "r2"
updateTimeout = time.Millisecond * 100
failoverTimeout = 2 * time.Millisecond
failoverTimeoutPlus100ms = failoverTimeout + 100*time.Millisecond
)

flagext.DefaultValues(&config)
ctx := context.Background()

config.Codecs = []codec.Codec{
GetReplicaDescCodec(),
}

memberListSvc := memberlist.NewKVInitService(
&config,
log.NewNopLogger(),
&dnsProviderMock{},
prometheus.NewPedanticRegistry(),
)
require.NoError(t, services.StartAndAwaitRunning(ctx, memberListSvc))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc))
})

tracker, err := newHaTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{
MemberlistKV: memberListSvc.GetMemberlistKV,
}},
UpdateTimeout: updateTimeout,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: failoverTimeout,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, tracker))

t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, tracker))
})

now := time.Now()

// Write the first time.
err = tracker.checkReplica(context.Background(), tenant, cluster, replica1, now)
assert.NoError(t, err)

key := fmt.Sprintf("%s/%s", tenant, cluster)

// Mark the ReplicaDesc as deleted in the KVStore, which will also remove it from the tracker cache.
err = tracker.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
d, ok := in.(*ReplicaDesc)
if !ok || d == nil {
return nil, false, nil
}
d.DeletedAt = timestamp.FromTime(time.Now())
return d, true, nil
})
require.NoError(t, err)

condition := waitForHaTrackerCacheEntryRemoval(t, tracker, tenant, cluster, 2*time.Second, 50*time.Millisecond)
require.True(t, condition)

now = now.Add(failoverTimeoutPlus100ms)
// check replica2
err = tracker.checkReplica(context.Background(), tenant, cluster, replica2, now)
assert.NoError(t, err)

// check replica1
assert.ErrorAs(t, tracker.checkReplica(context.Background(), tenant, cluster, replica1, now), &replicasDidNotMatchError{})
}

func TestHATrackerCacheSyncOnStart(t *testing.T) {
const cluster = "c1"
const replicaOne = "r1"
Expand Down

0 comments on commit 9d27b48

Please sign in to comment.