Skip to content

Commit

Permalink
kvserver: fix flaky flow token return tests
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Oct 11, 2024
1 parent a82fea1 commit ffd5301
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 455 deletions.
205 changes: 35 additions & 170 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3282,139 +3282,10 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) {
})
}

// TestFlowControlQuiescedRangeV2 tests flow token behavior when ranges are
// quiesced. It ensures that we have timely returns of flow tokens even when
// there's no raft traffic to piggyback token returns on top of.
func TestFlowControlQuiescedRangeV2(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{
kvflowcontrol.V2EnabledWhenLeaderV1Encoding,
kvflowcontrol.V2EnabledWhenLeaderV2Encoding,
}, func(t *testing.T, v2EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel) {
ctx := context.Background()
var disableWorkQueueGranting atomic.Bool
var disableFallbackTokenDispatch atomic.Bool
disableWorkQueueGranting.Store(true)
disableFallbackTokenDispatch.Store(true)

settings := cluster.MakeTestingClusterSettings()
// Override metamorphism to allow range quiescence.
kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, false)
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
RaftConfig: base.RaftConfig{
// Suppress timeout-based elections. This test doesn't want to
// deal with leadership changing hands.
RaftElectionTimeoutTicks: 1000000,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return v2EnabledWhenLeaderLevel
},
OverrideTokenDeduction: func(_ kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// This test asserts on the exact values of tracked
// tokens. In non-test code, the tokens deducted are
// a few bytes off (give or take) from the size of
// the proposals. We don't care about such
// differences.
return kvflowcontrol.Tokens(1 << 20 /* 1MiB */)
},
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
return disableWorkQueueGranting.Load()
},
},
RaftTransport: &kvserver.RaftTransportTestingKnobs{
DisableFallbackFlowTokenDispatch: func() bool {
return disableFallbackTokenDispatch.Load()
},
DisablePiggyBackedFlowTokenDispatch: func() bool {
// We'll only test using the fallback token mechanism.
return true
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
h := newFlowControlTestHelperV2(t, tc, v2EnabledWhenLeaderLevel)
h.init()
defer h.close(makeV2EnabledTestFileName(v2EnabledWhenLeaderLevel, "quiesced_range"))

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)

h.comment(`
-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write
-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with
-- no corresponding returns.
`)
h.query(n1, v2FlowTokensQueryStr)

// The range must not quiesce because the leader holds send tokens.
leader := tc.GetRaftLeader(t, roachpb.RKey(k))
require.NotNil(t, leader)
require.False(t, leader.IsQuiescent())

h.comment(`
-- (Allow below-raft admission to proceed. We've disabled the fallback token
-- dispatch mechanism so no tokens are returned yet -- quiesced ranges don't
-- use the piggy-backed token return mechanism since there's no raft traffic.)`)
disableWorkQueueGranting.Store(false)

h.comment(`
-- Flow token metrics from n1 after work gets admitted but fallback token
-- dispatch mechanism is disabled. Deducted elastic tokens from remote stores
-- are yet to be returned. Tokens for the local store are.
`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`-- (Enable the fallback token dispatch mechanism.)`)
disableFallbackTokenDispatch.Store(false)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)

h.comment(`
-- Flow token metrics from n1 after work gets admitted and all elastic tokens
-- are returned through the fallback mechanism.
`)
h.query(n1, v2FlowTokensQueryStr)

// The range eventually quiesces because all the tokens have been returned.
h.comment(`-- (Wait for range to quiesce.)`)
testutils.SucceedsSoon(t, func() error {
if !leader.IsQuiescent() {
return errors.Errorf("%s not quiescent", leader)
}
return nil
})
})
}

// TestFlowControlUnquiescedRangeV2 tests flow token behavior when ranges are
// unquiesced. It's a sort of roundabout test to ensure that flow tokens are
// returned through the raft transport piggybacking mechanism, piggybacking on
// raft heartbeats.
// TestFlowControlUnquiescedRangeV2 tests that flow tokens are reliably returned
// via the normal flow of MsgApp and MsgAppResp messages, with MsgApp pings if
// the admissions are lagging. It also ensures that the range does not quiesce
// until all deducted flow tokens are returned.
func TestFlowControlUnquiescedRangeV2(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -3432,29 +3303,32 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
settings := cluster.MakeTestingClusterSettings()
// Override metamorphism to allow range quiescence.
kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, false)
pinnedLease := kvserver.NewPinnedLeases()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
RaftConfig: base.RaftConfig{
// Suppress timeout-based elections. This test doesn't want to
// deal with leadership changing hands or followers unquiescing
// ranges by calling elections.
// Suppress timeout-based elections. This test doesn't want to deal
// with leadership changing hands.
RaftElectionTimeoutTicks: 1000000,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Pin the lease to the first store to prevent lease and leader
// moves which disrupt this test.
PinnedLeases: pinnedLease,

FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return v2EnabledWhenLeaderLevel
},
OverrideTokenDeduction: func(_ kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// This test asserts on the exact values of tracked
// tokens. In non-test code, the tokens deducted are
// a few bytes off (give or take) from the size of
// the proposals. We don't care about such
// differences.
// This test asserts on the exact values of tracked tokens. In
// non-test code, the tokens deducted are a few bytes off (give
// or take) from the size of the proposals. We don't care about
// such differences.
return kvflowcontrol.Tokens(1 << 20 /* 1MiB */)
},
},
Expand All @@ -3467,8 +3341,7 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
},
RaftTransport: &kvserver.RaftTransportTestingKnobs{
DisableFallbackFlowTokenDispatch: func() bool {
// We'll only test using the piggy-back token mechanism.
return true
return disablePiggybackTokenDispatch.Load()
},
DisablePiggyBackedFlowTokenDispatch: func() bool {
return disablePiggybackTokenDispatch.Load()
Expand All @@ -3480,21 +3353,22 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
desc, err := tc.LookupRange(k)
require.NoError(t, err)
pinnedLease.PinLease(desc.RangeID, tc.GetFirstStoreFromServer(t, 0).StoreID())

tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
h := newFlowControlTestHelperV2(t, tc, v2EnabledWhenLeaderLevel)
h.init()
defer h.close(makeV2EnabledTestFileName(v2EnabledWhenLeaderLevel, "unquiesced_range"))

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)

h.comment(`
-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write
-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with
Expand All @@ -3508,37 +3382,28 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
require.False(t, leader.IsQuiescent())

h.comment(`
-- (Allow below-raft admission to proceed. We've disabled the fallback token
-- dispatch mechanism so no tokens are returned yet -- quiesced ranges don't
-- use the piggy-backed token return mechanism since there's no raft traffic.)`)
-- (Allow below-raft admission to proceed. We've disabled the piggybacked token
-- return mechanism so no tokens are returned via this path. But the tokens will
-- be returned anyway because the range is not quiesced and keeps pinging.)`)
disableWorkQueueGranting.Store(false)

h.comment(`
-- Flow token metrics from n1 after work gets admitted but fallback token
-- dispatch mechanism is disabled. Deducted elastic tokens from remote stores
-- are yet to be returned. Tokens for the local store are.
`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`-- (Enable the piggyback token dispatch mechanism.)`)
disablePiggybackTokenDispatch.Store(false)

h.comment(`-- (Unquiesce the range.)`)
testutils.SucceedsSoon(t, func() error {
_, err := tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndPropose()
require.NoError(t, err)
return h.checkAllTokensReturned(ctx, 3, 0 /* serverIdx */)
})
h.comment(`-- (Issuing another 1x1MiB 3x elastic write.)`)
disableWorkQueueGranting.Store(true)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- Flow token metrics from n1 after work gets admitted and all elastic tokens
-- are returned through the piggyback mechanism.
`)
-- (Allow below-raft admission to proceed. We've enabled the piggybacked token
-- return mechanism so tokens are returned either via this path, or the normal
-- MsgAppResp flow, depending on which is exercised first.)`)
disablePiggybackTokenDispatch.Store(false)
disableWorkQueueGranting.Store(false)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.query(n1, v2FlowTokensQueryStr)

// The range eventually quiesces because all the tokens have been returned.
h.comment(`-- (Wait for range to quiesce.)`)
h.comment(`-- (Now the range can quiesce. Wait for it.)`)
testutils.SucceedsSoon(t, func() error {
if !leader.IsQuiescent() {
return errors.Errorf("%s not quiescent", leader)
Expand Down
117 changes: 0 additions & 117 deletions pkg/kv/kvserver/testdata/flow_control_integration_v2/quiesced_range

This file was deleted.

Loading

0 comments on commit ffd5301

Please sign in to comment.