Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add TestFlowControlSendQueueManyInflight test #135331

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4917,6 +4917,157 @@ ORDER BY streams DESC;
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

// TODO(kvoli): Add the following tests which exercise interesting events while
// send tokens are exhausted on a partial number, or on all streams:
// - TestFlowControlSendQueueRangeSplitMerge
// - TestFlowControlSendQueueTransferLease
// - TestFlowControlSendQueueRaftMembershipRemoveSelf
// - TestFlowControlSendQueueRaftMembership
// - TestFlowControlSendQueueRaftSnapshot
// - TestFlowControlSendQueueLeaderNotLeaseholder
// - TestFlowControlSendQueueGranterAdmitOneByOne

// TestFlowControlSendQueueManyInflight exercises send queue formation,
// prevention and quickly draining 1k+ entries tracked in the send queue, in
// order to exercise a raft inflight tracking with a large number of inflight
// entries.
func TestFlowControlSendQueueManyInflight(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var disableWorkQueueGranting atomic.Bool
disableWorkQueueGranting.Store(false)
var tokenDeduction atomic.Int64
tokenDeduction.Store(1 /* 1b */)
var noopWaitForEval atomic.Bool

settings := cluster.MakeTestingClusterSettings()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
return kvflowcontrol.Tokens(tokenDeduction.Load())
},
OverrideBypassAdmitWaitForEval: func(ctx context.Context) (bypass bool, waited bool) {
bypassAndWaited := noopWaitForEval.Load()
if bypassAndWaited {
return true, true
}
if !isTestGeneratedPut(ctx) {
return true, false
}
return false, false
},
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
return disableWorkQueueGranting.Load()
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_many_inflight")

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
desc, err := tc.LookupRange(k)
require.NoError(t, err)

n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked,
-- using a single 16 MiB (deduction, the write itself is small) write. Then,
-- we will write a thousand or so entries (@ 4KiB deduction) which should be
-- queued towards one of the replica send streams, while the other has a send
-- queue prevented from forming. Lastly, we will unblock admission and stress
-- the raft in-flights tracker as the queue is drained.`)
h.comment(`
-- Initial per-store tokens available from n1.
`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(`-- (Blocking below-raft admission on [n1,n2,n3].)`)
disableWorkQueueGranting.Store(true)

h.comment(`-- (Issuing 16MiB regular write that's not admitted.)`)
tokenDeduction.Store(16 << 20)
h.put(contextWithTestGeneratedPut(ctx), k, 1 /* 16 MiB deducted */, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 48<<20 /* 48MiB */, 0 /* serverIdx */)

h.comment(`
-- Per-store tokens available from n1, these should reflect the prior
-- large write.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, these should also reflect the
-- large write.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Enabling wait-for-eval bypass.)`)
noopWaitForEval.Store(true)
h.comment(`-- (Issuing 1024x4KiB(=4MiB) regular writes that are not admitted.)`)
// We issue exactly 4 MiB worth of writes by issuing 1024 writes of 4KiB
// each.
tokenDeduction.Store(4 << 10 /* 4KiB */)
for i := 0; i < 1024; i++ {
h.put(contextWithTestGeneratedPut(ctx), k, 1, admissionpb.NormalPri)
}
h.comment(`-- (Disabling wait-for-eval bypass.)`)
noopWaitForEval.Store(false)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 56<<20 /* 48+2*4=56MiB */, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 4<<20 /* 1024*4KiB=4MiB expSize */, 0 /* serverIdx */)
h.comment(`
-- Per-store tokens available from n1, these should reflect the deducted
-- tokens from preventing send queue formation.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(`
-- Send queue metrics from n1, a send queue should have formed for one of the
-- replica send streams, while the other (non-leader stream) should have been
-- prevented from forming. It should be 1024*4KiB=4MiB in size.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, one of the three
-- streams will only be tracking the 16 MiB write, while the other two will
-- track the 1024x4KiB writes as well.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`)
disableWorkQueueGranting.Store(false)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
echo
----
----
-- We will exhaust the tokens across all streams while admission is blocked,
-- using a single 16 MiB (deduction, the write itself is small) write. Then,
-- we will write a thousand or so entries (@ 4KiB deduction) which should be
-- queued towards one of the replica send streams, while the other has a send
-- queue prevented from forming. Lastly, we will unblock admission and stress
-- the raft in-flights tracker as the queue is drained.


-- Initial per-store tokens available from n1.
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB
3 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB


-- (Blocking below-raft admission on [n1,n2,n3].)


-- (Issuing 16MiB regular write that's not admitted.)


-- Per-store tokens available from n1, these should reflect the prior
-- large write.
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | 0 B | -8.0 MiB | 0 B | -8.0 MiB
2 | 0 B | -8.0 MiB | 0 B | -8.0 MiB
3 | 0 B | -8.0 MiB | 0 B | -8.0 MiB


-- Observe the total tracked tokens per-stream on n1, these should also reflect the
-- large write.
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2

range_id | store_id | total_tracked_tokens
-----------+----------+-----------------------
70 | 1 | 16 MiB
70 | 2 | 16 MiB
70 | 3 | 16 MiB


-- (Enabling wait-for-eval bypass.)


-- (Issuing 1024x4KiB(=4MiB) regular writes that are not admitted.)


-- (Disabling wait-for-eval bypass.)


-- Per-store tokens available from n1, these should reflect the deducted
-- tokens from preventing send queue formation.
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | -4.0 MiB | -12 MiB | -4.0 MiB | -12 MiB
2 | 0 B | -12 MiB | 0 B | -8.0 MiB
3 | -4.0 MiB | -12 MiB | -4.0 MiB | -12 MiB


-- Send queue metrics from n1, a send queue should have formed for one of the
-- replica send streams, while the other (non-leader stream) should have been
-- prevented from forming. It should be 1024*4KiB=4MiB in size.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%send_queue%'
AND name != 'kvflowcontrol.send_queue.count'
ORDER BY name ASC;

kvflowcontrol.send_queue.bytes | 4.0 MiB
kvflowcontrol.send_queue.prevent.count | 1.0 KiB
kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
kvflowcontrol.send_queue.scheduled.force_flush | 0 B
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 4.0 MiB
kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 4.0 MiB


-- Observe the total tracked tokens per-stream on n1, one of the three
-- streams will only be tracking the 16 MiB write, while the other two will
-- track the 1024x4KiB writes as well.
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2

range_id | store_id | total_tracked_tokens
-----------+----------+-----------------------
70 | 1 | 20 MiB
70 | 2 | 16 MiB
70 | 3 | 20 MiB


-- (Allowing below-raft admission to proceed on [n1,n2,n3].)


-- Send queue and flow token metrics from n1. All tokens should be returned.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%send_queue%'
AND name != 'kvflowcontrol.send_queue.count'
ORDER BY name ASC;

kvflowcontrol.send_queue.bytes | 4.0 MiB
kvflowcontrol.send_queue.prevent.count | 1.0 KiB
kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
kvflowcontrol.send_queue.scheduled.force_flush | 0 B
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 4.0 MiB
kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 4.0 MiB
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB
3 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB
----
----


# vim:ft=sql
Loading