diff --git a/pkg/build/version.txt b/pkg/build/version.txt index 264f090166b5..a6f0b11222a8 100644 --- a/pkg/build/version.txt +++ b/pkg/build/version.txt @@ -1 +1 @@ -v24.3.0-alpha.2 +v24.3.0-alpha.3 diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go index eb3758a960e1..7534cb764a81 100644 --- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go @@ -332,13 +332,13 @@ func newKVTableWriter( // TODO(dt): pass these some sort fo flag to have them use versions of CPut // or a new LWW KV API. For now they're not detecting/handling conflicts. - ri, err := row.MakeInserter(ctx, nil, evalCtx.Codec, tableDesc, writeCols, a, &evalCtx.Settings.SV, internal, nil) + ri, err := row.MakeInserter(ctx, nil, evalCtx.Codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, writeCols, a, &evalCtx.Settings.SV, internal, nil) if err != nil { return nil, err } rd := row.MakeDeleter(evalCtx.Codec, tableDesc, readCols, &evalCtx.Settings.SV, internal, nil) ru, err := row.MakeUpdater( - ctx, nil, evalCtx.Codec, tableDesc, readCols, writeCols, row.UpdaterDefault, a, &evalCtx.Settings.SV, internal, nil, + ctx, nil, evalCtx.Codec, tableDesc, nil /* uniqueWithTombstoneIndexes */, readCols, writeCols, row.UpdaterDefault, a, &evalCtx.Settings.SV, internal, nil, ) if err != nil { return nil, err diff --git a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed new file mode 100644 index 000000000000..9e035b0b789a --- /dev/null +++ b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed @@ -0,0 +1,213 @@ +# LogicTest: local + +statement ok +CREATE TYPE part_type AS ENUM ('one', 'two', 'three', 'four', 'five'); + +statement ok +SET experimental_enable_implicit_column_partitioning = true + +statement ok +SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED + +statement ok +CREATE TABLE t_double ( + pk INT PRIMARY KEY, + a part_type, + b part_type, + c INT, + UNIQUE INDEX (c) +) PARTITION ALL BY LIST (a, b) ( + PARTITION one VALUES IN (('one', 'one')), + PARTITION two VALUES IN (('two', 'two')) +) + +# Test that we don't allow writes to tables with multiple partition columns. +statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +INSERT INTO t_double VALUES (1, 'one', 'one', 10), (2, 'two', 'two', 20) + +statement ok +CREATE TABLE t_int ( + pk INT PRIMARY KEY, + a INT NOT NULL, + c INT, + UNIQUE INDEX (c) +) PARTITION ALL BY LIST (a) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +) + +# Test that we don't allow writes to tables with non-enum partition columns. +statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +INSERT INTO t_int VALUES (1, 1, 10), (2, 2, 20) + +statement ok +CREATE TABLE t ( + pk INT PRIMARY KEY, + a part_type, + b INT, + c INT, + d INT, + j JSON, + UNIQUE INDEX (c), + FAMILY (pk, a, b, c, d, j) +) PARTITION ALL BY LIST(a) ( + PARTITION one VALUES IN ('one'), + PARTITION two VALUES IN ('two'), + PARTITION three VALUES IN ('three'), + PARTITION four VALUES IN ('four'), + PARTITION five VALUES IN ('five') +) + +query T +SELECT create_statement FROM [SHOW CREATE TABLE t] +---- +CREATE TABLE public.t ( + pk INT8 NOT NULL, + a public.part_type NOT NULL, + b INT8 NULL, + c INT8 NULL, + d INT8 NULL, + j JSONB NULL, + CONSTRAINT t_pkey PRIMARY KEY (pk ASC), + UNIQUE INDEX t_c_key (c ASC), + FAMILY fam_0_pk_a_b_c_d_j (pk, a, b, c, d, j) +) PARTITION ALL BY LIST (a) ( + PARTITION one VALUES IN (('one')), + PARTITION two VALUES IN (('two')), + PARTITION three VALUES IN (('three')), + PARTITION four VALUES IN (('four')), + PARTITION five VALUES IN (('five')) +) +-- Warning: Partitioned table with no zone configurations. + +query T +EXPLAIN (OPT, CATALOG) SELECT * FROM t +---- +TABLE t + ├── pk int not null + ├── a part_type not null + ├── b int + ├── c int + ├── d int + ├── j jsonb + ├── crdb_internal_mvcc_timestamp decimal [hidden] [system] + ├── tableoid oid [hidden] [system] + ├── crdb_internal_origin_id int4 [hidden] [system] + ├── crdb_internal_origin_timestamp decimal [hidden] [system] + ├── FAMILY fam_0_pk_a_b_c_d_j (pk, a, b, c, d, j) + ├── CHECK (a IN (x'20':::@100106, x'40':::@100106, x'80':::@100106, x'a0':::@100106, x'c0':::@100106)) + ├── PRIMARY INDEX t_pkey + │ ├── a part_type not null (implicit) + │ ├── pk int not null + │ └── partitions + │ ├── one + │ │ └── partition by list prefixes + │ │ └── ('one') + │ ├── two + │ │ └── partition by list prefixes + │ │ └── ('two') + │ ├── three + │ │ └── partition by list prefixes + │ │ └── ('three') + │ ├── four + │ │ └── partition by list prefixes + │ │ └── ('four') + │ └── five + │ └── partition by list prefixes + │ └── ('five') + ├── UNIQUE INDEX t_c_key + │ ├── a part_type not null (implicit) + │ ├── c int + │ ├── pk int not null (storing) + │ └── partitions + │ ├── one + │ │ └── partition by list prefixes + │ │ └── ('one') + │ ├── two + │ │ └── partition by list prefixes + │ │ └── ('two') + │ ├── three + │ │ └── partition by list prefixes + │ │ └── ('three') + │ ├── four + │ │ └── partition by list prefixes + │ │ └── ('four') + │ └── five + │ └── partition by list prefixes + │ └── ('five') + ├── UNIQUE WITHOUT INDEX (pk) + └── UNIQUE WITHOUT INDEX (c) +scan t + └── check constraint expressions + └── a IN ('one', 'two', 'three', 'four', 'five') + +statement ok +SET tracing = kv + +statement ok +INSERT INTO t VALUES (1, 'two', 3, 4, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_pkey" +INSERT INTO t VALUES (1, 'one', 3, 6, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" +INSERT INTO t VALUES (2, 'three', 3, 4, 5) + +statement ok +INSERT INTO t VALUES (2, 'four', 3, 6, 5) + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_pkey" +UPDATE t SET pk = 1 WHERE c = 6; + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" +UPDATE t SET c = 4 WHERE pk = 2 + +query T +SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'CPut%' +---- +CPut /Table/110/1/"@"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/5 +CPut /Table/110/1/" "/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) +CPut /Table/110/1/" "/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/"@"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/"@"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/6/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/2/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/5 +CPut /Table/110/1/" "/2/0 -> nil (tombstone) +CPut /Table/110/1/"@"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/2/0 -> nil (tombstone) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"@"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/2/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/" "/2/0 -> nil (tombstone) +CPut /Table/110/1/"@"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/2/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/2/0 -> nil (tombstone) +CPut /Table/110/2/" "/6/0 -> nil (tombstone) +CPut /Table/110/2/"@"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/6/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/6/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/6/1:5:Int/5 +CPut /Table/110/1/" "/1/0 -> nil (tombstone) +CPut /Table/110/1/"@"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> /BYTES/0x8a (expecting does not exist) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"@"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed index b70febf8fa06..eac30ae7bf76 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed @@ -78,7 +78,7 @@ CREATE TABLE river ( LOCALITY REGIONAL BY ROW AS region statement ok -GRANT INSERT ON TABLE university TO testuser +GRANT INSERT, UPDATE, SELECT ON TABLE university TO testuser # Test non-conflicting INSERT. @@ -117,10 +117,15 @@ INSERT INTO university (name, mascot, postal_code) VALUES ('Thompson Rivers', 'wolves', 'V2C 0C8'), ('Evergreen State', 'geoducks', '98505') ON CONFLICT (mascot) DO NOTHING +# TODO(mw5h): Temporary until ON CONFLICT works +statement ok +INSERT INTO university (name, mascot, postal_code) VALUES ('Evergreen State', 'geoducks', '98505') + query TTT SELECT name, mascot, postal_code FROM university ORDER BY name ---- -Western Oregon wolves 97361 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation INSERT INTO volcano VALUES @@ -170,10 +175,10 @@ UPSERT INTO river VALUES ('us-east-1', 'Fraser', 'Salish Sea') # Test conflicting UPDATE. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" UPDATE university SET mascot = 'wolves' WHERE name = 'Evergreen State' -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement ok UPDATE volcano SET origin = 'Fought over Loowit and was transformed by Saghalie.' WHERE name = 'Mount St. Helens' statement error pgcode 23505 pq: duplicate key value violates unique constraint "city_pkey"\nDETAIL: Key \(name, state_or_province\)=\('Vancouver', 'BC'\) already exists\. @@ -232,5 +237,41 @@ awaitstatement conflict query TTT SELECT name, mascot, postal_code FROM university ORDER BY name ---- -CMU Scottie Dog 15213 -Western Oregon wolves 97361 +CMU Scottie Dog 15213 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 + +statement ok +INSERT INTO university VALUES ('Central Michigan University', 'Chippewas', '97858'); + +statement ok +UPDATE university SET name = 'Carnegie Mellon University' WHERE name = 'CMU'; + +statement ok +BEGIN + +statement ok +UPDATE university SET name = 'CMU' WHERE name = 'Carnegie Mellon University'; + +user testuser + +statement async conflict error pgcode 23505 pq: duplicate key value violates unique constraint "university_pkey" +UPDATE university SET name = 'CMU' WHERE name = 'Central Michigan University' + +user root + +statement ok +COMMIT + +awaitstatement conflict + +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" +UPDATE university SET mascot = 'wolves' WHERE name = 'CMU' + +query TTT +SELECT name, mascot, postal_code FROM university ORDER BY name +---- +CMU Scottie Dog 15213 +Central Michigan University Chippewas 97858 +Evergreen State geoducks 98505 +Western Oregon wolves 97361 diff --git a/pkg/ccl/logictestccl/tests/local/BUILD.bazel b/pkg/ccl/logictestccl/tests/local/BUILD.bazel index 035b110f5544..341da4a02656 100644 --- a/pkg/ccl/logictestccl/tests/local/BUILD.bazel +++ b/pkg/ccl/logictestccl/tests/local/BUILD.bazel @@ -9,7 +9,7 @@ go_test( "//pkg/ccl/logictestccl:testdata", # keep ], exec_properties = {"test.Pool": "large"}, - shard_count = 47, + shard_count = 48, tags = ["cpu:1"], deps = [ "//pkg/base", diff --git a/pkg/ccl/logictestccl/tests/local/generated_test.go b/pkg/ccl/logictestccl/tests/local/generated_test.go index 6b277b0617d9..da317842ba43 100644 --- a/pkg/ccl/logictestccl/tests/local/generated_test.go +++ b/pkg/ccl/logictestccl/tests/local/generated_test.go @@ -208,6 +208,13 @@ func TestCCLLogic_partitioning_implicit( runCCLLogicTest(t, "partitioning_implicit") } +func TestCCLLogic_partitioning_implicit_read_committed( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runCCLLogicTest(t, "partitioning_implicit_read_committed") +} + func TestCCLLogic_partitioning_index( t *testing.T, ) { diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 1ed5f3b261e2..dab7acb782e8 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -736,7 +736,7 @@ func setupLDR( startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int { options := "" - if mode.String() != "" { + if mode != Default { options = fmt.Sprintf("WITH mode='%s'", mode) } targetDB.Exec(t, fmt.Sprintf("USE %s", dbName)) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 4c9346a1d9b7..ff37488205d4 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -68,9 +67,6 @@ import ( func TestFlowControlBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { ctx := context.Background() @@ -223,9 +219,6 @@ ORDER BY streams DESC; func TestFlowControlRangeSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -341,9 +334,6 @@ ORDER BY streams DESC; func TestFlowControlBlockedAdmission(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -455,9 +445,6 @@ ORDER BY name ASC; func TestFlowControlAdmissionPostSplitMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -594,9 +581,6 @@ ORDER BY streams DESC; func TestFlowControlCrashedNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 2 @@ -714,9 +698,6 @@ func TestFlowControlCrashedNode(t *testing.T) { func TestFlowControlRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1014,9 +995,6 @@ SELECT store_id, func TestFlowControlRaftTransportBreak(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1129,9 +1107,6 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { func TestFlowControlRaftTransportCulled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1268,9 +1243,6 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { func TestFlowControlRaftMembership(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -1401,9 +1373,6 @@ ORDER BY name ASC; func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { ctx := context.Background() @@ -1532,9 +1501,6 @@ ORDER BY name ASC; func TestFlowControlClassPrioritization(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -1618,9 +1584,6 @@ func TestFlowControlClassPrioritization(t *testing.T) { func TestFlowControlQuiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1761,9 +1724,6 @@ ORDER BY name ASC; func TestFlowControlUnquiescedRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -1917,9 +1877,6 @@ ORDER BY name ASC; func TestFlowControlTransferLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2008,9 +1965,6 @@ ORDER BY name ASC; func TestFlowControlLeaderNotLeaseholder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 5 @@ -2144,9 +2098,6 @@ ORDER BY name ASC; func TestFlowControlGranterAdmitOneByOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TOOD(kvoli,pav-kv,sumeerbhola): Enable this test under all conditions - // after fixing the flakiness introduced by #132121. - skip.UnderDuressWithIssue(t, 132310) ctx := context.Background() const numNodes = 3 @@ -2255,7 +2206,6 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { func TestFlowControlBasicV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2350,7 +2300,6 @@ ORDER BY streams DESC; func TestFlowControlRangeSplitMergeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2465,7 +2414,6 @@ ORDER BY streams DESC; func TestFlowControlBlockedAdmissionV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2567,7 +2515,6 @@ func TestFlowControlBlockedAdmissionV2(t *testing.T) { func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2718,7 +2665,6 @@ ORDER BY streams DESC; func TestFlowControlCrashedNodeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -2823,7 +2769,6 @@ func TestFlowControlCrashedNodeV2(t *testing.T) { func TestFlowControlRaftSnapshotV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") const numServers int = 5 @@ -3091,7 +3036,6 @@ SELECT store_id, func TestFlowControlRaftMembershipV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3224,7 +3168,6 @@ func TestFlowControlRaftMembershipV2(t *testing.T) { func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3361,7 +3304,6 @@ ORDER BY streams DESC; func TestFlowControlClassPrioritizationV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3452,7 +3394,6 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) { func TestFlowControlUnquiescedRangeV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3471,7 +3412,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { settings := cluster.MakeTestingClusterSettings() // Override metamorphism to allow range quiescence. kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, false) - pinnedLease := kvserver.NewPinnedLeases() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ @@ -3483,10 +3423,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { }, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - // Pin the lease to the first store to prevent lease and leader - // moves which disrupt this test. - PinnedLeases: pinnedLease, - FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ UseOnlyForScratchRanges: true, OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel { @@ -3523,7 +3459,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { k := tc.ScratchRange(t) desc, err := tc.LookupRange(k) require.NoError(t, err) - pinnedLease.PinLease(desc.RangeID, tc.GetFirstStoreFromServer(t, 0).StoreID()) tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) h := newFlowControlTestHelperV2(t, tc, v2EnabledWhenLeaderLevel) @@ -3587,7 +3522,6 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { func TestFlowControlTransferLeaseV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3676,7 +3610,6 @@ func TestFlowControlTransferLeaseV2(t *testing.T) { func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, @@ -3789,7 +3722,6 @@ func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDuressWithIssue(t, 132272, "non-determinism under duress: stress/race/deadlock") testutils.RunValues(t, "v2_enabled_when_leader_level", []kvflowcontrol.V2EnabledWhenLeaderLevel{ kvflowcontrol.V2EnabledWhenLeaderV1Encoding, diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 89986323708c..bf828c384fbf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -149,6 +149,7 @@ type StoreRebalancer struct { processTimeoutFn func(replica CandidateReplica) time.Duration objectiveProvider RebalanceObjectiveProvider subscribedToSpanConfigs func() bool + disabled func() bool } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the @@ -191,6 +192,10 @@ func NewStoreRebalancer( } return !rq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() }, + disabled: func() bool { + return LoadBasedRebalancingMode.Get(&st.SV) == LBRebalancingOff || + rq.store.cfg.TestingKnobs.DisableStoreRebalancer + }, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -308,15 +313,13 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Read = true timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - + if sr.disabled() { + continue + } // Once the rebalance mode and rebalance objective are defined for // this loop, they are immutable and do not change. This avoids // inconsistency where the rebalance objective changes and very // different or contradicting actions are then taken. - mode := sr.RebalanceMode() - if mode == LBRebalancingOff { - continue - } if !sr.subscribedToSpanConfigs() { continue } @@ -326,7 +329,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 7f1379f758f8..7e2d38e30dc5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -176,6 +176,9 @@ type StoreTestingKnobs struct { DisableReplicaGCQueue bool // DisableReplicateQueue disables the replication queue. DisableReplicateQueue bool + // DisableStoreRebalancer turns off the store rebalancer which moves replicas + // and leases. + DisableStoreRebalancer bool // DisableLoadBasedSplitting turns off LBS so no splits happen because of load. DisableLoadBasedSplitting bool // LoadBasedSplittingOverrideKey returns a key which should be used for load diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 97ce8c228b05..e9ac63607c47 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -297,6 +297,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( txn, cb.evalCtx.Codec, tableDesc, + nil, /* uniqueWithTombstoneIndexes */ cb.updateCols, requestedCols, row.UpdaterOnlyColumns, diff --git a/pkg/sql/colenc/encode.go b/pkg/sql/colenc/encode.go index be827546ca04..4fb14c579814 100644 --- a/pkg/sql/colenc/encode.go +++ b/pkg/sql/colenc/encode.go @@ -82,7 +82,7 @@ func MakeEncoder( partialIndexes map[descpb.IndexID][]bool, memoryUsageCheck func() error, ) BatchEncoder { - rh := row.NewRowHelper(codec, desc, desc.WritableNonPrimaryIndexes(), sv, false /*internal*/, metrics) + rh := row.NewRowHelper(codec, desc, desc.WritableNonPrimaryIndexes(), nil /* uniqueWithTombstoneIndexes */, sv, false /*internal*/, metrics) rh.Init() colMap := row.ColIDtoRowIndexFromCols(insCols) return BatchEncoder{rh: &rh, b: b, colMap: colMap, diff --git a/pkg/sql/colenc/encode_test.go b/pkg/sql/colenc/encode_test.go index 3fe0998df858..c6dee5ea04f4 100644 --- a/pkg/sql/colenc/encode_test.go +++ b/pkg/sql/colenc/encode_test.go @@ -600,7 +600,7 @@ func buildRowKVs( sv *settings.Values, codec keys.SQLCodec, ) (kvs, error) { - inserter, err := row.MakeInserter(context.Background(), nil /*txn*/, codec, desc, cols, nil, sv, false, nil) + inserter, err := row.MakeInserter(context.Background(), nil /*txn*/, codec, desc, nil /* uniqueWithTombstoneIndexes */, cols, nil, sv, false, nil) if err != nil { return kvs{}, err } diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index f9c69079c065..9e4afd5d1a11 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -540,6 +540,7 @@ func (n *createTableNode) startExec(params runParams) error { params.p.txn, params.ExecCfg().Codec, desc.ImmutableCopy().(catalog.TableDescriptor), + nil, /* uniqueWithTombstoneIndexes */ desc.PublicColumns(), &tree.DatumAlloc{}, ¶ms.ExecCfg().Settings.SV, diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 1416edd5ae55..4cd18e6e8369 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -969,6 +969,7 @@ func (e *distSQLSpecExecFactory) ConstructInsert( insertCols exec.TableColumnOrdinalSet, returnCols exec.TableColumnOrdinalSet, checkCols exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: insert") @@ -982,6 +983,7 @@ func (e *distSQLSpecExecFactory) ConstructInsertFastPath( checkCols exec.CheckOrdinalSet, fkChecks []exec.InsertFastPathCheck, uniqChecks []exec.InsertFastPathCheck, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: insert fast path") @@ -995,6 +997,7 @@ func (e *distSQLSpecExecFactory) ConstructUpdate( returnCols exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, passthrough colinfo.ResultColumns, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: update") diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 8265999c9cfb..9e781c515f3d 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -193,16 +193,12 @@ func (r *insertFastPathRun) addUniqChecks( combinedRow = make(tree.Datums, len(templateRow)) } copy(combinedRow, templateRow) - isInputRow := true for j := 0; j < len(c.InsertCols); j++ { // Datums from single-table constraints are already present in // DatumsFromConstraint. Fill in other values from the input row. if combinedRow[c.InsertCols[j]] == nil { combinedRow[c.InsertCols[j]] = inputRow[c.InsertCols[j]] } - if combinedRow[c.InsertCols[j]] != inputRow[c.InsertCols[j]] { - isInputRow = false - } } if !forTesting { span, err := c.generateSpan(combinedRow) @@ -210,29 +206,14 @@ func (r *insertFastPathRun) addUniqChecks( return nil, err } reqIdx := len(r.uniqBatch.Requests) - // Since predicate locks are not yet supported by the KV layer, we - // emulate them by writing a tombstone to the other partitions instead - // of scanning and locking. These tombstones are added to the insert - // batch instead of the uniqueness check batch because they do not - // require any post-processing (the KV generated conflict message is - // what we want). - if c.Locking.Form == tree.LockPredicate { - if !isInputRow { - if r.traceKV { - log.VEventf(ctx, 2, "CPut %s (LockPredicate)", span) - } - r.ti.putter.CPut(span.Key, nil, nil) - } - } else { - if r.traceKV { - log.VEventf(ctx, 2, "UniqScan %s", span) - } - r.uniqBatch.Requests = append(r.uniqBatch.Requests, kvpb.RequestUnion{}) - // TODO(msirek): Batch-allocate the kvpb.ScanRequests outside the loop. - r.uniqBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span), - }) + if r.traceKV { + log.VEventf(ctx, 2, "UniqScan %s", span) } + r.uniqBatch.Requests = append(r.uniqBatch.Requests, kvpb.RequestUnion{}) + // TODO(msirek): Batch-allocate the kvpb.ScanRequests outside the loop. + r.uniqBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(span), + }) r.uniqSpanInfo = append(r.uniqSpanInfo, insertFastPathFKUniqSpanInfo{ check: c, rowIdx: rowIdx, diff --git a/pkg/sql/opt/cat/table.go b/pkg/sql/opt/cat/table.go index a9fa530745c1..ffa6fb80cecd 100644 --- a/pkg/sql/opt/cat/table.go +++ b/pkg/sql/opt/cat/table.go @@ -363,6 +363,10 @@ type UniqueConstraint interface { // WithoutIndex is true if this unique constraint is not enforced by an index. WithoutIndex() bool + // CanUseTombstones is true if this unique constraint can be enforced by + // writing tombstones to all partitions. + CanUseTombstones() bool + // Validated is true if the constraint is validated (i.e. we know that the // existing data satisfies the constraint). It is possible to set up a unique // constraint on existing tables without validating it, in which case we diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 782cff220141..b46bb271f41e 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -110,6 +110,7 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (_ execPlan, outputCols colO insertOrds, returnOrds, checkOrds, + ins.UniqueWithTombstoneIndexes, b.allowAutoCommit && len(ins.UniqueChecks) == 0 && len(ins.FKChecks) == 0 && len(ins.FKCascades) == 0, ) @@ -184,8 +185,7 @@ func (b *Builder) tryBuildFastPathInsert( // If there is a unique index with implicit partitioning columns, the fast // path can write tombstones to lock the row in all partitions. - allowPredicateLocks := execFastPathCheck.ReferencedIndex.ImplicitPartitioningColumnCount() > 0 - locking, err := b.buildLockingImpl(ins.Table, c.Locking, allowPredicateLocks) + locking, err := b.buildLocking(ins.Table, c.Locking) if err != nil { return execPlan{}, colOrdMap{}, false, err } @@ -339,6 +339,7 @@ func (b *Builder) tryBuildFastPathInsert( checkOrds, fkChecks, uniqChecks, + ins.UniqueWithTombstoneIndexes, b.allowAutoCommit, ) if err != nil { @@ -440,6 +441,7 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (_ execPlan, outputCols colO returnColOrds, checkOrds, passthroughCols, + upd.UniqueWithTombstoneIndexes, b.allowAutoCommit && len(upd.UniqueChecks) == 0 && len(upd.FKChecks) == 0 && len(upd.FKCascades) == 0, ) diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index db6e17ca335c..ecdc5b305c1a 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -3107,9 +3107,7 @@ func (b *Builder) buildZigzagJoin( return b.applySimpleProject(res, outputCols, join, join.Cols, join.ProvidedPhysical().Ordering) } -func (b *Builder) buildLockingImpl( - toLock opt.TableID, locking opt.Locking, allowPredicateLocks bool, -) (opt.Locking, error) { +func (b *Builder) buildLocking(toLock opt.TableID, locking opt.Locking) (opt.Locking, error) { if b.forceForUpdateLocking.Contains(int(toLock)) { locking = locking.Max(forUpdateLocking) } @@ -3120,16 +3118,11 @@ func (b *Builder) buildLockingImpl( "cannot execute SELECT %s in a read-only transaction", locking.Strength.String(), ) } - if !allowPredicateLocks && locking.Form == tree.LockPredicate { + if locking.Form == tree.LockPredicate { return opt.Locking{}, unimplemented.NewWithIssuef( 110873, "explicit unique checks are not yet supported under read committed isolation", ) } - if locking.Form == tree.LockPredicate && locking.WaitPolicy != tree.LockWaitBlock { - return opt.Locking{}, unimplemented.NewWithIssuef( - 126592, "non-blocking predicate locks are not yet supported", - ) - } // Check if we can actually use shared locks here, or we need to use // non-locking reads instead. if locking.Strength == tree.ForShare || locking.Strength == tree.ForKeyShare { @@ -3147,11 +3140,6 @@ func (b *Builder) buildLockingImpl( return locking, nil } -// TODO (#126592): Delete this function once predicate locks are universally supported. -func (b *Builder) buildLocking(toLock opt.TableID, locking opt.Locking) (opt.Locking, error) { - return b.buildLockingImpl(toLock, locking, false /* allowPredicateLocks */) -} - func (b *Builder) buildMax1Row( max1Row *memo.Max1RowExpr, ) (_ execPlan, outputCols colOrdMap, err error) { diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index b8c371fa508a..6763ff4f0332 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -32,6 +32,20 @@ func Emit(ctx context.Context, plan *Plan, ob *OutputBuilder, spanFormatFn SpanF return emitInternal(ctx, plan, ob, spanFormatFn, nil /* visitedFKsByCascades */) } +// joinIndexNames emits a string of index names on table 'table' as specified in +// 'ords', with each name separated by 'sep'. +func joinIndexNames(table cat.Table, ords cat.IndexOrdinals, sep string) string { + var sb strings.Builder + for i, idx := range ords { + index := table.Index(idx) + if i > 0 { + sb.WriteString(sep) + } + sb.WriteString(string(index.Name())) + } + return sb.String() +} + // - visitedFKsByCascades is updated on recursive calls for each cascade plan. // Can be nil if the plan doesn't have any cascades. In this map the key is the // "id" of the FK constraint that we construct as OriginTableID || Name. @@ -872,16 +886,8 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if a.AutoCommit { ob.Attr("auto commit", "") } - if len(a.ArbiterIndexes) > 0 { - var sb strings.Builder - for i, idx := range a.ArbiterIndexes { - index := a.Table.Index(idx) - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(string(index.Name())) - } - ob.Attr("arbiter indexes", sb.String()) + if arbind := joinIndexNames(a.Table, a.ArbiterIndexes, ", "); arbind != "" { + ob.Attr("arbiter indexes", arbind) } if len(a.ArbiterConstraints) > 0 { var sb strings.Builder @@ -894,6 +900,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { } ob.Attr("arbiter constraints", sb.String()) } + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } case insertFastPathOp: a := n.args.(*insertFastPathArgs) @@ -917,6 +926,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { ) e.emitLockingPolicyWithPrefix("uniqueness check ", uniq.Locking) } + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } if len(a.Rows) > 0 { e.emitTuples(tree.RawRows(a.Rows), len(a.Rows[0])) } @@ -931,16 +943,8 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if a.AutoCommit { ob.Attr("auto commit", "") } - if len(a.ArbiterIndexes) > 0 { - var sb strings.Builder - for i, idx := range a.ArbiterIndexes { - index := a.Table.Index(idx) - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(string(index.Name())) - } - ob.Attr("arbiter indexes", sb.String()) + if arbind := joinIndexNames(a.Table, a.ArbiterIndexes, ", "); arbind != "" { + ob.Attr("arbiter indexes", arbind) } if len(a.ArbiterConstraints) > 0 { var sb strings.Builder @@ -957,6 +961,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { case updateOp: a := n.args.(*updateArgs) ob.Attrf("table", "%s", a.Table.Name()) + if uniqWithTombstoneIndexes := joinIndexNames(a.Table, a.UniqueWithTombstonesIndexes, ", "); uniqWithTombstoneIndexes != "" { + ob.Attr("uniqueness checks (tombstones)", uniqWithTombstoneIndexes) + } ob.Attr("set", printColumns(tableColumns(a.Table, a.UpdateCols))) if a.AutoCommit { ob.Attr("auto commit", "") diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index bbe989992bbc..5c1210a07cd7 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -452,6 +452,7 @@ define Insert { InsertCols exec.TableColumnOrdinalSet ReturnCols exec.TableColumnOrdinalSet CheckCols exec.CheckOrdinalSet + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. # This is false when executing inside an explicit transaction, or there are @@ -483,6 +484,7 @@ define InsertFastPath { CheckCols exec.CheckOrdinalSet FkChecks []exec.InsertFastPathCheck UniqChecks []exec.InsertFastPathCheck + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. # This is false when executing inside an explicit transaction. @@ -518,6 +520,7 @@ define Update { ReturnCols exec.TableColumnOrdinalSet Checks exec.CheckOrdinalSet Passthrough colinfo.ResultColumns + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. AutoCommit bool diff --git a/pkg/sql/opt/ops/mutation.opt b/pkg/sql/opt/ops/mutation.opt index 2b24a3133db3..90bdb34a9c22 100644 --- a/pkg/sql/opt/ops/mutation.opt +++ b/pkg/sql/opt/ops/mutation.opt @@ -176,6 +176,10 @@ define MutationPrivate { # FKCascades stores metadata necessary for building cascading queries. FKCascades FKCascades + + # Unique indexes where uniqueness will be ensured by writing tombstones to + # all partitions. + UniqueWithTombstoneIndexes IndexOrdinals } # Update evaluates a relational input expression that fetches existing rows from diff --git a/pkg/sql/opt/optbuilder/mutation_builder.go b/pkg/sql/opt/optbuilder/mutation_builder.go index 51890bac8ab3..1181344b2d49 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder.go +++ b/pkg/sql/opt/optbuilder/mutation_builder.go @@ -209,6 +209,10 @@ type mutationBuilder struct { // inputForInsertExpr stores the result of outscope.expr from the most // recent call to buildInputForInsert. inputForInsertExpr memo.RelExpr + + // uniqueWithTombstoneIndexes is the set of unique indexes that ensure uniqueness + // by writing tombstones to all partitions + uniqueWithTombstoneIndexes intsets.Fast } func (mb *mutationBuilder) init(b *Builder, opName string, tab cat.Table, alias tree.TableName) { @@ -1057,17 +1061,18 @@ func (mb *mutationBuilder) makeMutationPrivate(needResults bool) *memo.MutationP } private := &memo.MutationPrivate{ - Table: mb.tabID, - InsertCols: checkEmptyList(mb.insertColIDs), - FetchCols: checkEmptyList(mb.fetchColIDs), - UpdateCols: checkEmptyList(mb.updateColIDs), - CanaryCol: mb.canaryColID, - ArbiterIndexes: mb.arbiters.IndexOrdinals(), - ArbiterConstraints: mb.arbiters.UniqueConstraintOrdinals(), - CheckCols: checkEmptyList(mb.checkColIDs), - PartialIndexPutCols: checkEmptyList(mb.partialIndexPutColIDs), - PartialIndexDelCols: checkEmptyList(mb.partialIndexDelColIDs), - FKCascades: mb.cascades, + Table: mb.tabID, + InsertCols: checkEmptyList(mb.insertColIDs), + FetchCols: checkEmptyList(mb.fetchColIDs), + UpdateCols: checkEmptyList(mb.updateColIDs), + CanaryCol: mb.canaryColID, + ArbiterIndexes: mb.arbiters.IndexOrdinals(), + ArbiterConstraints: mb.arbiters.UniqueConstraintOrdinals(), + CheckCols: checkEmptyList(mb.checkColIDs), + PartialIndexPutCols: checkEmptyList(mb.partialIndexPutColIDs), + PartialIndexDelCols: checkEmptyList(mb.partialIndexDelColIDs), + FKCascades: mb.cascades, + UniqueWithTombstoneIndexes: mb.uniqueWithTombstoneIndexes.Ordered(), } // If we didn't actually plan any checks or cascades, don't buffer the input. diff --git a/pkg/sql/opt/optbuilder/mutation_builder_unique.go b/pkg/sql/opt/optbuilder/mutation_builder_unique.go index 0d76b4f9ad00..f7fde1d64df4 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_unique.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_unique.go @@ -54,6 +54,14 @@ func (mb *mutationBuilder) buildUniqueChecksForInsert() { if mb.uniqueConstraintIsArbiter(i) { continue } + + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } + if h.init(mb, i) { uniqueChecksItem, fastPathUniqueChecksItem := h.buildInsertionCheck(buildFastPathCheck) if fastPathUniqueChecksItem == nil { @@ -94,6 +102,12 @@ func (mb *mutationBuilder) buildUniqueChecksForUpdate() { if !mb.uniqueColsUpdated(i) { continue } + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } if h.init(mb, i) { // The insertion check works for updates too since it simply checks that // the unique columns in the newly inserted or updated rows do not match diff --git a/pkg/sql/opt/testutils/testcat/test_catalog.go b/pkg/sql/opt/testutils/testcat/test_catalog.go index 0c573b2949d0..44f400856ed4 100644 --- a/pkg/sql/opt/testutils/testcat/test_catalog.go +++ b/pkg/sql/opt/testutils/testcat/test_catalog.go @@ -1548,12 +1548,13 @@ func (fk *ForeignKeyConstraint) UpdateReferenceAction() tree.ReferenceAction { // UniqueConstraint implements cat.UniqueConstraint. See that interface // for more information on the fields. type UniqueConstraint struct { - name string - tabID cat.StableID - columnOrdinals []int - predicate string - withoutIndex bool - validated bool + name string + tabID cat.StableID + columnOrdinals []int + predicate string + withoutIndex bool + canUseTombstones bool + validated bool } var _ cat.UniqueConstraint = &UniqueConstraint{} @@ -1594,6 +1595,8 @@ func (u *UniqueConstraint) WithoutIndex() bool { return u.withoutIndex } +func (u *UniqueConstraint) CanUseTombstones() bool { return u.canUseTombstones } + // Validated is part of the cat.UniqueConstraint interface. func (u *UniqueConstraint) Validated() bool { return u.validated diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 7044ad712958..e6d7e26dbe6b 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -1006,13 +1006,21 @@ func newOptTable( if idx.IsUnique() { if idx.ImplicitPartitioningColumnCount() > 0 { - // Add unique constraints for implicitly partitioned unique indexes. + // Add unique constraints for implicitly partitioned unique indexes. If + // there is a single implicit column of ENUM type (e.g. an implicit RBR + // column), then we can ensure uniqueness under non-Serializable + // isolation levels by writing tombstones. We assume that the partition + // column is the first column of the index. + partitionColumn := catalog.FindColumnByID(desc, idx.GetKeyColumnID(0 /* columnOrdinal */)) + canUseTombstones := idx.ImplicitPartitioningColumnCount() == 1 && + partitionColumn.GetType().Family() == types.EnumFamily ot.uniqueConstraints = append(ot.uniqueConstraints, optUniqueConstraint{ - name: idx.GetName(), - table: ot.ID(), - columns: idx.IndexDesc().KeyColumnIDs[idx.IndexDesc().ExplicitColumnStartIdx():], - withoutIndex: true, - predicate: idx.GetPredicate(), + name: idx.GetName(), + table: ot.ID(), + columns: idx.IndexDesc().KeyColumnIDs[idx.IndexDesc().ExplicitColumnStartIdx():], + withoutIndex: true, + canUseTombstones: canUseTombstones, + predicate: idx.GetPredicate(), // TODO(rytaft): will we ever support an unvalidated unique constraint // here? validity: descpb.ConstraintValidity_Validated, @@ -1999,8 +2007,9 @@ type optUniqueConstraint struct { columns []descpb.ColumnID predicate string - withoutIndex bool - validity descpb.ConstraintValidity + withoutIndex bool + canUseTombstones bool + validity descpb.ConstraintValidity uniquenessGuaranteedByAnotherIndex bool } @@ -2045,6 +2054,10 @@ func (u *optUniqueConstraint) WithoutIndex() bool { return u.withoutIndex } +func (u *optUniqueConstraint) CanUseTombstones() bool { + return u.canUseTombstones +} + // Validated is part of the cat.UniqueConstraint interface. func (u *optUniqueConstraint) Validated() bool { return u.validity == descpb.ConstraintValidity_Validated diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index af638dc31bc3..9ab2e30c0bde 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1352,6 +1352,18 @@ func (ef *execFactory) ConstructShowTrace(typ tree.ShowTraceType, compact bool) return node, nil } +func ordinalsToIndexes(table cat.Table, ords cat.IndexOrdinals) []catalog.Index { + if ords == nil { + return nil + } + + retval := make([]catalog.Index, len(ords)) + for i, idx := range ords { + retval[i] = table.Index(idx).(*optIndex).idx + } + return retval +} + func (ef *execFactory) ConstructInsert( input exec.Node, table cat.Table, @@ -1360,6 +1372,7 @@ func (ef *execFactory) ConstructInsert( insertColOrdSet exec.TableColumnOrdinalSet, returnColOrdSet exec.TableColumnOrdinalSet, checkOrdSet exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // Derive insert table and column descriptors. @@ -1374,6 +1387,7 @@ func (ef *execFactory) ConstructInsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), cols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1432,6 +1446,7 @@ func (ef *execFactory) ConstructInsertFastPath( checkOrdSet exec.CheckOrdinalSet, fkChecks []exec.InsertFastPathCheck, uniqChecks []exec.InsertFastPathCheck, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // Derive insert table and column descriptors. @@ -1446,6 +1461,7 @@ func (ef *execFactory) ConstructInsertFastPath( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), cols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1524,6 +1540,7 @@ func (ef *execFactory) ConstructUpdate( returnColOrdSet exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, passthrough colinfo.ResultColumns, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // TODO(radu): the execution code has an annoying limitation that the fetch @@ -1551,6 +1568,7 @@ func (ef *execFactory) ConstructUpdate( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), updateCols, fetchCols, row.UpdaterDefault, @@ -1638,6 +1656,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + nil, /* uniqueWithTombstoneIndexes */ insertCols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1654,6 +1673,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, + nil, /* uniqueWithTombstoneIndexes */ updateCols, fetchCols, row.UpdaterDefault, diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 0028302e63a8..14339ecd45e9 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -91,7 +91,7 @@ func MakeDeleter( } rd := Deleter{ - Helper: NewRowHelper(codec, tableDesc, indexes, sv, internal, metrics), + Helper: NewRowHelper(codec, tableDesc, indexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics), FetchCols: fetchCols, FetchColIDtoRowIndex: fetchColIDtoRowIndex, } @@ -140,7 +140,7 @@ func (rd *Deleter) DeleteRow( } } - primaryIndexKey, err := rd.Helper.encodePrimaryIndex(rd.FetchColIDtoRowIndex, values) + primaryIndexKey, err := rd.Helper.encodePrimaryIndexKey(rd.FetchColIDtoRowIndex, values) if err != nil { return err } diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index abde9f86b68d..633c17ce7887 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -6,6 +6,7 @@ package row import ( + "bytes" "context" "sort" @@ -66,14 +67,28 @@ var maxRowSizeErr = settings.RegisterByteSizeSetting( settings.WithPublic, ) +// Per-index data for writing tombstones to enforce a uniqueness constraint. +type uniqueWithTombstoneEntry struct { + // implicitPartitionKeyValues contains the potential values for the + // partitioning column. + implicitPartitionKeyVals []tree.Datum + + // tmpTombstones contains the tombstones generated for this index by the last + // call to encodeTombstonesForIndex. + tmpTombstones [][]byte +} + // RowHelper has the common methods for table row manipulations. type RowHelper struct { Codec keys.SQLCodec TableDesc catalog.TableDescriptor // Secondary indexes. - Indexes []catalog.Index - indexEntries map[catalog.Index][]rowenc.IndexEntry + Indexes []catalog.Index + + // Unique indexes that can be enforced with tombstones. + UniqueWithTombstoneIndexes intsets.Fast + indexEntries map[catalog.Index][]rowenc.IndexEntry // Computed during initialization for pretty-printing. primIndexValDirs []encoding.Direction @@ -85,6 +100,11 @@ type RowHelper struct { primaryIndexValueCols catalog.TableColSet sortedColumnFamilies map[descpb.FamilyID][]descpb.ColumnID + // Used to build tmpTombstones for non-Serializable uniqueness checks. + index2UniqueWithTombstoneEntry map[catalog.Index]*uniqueWithTombstoneEntry + // Used to hold the row being written while writing tombstones. + tmpRow []tree.Datum + // Used to check row size. maxRowSizeLog, maxRowSizeErr uint32 internal bool @@ -95,16 +115,22 @@ func NewRowHelper( codec keys.SQLCodec, desc catalog.TableDescriptor, indexes []catalog.Index, + uniqueWithTombstoneIndexes []catalog.Index, sv *settings.Values, internal bool, metrics *rowinfra.Metrics, ) RowHelper { + var uniqueWithTombstoneIndexesSet intsets.Fast + for _, index := range uniqueWithTombstoneIndexes { + uniqueWithTombstoneIndexesSet.Add(index.Ordinal()) + } rh := RowHelper{ - Codec: codec, - TableDesc: desc, - Indexes: indexes, - internal: internal, - metrics: metrics, + Codec: codec, + TableDesc: desc, + Indexes: indexes, + UniqueWithTombstoneIndexes: uniqueWithTombstoneIndexesSet, + internal: internal, + metrics: metrics, } // Pre-compute the encoding directions of the index key values for @@ -128,7 +154,7 @@ func NewRowHelper( // include empty secondary index k/v pairs. func (rh *RowHelper) encodeIndexes( ctx context.Context, - colIDtoRowIndex catalog.TableColMap, + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ignoreIndexes intsets.Fast, includeEmpty bool, @@ -137,11 +163,11 @@ func (rh *RowHelper) encodeIndexes( secondaryIndexEntries map[catalog.Index][]rowenc.IndexEntry, err error, ) { - primaryIndexKey, err = rh.encodePrimaryIndex(colIDtoRowIndex, values) + primaryIndexKey, err = rh.encodePrimaryIndexKey(colIDtoRowPosition, values) if err != nil { return nil, nil, err } - secondaryIndexEntries, err = rh.encodeSecondaryIndexes(ctx, colIDtoRowIndex, values, ignoreIndexes, includeEmpty) + secondaryIndexEntries, err = rh.encodeSecondaryIndexes(ctx, colIDtoRowPosition, values, ignoreIndexes, includeEmpty) if err != nil { return nil, nil, err } @@ -154,23 +180,113 @@ func (rh *RowHelper) Init() { ) } -// encodePrimaryIndex encodes the primary index key. -func (rh *RowHelper) encodePrimaryIndex( - colIDtoRowIndex catalog.TableColMap, values []tree.Datum, +// encodePrimaryIndexKey encodes the primary index key. +func (rh *RowHelper) encodePrimaryIndexKey( + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ) (primaryIndexKey []byte, err error) { if rh.PrimaryIndexKeyPrefix == nil { rh.Init() } idx := rh.TableDesc.GetPrimaryIndex() primaryIndexKey, containsNull, err := rowenc.EncodeIndexKey( - rh.TableDesc, idx, colIDtoRowIndex, values, rh.PrimaryIndexKeyPrefix, + rh.TableDesc, idx, colIDtoRowPosition, values, rh.PrimaryIndexKeyPrefix, ) if containsNull { - return nil, rowenc.MakeNullPKError(rh.TableDesc, idx, colIDtoRowIndex, values) + return nil, rowenc.MakeNullPKError(rh.TableDesc, idx, colIDtoRowPosition, values) } return primaryIndexKey, err } +// initRowTmp creates a copy of the row that we can modify while trying to be +// smart about allocations. +func (rh *RowHelper) initRowTmp(values []tree.Datum) []tree.Datum { + if rh.tmpRow == nil { + rh.tmpRow = make([]tree.Datum, len(values)) + } + copy(rh.tmpRow, values) + return rh.tmpRow +} + +// getTombstoneTmpForIndex initializes and gets for the index provided. +func (rh *RowHelper) getTombstoneTmpForIndex( + index catalog.Index, partitionColValue *tree.DEnum, +) *uniqueWithTombstoneEntry { + if rh.index2UniqueWithTombstoneEntry == nil { + rh.index2UniqueWithTombstoneEntry = make(map[catalog.Index]*uniqueWithTombstoneEntry, len(rh.TableDesc.WritableNonPrimaryIndexes())+1) + } + tombstoneTmp, ok := rh.index2UniqueWithTombstoneEntry[index] + if !ok { + implicitKeys := tree.MakeAllDEnumsInType(partitionColValue.ResolvedType()) + tombstoneTmp = &uniqueWithTombstoneEntry{implicitPartitionKeyVals: implicitKeys, tmpTombstones: make([][]byte, len(implicitKeys)-1)} + rh.index2UniqueWithTombstoneEntry[index] = tombstoneTmp + } + tombstoneTmp.tmpTombstones = tombstoneTmp.tmpTombstones[:0] + return tombstoneTmp +} + +// encodeTombstonesForIndex creates a set of keys that can be used to write +// tombstones for the provided index. These values remain valid for the index +// until this function is called again for that index. +func (rh *RowHelper) encodeTombstonesForIndex( + ctx context.Context, + index catalog.Index, + colIDtoRowPosition catalog.TableColMap, + values []tree.Datum, +) ([][]byte, error) { + if !rh.UniqueWithTombstoneIndexes.Contains(index.Ordinal()) { + return nil, nil + } + + if !index.IsUnique() { + return nil, errors.AssertionFailedf("Expected index %s to be unique", index.GetName()) + } + if index.GetType() != descpb.IndexDescriptor_FORWARD { + return nil, errors.AssertionFailedf("Expected index %s to be a forward index", index.GetName()) + } + + // Get the position and value of the partition column in this index. + partitionColPosition, ok := colIDtoRowPosition.Get(index.GetKeyColumnID(0 /* columnOrdinal */)) + if !ok { + return nil, nil + } + partitionColValue, ok := values[partitionColPosition].(*tree.DEnum) + if !ok { + return nil, errors.AssertionFailedf("Expected partition column value to be enum, but got %T", values[partitionColPosition]) + } + + // Intentionally shadowing values here to avoid accidentally overwriting the tuple + values = rh.initRowTmp(values) + tombstoneTmpForIndex := rh.getTombstoneTmpForIndex(index, partitionColValue) + + for _, partVal := range tombstoneTmpForIndex.implicitPartitionKeyVals { + if bytes.Equal(partitionColValue.PhysicalRep, partVal.(*tree.DEnum).PhysicalRep) { + continue + } + values[partitionColPosition] = partVal + + if index.Primary() { + key, err := rh.encodePrimaryIndexKey(colIDtoRowPosition, values) + if err != nil { + return nil, err + } + tombstoneTmpForIndex.tmpTombstones = append(tombstoneTmpForIndex.tmpTombstones, key) + } else { + keys, containsNull, err := rowenc.EncodeSecondaryIndexKey(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowPosition, values) + if err != nil { + return nil, err + } + // If this key contains a NULL value, it can't violate a NULL constraint. + if containsNull { + tombstoneTmpForIndex.tmpTombstones = tombstoneTmpForIndex.tmpTombstones[:0] + break + } + tombstoneTmpForIndex.tmpTombstones = append(tombstoneTmpForIndex.tmpTombstones, keys...) + } + } + + return tombstoneTmpForIndex.tmpTombstones, nil +} + // encodeSecondaryIndexes encodes the secondary index keys based on a row's // values. // @@ -184,7 +300,7 @@ func (rh *RowHelper) encodePrimaryIndex( // k/v pairs. func (rh *RowHelper) encodeSecondaryIndexes( ctx context.Context, - colIDtoRowIndex catalog.TableColMap, + colIDtoRowPosition catalog.TableColMap, values []tree.Datum, ignoreIndexes intsets.Fast, includeEmpty bool, @@ -201,7 +317,7 @@ func (rh *RowHelper) encodeSecondaryIndexes( for i := range rh.Indexes { index := rh.Indexes[i] if !ignoreIndexes.Contains(int(index.GetID())) { - entries, err := rowenc.EncodeSecondaryIndex(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowIndex, values, includeEmpty) + entries, err := rowenc.EncodeSecondaryIndex(ctx, rh.Codec, rh.TableDesc, index, colIDtoRowPosition, values, includeEmpty) if err != nil { return nil, err } diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 0560174c4595..e9a5a557892a 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -41,6 +41,7 @@ func MakeInserter( txn *kv.Txn, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, + uniqueWithTombstoneIndexes []catalog.Index, insertCols []catalog.Column, alloc *tree.DatumAlloc, sv *settings.Values, @@ -49,7 +50,7 @@ func MakeInserter( ) (Inserter, error) { ri := Inserter{ Helper: NewRowHelper( - codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), sv, internal, metrics, + codec, tableDesc, tableDesc.WritableNonPrimaryIndexes(), uniqueWithTombstoneIndexes, sv, internal, metrics, ), InsertCols: insertCols, @@ -114,6 +115,29 @@ func insertInvertedPutFn( b.InitPut(key, value, false) } +func writeTombstones( + ctx context.Context, + helper *RowHelper, + index catalog.Index, + b Putter, + insertColIDtoRowIndex catalog.TableColMap, + values []tree.Datum, + traceKV bool, +) error { + tombstones, err := helper.encodeTombstonesForIndex(ctx, index, insertColIDtoRowIndex, values) + if err != nil { + return err + } + for _, tombstone := range tombstones { + k := roachpb.Key(keys.MakeFamilyKey(tombstone, 0 /* famID */)) + if traceKV { + log.VEventfDepth(ctx, 1, 2, "CPut %s -> nil (tombstone)", k) + } + b.CPut(k, nil, nil /* expValue */) + } + return nil +} + // InsertRow adds to the batch the kv operations necessary to insert a table row // with the given values. func (ri *Inserter) InsertRow( @@ -163,11 +187,15 @@ func (ri *Inserter) InsertRow( return err } + if err := writeTombstones(ctx, &ri.Helper, ri.Helper.TableDesc.GetPrimaryIndex(), b, ri.InsertColIDtoRowIndex, values, traceKV); err != nil { + return err + } + putFn = insertInvertedPutFn // For determinism, add the entries for the secondary indexes in the same // order as they appear in the helper. - for idx := range ri.Helper.Indexes { + for idx, index := range ri.Helper.Indexes { entries, ok := secondaryIndexEntries[ri.Helper.Indexes[idx]] if ok { for i := range entries { @@ -180,8 +208,15 @@ func (ri *Inserter) InsertRow( putFn(ctx, b, &e.Key, &e.Value, traceKV) } } + + // If a row does not satisfy a partial index predicate, it will have no + // entries, implying that we should also not write tombstones. + if len(entries) > 0 { + if err := writeTombstones(ctx, &ri.Helper, index, b, ri.InsertColIDtoRowIndex, values, traceKV); err != nil { + return err + } + } } } - return nil } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index f27e7b764ca6..e874fc2379a6 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -365,6 +365,7 @@ func NewDatumRowConverter( nil, /* txn */ evalCtx.Codec, tableDesc, + nil, /* uniqueWithTombstoneIndexes */ cols, &tree.DatumAlloc{}, &evalCtx.Settings.SV, diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index bf8e3a59bf46..55cb1e782c0b 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/unique" "github.com/cockroachdb/errors" @@ -75,6 +76,7 @@ func MakeUpdater( txn *kv.Txn, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, + uniqueWithTombstoneIndexes []catalog.Index, updateCols []catalog.Column, requestedCols []catalog.Column, updateType rowUpdaterType, @@ -156,12 +158,12 @@ func MakeUpdater( var deleteOnlyHelper *RowHelper if len(deleteOnlyIndexes) > 0 { - rh := NewRowHelper(codec, tableDesc, deleteOnlyIndexes, sv, internal, metrics) + rh := NewRowHelper(codec, tableDesc, deleteOnlyIndexes, nil /* uniqueWithTombstoneIndexes */, sv, internal, metrics) deleteOnlyHelper = &rh } ru := Updater{ - Helper: NewRowHelper(codec, tableDesc, includeIndexes, sv, internal, metrics), + Helper: NewRowHelper(codec, tableDesc, includeIndexes, uniqueWithTombstoneIndexes, sv, internal, metrics), DeleteHelper: deleteOnlyHelper, FetchCols: requestedCols, FetchColIDtoRowIndex: ColIDtoRowIndexFromCols(requestedCols), @@ -177,7 +179,7 @@ func MakeUpdater( var err error ru.rd = MakeDeleter(codec, tableDesc, requestedCols, sv, internal, metrics) if ru.ri, err = MakeInserter( - ctx, txn, codec, tableDesc, requestedCols, alloc, sv, internal, metrics, + ctx, txn, codec, tableDesc, uniqueWithTombstoneIndexes, requestedCols, alloc, sv, internal, metrics, ); err != nil { return Updater{}, err } @@ -214,7 +216,7 @@ func (ru *Updater) UpdateRow( return nil, errors.Errorf("got %d values but expected %d", len(updateValues), len(ru.UpdateCols)) } - primaryIndexKey, err := ru.Helper.encodePrimaryIndex(ru.FetchColIDtoRowIndex, oldValues) + primaryIndexKey, err := ru.Helper.encodePrimaryIndexKey(ru.FetchColIDtoRowIndex, oldValues) if err != nil { return nil, err } @@ -250,7 +252,7 @@ func (ru *Updater) UpdateRow( if ru.primaryKeyColChange { var newPrimaryIndexKey []byte newPrimaryIndexKey, err = - ru.Helper.encodePrimaryIndex(ru.FetchColIDtoRowIndex, ru.newValues) + ru.Helper.encodePrimaryIndexKey(ru.FetchColIDtoRowIndex, ru.newValues) if err != nil { return nil, err } @@ -375,6 +377,7 @@ func (ru *Updater) UpdateRow( // Update secondary indexes. // We're iterating through all of the indexes, which should have corresponding entries // in the new and old values. + var writtenIndexes intsets.Fast for i, index := range ru.Helper.Indexes { if index.GetType() == descpb.IndexDescriptor_FORWARD { oldIdx, newIdx := 0, 0 @@ -433,6 +436,7 @@ func (ru *Updater) UpdateRow( } batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue) } + writtenIndexes.Add(i) } else if oldEntry.Family < newEntry.Family { if oldEntry.Family == descpb.FamilyID(0) { return nil, errors.AssertionFailedf( @@ -468,6 +472,7 @@ func (ru *Updater) UpdateRow( } batch.CPut(newEntry.Key, &newEntry.Value, nil) } + writtenIndexes.Add(i) newIdx++ } } @@ -501,6 +506,7 @@ func (ru *Updater) UpdateRow( } batch.CPut(newEntry.Key, &newEntry.Value, nil) } + writtenIndexes.Add(i) newIdx++ } } else { @@ -522,10 +528,18 @@ func (ru *Updater) UpdateRow( } } + writtenIndexes.ForEach(func(idx int) { + if err == nil { + err = writeTombstones(ctx, &ru.Helper, ru.Helper.Indexes[idx], putter, ru.FetchColIDtoRowIndex, ru.newValues, traceKV) + } + }) + if err != nil { + return nil, err + } + // We're deleting indexes in a delete only state. We're bounding this by the number of indexes because inverted // indexed will be handled separately. if ru.DeleteHelper != nil { - // For determinism, add the entries for the secondary indexes in the same // order as they appear in the helper. for idx := range ru.DeleteHelper.Indexes { diff --git a/pkg/sql/rowenc/index_encoding.go b/pkg/sql/rowenc/index_encoding.go index 5fee2780021a..59ed0134682e 100644 --- a/pkg/sql/rowenc/index_encoding.go +++ b/pkg/sql/rowenc/index_encoding.go @@ -1283,28 +1283,19 @@ func MakeNullPKError( return errors.AssertionFailedf("NULL value in unknown key column") } -// EncodeSecondaryIndex encodes key/values for a secondary -// index. colMap maps descpb.ColumnIDs to indices in `values`. This returns a -// slice of IndexEntry. includeEmpty controls whether or not -// EncodeSecondaryIndex should return k/v's that contain -// empty values. For forward indexes the returned list of -// index entries is in family sorted order. -func EncodeSecondaryIndex( +// EncodeSecondaryIndexKey encodes the key for a secondary index. The 'colMap' +// maps descpb.ColumnIDs to positions in 'values'. This function returns a slice +// of byte arrays representing the key values. +func EncodeSecondaryIndexKey( ctx context.Context, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, secondaryIndex catalog.Index, colMap catalog.TableColMap, values []tree.Datum, - includeEmpty bool, -) ([]IndexEntry, error) { +) ([][]byte, bool, error) { secondaryIndexKeyPrefix := MakeIndexKeyPrefix(codec, tableDesc.GetID(), secondaryIndex.GetID()) - // Use the primary key encoding for covering indexes. - if secondaryIndex.GetEncodingType() == catenumpb.PrimaryIndexEncoding { - return EncodePrimaryIndex(codec, tableDesc, secondaryIndex, colMap, values, includeEmpty) - } - var containsNull = false var secondaryKeys [][]byte var err error @@ -1317,6 +1308,30 @@ func EncodeSecondaryIndex( secondaryKeys = [][]byte{secondaryIndexKey} } + return secondaryKeys, containsNull, err +} + +// EncodeSecondaryIndex encodes key/values for a secondary +// index. colMap maps descpb.ColumnIDs to indices in `values`. This returns a +// slice of IndexEntry. includeEmpty controls whether or not +// EncodeSecondaryIndex should return k/v's that contain +// empty values. For forward indexes the returned list of +// index entries is in family sorted order. +func EncodeSecondaryIndex( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + secondaryIndex catalog.Index, + colMap catalog.TableColMap, + values []tree.Datum, + includeEmpty bool, +) ([]IndexEntry, error) { + // Use the primary key encoding for covering indexes. + if secondaryIndex.GetEncodingType() == catenumpb.PrimaryIndexEncoding { + return EncodePrimaryIndex(codec, tableDesc, secondaryIndex, colMap, values, includeEmpty) + } + + secondaryKeys, containsNull, err := EncodeSecondaryIndexKey(ctx, codec, tableDesc, secondaryIndex, colMap, values) if err != nil { return []IndexEntry{}, err } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index ac49505dcfc9..cb88ed0a88ba 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -581,6 +581,7 @@ func (tc *TestCluster) AddServer( stkCopy.DisableSplitQueue = true stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true + stkCopy.DisableStoreRebalancer = true serverArgs.Knobs.Store = &stkCopy } diff --git a/pkg/ui/workspaces/cluster-ui/package.json b/pkg/ui/workspaces/cluster-ui/package.json index 40988c27d0a2..b670099d0e26 100644 --- a/pkg/ui/workspaces/cluster-ui/package.json +++ b/pkg/ui/workspaces/cluster-ui/package.json @@ -1,6 +1,6 @@ { "name": "@cockroachlabs/cluster-ui", - "version": "24.3.0-prerelease.0", + "version": "24.3.0-prerelease.1", "description": "Cluster UI is a library of large features shared between CockroachDB and CockroachCloud", "repository": { "type": "git",