diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 7f190d08babc..8ba612825c31 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -4917,6 +4917,157 @@ ORDER BY streams DESC; h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) } +// TODO(kvoli): Add the following tests which exercise interesting events while +// send tokens are exhausted on a partial number, or on all streams: +// - TestFlowControlSendQueueRangeSplitMerge +// - TestFlowControlSendQueueTransferLease +// - TestFlowControlSendQueueRaftMembershipRemoveSelf +// - TestFlowControlSendQueueRaftMembership +// - TestFlowControlSendQueueRaftSnapshot +// - TestFlowControlSendQueueLeaderNotLeaseholder +// - TestFlowControlSendQueueGranterAdmitOneByOne + +// TestFlowControlSendQueueManyInflight exercises send queue formation, +// prevention and quickly draining 1k+ entries tracked in the send queue, in +// order to exercise a raft inflight tracking with a large number of inflight +// entries. +func TestFlowControlSendQueueManyInflight(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(false) + var tokenDeduction atomic.Int64 + tokenDeduction.Store(1 /* 1b */) + var noopWaitForEval atomic.Bool + + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + return kvflowcontrol.Tokens(tokenDeduction.Load()) + }, + OverrideBypassAdmitWaitForEval: func(ctx context.Context) (bypass bool, waited bool) { + bypassAndWaited := noopWaitForEval.Load() + if bypassAndWaited { + return true, true + } + if !isTestGeneratedPut(ctx) { + return true, false + } + return false, false + }, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close("send_queue_many_inflight") + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.comment(` +-- We will exhaust the tokens across all streams while admission is blocked, +-- using a single 16 MiB (deduction, the write itself is small) write. Then, +-- we will write a thousand or so entries (@ 4KiB deduction) which should be +-- queued towards one of the replica send streams, while the other has a send +-- queue prevented from forming. Lastly, we will unblock admission and stress +-- the raft in-flights tracker as the queue is drained.`) + h.comment(` +-- Initial per-store tokens available from n1. +`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + h.comment(`-- (Blocking below-raft admission on [n1,n2,n3].)`) + disableWorkQueueGranting.Store(true) + + h.comment(`-- (Issuing 16MiB regular write that's not admitted.)`) + tokenDeduction.Store(16 << 20) + h.put(contextWithTestGeneratedPut(ctx), k, 1 /* 16 MiB deducted */, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 48<<20 /* 48MiB */, 0 /* serverIdx */) + + h.comment(` +-- Per-store tokens available from n1, these should reflect the prior +-- large write.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, these should also reflect the +-- large write.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Enabling wait-for-eval bypass.)`) + noopWaitForEval.Store(true) + h.comment(`-- (Issuing 1024x4KiB(=4MiB) regular writes that are not admitted.)`) + // We issue exactly 4 MiB worth of writes by issuing 1024 writes of 4KiB + // each. + tokenDeduction.Store(4 << 10 /* 4KiB */) + for i := 0; i < 1024; i++ { + h.put(contextWithTestGeneratedPut(ctx), k, 1, admissionpb.NormalPri) + } + h.comment(`-- (Disabling wait-for-eval bypass.)`) + noopWaitForEval.Store(false) + + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 56<<20 /* 48+2*4=56MiB */, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, desc.RangeID, 4<<20 /* 1024*4KiB=4MiB expSize */, 0 /* serverIdx */) + h.comment(` +-- Per-store tokens available from n1, these should reflect the deducted +-- tokens from preventing send queue formation.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + h.comment(` +-- Send queue metrics from n1, a send queue should have formed for one of the +-- replica send streams, while the other (non-leader stream) should have been +-- prevented from forming. It should be 1024*4KiB=4MiB in size.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, one of the three +-- streams will only be tracking the 16 MiB write, while the other two will +-- track the 1024x4KiB writes as well.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.comment(` +-- Send queue and flow token metrics from n1. All tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_many_inflight b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_many_inflight new file mode 100644 index 000000000000..ba6785e26e85 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_many_inflight @@ -0,0 +1,154 @@ +echo +---- +---- +-- We will exhaust the tokens across all streams while admission is blocked, +-- using a single 16 MiB (deduction, the write itself is small) write. Then, +-- we will write a thousand or so entries (@ 4KiB deduction) which should be +-- queued towards one of the replica send streams, while the other has a send +-- queue prevented from forming. Lastly, we will unblock admission and stress +-- the raft in-flights tracker as the queue is drained. + + +-- Initial per-store tokens available from n1. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB + 2 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB + 3 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB + + +-- (Blocking below-raft admission on [n1,n2,n3].) + + +-- (Issuing 16MiB regular write that's not admitted.) + + +-- Per-store tokens available from n1, these should reflect the prior +-- large write. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 0 B | -8.0 MiB | 0 B | -8.0 MiB + 2 | 0 B | -8.0 MiB | 0 B | -8.0 MiB + 3 | 0 B | -8.0 MiB | 0 B | -8.0 MiB + + +-- Observe the total tracked tokens per-stream on n1, these should also reflect the +-- large write. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 16 MiB + 70 | 2 | 16 MiB + 70 | 3 | 16 MiB + + +-- (Enabling wait-for-eval bypass.) + + +-- (Issuing 1024x4KiB(=4MiB) regular writes that are not admitted.) + + +-- (Disabling wait-for-eval bypass.) + + +-- Per-store tokens available from n1, these should reflect the deducted +-- tokens from preventing send queue formation. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | -4.0 MiB | -12 MiB | -4.0 MiB | -12 MiB + 2 | 0 B | -12 MiB | 0 B | -8.0 MiB + 3 | -4.0 MiB | -12 MiB | -4.0 MiB | -12 MiB + + +-- Send queue metrics from n1, a send queue should have formed for one of the +-- replica send streams, while the other (non-leader stream) should have been +-- prevented from forming. It should be 1024*4KiB=4MiB in size. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 4.0 MiB + kvflowcontrol.send_queue.prevent.count | 1.0 KiB + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 4.0 MiB + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 4.0 MiB + + +-- Observe the total tracked tokens per-stream on n1, one of the three +-- streams will only be tracking the 16 MiB write, while the other two will +-- track the 1024x4KiB writes as well. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 20 MiB + 70 | 2 | 16 MiB + 70 | 3 | 20 MiB + + +-- (Allowing below-raft admission to proceed on [n1,n2,n3].) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 4.0 MiB + kvflowcontrol.send_queue.prevent.count | 1.0 KiB + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 4.0 MiB + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 4.0 MiB +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB + 2 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB + 3 | 16 MiB | 8.0 MiB | 16 MiB | 8.0 MiB +---- +---- + + +# vim:ft=sql