From 55a471984ead441ed14ef85e5afe99d085cefb94 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 12:48:52 -0400 Subject: [PATCH 01/13] testutils,kvserver: disable store rebalancer when replication manual `ReplicationManual` can be specified in tests which use a test cluster to prevent lease/replica movements. This proves vital if the test is asserting exactly on the placement of a range, or its leaseholder. Also disable the store rebalancer, which could move both leases and replicas under `ReplicationManual` before. Informs: #132310 Informs: #132272 Release note: None --- pkg/kv/kvserver/store_rebalancer.go | 15 +++++++++------ pkg/kv/kvserver/testing_knobs.go | 3 +++ pkg/testutils/testcluster/testcluster.go | 1 + 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 89986323708c..bf828c384fbf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -149,6 +149,7 @@ type StoreRebalancer struct { processTimeoutFn func(replica CandidateReplica) time.Duration objectiveProvider RebalanceObjectiveProvider subscribedToSpanConfigs func() bool + disabled func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -191,6 +192,10 @@ func NewStoreRebalancer( } return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() }, + disabled: func() bool { + return LoadBasedRebalancingMode.Get(&st.SV) == LBRebalancingOff || + rq.store.cfg.TestingKnobs.DisableStoreRebalancer + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -308,15 +313,13 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Read = true timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - + if sr.disabled() { + continue + } // Once the rebalance mode and rebalance objective are defined for // this loop, they are immutable and do not change. This avoids // inconsistency where the rebalance objective changes and very // different or contradicting actions are then taken. - mode := sr.RebalanceMode() - if mode == LBRebalancingOff { - continue - } if !sr.subscribedToSpanConfigs() { continue } @@ -326,7 +329,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 7f1379f758f8..7e2d38e30dc5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -176,6 +176,9 @@ type StoreTestingKnobs struct { DisableReplicaGCQueue bool // DisableReplicateQueue disables the replication queue. DisableReplicateQueue bool + // DisableStoreRebalancer turns off the store rebalancer which moves replicas + // and leases. + DisableStoreRebalancer bool // DisableLoadBasedSplitting turns off LBS so no splits happen because of load. DisableLoadBasedSplitting bool // LoadBasedSplittingOverrideKey returns a key which should be used for load diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index ac49505dcfc9..cb88ed0a88ba 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -581,6 +581,7 @@ func (tc *TestCluster) AddServer( stkCopy.DisableSplitQueue = true stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true + stkCopy.DisableStoreRebalancer = true serverArgs.Knobs.Store = &stkCopy } From 30ba549772d7cbb3dacc47df1bcc48207131978b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 14 Oct 2024 17:24:21 +0000 Subject: [PATCH 02/13] roachtest: elide mode=default in ldr tests Release note: none. Epic: none. --- pkg/cmd/roachtest/tests/logical_data_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 1ed5f3b261e2..dab7acb782e8 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -736,7 +736,7 @@ func setupLDR( startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int { options := "" - if mode.String() != "" { + if mode != Default { options = fmt.Sprintf("WITH mode='%s'", mode) } targetDB.Exec(t, fmt.Sprintf("USE %s", dbName)) From 32f51219cd6ed51a7790416fceb3f75ba753ab64 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Mon, 14 Oct 2024 13:27:52 -0400 Subject: [PATCH 03/13] cluster-ui: bump version to 24.3.0-prerelease.1 Bump the cluster-ui pkg version. Epic: none Release note: None --- pkg/ui/workspaces/cluster-ui/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ui/workspaces/cluster-ui/package.json b/pkg/ui/workspaces/cluster-ui/package.json index 40988c27d0a2..b670099d0e26 100644 --- a/pkg/ui/workspaces/cluster-ui/package.json +++ b/pkg/ui/workspaces/cluster-ui/package.json @@ -1,6 +1,6 @@ { "name": "@cockroachlabs/cluster-ui", - "version": "24.3.0-prerelease.0", + "version": "24.3.0-prerelease.1", "description": "Cluster UI is a library of large features shared between CockroachDB and CockroachCloud", "repository": { "type": "git", From e9b2ed692392c1f2d2b3d73bd56291d2a1a89fd0 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 12:48:52 -0400 Subject: [PATCH 04/13] testutils,kvserver: disable store rebalancer when replication manual `ReplicationManual` can be specified in tests which use a test cluster to prevent lease/replica movements. This proves vital if the test is asserting exactly on the placement of a range, or its leaseholder. Also disable the store rebalancer, which could move both leases and replicas under `ReplicationManual` before. Informs: #132310 Informs: #132272 Release note: None --- pkg/kv/kvserver/store_rebalancer.go | 15 +++++++++------ pkg/kv/kvserver/testing_knobs.go | 3 +++ pkg/testutils/testcluster/testcluster.go | 1 + 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 89986323708c..bf828c384fbf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -149,6 +149,7 @@ type StoreRebalancer struct { processTimeoutFn func(replica CandidateReplica) time.Duration objectiveProvider RebalanceObjectiveProvider subscribedToSpanConfigs func() bool + disabled func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -191,6 +192,10 @@ func NewStoreRebalancer( } return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() }, + disabled: func() bool { + return LoadBasedRebalancingMode.Get(&st.SV) == LBRebalancingOff || + rq.store.cfg.TestingKnobs.DisableStoreRebalancer + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -308,15 +313,13 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Read = true timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - + if sr.disabled() { + continue + } // Once the rebalance mode and rebalance objective are defined for // this loop, they are immutable and do not change. This avoids // inconsistency where the rebalance objective changes and very // different or contradicting actions are then taken. - mode := sr.RebalanceMode() - if mode == LBRebalancingOff { - continue - } if !sr.subscribedToSpanConfigs() { continue } @@ -326,7 +329,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 7f1379f758f8..7e2d38e30dc5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -176,6 +176,9 @@ type StoreTestingKnobs struct { DisableReplicaGCQueue bool // DisableReplicateQueue disables the replication queue. DisableReplicateQueue bool + // DisableStoreRebalancer turns off the store rebalancer which moves replicas + // and leases. + DisableStoreRebalancer bool // DisableLoadBasedSplitting turns off LBS so no splits happen because of load. DisableLoadBasedSplitting bool // LoadBasedSplittingOverrideKey returns a key which should be used for load diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index ac49505dcfc9..cb88ed0a88ba 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -581,6 +581,7 @@ func (tc *TestCluster) AddServer( stkCopy.DisableSplitQueue = true stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true + stkCopy.DisableStoreRebalancer = true serverArgs.Knobs.Store = &stkCopy } From 43f50d1022f8b202c43cfda00228e487b0394761 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 10:46:47 -0400 Subject: [PATCH 05/13] kvserver: unskip v1 flow control integration tests under duress These were skipped to unblock merging #132125 and later (presumed to be) fixed by #132563. Un-skip all `TestFlowControl.*` v1 integration tests under duress. Resolves: #132310 Release note: None --- .../kvserver/flow_control_integration_test.go | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 4c9346a1d9b7..9057c3549df1 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -68,9 +68,6 @@ import ( func TestFlowControlBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { ctx := context.Background() @@ -223,9 +220,6 @@ ORDER BY streams DESC; func TestFlowControlRangeSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -341,9 +335,6 @@ ORDER BY streams DESC; func TestFlowControlBlockedAdmission(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -455,9 +446,6 @@ ORDER BY name ASC; func TestFlowControlAdmissionPostSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -594,9 +582,6 @@ ORDER BY streams DESC; func TestFlowControlCrashedNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 2 @@ -714,9 +699,6 @@ func TestFlowControlCrashedNode(t *testing.T) { func TestFlowControlRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1014,9 +996,6 @@ SELECT store_id, func TestFlowControlRaftTransportBreak(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1129,9 +1108,6 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { func TestFlowControlRaftTransportCulled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1268,9 +1244,6 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { func TestFlowControlRaftMembership(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -1401,9 +1374,6 @@ ORDER BY name ASC; func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { ctx := context.Background() @@ -1532,9 +1502,6 @@ ORDER BY name ASC; func TestFlowControlClassPrioritization(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -1618,9 +1585,6 @@ func TestFlowControlClassPrioritization(t *testing.T) { func TestFlowControlQuiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1761,9 +1725,6 @@ ORDER BY name ASC; func TestFlowControlUnquiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1917,9 +1878,6 @@ ORDER BY name ASC; func TestFlowControlTransferLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2008,9 +1966,6 @@ ORDER BY name ASC; func TestFlowControlLeaderNotLeaseholder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2144,9 +2099,6 @@ ORDER BY name ASC; func TestFlowControlGranterAdmitOneByOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 From efffda9132ec18b397d86cbc1ac75d2c6035c5fc Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 10:50:18 -0400 Subject: [PATCH 06/13] kvserver: unskip v2 flow control integration tests under duress Similar to the prior commit, un-skip the v2 flow control integration tests under duress. These were disabled due to flakiness. Resolves: #132272 Release note: None --- .../kvserver/flow_control_integration_test.go | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 9057c3549df1..ff37488205d4 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -2207,7 +2206,6 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { func TestFlowControlBasicV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2302,7 +2300,6 @@ ORDER BY streams DESC; func TestFlowControlRangeSplitMergeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2417,7 +2414,6 @@ ORDER BY streams DESC; func TestFlowControlBlockedAdmissionV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2519,7 +2515,6 @@ func TestFlowControlBlockedAdmissionV2(t *testing.T) { func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2670,7 +2665,6 @@ ORDER BY streams DESC; func TestFlowControlCrashedNodeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2775,7 +2769,6 @@ func TestFlowControlCrashedNodeV2(t *testing.T) { func TestFlowControlRaftSnapshotV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") const numServers int = 5 @@ -3043,7 +3036,6 @@ SELECT store_id, func TestFlowControlRaftMembershipV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3176,7 +3168,6 @@ func TestFlowControlRaftMembershipV2(t *testing.T) { func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3313,7 +3304,6 @@ ORDER BY streams DESC; func TestFlowControlClassPrioritizationV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3404,7 +3394,6 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) { func TestFlowControlUnquiescedRangeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3423,7 +3412,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { settings := cluster.MakeTestingClusterSettings() // Override metamorphism to allow range quiescence. kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, false) - pinnedLease := kvserver.NewPinnedLeases() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ @@ -3435,10 +3423,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { }, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - // Pin the lease to the first store to prevent lease and leader - // moves which disrupt this test. - PinnedLeases: pinnedLease, - FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ UseOnlyForScratchRanges: true, OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel { @@ -3475,7 +3459,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { k := tc.ScratchRange(t) desc, err := tc.LookupRange(k) require.NoError(t, err) - pinnedLease.PinLease(desc.RangeID, tc.GetFirstStoreFromServer(t, 0).StoreID()) tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) h := newFlowControlTestHelperV2(t, tc, v2EnabledWhenLeaderLevel) @@ -3539,7 +3522,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { func TestFlowControlTransferLeaseV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3628,7 +3610,6 @@ func TestFlowControlTransferLeaseV2(t *testing.T) { func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3741,7 +3722,6 @@ func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, From a9775ad548db1d8184964ecf9e60652c0c3c1a26 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 12:48:52 -0400 Subject: [PATCH 07/13] testutils,kvserver: disable store rebalancer when replication manual `ReplicationManual` can be specified in tests which use a test cluster to prevent lease/replica movements. This proves vital if the test is asserting exactly on the placement of a range, or its leaseholder. Also disable the store rebalancer, which could move both leases and replicas under `ReplicationManual` before. Informs: #132310 Informs: #132272 Release note: None --- pkg/kv/kvserver/store_rebalancer.go | 15 +++++++++------ pkg/kv/kvserver/testing_knobs.go | 3 +++ pkg/testutils/testcluster/testcluster.go | 1 + 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 89986323708c..bf828c384fbf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -149,6 +149,7 @@ type StoreRebalancer struct { processTimeoutFn func(replica CandidateReplica) time.Duration objectiveProvider RebalanceObjectiveProvider subscribedToSpanConfigs func() bool + disabled func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -191,6 +192,10 @@ func NewStoreRebalancer( } return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() }, + disabled: func() bool { + return LoadBasedRebalancingMode.Get(&st.SV) == LBRebalancingOff || + rq.store.cfg.TestingKnobs.DisableStoreRebalancer + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -308,15 +313,13 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Read = true timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - + if sr.disabled() { + continue + } // Once the rebalance mode and rebalance objective are defined for // this loop, they are immutable and do not change. This avoids // inconsistency where the rebalance objective changes and very // different or contradicting actions are then taken. - mode := sr.RebalanceMode() - if mode == LBRebalancingOff { - continue - } if !sr.subscribedToSpanConfigs() { continue } @@ -326,7 +329,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 7f1379f758f8..7e2d38e30dc5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -176,6 +176,9 @@ type StoreTestingKnobs struct { DisableReplicaGCQueue bool // DisableReplicateQueue disables the replication queue. DisableReplicateQueue bool + // DisableStoreRebalancer turns off the store rebalancer which moves replicas + // and leases. + DisableStoreRebalancer bool // DisableLoadBasedSplitting turns off LBS so no splits happen because of load. DisableLoadBasedSplitting bool // LoadBasedSplittingOverrideKey returns a key which should be used for load diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index ac49505dcfc9..cb88ed0a88ba 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -581,6 +581,7 @@ func (tc *TestCluster) AddServer( stkCopy.DisableSplitQueue = true stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true + stkCopy.DisableStoreRebalancer = true serverArgs.Knobs.Store = &stkCopy } From 9925b69fa75991f9ec56fdb6b180a71e81d3129e Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 14 Oct 2024 10:46:47 -0400 Subject: [PATCH 08/13] kvserver: unskip v1 flow control integration tests under duress These were skipped to unblock merging #132125 and later (presumed to be) fixed by #132563. Un-skip all `TestFlowControl.*` v1 integration tests under duress. Resolves: #132310 Release note: None --- .../kvserver/flow_control_integration_test.go | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 4c9346a1d9b7..9057c3549df1 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -68,9 +68,6 @@ import ( func TestFlowControlBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { ctx := context.Background() @@ -223,9 +220,6 @@ ORDER BY streams DESC; func TestFlowControlRangeSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -341,9 +335,6 @@ ORDER BY streams DESC; func TestFlowControlBlockedAdmission(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -455,9 +446,6 @@ ORDER BY name ASC; func TestFlowControlAdmissionPostSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -594,9 +582,6 @@ ORDER BY streams DESC; func TestFlowControlCrashedNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 2 @@ -714,9 +699,6 @@ func TestFlowControlCrashedNode(t *testing.T) { func TestFlowControlRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1014,9 +996,6 @@ SELECT store_id, func TestFlowControlRaftTransportBreak(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1129,9 +1108,6 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { func TestFlowControlRaftTransportCulled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1268,9 +1244,6 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { func TestFlowControlRaftMembership(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -1401,9 +1374,6 @@ ORDER BY name ASC; func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { ctx := context.Background() @@ -1532,9 +1502,6 @@ ORDER BY name ASC; func TestFlowControlClassPrioritization(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -1618,9 +1585,6 @@ func TestFlowControlClassPrioritization(t *testing.T) { func TestFlowControlQuiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1761,9 +1725,6 @@ ORDER BY name ASC; func TestFlowControlUnquiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1917,9 +1878,6 @@ ORDER BY name ASC; func TestFlowControlTransferLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2008,9 +1966,6 @@ ORDER BY name ASC; func TestFlowControlLeaderNotLeaseholder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2144,9 +2099,6 @@ ORDER BY name ASC; func TestFlowControlGranterAdmitOneByOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 From 1f6c4a5d42b8dd01211d1a9db2fb37514b6673d4 Mon Sep 17 00:00:00 2001 From: Matt White Date: Fri, 4 Oct 2024 07:34:32 -0700 Subject: [PATCH 09/13] row: refactor some confusing function & variable names The name colIDtoRowIdx was creating confusion in another patch. 'Idx' is overloaded in the context of a database, so change that to the more descriptive 'Position'. We also change 'encodePrimaryIndex()' to 'encodePrimaryIndexKey()' to better describe what we're doing and make code more obvious. Part of: #129837 Release note: None --- pkg/sql/row/deleter.go | 2 +- pkg/sql/row/helper.go | 20 ++++++++++---------- pkg/sql/row/updater.go | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 0028302e63a8..9c9adb9e927f 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -140,7 +140,7 @@ func (rd *Deleter) DeleteRow( } } - primaryIndexKey, err := rd.Helper.encodePrimaryIndex(rd.FetchColIDtoRowIndex, values) + primaryIndexKey, err := rd.Helper.encodePrimaryIndexKey(rd.FetchColIDtoRowIndex, values) if err != nil { return err } diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index abde9f86b68d..30eb2fd92796 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -128,7 +128,7 @@ func NewRowHelper( // include empty secondary index k/v pairs. func (rh *RowHelper) encodeIndexes( ctx context.Context, - colIDtoRowIndex catalog.TableColMap, + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ignoreIndexes intsets.Fast, includeEmpty bool, @@ -137,11 +137,11 @@ func (rh *RowHelper) encodeIndexes( secondaryIndexEntries map[catalog.Index][]rowenc.IndexEntry, err error, ) { - primaryIndexKey, err = rh.encodePrimaryIndex(colIDtoRowIndex, values) + primaryIndexKey, err = rh.encodePrimaryIndexKey(colIDtoRowPosition, values) if err != nil { return nil, nil, err } - secondaryIndexEntries, err = rh.encodeSecondaryIndexes(ctx, colIDtoRowIndex, values, ignoreIndexes, includeEmpty) + secondaryIndexEntries, err = rh.encodeSecondaryIndexes(ctx, colIDtoRowPosition, values, ignoreIndexes, includeEmpty) if err != nil { return nil, nil, err } @@ -154,19 +154,19 @@ func (rh *RowHelper) Init() { ) } -// encodePrimaryIndex encodes the primary index key. -func (rh *RowHelper) encodePrimaryIndex( - colIDtoRowIndex catalog.TableColMap, values []tree.Datum, +// encodePrimaryIndexKey encodes the primary index key. +func (rh *RowHelper) encodePrimaryIndexKey( + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ) (primaryIndexKey []byte, err error) { if rh.PrimaryIndexKeyPrefix == nil { rh.Init() } idx := rh.TableDesc.GetPrimaryIndex() primaryIndexKey, containsNull, err := rowenc.EncodeIndexKey( - rh.TableDesc, idx, colIDtoRowIndex, values, rh.PrimaryIndexKeyPrefix, + rh.TableDesc, idx, colIDtoRowPosition, values, rh.PrimaryIndexKeyPrefix, ) if containsNull { - return nil, rowenc.MakeNullPKError(rh.TableDesc, idx, colIDtoRowIndex, values) + return nil, rowenc.MakeNullPKError(rh.TableDesc, idx, colIDtoRowPosition, values) } return primaryIndexKey, err } @@ -184,7 +184,7 @@ func (rh *RowHelper) encodePrimaryIndex( // k/v pairs. func (rh *RowHelper) encodeSecondaryIndexes( ctx context.Context, - colIDtoRowIndex catalog.TableColMap, + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ignoreIndexes intsets.Fast, includeEmpty bool, @@ -201,7 +201,7 @@ func (rh *RowHelper) encodeSecondaryIndexes( for i := range rh.Indexes { index := rh.Indexes[i] if !ignoreIndexes.Contains(int(index.GetID())) { - entries, err := rowenc.EncodeSecondaryIndex(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowIndex, values, includeEmpty) + entries, err := rowenc.EncodeSecondaryIndex(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowPosition, values, includeEmpty) if err != nil { return nil, err } diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index bf8e3a59bf46..02902c231b3c 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -214,7 +214,7 @@ func (ru *Updater) UpdateRow( return nil, errors.Errorf("got %d values but expected %d", len(updateValues), len(ru.UpdateCols)) } - primaryIndexKey, err := ru.Helper.encodePrimaryIndex(ru.FetchColIDtoRowIndex, oldValues) + primaryIndexKey, err := ru.Helper.encodePrimaryIndexKey(ru.FetchColIDtoRowIndex, oldValues) if err != nil { return nil, err } @@ -250,7 +250,7 @@ func (ru *Updater) UpdateRow( if ru.primaryKeyColChange { var newPrimaryIndexKey []byte newPrimaryIndexKey, err = - ru.Helper.encodePrimaryIndex(ru.FetchColIDtoRowIndex, ru.newValues) + ru.Helper.encodePrimaryIndexKey(ru.FetchColIDtoRowIndex, ru.newValues) if err != nil { return nil, err } From 8de4fd0d2993b27996fdb7f2932dca65e10dbed1 Mon Sep 17 00:00:00 2001 From: Matt White Date: Fri, 4 Oct 2024 07:37:36 -0700 Subject: [PATCH 10/13] row: add RowHelper support for writing tombstones on implicitly partitioned indexes Under non-Serializable isolations, we need to lock the keys for unique indexes with implicit partitioning in all partitions to ensure a racing write doesn't insert a conflicting row into a different partition. Since there is no support for predicate locks at the KV layer, we fake it by writing tombstone values to the partitions not being inserted into. This causes a bit of a write explosion, but it will keep conflicting rows from being written. In this patch, we create the infrastructure for writing these tombstones by modifying the RowHelper to generate a list of keys to lock. Part of: #110873 Release note: None --- pkg/sql/colenc/encode.go | 2 +- pkg/sql/row/deleter.go | 2 +- pkg/sql/row/helper.go | 130 +++++++++++++++++++++++++++++-- pkg/sql/row/inserter.go | 2 +- pkg/sql/row/updater.go | 4 +- pkg/sql/rowenc/index_encoding.go | 43 ++++++---- 6 files changed, 157 insertions(+), 26 deletions(-) diff --git a/pkg/sql/colenc/encode.go b/pkg/sql/colenc/encode.go index be827546ca04..4fb14c579814 100644 --- a/pkg/sql/colenc/encode.go +++ b/pkg/sql/colenc/encode.go @@ -82,7 +82,7 @@ func MakeEncoder( partialIndexes map[descpb.IndexID][]bool, memoryUsageCheck func() error, ) BatchEncoder { - rh := row.NewRowHelper(codec, desc, desc.WritableNonPrimaryIndexes(), sv, false /*internal*/, metrics) + rh := row.NewRowHelper(codec, desc, desc.WritableNonPrimaryIndexes(), nil /* uniqueWithTombstoneIndexes */, sv, false /*internal*/, metrics) rh.Init() colMap := row.ColIDtoRowIndexFromCols(insCols) return BatchEncoder{rh: &rh, b: b, colMap: colMap, diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 9c9adb9e927f..14339ecd45e9 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -91,7 +91,7 @@ func MakeDeleter( } rd := Deleter{ - Helper: NewRowHelper(codec, tableDesc, indexes, sv, internal, metrics), + Helper: NewRowHelper(codec, tableDesc, indexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics), FetchCols: fetchCols, FetchColIDtoRowIndex: fetchColIDtoRowIndex, } diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index 30eb2fd92796..1f68013399f5 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -6,6 +6,7 @@ package row import ( + "bytes" "context" "sort" @@ -66,14 +67,28 @@ var maxRowSizeErr = settings.RegisterByteSizeSetting( settings.WithPublic, ) +// Per-index data for writing tombstones to enforce a uniqueness constraint +type uniqueWithTombstoneEntry struct { + // implicitPartitionKeyValues contains the potential values for the + // partitioning column + implicitPartitionKeyVals []tree.Datum + + // tmpTombstones contains the tombstones generated for this index by the last + // call to encodeTombstonesForIndex + tmpTombstones [][]byte +} + // RowHelper has the common methods for table row manipulations. type RowHelper struct { Codec keys.SQLCodec TableDesc catalog.TableDescriptor // Secondary indexes. - Indexes []catalog.Index - indexEntries map[catalog.Index][]rowenc.IndexEntry + Indexes []catalog.Index + + // Unique indexes that can be enforced with tombstones. + UniqueWithTombstoneIndexes intsets.Fast + indexEntries map[catalog.Index][]rowenc.IndexEntry // Computed during initialization for pretty-printing. primIndexValDirs []encoding.Direction @@ -85,6 +100,11 @@ type RowHelper struct { primaryIndexValueCols catalog.TableColSet sortedColumnFamilies map[descpb.FamilyID][]descpb.ColumnID + // Used to build tmpTombstones for non-Serializable uniqueness checks. + index2UniqueWithTombstoneEntry map[catalog.Index]*uniqueWithTombstoneEntry + // Used to hold the row being written while writing tombstones. + tmpRow []tree.Datum + // Used to check row size. maxRowSizeLog, maxRowSizeErr uint32 internal bool @@ -95,16 +115,22 @@ func NewRowHelper( codec keys.SQLCodec, desc catalog.TableDescriptor, indexes []catalog.Index, + uniqueWithTombstoneIndexes []catalog.Index, sv *settings.Values, internal bool, metrics *rowinfra.Metrics, ) RowHelper { + var uniqueWithTombstoneIndexesSet intsets.Fast + for _, index := range uniqueWithTombstoneIndexes { + uniqueWithTombstoneIndexesSet.Add(index.Ordinal()) + } rh := RowHelper{ - Codec: codec, - TableDesc: desc, - Indexes: indexes, - internal: internal, - metrics: metrics, + Codec: codec, + TableDesc: desc, + Indexes: indexes, + UniqueWithTombstoneIndexes: uniqueWithTombstoneIndexesSet, + internal: internal, + metrics: metrics, } // Pre-compute the encoding directions of the index key values for @@ -171,6 +197,96 @@ func (rh *RowHelper) encodePrimaryIndexKey( return primaryIndexKey, err } +// initRowTmp creates a copy of the row that we can modify while trying to be +// smart about allocations. +func (rh *RowHelper) initRowTmp(values []tree.Datum) []tree.Datum { + if rh.tmpRow == nil { + rh.tmpRow = make([]tree.Datum, len(values)) + } + copy(rh.tmpRow, values) + return rh.tmpRow +} + +// getTombstoneTmpForIndex initializes and gets for the index provided. +func (rh *RowHelper) getTombstoneTmpForIndex( + index catalog.Index, partitionColValue *tree.DEnum, +) *uniqueWithTombstoneEntry { + if rh.index2UniqueWithTombstoneEntry == nil { + rh.index2UniqueWithTombstoneEntry = make(map[catalog.Index]*uniqueWithTombstoneEntry, len(rh.TableDesc.WritableNonPrimaryIndexes())+1) + } + tombstoneTmp, ok := rh.index2UniqueWithTombstoneEntry[index] + if !ok { + implicitKeys := tree.MakeAllDEnumsInType(partitionColValue.ResolvedType()) + tombstoneTmp = &uniqueWithTombstoneEntry{implicitPartitionKeyVals: implicitKeys, tmpTombstones: make([][]byte, len(implicitKeys)-1)} + rh.index2UniqueWithTombstoneEntry[index] = tombstoneTmp + } + tombstoneTmp.tmpTombstones = tombstoneTmp.tmpTombstones[:0] + return tombstoneTmp +} + +// encodeTombstonesForIndex creates a set of keys that can be used to write +// tombstones for the provided index. These values remain valid for the index +// until this function is called again for that index. +func (rh *RowHelper) encodeTombstonesForIndex( + ctx context.Context, + index catalog.Index, + colIDtoRowPosition catalog.TableColMap, + values []tree.Datum, +) ([][]byte, error) { + if !rh.UniqueWithTombstoneIndexes.Contains(index.Ordinal()) { + return nil, nil + } + + if !index.IsUnique() { + return nil, errors.AssertionFailedf("Expected index %s to be unique", index.GetName()) + } + if index.GetType() != descpb.IndexDescriptor_FORWARD { + return nil, errors.AssertionFailedf("Expected index %s to be a forward index", index.GetName()) + } + + // Get the position and value of the partition column in this index. + partitionColPosition, ok := colIDtoRowPosition.Get(index.GetKeyColumnID(0 /* columnOrdinal */)) + if !ok { + return nil, nil + } + partitionColValue, ok := values[partitionColPosition].(*tree.DEnum) + if !ok { + return nil, errors.AssertionFailedf("Expected partition column value to be enum, but got %T", values[partitionColPosition]) + } + + // Intentionally shadowing values here to avoid accidentally overwriting the tuple + values = rh.initRowTmp(values) + tombstoneTmpForIndex := rh.getTombstoneTmpForIndex(index, partitionColValue) + + for _, partVal := range tombstoneTmpForIndex.implicitPartitionKeyVals { + if bytes.Equal(partitionColValue.PhysicalRep, partVal.(*tree.DEnum).PhysicalRep) { + continue + } + values[partitionColPosition] = partVal + + if index.Primary() { + key, err := rh.encodePrimaryIndexKey(colIDtoRowPosition, values) + if err != nil { + return nil, err + } + tombstoneTmpForIndex.tmpTombstones = append(tombstoneTmpForIndex.tmpTombstones, key) + } else { + keys, containsNull, err := rowenc.EncodeSecondaryIndexKey(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowPosition, values) + if err != nil { + return nil, err + } + // If this key contains a NULL value, it can't violate a NULL constraint. + if containsNull { + tombstoneTmpForIndex.tmpTombstones = tombstoneTmpForIndex.tmpTombstones[:0] + break + } + tombstoneTmpForIndex.tmpTombstones = append(tombstoneTmpForIndex.tmpTombstones, keys...) + } + } + + return tombstoneTmpForIndex.tmpTombstones, nil +} + // encodeSecondaryIndexes encodes the secondary index keys based on a row's // values. // diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 0560174c4595..edda591c848e 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -49,7 +49,7 @@ func MakeInserter( ) (Inserter, error) { ri := Inserter{ Helper: NewRowHelper( - codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), sv, internal, metrics, + codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics, ), InsertCols: insertCols, diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 02902c231b3c..756e1a2d2810 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -156,12 +156,12 @@ func MakeUpdater( var deleteOnlyHelper *RowHelper if len(deleteOnlyIndexes) > 0 { - rh := NewRowHelper(codec, tableDesc, deleteOnlyIndexes, sv, internal, metrics) + rh := NewRowHelper(codec, tableDesc, deleteOnlyIndexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics) deleteOnlyHelper = &rh } ru := Updater{ - Helper: NewRowHelper(codec, tableDesc, includeIndexes, sv, internal, metrics), + Helper: NewRowHelper(codec, tableDesc, includeIndexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics), DeleteHelper: deleteOnlyHelper, FetchCols: requestedCols, FetchColIDtoRowIndex: ColIDtoRowIndexFromCols(requestedCols), diff --git a/pkg/sql/rowenc/index_encoding.go b/pkg/sql/rowenc/index_encoding.go index 5fee2780021a..59ed0134682e 100644 --- a/pkg/sql/rowenc/index_encoding.go +++ b/pkg/sql/rowenc/index_encoding.go @@ -1283,28 +1283,19 @@ func MakeNullPKError( return errors.AssertionFailedf("NULL value in unknown key column") } -// EncodeSecondaryIndex encodes key/values for a secondary -// index. colMap maps descpb.ColumnIDs to indices in `values`. This returns a -// slice of IndexEntry. includeEmpty controls whether or not -// EncodeSecondaryIndex should return k/v's that contain -// empty values. For forward indexes the returned list of -// index entries is in family sorted order. -func EncodeSecondaryIndex( +// EncodeSecondaryIndexKey encodes the key for a secondary index. The 'colMap' +// maps descpb.ColumnIDs to positions in 'values'. This function returns a slice +// of byte arrays representing the key values. +func EncodeSecondaryIndexKey( ctx context.Context, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, secondaryIndex catalog.Index, colMap catalog.TableColMap, values []tree.Datum, - includeEmpty bool, -) ([]IndexEntry, error) { +) ([][]byte, bool, error) { secondaryIndexKeyPrefix := MakeIndexKeyPrefix(codec, tableDesc.GetID(), secondaryIndex.GetID()) - // Use the primary key encoding for covering indexes. - if secondaryIndex.GetEncodingType() == catenumpb.PrimaryIndexEncoding { - return EncodePrimaryIndex(codec, tableDesc, secondaryIndex, colMap, values, includeEmpty) - } - var containsNull = false var secondaryKeys [][]byte var err error @@ -1317,6 +1308,30 @@ func EncodeSecondaryIndex( secondaryKeys = [][]byte{secondaryIndexKey} } + return secondaryKeys, containsNull, err +} + +// EncodeSecondaryIndex encodes key/values for a secondary +// index. colMap maps descpb.ColumnIDs to indices in `values`. This returns a +// slice of IndexEntry. includeEmpty controls whether or not +// EncodeSecondaryIndex should return k/v's that contain +// empty values. For forward indexes the returned list of +// index entries is in family sorted order. +func EncodeSecondaryIndex( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + secondaryIndex catalog.Index, + colMap catalog.TableColMap, + values []tree.Datum, + includeEmpty bool, +) ([]IndexEntry, error) { + // Use the primary key encoding for covering indexes. + if secondaryIndex.GetEncodingType() == catenumpb.PrimaryIndexEncoding { + return EncodePrimaryIndex(codec, tableDesc, secondaryIndex, colMap, values, includeEmpty) + } + + secondaryKeys, containsNull, err := EncodeSecondaryIndexKey(ctx, codec, tableDesc, secondaryIndex, colMap, values) if err != nil { return []IndexEntry{}, err } From 5fb741d1f8e627986c1c5d65182800ededd24627 Mon Sep 17 00:00:00 2001 From: Matt White Date: Wed, 28 Aug 2024 11:29:05 -0700 Subject: [PATCH 11/13] sql: support INSERT w/o arbiters under non-serializable isolations For unique indexes with a single ENUM implicit partition column (e.g. regional by row), write a tombstone to each partition to ensure uniqueness when isolation level is not serializable. These tombstones get write intents, which we use as row locks because the the KV layer does not currently support locking non-existant rows. This patch also reverts previous work on the insert fast path to write tombstones because those tombstones are now being written by the row inserter. Part of: #110873 Release note (sql change): REGIONAL BY ROW and PARTITION BY ALL tables can now be inserted into under non-serializable isolation levels so long as there is no ON CONFLICT clause on the statement. --- .../crosscluster/logical/lww_kv_processor.go | 2 +- .../partitioning_implicit_read_committed | 185 ++++++++++++++++++ pkg/ccl/logictestccl/tests/local/BUILD.bazel | 2 +- .../tests/local/generated_test.go | 7 + pkg/sql/colenc/encode_test.go | 2 +- pkg/sql/create_table.go | 1 + pkg/sql/distsql_spec_exec_factory.go | 2 + pkg/sql/insert_fast_path.go | 33 +--- pkg/sql/opt/cat/table.go | 4 + pkg/sql/opt/exec/execbuilder/mutation.go | 5 +- pkg/sql/opt/exec/execbuilder/relational.go | 16 +- pkg/sql/opt/exec/explain/emit.go | 44 +++-- pkg/sql/opt/exec/factory.opt | 2 + pkg/sql/opt/ops/mutation.opt | 4 + pkg/sql/opt/optbuilder/mutation_builder.go | 27 +-- .../opt/optbuilder/mutation_builder_unique.go | 8 + pkg/sql/opt/testutils/testcat/test_catalog.go | 15 +- pkg/sql/opt_catalog.go | 29 ++- pkg/sql/opt_exec_factory.go | 17 ++ pkg/sql/row/helper.go | 6 +- pkg/sql/row/inserter.go | 41 +++- pkg/sql/row/row_converter.go | 1 + pkg/sql/row/updater.go | 2 +- 23 files changed, 358 insertions(+), 97 deletions(-) create mode 100644 pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go index eb3758a960e1..3da95396aaca 100644 --- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go @@ -332,7 +332,7 @@ func newKVTableWriter( // TODO(dt): pass these some sort fo flag to have them use versions of CPut // or a new LWW KV API. For now they're not detecting/handling conflicts. - ri, err := row.MakeInserter(ctx, nil, evalCtx.Codec, tableDesc, writeCols, a, &evalCtx.Settings.SV, internal, nil) + ri, err := row.MakeInserter(ctx, nil, evalCtx.Codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, writeCols, a, &evalCtx.Settings.SV, internal, nil) if err != nil { return nil, err } diff --git a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed new file mode 100644 index 000000000000..60be771d48b1 --- /dev/null +++ b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed @@ -0,0 +1,185 @@ +# LogicTest: local + +statement ok +CREATE TYPE part_type AS ENUM ('one', 'two', 'three', 'four', 'five'); + +statement ok +SET experimental_enable_implicit_column_partitioning = true + +statement ok +SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED + +statement ok +CREATE TABLE t_double ( + pk INT PRIMARY KEY, + a part_type, + b part_type, + c INT, + UNIQUE INDEX (c) +) PARTITION ALL BY LIST (a, b) ( + PARTITION one VALUES IN (('one', 'one')), + PARTITION two VALUES IN (('two', 'two')) +) + +# Test that we don't allow writes to tables with multiple partition columns. +statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +INSERT INTO t_double VALUES (1, 'one', 'one', 10), (2, 'two', 'two', 20) + +statement ok +CREATE TABLE t_int ( + pk INT PRIMARY KEY, + a INT NOT NULL, + c INT, + UNIQUE INDEX (c) +) PARTITION ALL BY LIST (a) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +) + +# Test that we don't allow writes to tables with non-enum partition columns. +statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +INSERT INTO t_int VALUES (1, 1, 10), (2, 2, 20) + +statement ok +CREATE TABLE t ( + pk INT PRIMARY KEY, + a part_type, + b INT, + c INT, + d INT, + j JSON, + UNIQUE INDEX (c), + FAMILY (pk, a, b, c, d, j) +) PARTITION ALL BY LIST(a) ( + PARTITION one VALUES IN ('one'), + PARTITION two VALUES IN ('two'), + PARTITION three VALUES IN ('three'), + PARTITION four VALUES IN ('four'), + PARTITION five VALUES IN ('five') +) + +query T +SELECT create_statement FROM [SHOW CREATE TABLE t] +---- +CREATE TABLE public.t ( + pk INT8 NOT NULL, + a public.part_type NOT NULL, + b INT8 NULL, + c INT8 NULL, + d INT8 NULL, + j JSONB NULL, + CONSTRAINT t_pkey PRIMARY KEY (pk ASC), + UNIQUE INDEX t_c_key (c ASC), + FAMILY fam_0_pk_a_b_c_d_j (pk, a, b, c, d, j) +) PARTITION ALL BY LIST (a) ( + PARTITION one VALUES IN (('one')), + PARTITION two VALUES IN (('two')), + PARTITION three VALUES IN (('three')), + PARTITION four VALUES IN (('four')), + PARTITION five VALUES IN (('five')) +) +-- Warning: Partitioned table with no zone configurations. + +query T +EXPLAIN (OPT, CATALOG) SELECT * FROM t +---- +TABLE t + ├── pk int not null + ├── a part_type not null + ├── b int + ├── c int + ├── d int + ├── j jsonb + ├── crdb_internal_mvcc_timestamp decimal [hidden] [system] + ├── tableoid oid [hidden] [system] + ├── crdb_internal_origin_id int4 [hidden] [system] + ├── crdb_internal_origin_timestamp decimal [hidden] [system] + ├── FAMILY fam_0_pk_a_b_c_d_j (pk, a, b, c, d, j) + ├── CHECK (a IN (x'20':::@100106, x'40':::@100106, x'80':::@100106, x'a0':::@100106, x'c0':::@100106)) + ├── PRIMARY INDEX t_pkey + │ ├── a part_type not null (implicit) + │ ├── pk int not null + │ └── partitions + │ ├── one + │ │ └── partition by list prefixes + │ │ └── ('one') + │ ├── two + │ │ └── partition by list prefixes + │ │ └── ('two') + │ ├── three + │ │ └── partition by list prefixes + │ │ └── ('three') + │ ├── four + │ │ └── partition by list prefixes + │ │ └── ('four') + │ └── five + │ └── partition by list prefixes + │ └── ('five') + ├── UNIQUE INDEX t_c_key + │ ├── a part_type not null (implicit) + │ ├── c int + │ ├── pk int not null (storing) + │ └── partitions + │ ├── one + │ │ └── partition by list prefixes + │ │ └── ('one') + │ ├── two + │ │ └── partition by list prefixes + │ │ └── ('two') + │ ├── three + │ │ └── partition by list prefixes + │ │ └── ('three') + │ ├── four + │ │ └── partition by list prefixes + │ │ └── ('four') + │ └── five + │ └── partition by list prefixes + │ └── ('five') + ├── UNIQUE WITHOUT INDEX (pk) + └── UNIQUE WITHOUT INDEX (c) +scan t + └── check constraint expressions + └── a IN ('one', 'two', 'three', 'four', 'five') + +statement ok +SET tracing = kv + +statement ok +INSERT INTO t VALUES (1, 'two', 3, 4, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_pkey" +INSERT INTO t VALUES (1, 'one', 3, 6, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" +INSERT INTO t VALUES (2, 'three', 3, 4, 5) + +query T +SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'CPut%' +---- +CPut /Table/110/1/"@"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/5 +CPut /Table/110/1/" "/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) +CPut /Table/110/1/" "/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/"@"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/"@"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/6/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/2/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/5 +CPut /Table/110/1/" "/2/0 -> nil (tombstone) +CPut /Table/110/1/"@"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/2/0 -> nil (tombstone) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"@"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) diff --git a/pkg/ccl/logictestccl/tests/local/BUILD.bazel b/pkg/ccl/logictestccl/tests/local/BUILD.bazel index 035b110f5544..341da4a02656 100644 --- a/pkg/ccl/logictestccl/tests/local/BUILD.bazel +++ b/pkg/ccl/logictestccl/tests/local/BUILD.bazel @@ -9,7 +9,7 @@ go_test( "//pkg/ccl/logictestccl:testdata", # keep ], exec_properties = {"test.Pool": "large"}, - shard_count = 47, + shard_count = 48, tags = ["cpu:1"], deps = [ "//pkg/base", diff --git a/pkg/ccl/logictestccl/tests/local/generated_test.go b/pkg/ccl/logictestccl/tests/local/generated_test.go index 6b277b0617d9..da317842ba43 100644 --- a/pkg/ccl/logictestccl/tests/local/generated_test.go +++ b/pkg/ccl/logictestccl/tests/local/generated_test.go @@ -208,6 +208,13 @@ func TestCCLLogic_partitioning_implicit( runCCLLogicTest(t, "partitioning_implicit") } +func TestCCLLogic_partitioning_implicit_read_committed( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runCCLLogicTest(t, "partitioning_implicit_read_committed") +} + func TestCCLLogic_partitioning_index( t *testing.T, ) { diff --git a/pkg/sql/colenc/encode_test.go b/pkg/sql/colenc/encode_test.go index 3fe0998df858..c6dee5ea04f4 100644 --- a/pkg/sql/colenc/encode_test.go +++ b/pkg/sql/colenc/encode_test.go @@ -600,7 +600,7 @@ func buildRowKVs( sv *settings.Values, codec keys.SQLCodec, ) (kvs, error) { - inserter, err := row.MakeInserter(context.Background(), nil /*txn*/, codec, desc, cols, nil, sv, false, nil) + inserter, err := row.MakeInserter(context.Background(), nil /*txn*/, codec, desc, nil /* uniqueWithTombstoneIndexes */, cols, nil, sv, false, nil) if err != nil { return kvs{}, err } diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index f9c69079c065..9e4afd5d1a11 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -540,6 +540,7 @@ func (n *createTableNode) startExec(params runParams) error { params.p.txn, params.ExecCfg().Codec, desc.ImmutableCopy().(catalog.TableDescriptor), + nil, /* uniqueWithTombstoneIndexes */ desc.PublicColumns(), &tree.DatumAlloc{}, ¶ms.ExecCfg().Settings.SV, diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 1416edd5ae55..4a69d09dcafc 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -969,6 +969,7 @@ func (e *distSQLSpecExecFactory) ConstructInsert( insertCols exec.TableColumnOrdinalSet, returnCols exec.TableColumnOrdinalSet, checkCols exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: insert") @@ -982,6 +983,7 @@ func (e *distSQLSpecExecFactory) ConstructInsertFastPath( checkCols exec.CheckOrdinalSet, fkChecks []exec.InsertFastPathCheck, uniqChecks []exec.InsertFastPathCheck, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: insert fast path") diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 8265999c9cfb..9e781c515f3d 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -193,16 +193,12 @@ func (r *insertFastPathRun) addUniqChecks( combinedRow = make(tree.Datums, len(templateRow)) } copy(combinedRow, templateRow) - isInputRow := true for j := 0; j < len(c.InsertCols); j++ { // Datums from single-table constraints are already present in // DatumsFromConstraint. Fill in other values from the input row. if combinedRow[c.InsertCols[j]] == nil { combinedRow[c.InsertCols[j]] = inputRow[c.InsertCols[j]] } - if combinedRow[c.InsertCols[j]] != inputRow[c.InsertCols[j]] { - isInputRow = false - } } if !forTesting { span, err := c.generateSpan(combinedRow) @@ -210,29 +206,14 @@ func (r *insertFastPathRun) addUniqChecks( return nil, err } reqIdx := len(r.uniqBatch.Requests) - // Since predicate locks are not yet supported by the KV layer, we - // emulate them by writing a tombstone to the other partitions instead - // of scanning and locking. These tombstones are added to the insert - // batch instead of the uniqueness check batch because they do not - // require any post-processing (the KV generated conflict message is - // what we want). - if c.Locking.Form == tree.LockPredicate { - if !isInputRow { - if r.traceKV { - log.VEventf(ctx, 2, "CPut %s (LockPredicate)", span) - } - r.ti.putter.CPut(span.Key, nil, nil) - } - } else { - if r.traceKV { - log.VEventf(ctx, 2, "UniqScan %s", span) - } - r.uniqBatch.Requests = append(r.uniqBatch.Requests, kvpb.RequestUnion{}) - // TODO(msirek): Batch-allocate the kvpb.ScanRequests outside the loop. - r.uniqBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span), - }) + if r.traceKV { + log.VEventf(ctx, 2, "UniqScan %s", span) } + r.uniqBatch.Requests = append(r.uniqBatch.Requests, kvpb.RequestUnion{}) + // TODO(msirek): Batch-allocate the kvpb.ScanRequests outside the loop. + r.uniqBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(span), + }) r.uniqSpanInfo = append(r.uniqSpanInfo, insertFastPathFKUniqSpanInfo{ check: c, rowIdx: rowIdx, diff --git a/pkg/sql/opt/cat/table.go b/pkg/sql/opt/cat/table.go index a9fa530745c1..ffa6fb80cecd 100644 --- a/pkg/sql/opt/cat/table.go +++ b/pkg/sql/opt/cat/table.go @@ -363,6 +363,10 @@ type UniqueConstraint interface { // WithoutIndex is true if this unique constraint is not enforced by an index. WithoutIndex() bool + // CanUseTombstones is true if this unique constraint can be enforced by + // writing tombstones to all partitions. + CanUseTombstones() bool + // Validated is true if the constraint is validated (i.e. we know that the // existing data satisfies the constraint). It is possible to set up a unique // constraint on existing tables without validating it, in which case we diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 782cff220141..77ce3e4ba98d 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -110,6 +110,7 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (_ execPlan, outputCols colO insertOrds, returnOrds, checkOrds, + ins.UniqueWithTombstoneIndexes, b.allowAutoCommit && len(ins.UniqueChecks) == 0 && len(ins.FKChecks) == 0 && len(ins.FKCascades) == 0, ) @@ -184,8 +185,7 @@ func (b *Builder) tryBuildFastPathInsert( // If there is a unique index with implicit partitioning columns, the fast // path can write tombstones to lock the row in all partitions. - allowPredicateLocks := execFastPathCheck.ReferencedIndex.ImplicitPartitioningColumnCount() > 0 - locking, err := b.buildLockingImpl(ins.Table, c.Locking, allowPredicateLocks) + locking, err := b.buildLocking(ins.Table, c.Locking) if err != nil { return execPlan{}, colOrdMap{}, false, err } @@ -339,6 +339,7 @@ func (b *Builder) tryBuildFastPathInsert( checkOrds, fkChecks, uniqChecks, + ins.UniqueWithTombstoneIndexes, b.allowAutoCommit, ) if err != nil { diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index db6e17ca335c..ecdc5b305c1a 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -3107,9 +3107,7 @@ func (b *Builder) buildZigzagJoin( return b.applySimpleProject(res, outputCols, join, join.Cols, join.ProvidedPhysical().Ordering) } -func (b *Builder) buildLockingImpl( - toLock opt.TableID, locking opt.Locking, allowPredicateLocks bool, -) (opt.Locking, error) { +func (b *Builder) buildLocking(toLock opt.TableID, locking opt.Locking) (opt.Locking, error) { if b.forceForUpdateLocking.Contains(int(toLock)) { locking = locking.Max(forUpdateLocking) } @@ -3120,16 +3118,11 @@ func (b *Builder) buildLockingImpl( "cannot execute SELECT %s in a read-only transaction", locking.Strength.String(), ) } - if !allowPredicateLocks && locking.Form == tree.LockPredicate { + if locking.Form == tree.LockPredicate { return opt.Locking{}, unimplemented.NewWithIssuef( 110873, "explicit unique checks are not yet supported under read committed isolation", ) } - if locking.Form == tree.LockPredicate && locking.WaitPolicy != tree.LockWaitBlock { - return opt.Locking{}, unimplemented.NewWithIssuef( - 126592, "non-blocking predicate locks are not yet supported", - ) - } // Check if we can actually use shared locks here, or we need to use // non-locking reads instead. if locking.Strength == tree.ForShare || locking.Strength == tree.ForKeyShare { @@ -3147,11 +3140,6 @@ func (b *Builder) buildLockingImpl( return locking, nil } -// TODO (#126592): Delete this function once predicate locks are universally supported. -func (b *Builder) buildLocking(toLock opt.TableID, locking opt.Locking) (opt.Locking, error) { - return b.buildLockingImpl(toLock, locking, false /* allowPredicateLocks */) -} - func (b *Builder) buildMax1Row( max1Row *memo.Max1RowExpr, ) (_ execPlan, outputCols colOrdMap, err error) { diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index b8c371fa508a..b9593be68d7c 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -32,6 +32,20 @@ func Emit(ctx context.Context, plan *Plan, ob *OutputBuilder, spanFormatFn SpanF return emitInternal(ctx, plan, ob, spanFormatFn, nil /* visitedFKsByCascades */) } +// joinIndexNames emits a string of index names on table 'table' as specified in +// 'ords', with each name separated by 'sep'. +func joinIndexNames(table cat.Table, ords cat.IndexOrdinals, sep string) string { + var sb strings.Builder + for i, idx := range ords { + index := table.Index(idx) + if i > 0 { + sb.WriteString(sep) + } + sb.WriteString(string(index.Name())) + } + return sb.String() +} + // - visitedFKsByCascades is updated on recursive calls for each cascade plan. // Can be nil if the plan doesn't have any cascades. In this map the key is the // "id" of the FK constraint that we construct as OriginTableID || Name. @@ -872,16 +886,8 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if a.AutoCommit { ob.Attr("auto commit", "") } - if len(a.ArbiterIndexes) > 0 { - var sb strings.Builder - for i, idx := range a.ArbiterIndexes { - index := a.Table.Index(idx) - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(string(index.Name())) - } - ob.Attr("arbiter indexes", sb.String()) + if arbind := joinIndexNames(a.Table, a.ArbiterIndexes, ", "); arbind != "" { + ob.Attr("arbiter indexes", arbind) } if len(a.ArbiterConstraints) > 0 { var sb strings.Builder @@ -894,6 +900,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { } ob.Attr("arbiter constraints", sb.String()) } + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } case insertFastPathOp: a := n.args.(*insertFastPathArgs) @@ -917,6 +926,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { ) e.emitLockingPolicyWithPrefix("uniqueness check ", uniq.Locking) } + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } if len(a.Rows) > 0 { e.emitTuples(tree.RawRows(a.Rows), len(a.Rows[0])) } @@ -931,16 +943,8 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if a.AutoCommit { ob.Attr("auto commit", "") } - if len(a.ArbiterIndexes) > 0 { - var sb strings.Builder - for i, idx := range a.ArbiterIndexes { - index := a.Table.Index(idx) - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(string(index.Name())) - } - ob.Attr("arbiter indexes", sb.String()) + if arbind := joinIndexNames(a.Table, a.ArbiterIndexes, ", "); arbind != "" { + ob.Attr("arbiter indexes", arbind) } if len(a.ArbiterConstraints) > 0 { var sb strings.Builder diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index bbe989992bbc..d92d6a2af7a5 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -452,6 +452,7 @@ define Insert { InsertCols exec.TableColumnOrdinalSet ReturnCols exec.TableColumnOrdinalSet CheckCols exec.CheckOrdinalSet + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. # This is false when executing inside an explicit transaction, or there are @@ -483,6 +484,7 @@ define InsertFastPath { CheckCols exec.CheckOrdinalSet FkChecks []exec.InsertFastPathCheck UniqChecks []exec.InsertFastPathCheck + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. # This is false when executing inside an explicit transaction. diff --git a/pkg/sql/opt/ops/mutation.opt b/pkg/sql/opt/ops/mutation.opt index 2b24a3133db3..90bdb34a9c22 100644 --- a/pkg/sql/opt/ops/mutation.opt +++ b/pkg/sql/opt/ops/mutation.opt @@ -176,6 +176,10 @@ define MutationPrivate { # FKCascades stores metadata necessary for building cascading queries. FKCascades FKCascades + + # Unique indexes where uniqueness will be ensured by writing tombstones to + # all partitions. + UniqueWithTombstoneIndexes IndexOrdinals } # Update evaluates a relational input expression that fetches existing rows from diff --git a/pkg/sql/opt/optbuilder/mutation_builder.go b/pkg/sql/opt/optbuilder/mutation_builder.go index 51890bac8ab3..1181344b2d49 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder.go +++ b/pkg/sql/opt/optbuilder/mutation_builder.go @@ -209,6 +209,10 @@ type mutationBuilder struct { // inputForInsertExpr stores the result of outscope.expr from the most // recent call to buildInputForInsert. inputForInsertExpr memo.RelExpr + + // uniqueWithTombstoneIndexes is the set of unique indexes that ensure uniqueness + // by writing tombstones to all partitions + uniqueWithTombstoneIndexes intsets.Fast } func (mb *mutationBuilder) init(b *Builder, opName string, tab cat.Table, alias tree.TableName) { @@ -1057,17 +1061,18 @@ func (mb *mutationBuilder) makeMutationPrivate(needResults bool) *memo.MutationP } private := &memo.MutationPrivate{ - Table: mb.tabID, - InsertCols: checkEmptyList(mb.insertColIDs), - FetchCols: checkEmptyList(mb.fetchColIDs), - UpdateCols: checkEmptyList(mb.updateColIDs), - CanaryCol: mb.canaryColID, - ArbiterIndexes: mb.arbiters.IndexOrdinals(), - ArbiterConstraints: mb.arbiters.UniqueConstraintOrdinals(), - CheckCols: checkEmptyList(mb.checkColIDs), - PartialIndexPutCols: checkEmptyList(mb.partialIndexPutColIDs), - PartialIndexDelCols: checkEmptyList(mb.partialIndexDelColIDs), - FKCascades: mb.cascades, + Table: mb.tabID, + InsertCols: checkEmptyList(mb.insertColIDs), + FetchCols: checkEmptyList(mb.fetchColIDs), + UpdateCols: checkEmptyList(mb.updateColIDs), + CanaryCol: mb.canaryColID, + ArbiterIndexes: mb.arbiters.IndexOrdinals(), + ArbiterConstraints: mb.arbiters.UniqueConstraintOrdinals(), + CheckCols: checkEmptyList(mb.checkColIDs), + PartialIndexPutCols: checkEmptyList(mb.partialIndexPutColIDs), + PartialIndexDelCols: checkEmptyList(mb.partialIndexDelColIDs), + FKCascades: mb.cascades, + UniqueWithTombstoneIndexes: mb.uniqueWithTombstoneIndexes.Ordered(), } // If we didn't actually plan any checks or cascades, don't buffer the input. diff --git a/pkg/sql/opt/optbuilder/mutation_builder_unique.go b/pkg/sql/opt/optbuilder/mutation_builder_unique.go index 0d76b4f9ad00..9003975d432c 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_unique.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_unique.go @@ -54,6 +54,14 @@ func (mb *mutationBuilder) buildUniqueChecksForInsert() { if mb.uniqueConstraintIsArbiter(i) { continue } + + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } + if h.init(mb, i) { uniqueChecksItem, fastPathUniqueChecksItem := h.buildInsertionCheck(buildFastPathCheck) if fastPathUniqueChecksItem == nil { diff --git a/pkg/sql/opt/testutils/testcat/test_catalog.go b/pkg/sql/opt/testutils/testcat/test_catalog.go index 0c573b2949d0..44f400856ed4 100644 --- a/pkg/sql/opt/testutils/testcat/test_catalog.go +++ b/pkg/sql/opt/testutils/testcat/test_catalog.go @@ -1548,12 +1548,13 @@ func (fk *ForeignKeyConstraint) UpdateReferenceAction() tree.ReferenceAction { // UniqueConstraint implements cat.UniqueConstraint. See that interface // for more information on the fields. type UniqueConstraint struct { - name string - tabID cat.StableID - columnOrdinals []int - predicate string - withoutIndex bool - validated bool + name string + tabID cat.StableID + columnOrdinals []int + predicate string + withoutIndex bool + canUseTombstones bool + validated bool } var _ cat.UniqueConstraint = &UniqueConstraint{} @@ -1594,6 +1595,8 @@ func (u *UniqueConstraint) WithoutIndex() bool { return u.withoutIndex } +func (u *UniqueConstraint) CanUseTombstones() bool { return u.canUseTombstones } + // Validated is part of the cat.UniqueConstraint interface. func (u *UniqueConstraint) Validated() bool { return u.validated diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 7044ad712958..e6d7e26dbe6b 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -1006,13 +1006,21 @@ func newOptTable( if idx.IsUnique() { if idx.ImplicitPartitioningColumnCount() > 0 { - // Add unique constraints for implicitly partitioned unique indexes. + // Add unique constraints for implicitly partitioned unique indexes. If + // there is a single implicit column of ENUM type (e.g. an implicit RBR + // column), then we can ensure uniqueness under non-Serializable + // isolation levels by writing tombstones. We assume that the partition + // column is the first column of the index. + partitionColumn := catalog.FindColumnByID(desc, idx.GetKeyColumnID(0 /* columnOrdinal */)) + canUseTombstones := idx.ImplicitPartitioningColumnCount() == 1 && + partitionColumn.GetType().Family() == types.EnumFamily ot.uniqueConstraints = append(ot.uniqueConstraints, optUniqueConstraint{ - name: idx.GetName(), - table: ot.ID(), - columns: idx.IndexDesc().KeyColumnIDs[idx.IndexDesc().ExplicitColumnStartIdx():], - withoutIndex: true, - predicate: idx.GetPredicate(), + name: idx.GetName(), + table: ot.ID(), + columns: idx.IndexDesc().KeyColumnIDs[idx.IndexDesc().ExplicitColumnStartIdx():], + withoutIndex: true, + canUseTombstones: canUseTombstones, + predicate: idx.GetPredicate(), // TODO(rytaft): will we ever support an unvalidated unique constraint // here? validity: descpb.ConstraintValidity_Validated, @@ -1999,8 +2007,9 @@ type optUniqueConstraint struct { columns []descpb.ColumnID predicate string - withoutIndex bool - validity descpb.ConstraintValidity + withoutIndex bool + canUseTombstones bool + validity descpb.ConstraintValidity uniquenessGuaranteedByAnotherIndex bool } @@ -2045,6 +2054,10 @@ func (u *optUniqueConstraint) WithoutIndex() bool { return u.withoutIndex } +func (u *optUniqueConstraint) CanUseTombstones() bool { + return u.canUseTombstones +} + // Validated is part of the cat.UniqueConstraint interface. func (u *optUniqueConstraint) Validated() bool { return u.validity == descpb.ConstraintValidity_Validated diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index af638dc31bc3..993e84249ff3 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1352,6 +1352,18 @@ func (ef *execFactory) ConstructShowTrace(typ tree.ShowTraceType, compact bool) return node, nil } +func ordinalsToIndexes(table cat.Table, ords cat.IndexOrdinals) []catalog.Index { + if ords == nil { + return nil + } + + retval := make([]catalog.Index, len(ords)) + for i, idx := range ords { + retval[i] = table.Index(idx).(*optIndex).idx + } + return retval +} + func (ef *execFactory) ConstructInsert( input exec.Node, table cat.Table, @@ -1360,6 +1372,7 @@ func (ef *execFactory) ConstructInsert( insertColOrdSet exec.TableColumnOrdinalSet, returnColOrdSet exec.TableColumnOrdinalSet, checkOrdSet exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // Derive insert table and column descriptors. @@ -1374,6 +1387,7 @@ func (ef *execFactory) ConstructInsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), cols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1432,6 +1446,7 @@ func (ef *execFactory) ConstructInsertFastPath( checkOrdSet exec.CheckOrdinalSet, fkChecks []exec.InsertFastPathCheck, uniqChecks []exec.InsertFastPathCheck, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // Derive insert table and column descriptors. @@ -1446,6 +1461,7 @@ func (ef *execFactory) ConstructInsertFastPath( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), cols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1638,6 +1654,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + nil, /* uniqueWithTombstoneIndexes */ insertCols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index 1f68013399f5..633c17ce7887 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -67,14 +67,14 @@ var maxRowSizeErr = settings.RegisterByteSizeSetting( settings.WithPublic, ) -// Per-index data for writing tombstones to enforce a uniqueness constraint +// Per-index data for writing tombstones to enforce a uniqueness constraint. type uniqueWithTombstoneEntry struct { // implicitPartitionKeyValues contains the potential values for the - // partitioning column + // partitioning column. implicitPartitionKeyVals []tree.Datum // tmpTombstones contains the tombstones generated for this index by the last - // call to encodeTombstonesForIndex + // call to encodeTombstonesForIndex. tmpTombstones [][]byte } diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index edda591c848e..e9a5a557892a 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -41,6 +41,7 @@ func MakeInserter( txn *kv.Txn, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, + uniqueWithTombstoneIndexes []catalog.Index, insertCols []catalog.Column, alloc *tree.DatumAlloc, sv *settings.Values, @@ -49,7 +50,7 @@ func MakeInserter( ) (Inserter, error) { ri := Inserter{ Helper: NewRowHelper( - codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics, + codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), uniqueWithTombstoneIndexes, sv, internal, metrics, ), InsertCols: insertCols, @@ -114,6 +115,29 @@ func insertInvertedPutFn( b.InitPut(key, value, false) } +func writeTombstones( + ctx context.Context, + helper *RowHelper, + index catalog.Index, + b Putter, + insertColIDtoRowIndex catalog.TableColMap, + values []tree.Datum, + traceKV bool, +) error { + tombstones, err := helper.encodeTombstonesForIndex(ctx, index, insertColIDtoRowIndex, values) + if err != nil { + return err + } + for _, tombstone := range tombstones { + k := roachpb.Key(keys.MakeFamilyKey(tombstone, 0 /* famID */)) + if traceKV { + log.VEventfDepth(ctx, 1, 2, "CPut %s -> nil (tombstone)", k) + } + b.CPut(k, nil, nil /* expValue */) + } + return nil +} + // InsertRow adds to the batch the kv operations necessary to insert a table row // with the given values. func (ri *Inserter) InsertRow( @@ -163,11 +187,15 @@ func (ri *Inserter) InsertRow( return err } + if err := writeTombstones(ctx, &ri.Helper, ri.Helper.TableDesc.GetPrimaryIndex(), b, ri.InsertColIDtoRowIndex, values, traceKV); err != nil { + return err + } + putFn = insertInvertedPutFn // For determinism, add the entries for the secondary indexes in the same // order as they appear in the helper. - for idx := range ri.Helper.Indexes { + for idx, index := range ri.Helper.Indexes { entries, ok := secondaryIndexEntries[ri.Helper.Indexes[idx]] if ok { for i := range entries { @@ -180,8 +208,15 @@ func (ri *Inserter) InsertRow( putFn(ctx, b, &e.Key, &e.Value, traceKV) } } + + // If a row does not satisfy a partial index predicate, it will have no + // entries, implying that we should also not write tombstones. + if len(entries) > 0 { + if err := writeTombstones(ctx, &ri.Helper, index, b, ri.InsertColIDtoRowIndex, values, traceKV); err != nil { + return err + } + } } } - return nil } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index f27e7b764ca6..e874fc2379a6 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -365,6 +365,7 @@ func NewDatumRowConverter( nil, /* txn */ evalCtx.Codec, tableDesc, + nil, /* uniqueWithTombstoneIndexes */ cols, &tree.DatumAlloc{}, &evalCtx.Settings.SV, diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 756e1a2d2810..80f28e17f83e 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -177,7 +177,7 @@ func MakeUpdater( var err error ru.rd = MakeDeleter(codec, tableDesc, requestedCols, sv, internal, metrics) if ru.ri, err = MakeInserter( - ctx, txn, codec, tableDesc, requestedCols, alloc, sv, internal, metrics, + ctx, txn, codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, requestedCols, alloc, sv, internal, metrics, ); err != nil { return Updater{}, err } From 9b07464bec3e056a2075d3166ef8c48199ac71e6 Mon Sep 17 00:00:00 2001 From: Matt White Date: Wed, 28 Aug 2024 12:53:11 -0700 Subject: [PATCH 12/13] sql: support UPDATE on RBR tables under non-serializable isolations For unique indexes with a single ENUM implicit partition column (e.g. regional by row), write a tombstone to each partition to ensure uniqueness when the isolation is not serializable. These tombstones get write intents, which we are using as row locks because the KV layer does not currently have the ability to lock non-existant keys. Fixes: #110873 Release note (sql change): REGIONAL BY ROW and PARTITION ALL BY tables can now be updated under non-SERIALIZABLE isolation levels. --- .../crosscluster/logical/lww_kv_processor.go | 2 +- .../partitioning_implicit_read_committed | 28 ++++++++++ .../logic_test/regional_by_row_read_committed | 53 ++++++++++++++++--- pkg/sql/backfill/backfill.go | 1 + pkg/sql/distsql_spec_exec_factory.go | 1 + pkg/sql/opt/exec/execbuilder/mutation.go | 1 + pkg/sql/opt/exec/explain/emit.go | 3 ++ pkg/sql/opt/exec/factory.opt | 1 + .../opt/optbuilder/mutation_builder_unique.go | 6 +++ pkg/sql/opt_exec_factory.go | 3 ++ pkg/sql/row/updater.go | 20 +++++-- 11 files changed, 109 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go index 3da95396aaca..7534cb764a81 100644 --- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go @@ -338,7 +338,7 @@ func newKVTableWriter( } rd := row.MakeDeleter(evalCtx.Codec, tableDesc, readCols, &evalCtx.Settings.SV, internal, nil) ru, err := row.MakeUpdater( - ctx, nil, evalCtx.Codec, tableDesc, readCols, writeCols, row.UpdaterDefault, a, &evalCtx.Settings.SV, internal, nil, + ctx, nil, evalCtx.Codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, readCols, writeCols, row.UpdaterDefault, a, &evalCtx.Settings.SV, internal, nil, ) if err != nil { return nil, err diff --git a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed index 60be771d48b1..9e035b0b789a 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed @@ -153,6 +153,15 @@ INSERT INTO t VALUES (1, 'one', 3, 6, 5) statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" INSERT INTO t VALUES (2, 'three', 3, 4, 5) +statement ok +INSERT INTO t VALUES (2, 'four', 3, 6, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_pkey" +UPDATE t SET pk = 1 WHERE c = 6; + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" +UPDATE t SET c = 4 WHERE pk = 2 + query T SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'CPut%' ---- @@ -183,3 +192,22 @@ CPut /Table/110/2/" "/4/0 -> nil (tombstone) CPut /Table/110/2/"@"/4/0 -> nil (tombstone) CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/2/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/" "/2/0 -> nil (tombstone) +CPut /Table/110/1/"@"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/2/0 -> nil (tombstone) +CPut /Table/110/2/" "/6/0 -> nil (tombstone) +CPut /Table/110/2/"@"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/6/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/" "/1/0 -> nil (tombstone) +CPut /Table/110/1/"@"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a (expecting does not exist) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"@"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed index b70febf8fa06..eac30ae7bf76 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed @@ -78,7 +78,7 @@ CREATE TABLE river ( LOCALITY REGIONAL BY ROW AS region statement ok -GRANT INSERT ON TABLE university TO testuser +GRANT INSERT, UPDATE, SELECT ON TABLE university TO testuser # Test non-conflicting INSERT. @@ -117,10 +117,15 @@ INSERT INTO university (name, mascot, postal_code) VALUES ('Thompson Rivers', 'wolves', 'V2C 0C8'), ('Evergreen State', 'geoducks', '98505') ON CONFLICT (mascot) DO NOTHING +# TODO(mw5h): Temporary until ON CONFLICT works +statement ok +INSERT INTO university (name, mascot, postal_code) VALUES ('Evergreen State', 'geoducks', '98505') + query TTT SELECT name, mascot, postal_code FROM university ORDER BY name ---- -Western Oregon wolves 97361 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation INSERT INTO volcano VALUES @@ -170,10 +175,10 @@ UPSERT INTO river VALUES ('us-east-1', 'Fraser', 'Salish Sea') # Test conflicting UPDATE. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" UPDATE university SET mascot = 'wolves' WHERE name = 'Evergreen State' -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement ok UPDATE volcano SET origin = 'Fought over Loowit and was transformed by Saghalie.' WHERE name = 'Mount St. Helens' statement error pgcode 23505 pq: duplicate key value violates unique constraint "city_pkey"\nDETAIL: Key \(name, state_or_province\)=\('Vancouver', 'BC'\) already exists\. @@ -232,5 +237,41 @@ awaitstatement conflict query TTT SELECT name, mascot, postal_code FROM university ORDER BY name ---- -CMU Scottie Dog 15213 -Western Oregon wolves 97361 +CMU Scottie Dog 15213 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 + +statement ok +INSERT INTO university VALUES ('Central Michigan University', 'Chippewas', '97858'); + +statement ok +UPDATE university SET name = 'Carnegie Mellon University' WHERE name = 'CMU'; + +statement ok +BEGIN + +statement ok +UPDATE university SET name = 'CMU' WHERE name = 'Carnegie Mellon University'; + +user testuser + +statement async conflict error pgcode 23505 pq: duplicate key value violates unique constraint "university_pkey" +UPDATE university SET name = 'CMU' WHERE name = 'Central Michigan University' + +user root + +statement ok +COMMIT + +awaitstatement conflict + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" +UPDATE university SET mascot = 'wolves' WHERE name = 'CMU' + +query TTT +SELECT name, mascot, postal_code FROM university ORDER BY name +---- +CMU Scottie Dog 15213 +Central Michigan University Chippewas 97858 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 97ce8c228b05..e9ac63607c47 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -297,6 +297,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( txn, cb.evalCtx.Codec, tableDesc, + nil, /* uniqueWithTombstoneIndexes */ cb.updateCols, requestedCols, row.UpdaterOnlyColumns, diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4a69d09dcafc..4cd18e6e8369 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -997,6 +997,7 @@ func (e *distSQLSpecExecFactory) ConstructUpdate( returnCols exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, passthrough colinfo.ResultColumns, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: update") diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 77ce3e4ba98d..b46bb271f41e 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -441,6 +441,7 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (_ execPlan, outputCols colO returnColOrds, checkOrds, passthroughCols, + upd.UniqueWithTombstoneIndexes, b.allowAutoCommit && len(upd.UniqueChecks) == 0 && len(upd.FKChecks) == 0 && len(upd.FKCascades) == 0, ) diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index b9593be68d7c..6763ff4f0332 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -961,6 +961,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { case updateOp: a := n.args.(*updateArgs) ob.Attrf("table", "%s", a.Table.Name()) + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } ob.Attr("set", printColumns(tableColumns(a.Table, a.UpdateCols))) if a.AutoCommit { ob.Attr("auto commit", "") diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index d92d6a2af7a5..5c1210a07cd7 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -520,6 +520,7 @@ define Update { ReturnCols exec.TableColumnOrdinalSet Checks exec.CheckOrdinalSet Passthrough colinfo.ResultColumns + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. AutoCommit bool diff --git a/pkg/sql/opt/optbuilder/mutation_builder_unique.go b/pkg/sql/opt/optbuilder/mutation_builder_unique.go index 9003975d432c..f7fde1d64df4 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_unique.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_unique.go @@ -102,6 +102,12 @@ func (mb *mutationBuilder) buildUniqueChecksForUpdate() { if !mb.uniqueColsUpdated(i) { continue } + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } if h.init(mb, i) { // The insertion check works for updates too since it simply checks that // the unique columns in the newly inserted or updated rows do not match diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 993e84249ff3..9ab2e30c0bde 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1540,6 +1540,7 @@ func (ef *execFactory) ConstructUpdate( returnColOrdSet exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, passthrough colinfo.ResultColumns, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // TODO(radu): the execution code has an annoying limitation that the fetch @@ -1567,6 +1568,7 @@ func (ef *execFactory) ConstructUpdate( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), updateCols, fetchCols, row.UpdaterDefault, @@ -1671,6 +1673,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + nil, /* uniqueWithTombstoneIndexes */ updateCols, fetchCols, row.UpdaterDefault, diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 80f28e17f83e..55cb1e782c0b 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/unique" "github.com/cockroachdb/errors" @@ -75,6 +76,7 @@ func MakeUpdater( txn *kv.Txn, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, + uniqueWithTombstoneIndexes []catalog.Index, updateCols []catalog.Column, requestedCols []catalog.Column, updateType rowUpdaterType, @@ -161,7 +163,7 @@ func MakeUpdater( } ru := Updater{ - Helper: NewRowHelper(codec, tableDesc, includeIndexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics), + Helper: NewRowHelper(codec, tableDesc, includeIndexes, uniqueWithTombstoneIndexes, sv, internal, metrics), DeleteHelper: deleteOnlyHelper, FetchCols: requestedCols, FetchColIDtoRowIndex: ColIDtoRowIndexFromCols(requestedCols), @@ -177,7 +179,7 @@ func MakeUpdater( var err error ru.rd = MakeDeleter(codec, tableDesc, requestedCols, sv, internal, metrics) if ru.ri, err = MakeInserter( - ctx, txn, codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, requestedCols, alloc, sv, internal, metrics, + ctx, txn, codec, tableDesc, uniqueWithTombstoneIndexes, requestedCols, alloc, sv, internal, metrics, ); err != nil { return Updater{}, err } @@ -375,6 +377,7 @@ func (ru *Updater) UpdateRow( // Update secondary indexes. // We're iterating through all of the indexes, which should have corresponding entries // in the new and old values. + var writtenIndexes intsets.Fast for i, index := range ru.Helper.Indexes { if index.GetType() == descpb.IndexDescriptor_FORWARD { oldIdx, newIdx := 0, 0 @@ -433,6 +436,7 @@ func (ru *Updater) UpdateRow( } batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue) } + writtenIndexes.Add(i) } else if oldEntry.Family < newEntry.Family { if oldEntry.Family == descpb.FamilyID(0) { return nil, errors.AssertionFailedf( @@ -468,6 +472,7 @@ func (ru *Updater) UpdateRow( } batch.CPut(newEntry.Key, &newEntry.Value, nil) } + writtenIndexes.Add(i) newIdx++ } } @@ -501,6 +506,7 @@ func (ru *Updater) UpdateRow( } batch.CPut(newEntry.Key, &newEntry.Value, nil) } + writtenIndexes.Add(i) newIdx++ } } else { @@ -522,10 +528,18 @@ func (ru *Updater) UpdateRow( } } + writtenIndexes.ForEach(func(idx int) { + if err == nil { + err = writeTombstones(ctx, &ru.Helper, ru.Helper.Indexes[idx], putter, ru.FetchColIDtoRowIndex, ru.newValues, traceKV) + } + }) + if err != nil { + return nil, err + } + // We're deleting indexes in a delete only state. We're bounding this by the number of indexes because inverted // indexed will be handled separately. if ru.DeleteHelper != nil { - // For determinism, add the entries for the secondary indexes in the same // order as they appear in the helper. for idx := range ru.DeleteHelper.Indexes { From 3c1433e6f9730f8726debfc1ea35783559ce632f Mon Sep 17 00:00:00 2001 From: Justin Beaver Date: Mon, 14 Oct 2024 20:43:24 +0000 Subject: [PATCH 13/13] release: released CockroachDB version 24.3.0-alpha.2. Next version: 24.3.0-alpha.3 Release note: None Epic: None Release justification: non-production (release infra) change. --- pkg/build/version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/build/version.txt b/pkg/build/version.txt index 264f090166b5..a6f0b11222a8 100644 --- a/pkg/build/version.txt +++ b/pkg/build/version.txt @@ -1 +1 @@ -v24.3.0-alpha.2 +v24.3.0-alpha.3