diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index 1e7d713b60e..2acac12c694 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -18,6 +18,7 @@
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
+ - **[Semi-sync monitor in vttablet](#semi-sync-monitor)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[VTTablet ACL enforcement and reloading](#reloading-vttablet-acl)**
@@ -162,6 +163,14 @@ Users can continue to specify exact keyranges. The new feature is backward compa
The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
+### Semi-sync monitor in vttablet
+
+A new component has been added to the vttablet binary to monitor the semi-sync status of primary vttablets. We've observed cases where a brief network disruption can cause the primary to get stuck indefinitely waiting for semi-sync ACKs. In rare scenarios, this can block reparent operations and render the primary unresponsive. More information can be found in the issues https://github.com/vitessio/vitess/issues/17709 and https://github.com/vitessio/vitess/issues/17749.
+
+To address this, the new component continuously monitors the semi-sync status. If the primary becomes stuck on semi-sync ACKs, it generates writes to unblock it. If this fails, VTOrc is notified of the issue and initiates an emergency reparent operation.
+
+The monitoring interval can be adjusted using the `--semi-sync-monitor-interval` flag, which defaults to 10 seconds.
+
## Minor Changes
#### VTTablet Flags
diff --git a/go/cmd/vtctldclient/command/framework_test.go b/go/cmd/vtctldclient/command/framework_test.go
index 351356ea3a0..b4c06cdd014 100644
--- a/go/cmd/vtctldclient/command/framework_test.go
+++ b/go/cmd/vtctldclient/command/framework_test.go
@@ -30,12 +30,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -158,6 +160,10 @@ func NewFakeTablet(t *testing.T, ts *topo.Server, cell string, uid uint32, table
}
}
+var (
+ exporter = servenv.NewExporter("TestVtctldClientCommand", "")
+)
+
// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
@@ -203,6 +209,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VREngine: vreplication.NewTestEngine(ts, ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil),
+ SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
diff --git a/go/cmd/vttablet/cli/cli.go b/go/cmd/vttablet/cli/cli.go
index e48a11c79dc..a953ed2b448 100644
--- a/go/cmd/vttablet/cli/cli.go
+++ b/go/cmd/vttablet/cli/cli.go
@@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
@@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()),
VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
+ SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()),
VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()),
}
if err := tm.Start(tablet, config); err != nil {
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index 04520a8d1ad..9807595b6a6 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -322,6 +322,7 @@ Flags:
--schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
--schema_dir string Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
+ --semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index 727448ce3de..e37d1ccc6af 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -322,6 +322,7 @@ Flags:
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
+ --semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go
index fc5db965847..ca9cbcefcf5 100644
--- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go
+++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go
@@ -19,15 +19,20 @@ package newfeaturetest
import (
"context"
"fmt"
+ "os"
+ "os/exec"
+ "strings"
"sync"
"testing"
"time"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
+ "vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
)
@@ -234,3 +239,99 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}
+
+// TestSemiSyncBlockDueToDisruption tests that Vitess can recover from a situation
+// where a primary is stuck waiting for semi-sync ACKs due to a network issue,
+// even if no new writes from the user arrives.
+func TestSemiSyncBlockDueToDisruption(t *testing.T) {
+ // This is always set to "true" on GitHub Actions runners:
+ // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
+ ci, ok := os.LookupEnv("CI")
+ if ok && strings.ToLower(ci) == "true" {
+ t.Skip("Test not meant to be run on CI")
+ }
+ clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
+ defer utils.TeardownCluster(clusterInstance)
+ tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
+ utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
+
+ // stop heartbeats on all the replicas
+ for idx, tablet := range tablets {
+ if idx == 0 {
+ continue
+ }
+ utils.RunSQLs(context.Background(), t, []string{
+ "stop slave;",
+ "change master to MASTER_HEARTBEAT_PERIOD = 0;",
+ "start slave;",
+ }, tablet)
+ }
+
+ // Take a backup of the pf.conf file
+ runCommandWithSudo(t, "cp", "/etc/pf.conf", "/etc/pf.conf.backup")
+ defer func() {
+ // Restore the file from backup
+ runCommandWithSudo(t, "mv", "/etc/pf.conf.backup", "/etc/pf.conf")
+ runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
+ }()
+ // Disrupt the network between the primary and the replicas
+ runCommandWithSudo(t, "sh", "-c", fmt.Sprintf("echo 'block in proto tcp from any to any port %d' | sudo tee -a /etc/pf.conf > /dev/null", tablets[0].MySQLPort))
+
+ // This following command is only required if pfctl is not already enabled
+ //runCommandWithSudo(t, "pfctl", "-e")
+ runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
+ rules := runCommandWithSudo(t, "pfctl", "-s", "rules")
+ log.Errorf("Rules enforced - %v", rules)
+
+ // Start a write that will be blocked by the primary waiting for semi-sync ACKs
+ ch := make(chan any)
+ go func() {
+ defer func() {
+ close(ch)
+ }()
+ utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
+ }()
+
+ // Starting VTOrc later now, because we don't want it to fix the heartbeat interval
+ // on the replica's before the disruption has been introduced.
+ err := clusterInstance.StartVTOrc(clusterInstance.Keyspaces[0].Name)
+ require.NoError(t, err)
+ go func() {
+ for {
+ select {
+ case <-ch:
+ return
+ case <-time.After(1 * time.Second):
+ str, isPresent := tablets[0].VttabletProcess.GetVars()["SemiSyncMonitorWritesBlocked"]
+ if isPresent {
+ log.Errorf("SemiSyncMonitorWritesBlocked - %v", str)
+ }
+ }
+ }
+ }()
+ // If the network disruption is too long lived, then we will end up running ERS from VTOrc.
+ networkDisruptionDuration := 43 * time.Second
+ time.Sleep(networkDisruptionDuration)
+
+ // Restore the network
+ runCommandWithSudo(t, "cp", "/etc/pf.conf.backup", "/etc/pf.conf")
+ runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
+
+ // We expect the problem to be resolved in less than 30 seconds.
+ select {
+ case <-time.After(30 * time.Second):
+ t.Errorf("Timed out waiting for semi-sync to be unblocked")
+ case <-ch:
+ log.Errorf("Woohoo, write finished!")
+ }
+}
+
+// runCommandWithSudo runs the provided command with sudo privileges
+// when the command is run, it prompts the user for the password, and it must be
+// entered for the program to resume.
+func runCommandWithSudo(t *testing.T, args ...string) string {
+ cmd := exec.Command("sudo", args...)
+ out, err := cmd.CombinedOutput()
+ assert.NoError(t, err, string(out))
+ return string(out)
+}
diff --git a/go/test/endtoend/vreplication/sidecardb_test.go b/go/test/endtoend/vreplication/sidecardb_test.go
index f908d66a2ec..54c1a10130f 100644
--- a/go/test/endtoend/vreplication/sidecardb_test.go
+++ b/go/test/endtoend/vreplication/sidecardb_test.go
@@ -38,7 +38,7 @@ var ddls1, ddls2 []string
func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
- "redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
+ "redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "semisync_heartbeat",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
index 6a565ac046f..531b069535a 100644
--- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
+++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
@@ -91,6 +91,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
assert.False(t, primaryInstance.SemiSyncReplicaStatus)
+ assert.False(t, primaryInstance.SemiSyncBlocked)
assert.EqualValues(t, 2, primaryInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, primaryInstance.SemiSyncPrimaryWaitForReplicaCount)
assert.EqualValues(t, 1000000000000000000, primaryInstance.SemiSyncPrimaryTimeout)
@@ -142,6 +143,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
+ assert.False(t, replicaInstance.SemiSyncBlocked)
assert.True(t, replicaInstance.SemiSyncReplicaStatus)
assert.EqualValues(t, 0, replicaInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, replicaInstance.SemiSyncPrimaryWaitForReplicaCount)
diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go
index e6afe7917f1..a2b5e66bd49 100644
--- a/go/vt/mysqlctl/fakemysqldaemon.go
+++ b/go/vt/mysqlctl/fakemysqldaemon.go
@@ -217,6 +217,11 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
return result
}
+// DB returns the fakesqldb.DB object.
+func (fmd *FakeMysqlDaemon) DB() *fakesqldb.DB {
+ return fmd.db
+}
+
// Start is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go
index d99881e2cad..2cd8424135a 100644
--- a/go/vt/proto/replicationdata/replicationdata.pb.go
+++ b/go/vt/proto/replicationdata/replicationdata.pb.go
@@ -504,6 +504,7 @@ type FullStatus struct {
SuperReadOnly bool `protobuf:"varint,21,opt,name=super_read_only,json=superReadOnly,proto3" json:"super_read_only,omitempty"`
ReplicationConfiguration *Configuration `protobuf:"bytes,22,opt,name=replication_configuration,json=replicationConfiguration,proto3" json:"replication_configuration,omitempty"`
DiskStalled bool `protobuf:"varint,23,opt,name=disk_stalled,json=diskStalled,proto3" json:"disk_stalled,omitempty"`
+ SemiSyncBlocked bool `protobuf:"varint,24,opt,name=semi_sync_blocked,json=semiSyncBlocked,proto3" json:"semi_sync_blocked,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -699,6 +700,13 @@ func (x *FullStatus) GetDiskStalled() bool {
return false
}
+func (x *FullStatus) GetSemiSyncBlocked() bool {
+ if x != nil {
+ return x.SemiSyncBlocked
+ }
+ return false
+}
+
var File_replicationdata_proto protoreflect.FileDescriptor
var file_replicationdata_proto_rawDesc = string([]byte{
@@ -786,7 +794,7 @@ var file_replicationdata_proto_rawDesc = string([]byte{
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75,
0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
- 0x55, 0x75, 0x69, 0x64, 0x22, 0xeb, 0x08, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61,
+ 0x55, 0x75, 0x69, 0x64, 0x22, 0x97, 0x09, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64,
0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18,
@@ -857,14 +865,17 @@ var file_replicationdata_proto_rawDesc = string([]byte{
0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12,
0x21, 0x0a, 0x0c, 0x64, 0x69, 0x73, 0x6b, 0x5f, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x18,
0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x64, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x6c, 0x6c,
- 0x65, 0x64, 0x2a, 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
- 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41,
- 0x4e, 0x44, 0x53, 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a,
- 0x0c, 0x49, 0x4f, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42,
- 0x2e, 0x5a, 0x2c, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74,
- 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
- 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x65, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f,
+ 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x18, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73,
+ 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x2a, 0x3b,
+ 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
+ 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, 0x4e, 0x44, 0x53, 0x51,
+ 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x4f, 0x54,
+ 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x76,
+ 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f,
+ 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x70, 0x6c,
+ 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x33,
})
var (
diff --git a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go
index 92f8e3074c3..d4e2af4496e 100644
--- a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go
+++ b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go
@@ -143,6 +143,7 @@ func (m *FullStatus) CloneVT() *FullStatus {
r.SuperReadOnly = m.SuperReadOnly
r.ReplicationConfiguration = m.ReplicationConfiguration.CloneVT()
r.DiskStalled = m.DiskStalled
+ r.SemiSyncBlocked = m.SemiSyncBlocked
if len(m.unknownFields) > 0 {
r.unknownFields = make([]byte, len(m.unknownFields))
copy(r.unknownFields, m.unknownFields)
@@ -553,6 +554,18 @@ func (m *FullStatus) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
+ if m.SemiSyncBlocked {
+ i--
+ if m.SemiSyncBlocked {
+ dAtA[i] = 1
+ } else {
+ dAtA[i] = 0
+ }
+ i--
+ dAtA[i] = 0x1
+ i--
+ dAtA[i] = 0xc0
+ }
if m.DiskStalled {
i--
if m.DiskStalled {
@@ -991,6 +1004,9 @@ func (m *FullStatus) SizeVT() (n int) {
if m.DiskStalled {
n += 3
}
+ if m.SemiSyncBlocked {
+ n += 3
+ }
n += len(m.unknownFields)
return n
}
@@ -2587,6 +2603,26 @@ func (m *FullStatus) UnmarshalVT(dAtA []byte) error {
}
}
m.DiskStalled = bool(v != 0)
+ case 24:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field SemiSyncBlocked", wireType)
+ }
+ var v int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.SemiSyncBlocked = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := protohelpers.Skip(dAtA[iNdEx:])
diff --git a/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql b/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql
new file mode 100644
index 00000000000..01b8e5d21c2
--- /dev/null
+++ b/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql
@@ -0,0 +1,21 @@
+/*
+Copyright 2025 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+CREATE TABLE IF NOT EXISTS semisync_heartbeat
+(
+ ts BIGINT UNSIGNED NOT NULL,
+ PRIMARY KEY (`ts`)
+) ENGINE = InnoDB CHARSET = utf8mb4
diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go
index 8baa9a12476..cdbc943690e 100644
--- a/go/vt/vtorc/db/generate_base.go
+++ b/go/vt/vtorc/db/generate_base.go
@@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
+ semi_sync_blocked tinyint NOT NULL DEFAULT 0,
is_disk_stalled TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go
index 6a800e5ee0b..3647134cd54 100644
--- a/go/vt/vtorc/inst/analysis.go
+++ b/go/vt/vtorc/inst/analysis.go
@@ -55,6 +55,7 @@ const (
AllPrimaryReplicasNotReplicatingOrDead AnalysisCode = "AllPrimaryReplicasNotReplicatingOrDead"
LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis"
LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary"
+ PrimarySemiSyncBlocked AnalysisCode = "PrimarySemiSyncBlocked"
ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected"
PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled"
)
@@ -115,6 +116,7 @@ type ReplicationAnalysis struct {
SemiSyncPrimaryWaitForReplicaCount uint
SemiSyncPrimaryClients uint
SemiSyncReplicaEnabled bool
+ SemiSyncBlocked bool
CountSemiSyncReplicasEnabled uint
CountLoggingReplicas uint
CountStatementBasedLoggingReplicas uint
diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go
index d487973b0f0..685d84b8a00 100644
--- a/go/vt/vtorc/inst/analysis_dao.go
+++ b/go/vt/vtorc/inst/analysis_dao.go
@@ -152,6 +152,9 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
MIN(
primary_instance.semi_sync_primary_status
) AS semi_sync_primary_status,
+ MIN(
+ primary_instance.semi_sync_blocked
+ ) AS semi_sync_blocked,
MIN(
primary_instance.semi_sync_replica_enabled
) AS semi_sync_replica_enabled,
@@ -333,6 +336,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.BinlogServerImmediateTopology = countValidBinlogServerReplicas == a.CountValidReplicas && a.CountValidReplicas > 0
a.SemiSyncPrimaryEnabled = m.GetBool("semi_sync_primary_enabled")
a.SemiSyncPrimaryStatus = m.GetBool("semi_sync_primary_status")
+ a.SemiSyncBlocked = m.GetBool("semi_sync_blocked")
a.SemiSyncReplicaEnabled = m.GetBool("semi_sync_replica_enabled")
a.CountSemiSyncReplicasEnabled = m.GetUint("count_semi_sync_replicas")
// countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas")
@@ -458,6 +462,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.Analysis = PrimaryTabletDeleted
a.Description = "Primary tablet has been deleted"
ca.hasClusterwideAction = true
+ } else if a.IsPrimary && a.SemiSyncBlocked && a.CountSemiSyncReplicasEnabled >= a.SemiSyncPrimaryWaitForReplicaCount {
+ // The primary is reporting that semi-sync monitor is blocked on writes.
+ // There are enough replicas configured to send semi-sync ACKs such that the primary shouldn't be blocked.
+ // There is some network diruption in progress. We should run an ERS.
+ a.Analysis = PrimarySemiSyncBlocked
+ a.Description = "Writes seem to be blocked on semi-sync acks on the primary, even though sufficient replicas are configured to send ACKs"
+ ca.hasClusterwideAction = true
} else if topo.IsReplicaType(a.TabletType) && !a.IsReadOnly {
a.Analysis = ReplicaIsWritable
a.Description = "Replica is writable"
diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go
index baa1121b776..a58414af3e8 100644
--- a/go/vt/vtorc/inst/analysis_dao_test.go
+++ b/go/vt/vtorc/inst/analysis_dao_test.go
@@ -34,10 +34,10 @@ var (
// The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here.
// This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica.
initialSQL = []string{
- `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false);`,
- `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false);`,
- `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false);`,
- `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false,false);`,
+ `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false,false);`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
@@ -119,6 +119,62 @@ func TestGetReplicationAnalysisDecision(t *testing.T) {
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimaryDiskStalled,
+ }, {
+ name: "PrimarySemiSyncBlocked",
+ info: []*test.InfoForRecoveryAnalysis{{
+ TabletInfo: &topodatapb.Tablet{
+ Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
+ Hostname: "localhost",
+ Keyspace: "ks",
+ Shard: "0",
+ Type: topodatapb.TabletType_PRIMARY,
+ MysqlHostname: "localhost",
+ MysqlPort: 6709,
+ },
+ DurabilityPolicy: "semi_sync",
+ LastCheckValid: 1,
+ CountReplicas: 4,
+ CountValidReplicas: 4,
+ CountValidReplicatingReplicas: 4,
+ IsPrimary: 1,
+ SemiSyncPrimaryEnabled: 1,
+ SemiSyncPrimaryStatus: 1,
+ SemiSyncPrimaryWaitForReplicaCount: 2,
+ CountSemiSyncReplicasEnabled: 2,
+ SemiSyncPrimaryClients: 0,
+ SemiSyncBlocked: 1,
+ }},
+ keyspaceWanted: "ks",
+ shardWanted: "0",
+ codeWanted: PrimarySemiSyncBlocked,
+ }, {
+ name: "LockedSemiSync",
+ info: []*test.InfoForRecoveryAnalysis{{
+ TabletInfo: &topodatapb.Tablet{
+ Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
+ Hostname: "localhost",
+ Keyspace: "ks",
+ Shard: "0",
+ Type: topodatapb.TabletType_PRIMARY,
+ MysqlHostname: "localhost",
+ MysqlPort: 6709,
+ },
+ DurabilityPolicy: "semi_sync",
+ LastCheckValid: 1,
+ CountReplicas: 4,
+ CountValidReplicas: 4,
+ CountValidReplicatingReplicas: 4,
+ IsPrimary: 1,
+ SemiSyncPrimaryEnabled: 1,
+ SemiSyncPrimaryStatus: 1,
+ SemiSyncPrimaryWaitForReplicaCount: 2,
+ CountSemiSyncReplicasEnabled: 1,
+ SemiSyncPrimaryClients: 1,
+ SemiSyncBlocked: 1,
+ }},
+ keyspaceWanted: "ks",
+ shardWanted: "0",
+ codeWanted: LockedSemiSyncPrimaryHypothesis,
}, {
name: "DeadPrimary",
info: []*test.InfoForRecoveryAnalysis{{
diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go
index b7b097bb14d..f8329e62288 100644
--- a/go/vt/vtorc/inst/instance.go
+++ b/go/vt/vtorc/inst/instance.go
@@ -85,6 +85,7 @@ type Instance struct {
SemiSyncPrimaryStatus bool
SemiSyncPrimaryClients uint
SemiSyncReplicaStatus bool
+ SemiSyncBlocked bool
LastSeenTimestamp string
IsLastCheckValid bool
diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go
index 916f4757722..82a28370069 100644
--- a/go/vt/vtorc/inst/instance_dao.go
+++ b/go/vt/vtorc/inst/instance_dao.go
@@ -240,6 +240,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
instance.SemiSyncPrimaryClients = uint(fs.SemiSyncPrimaryClients)
instance.SemiSyncPrimaryStatus = fs.SemiSyncPrimaryStatus
instance.SemiSyncReplicaStatus = fs.SemiSyncReplicaStatus
+ instance.SemiSyncBlocked = fs.SemiSyncBlocked
if instance.IsOracleMySQL() || instance.IsPercona() {
// Stuff only supported on Oracle / Percona MySQL
@@ -579,6 +580,7 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.SemiSyncPrimaryStatus = m.GetBool("semi_sync_primary_status")
instance.SemiSyncPrimaryClients = m.GetUint("semi_sync_primary_clients")
instance.SemiSyncReplicaStatus = m.GetBool("semi_sync_replica_status")
+ instance.SemiSyncBlocked = m.GetBool("semi_sync_blocked")
instance.ReplicationDepth = m.GetUint("replication_depth")
instance.IsCoPrimary = m.GetBool("is_co_primary")
instance.HasReplicationCredentials = m.GetBool("has_replication_credentials")
@@ -879,6 +881,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
"semi_sync_primary_status",
"semi_sync_primary_clients",
"semi_sync_replica_status",
+ "semi_sync_blocked",
"last_discovery_latency",
"is_disk_stalled",
}
@@ -959,6 +962,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
args = append(args, instance.SemiSyncPrimaryStatus)
args = append(args, instance.SemiSyncPrimaryClients)
args = append(args, instance.SemiSyncReplicaStatus)
+ args = append(args, instance.SemiSyncBlocked)
args = append(args, instance.LastDiscoveryLatency.Nanoseconds())
args = append(args, instance.StalledDisk)
}
diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go
index c3b99455741..e518d563739 100644
--- a/go/vt/vtorc/inst/instance_dao_test.go
+++ b/go/vt/vtorc/inst/instance_dao_test.go
@@ -64,13 +64,13 @@ func TestMkInsertSingle(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, semi_sync_blocked, last_discovery_latency, is_disk_stalled, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT,
FULL, false, false, , 0, , 0, 0, 0,
- false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,`
+ false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false,`
sql1, args1, err := mkInsertForInstances(instances[:1], false, true)
require.NoError(t, err)
@@ -87,16 +87,16 @@ func TestMkInsertThree(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, semi_sync_blocked, last_discovery_latency, is_disk_stalled, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a3 := `
- zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
- zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
- zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
+ zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false ,false, 0, false,
+ zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false,
+ zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false,
`
sql3, args3, err := mkInsertForInstances(instances[:3], true, true)
diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go
index 41c0ca5d398..82894e8b369 100644
--- a/go/vt/vtorc/logic/topology_recovery.go
+++ b/go/vt/vtorc/logic/topology_recovery.go
@@ -293,7 +293,7 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl
func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction {
switch analysisCode {
// primary
- case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled:
+ case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled, inst.PrimarySemiSyncBlocked:
// If ERS is disabled, we have no way of repairing the cluster.
if !config.ERSEnabled() {
log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode)
diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go
index 9df5fc989f0..68579dd05cb 100644
--- a/go/vt/vtorc/logic/topology_recovery_test.go
+++ b/go/vt/vtorc/logic/topology_recovery_test.go
@@ -49,6 +49,11 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) {
prevAnalysisCode: inst.DeadPrimary,
newAnalysisCode: inst.PrimaryDiskStalled,
shouldBeEqual: true,
+ }, {
+ // PrimarySemiSyncBlocked and PrimaryDiskStalled have the same recovery
+ prevAnalysisCode: inst.PrimarySemiSyncBlocked,
+ newAnalysisCode: inst.PrimaryDiskStalled,
+ shouldBeEqual: true,
}, {
// DeadPrimary and PrimaryTabletDeleted are different recoveries.
prevAnalysisCode: inst.DeadPrimary,
@@ -232,6 +237,16 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) {
ersEnabled: false,
analysisCode: inst.PrimaryDiskStalled,
wantRecoveryFunction: noRecoveryFunc,
+ }, {
+ name: "PrimarySemiSyncBlocked with ERS enabled",
+ ersEnabled: true,
+ analysisCode: inst.PrimarySemiSyncBlocked,
+ wantRecoveryFunction: recoverDeadPrimaryFunc,
+ }, {
+ name: "PrimarySemiSyncBlocked with ERS disabled",
+ ersEnabled: false,
+ analysisCode: inst.PrimarySemiSyncBlocked,
+ wantRecoveryFunction: noRecoveryFunc,
}, {
name: "PrimaryTabletDeleted with ERS enabled",
ersEnabled: true,
diff --git a/go/vt/vtorc/test/recovery_analysis.go b/go/vt/vtorc/test/recovery_analysis.go
index bb6e4132243..eaf0dac2258 100644
--- a/go/vt/vtorc/test/recovery_analysis.go
+++ b/go/vt/vtorc/test/recovery_analysis.go
@@ -65,6 +65,7 @@ type InfoForRecoveryAnalysis struct {
CountValidBinlogServerReplicas uint
SemiSyncPrimaryEnabled int
SemiSyncPrimaryStatus int
+ SemiSyncBlocked int
SemiSyncPrimaryWaitForReplicaCount uint
SemiSyncPrimaryClients uint
SemiSyncReplicaEnabled int
@@ -142,6 +143,7 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap {
rowMap["semi_sync_primary_clients"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryClients), Valid: true}
rowMap["semi_sync_primary_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryEnabled), Valid: true}
rowMap["semi_sync_primary_status"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryStatus), Valid: true}
+ rowMap["semi_sync_blocked"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncBlocked), Valid: true}
rowMap["semi_sync_primary_wait_for_replica_count"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryWaitForReplicaCount), Valid: true}
rowMap["semi_sync_replica_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncReplicaEnabled), Valid: true}
res, _ := prototext.Marshal(info.TabletInfo)
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 070eab9a38a..e8536521ede 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -187,6 +187,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
SemiSyncPrimaryClients: semiSyncClients,
SemiSyncPrimaryTimeout: semiSyncTimeout,
SemiSyncWaitForReplicaCount: semiSyncNumReplicas,
+ SemiSyncBlocked: tm.SemiSyncMonitor.AllWritesBlocked(),
SuperReadOnly: superReadOnly,
ReplicationConfiguration: replConfiguration,
}, nil
@@ -592,8 +593,14 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
}()
}
- // Now that we know no writes are in-flight and no new writes can occur,
- // set MySQL to super_read_only mode. If we are already super_read_only because of a
+ // Now we know no writes are in-flight and no new writes can occur.
+ // We just need to wait for no write being blocked on semi-sync ACKs.
+ err = tm.SemiSyncMonitor.WaitUntilSemiSyncUnblocked(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ // We can now set MySQL to super_read_only mode. If we are already super_read_only because of a
// previous demotion, or because we are not primary anyway, this should be
// idempotent.
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, true); err != nil {
@@ -1052,10 +1059,24 @@ func (tm *TabletManager) fixSemiSync(ctx context.Context, tabletType topodatapb.
case SemiSyncActionNone:
return nil
case SemiSyncActionSet:
+ if tm.SemiSyncMonitor != nil {
+ // We want to enable the semi-sync monitor only if the tablet is going to start
+ // expecting semi-sync ACKs.
+ if tabletType == topodatapb.TabletType_PRIMARY {
+ tm.SemiSyncMonitor.Open()
+ } else {
+ tm.SemiSyncMonitor.Close()
+ }
+ }
// Always enable replica-side since it doesn't hurt to keep it on for a primary.
// The primary-side needs to be off for a replica, or else it will get stuck.
return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, tabletType == topodatapb.TabletType_PRIMARY, true)
case SemiSyncActionUnset:
+ // The nil check is required for vtcombo, which doesn't run the semi-sync monitor
+ // but does try to turn off semi-sync.
+ if tm.SemiSyncMonitor != nil {
+ tm.SemiSyncMonitor.Close()
+ }
return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, false, false)
default:
return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync)
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go
index 4efb7b13081..98600b5f9b5 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go
@@ -25,7 +25,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"
+ "vitess.io/vitess/go/sqltypes"
+ "vitess.io/vitess/go/vt/mysqlctl"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/topo/memorytopo"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
)
@@ -77,15 +82,17 @@ func TestDemotePrimaryStalled(t *testing.T) {
qsWaitChan: make(chan any),
}
// Create a tablet manager with a replica type tablet.
+ fakeDb := newTestMysqlDaemon(t, 1)
tm := &TabletManager{
actionSema: semaphore.NewWeighted(1),
- MysqlDaemon: newTestMysqlDaemon(t, 1),
+ MysqlDaemon: fakeDb,
tmState: &tmState{
displayState: displayState{
tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}),
},
},
QueryServiceControl: qsc,
+ SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(fakeDb.DB(), exporter),
}
go func() {
@@ -105,3 +112,60 @@ func TestDemotePrimaryStalled(t *testing.T) {
return !qsc.primaryStalled.Load()
}, 5*time.Second, 100*time.Millisecond)
}
+
+// TestDemotePrimaryWaitingForSemiSyncUnblock tests that demote primary unblocks if the primary is blocked on semi-sync ACKs
+// and doesn't issue the set super read-only query until all writes waiting on semi-sync ACKs have gone through.
+func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ ts := memorytopo.NewServer(ctx, "cell1")
+ tm := newTestTM(t, ts, 1, "ks", "0", nil)
+ // Make the tablet a primary.
+ err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false)
+ require.NoError(t, err)
+ fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon)
+ fakeDb := fakeMysqlDaemon.DB()
+ fakeDb.SetNeverFail(true)
+
+ tm.SemiSyncMonitor.Open()
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ fakeDb.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+ // Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs.
+ fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
+
+ // Verify that in the beginning the tablet is serving.
+ require.True(t, tm.QueryServiceControl.IsServing())
+
+ // Start the demote primary operation in a go routine.
+ var demotePrimaryFinished atomic.Bool
+ go func() {
+ _, err := tm.demotePrimary(ctx, false)
+ require.NoError(t, err)
+ demotePrimaryFinished.Store(true)
+ }()
+
+ // Wait for the demote primary operation to have changed the serving state.
+ // After that point, we can assume that the demote primary gets blocked on writes waiting for semi-sync ACKs.
+ require.Eventually(t, func() bool {
+ return !tm.QueryServiceControl.IsServing()
+ }, 5*time.Second, 100*time.Millisecond)
+
+ // DemotePrimary shouldn't have finished yet.
+ require.False(t, demotePrimaryFinished.Load())
+ // We shouldn't have seen the super-read only query either.
+ require.False(t, fakeMysqlDaemon.SuperReadOnly.Load())
+
+ // Now we unblock the semi-sync monitor.
+ fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+ close(ch)
+
+ // This should unblock the demote primary operation eventually.
+ require.Eventually(t, func() bool {
+ return demotePrimaryFinished.Load()
+ }, 5*time.Second, 100*time.Millisecond)
+ // We should have also seen the super-read only query.
+ require.True(t, fakeMysqlDaemon.SuperReadOnly.Load())
+}
diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go
new file mode 100644
index 00000000000..0ea37e29284
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go
@@ -0,0 +1,406 @@
+/*
+Copyright 2025 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package semisyncmonitor
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "vitess.io/vitess/go/constants/sidecar"
+ "vitess.io/vitess/go/mysql/fakesqldb"
+ "vitess.io/vitess/go/stats"
+ "vitess.io/vitess/go/timer"
+ "vitess.io/vitess/go/vt/dbconfigs"
+ "vitess.io/vitess/go/vt/dbconnpool"
+ "vitess.io/vitess/go/vt/log"
+ "vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/servenv"
+ "vitess.io/vitess/go/vt/sqlparser"
+ "vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+)
+
+const (
+ semiSyncWaitSessionsRead = "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')"
+ semiSyncHeartbeatWrite = "INSERT INTO %s.semisync_heartbeat (ts) VALUES (NOW())"
+ semiSyncHeartbeatClear = "TRUNCATE TABLE %s.semisync_heartbeat"
+ maxWritesPermitted = 15
+ clearTimerDuration = 24 * time.Hour
+)
+
+var (
+ // waitBetweenWrites is the time to wait between consecutive writes.
+ // This is a variable instead of a constant only to be tweaked in tests.
+ waitBetweenWrites = 1 * time.Second
+)
+
+// Monitor is a monitor that checks if the primary tablet
+// is blocked on a semi-sync ack from the replica.
+// If the semi-sync ACK is lost in the network,
+// it is possible that the primary is indefinitely stuck,
+// blocking PRS. The monitor looks for this situation and manufactures a write
+// periodically to unblock the primary.
+type Monitor struct {
+ // config is used to get the connection parameters.
+ config *tabletenv.TabletConfig
+ // ticks is the ticker on which we'll check
+ // if the primary is blocked on semi-sync ACKs or not.
+ ticks *timer.Timer
+ // clearTicks is the ticker to clear the data in
+ // the semisync_heartbeat table.
+ clearTicks *timer.Timer
+
+ // mu protects the fields below.
+ mu sync.Mutex
+ appPool *dbconnpool.ConnectionPool
+ isOpen bool
+ // isWriting stores if the monitor is currently writing to the DB.
+ // We don't want two different threads initiating writes, so we use this
+ // for synchronization.
+ isWriting bool
+ // inProgressWriteCount is the number of writes currently in progress.
+ // The writes from the monitor themselves might get blocked and hence a count for them is required.
+ // After enough writes are blocked, we want to notify VTOrc to run an ERS.
+ inProgressWriteCount int
+ // isBlocked stores if the primary is blocked on semi-sync ack.
+ isBlocked bool
+ // waiters stores the list of waiters that are waiting for the primary to be unblocked.
+ waiters []chan struct{}
+ // writesBlockedGauge is a gauge tracking the number of writes the monitor is blocked on.
+ writesBlockedGauge *stats.Gauge
+ // errorCount is the number of errors that the semi-sync monitor ran into.
+ // We ignore some of the errors, so the counter is a good way to track how many errors we have seen.
+ errorCount *stats.Counter
+}
+
+// NewMonitor creates a new Monitor.
+func NewMonitor(config *tabletenv.TabletConfig, exporter *servenv.Exporter) *Monitor {
+ return &Monitor{
+ config: config,
+ ticks: timer.NewTimer(config.SemiSyncMonitor.Interval),
+ // We clear the data every day. We can make it configurable in the future,
+ // but this seams fine for now.
+ clearTicks: timer.NewTimer(clearTimerDuration),
+ writesBlockedGauge: exporter.NewGauge("SemiSyncMonitorWritesBlocked", "Number of writes blocked in the semi-sync monitor"),
+ errorCount: exporter.NewCounter("SemiSyncMonitorErrorCount", "Number of errors encountered by the semi-sync monitor"),
+ appPool: dbconnpool.NewConnectionPool("SemiSyncMonitorAppPool", exporter, maxWritesPermitted+5, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
+ waiters: make([]chan struct{}, 0),
+ }
+}
+
+// CreateTestSemiSyncMonitor created a monitor for testing.
+// It takes an optional fake db.
+func CreateTestSemiSyncMonitor(db *fakesqldb.DB, exporter *servenv.Exporter) *Monitor {
+ var dbc *dbconfigs.DBConfigs
+ if db != nil {
+ params := db.ConnParams()
+ cp := *params
+ dbc = dbconfigs.NewTestDBConfigs(cp, cp, "")
+ }
+ return NewMonitor(&tabletenv.TabletConfig{
+ DB: dbc,
+ SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{
+ Interval: 1 * time.Second,
+ },
+ }, exporter)
+}
+
+// Open starts the monitor.
+func (m *Monitor) Open() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ // The check for config being nil is only requried for tests.
+ if m.isOpen || m.config == nil || m.config.DB == nil {
+ // If we are already open, then there is nothing to do
+ return
+ }
+ // Set the monitor to be open.
+ m.isOpen = true
+ log.Info("SemiSync Monitor: opening")
+
+ // This function could be running from within a unit test scope, in which case we use
+ // mock pools that are already open. This is why we test for the pool being open.
+ if !m.appPool.IsOpen() {
+ m.appPool.Open(m.config.DB.AppWithDB())
+ }
+ m.clearTicks.Start(m.clearAllData)
+ m.ticks.Start(m.checkAndFixSemiSyncBlocked)
+}
+
+// Close stops the monitor.
+func (m *Monitor) Close() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if !m.isOpen {
+ // If we are already closed, then there is nothing to do
+ return
+ }
+ m.isOpen = false
+ log.Info("SemiSync Monitor: closing")
+ m.clearTicks.Stop()
+ m.ticks.Stop()
+ m.appPool.Close()
+}
+
+// checkAndFixSemiSyncBlocked checks if the primary is blocked on semi-sync ack
+// and manufactures a write to unblock the primary. This function is safe to
+// be called multiple times in parallel.
+func (m *Monitor) checkAndFixSemiSyncBlocked() {
+ // Check if semi-sync is blocked or not
+ ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
+ defer cancel()
+ isBlocked, err := m.isSemiSyncBlocked(ctx)
+ if err != nil {
+ m.errorCount.Add(1)
+ // If we are unable to determine whether the primary is blocked or not,
+ // then we can just abort the function and try again later.
+ log.Errorf("SemiSync Monitor: failed to check if primary is blocked on semi-sync: %v", err)
+ return
+ }
+ // Set the isBlocked state.
+ m.setIsBlocked(isBlocked)
+ if isBlocked {
+ // If we are blocked, then we want to start the writes.
+ // That function is re-entrant. If we are already writing, then it will just return.
+ // We start it in a go-routine, because we want to continue to check for when
+ // we get unblocked.
+ go m.startWrites()
+ }
+}
+
+// isSemiSyncBlocked checks if the primary is blocked on semi-sync.
+func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) {
+ // Get a connection from the pool
+ conn, err := m.appPool.Get(ctx)
+ if err != nil {
+ return false, err
+ }
+ defer conn.Recycle()
+
+ // Execute the query to check if the primary is blocked on semi-sync.
+ res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false)
+ if err != nil {
+ return false, err
+ }
+ // If we have no rows, then the primary doesn't have semi-sync enabled.
+ // It then follows, that the primary isn't blocked :)
+ if len(res.Rows) == 0 {
+ return false, nil
+ }
+
+ // Read the status value and check if it is non-zero.
+ if len(res.Rows) != 1 || len(res.Rows[0]) != 2 {
+ return false, errors.New("unexpected number of rows received")
+ }
+ value, err := res.Rows[0][1].ToCastInt64()
+ return value != 0, err
+}
+
+// isClosed returns if the monitor is currently closed or not.
+func (m *Monitor) isClosed() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return !m.isOpen
+}
+
+// WaitUntilSemiSyncUnblocked waits until the primary is not blocked
+// on semi-sync or until the context expires.
+func (m *Monitor) WaitUntilSemiSyncUnblocked(ctx context.Context) error {
+ // SemiSyncMonitor is closed, which means semi-sync is not enabled.
+ // We don't have anything to wait for.
+ if m.isClosed() {
+ return nil
+ }
+ // run one iteration of checking if semi-sync is blocked or not.
+ m.checkAndFixSemiSyncBlocked()
+ if !m.stillBlocked() {
+ // If we find that the primary isn't blocked, we're good,
+ // we don't need to wait for anything.
+ log.Infof("Primary not blocked on semi-sync ACKs")
+ return nil
+ }
+ log.Infof("Waiting for semi-sync to be unblocked")
+ // The primary is blocked. We need to wait for it to be unblocked
+ // or the context to expire.
+ ch := m.addWaiter()
+ select {
+ case <-ch:
+ log.Infof("Finished waiting for semi-sync to be unblocked")
+ return nil
+ case <-ctx.Done():
+ log.Infof("Error while waiting for semi-sync to be unblocked - %s", ctx.Err().Error())
+ return ctx.Err()
+ }
+}
+
+// stillBlocked returns true if the monitor should continue writing to the DB
+// because the monitor is still open, and the primary is still blocked.
+func (m *Monitor) stillBlocked() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.isOpen && m.isBlocked
+}
+
+// checkAndSetIsWriting checks if the monitor is already writing to the DB.
+// If it is not, then it sets the isWriting field and signals the caller.
+func (m *Monitor) checkAndSetIsWriting() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.isWriting {
+ return false
+ }
+ m.isWriting = true
+ return true
+}
+
+// clearIsWriting clears the isWriting field.
+func (m *Monitor) clearIsWriting() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.isWriting = false
+}
+
+// startWrites starts writing to the DB.
+// It is re-entrant and will return if we are already writing.
+func (m *Monitor) startWrites() {
+ // If we are already writing, then we can just return.
+ if !m.checkAndSetIsWriting() {
+ return
+ }
+ // We defer the clear of the isWriting field.
+ defer m.clearIsWriting()
+
+ // Check if we need to continue writing or not.
+ for m.stillBlocked() {
+ // We do the writes in a go-routine because if the network disruption
+ // is somewhat long-lived, then the writes themselves can also block.
+ // By doing them in a go-routine we give the system more time to recover while
+ // exponentially backing off. We will not do more than maxWritesPermitted writes and once
+ // all maxWritesPermitted writes are blocked, we'll wait for VTOrc to run an ERS.
+ go m.write()
+ time.Sleep(waitBetweenWrites)
+ }
+}
+
+// incrementWriteCount tries to increment the write count. It
+// also checks that the write count value should not exceed
+// the maximum value configured. It returns whether it was able
+// to increment the value or not.
+func (m *Monitor) incrementWriteCount() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.inProgressWriteCount == maxWritesPermitted {
+ return false
+ }
+ m.inProgressWriteCount++
+ m.writesBlockedGauge.Set(int64(m.inProgressWriteCount))
+ return true
+}
+
+// AllWritesBlocked returns if maxWritesPermitted number of writes
+// are already outstanding.
+func (m *Monitor) AllWritesBlocked() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.isOpen && m.inProgressWriteCount == maxWritesPermitted
+}
+
+// decrementWriteCount decrements the write count.
+func (m *Monitor) decrementWriteCount() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.inProgressWriteCount--
+ m.writesBlockedGauge.Set(int64(m.inProgressWriteCount))
+}
+
+// write writes a heartbeat to unblock semi-sync being stuck.
+func (m *Monitor) write() {
+ shouldWrite := m.incrementWriteCount()
+ if !shouldWrite {
+ return
+ }
+ defer m.decrementWriteCount()
+ // Get a connection from the pool
+ ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
+ defer cancel()
+ conn, err := m.appPool.Get(ctx)
+ if err != nil {
+ m.errorCount.Add(1)
+ log.Errorf("SemiSync Monitor: failed to get a connection when writing to semisync_heartbeat table: %v", err)
+ return
+ }
+ defer conn.Recycle()
+ _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatWrite), 0, false)
+ if err != nil {
+ m.errorCount.Add(1)
+ log.Errorf("SemiSync Monitor: failed to write to semisync_heartbeat table: %v", err)
+ } else {
+ // One of the writes went through without an error.
+ // This means that we aren't blocked on semi-sync anymore.
+ m.setIsBlocked(false)
+ }
+}
+
+// setIsBlocked sets the isBlocked field.
+func (m *Monitor) setIsBlocked(val bool) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.isBlocked = val
+ if !val {
+ // If we are unblocked, then we need to signal all the waiters.
+ for _, ch := range m.waiters {
+ close(ch)
+ }
+ // We also empty the list of current waiters.
+ m.waiters = nil
+ }
+}
+
+// clearAllData clears all the data in the table so that it never
+// consumes too much space on the MySQL instance.
+func (m *Monitor) clearAllData() {
+ // Get a connection from the pool
+ conn, err := m.appPool.Get(context.Background())
+ if err != nil {
+ m.errorCount.Add(1)
+ log.Errorf("SemiSync Monitor: failed get a connection to clear semisync_heartbeat table: %v", err)
+ return
+ }
+ defer conn.Recycle()
+ _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatClear), 0, false)
+ if err != nil {
+ m.errorCount.Add(1)
+ log.Errorf("SemiSync Monitor: failed to clear semisync_heartbeat table: %v", err)
+ }
+}
+
+// addWaiter adds a waiter to the list of waiters
+// that will be unblocked when the primary is no longer blocked.
+func (m *Monitor) addWaiter() chan struct{} {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ ch := make(chan struct{})
+ m.waiters = append(m.waiters, ch)
+ return ch
+}
+
+// bindSideCarDBName binds the sidecar db name to the query.
+func (m *Monitor) bindSideCarDBName(query string) string {
+ return sqlparser.BuildParsedQuery(query, sidecar.GetIdentifier()).Query
+}
diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go
new file mode 100644
index 00000000000..97716569563
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go
@@ -0,0 +1,728 @@
+/*
+Copyright 2025 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package semisyncmonitor
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/mysql/fakesqldb"
+ "vitess.io/vitess/go/sqltypes"
+ "vitess.io/vitess/go/vt/dbconfigs"
+ "vitess.io/vitess/go/vt/servenv"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+)
+
+var (
+ exporter = servenv.NewExporter("TestSemiSyncMonitor", "")
+)
+
+// createFakeDBAndMonitor created a fake DB and a monitor for testing.
+func createFakeDBAndMonitor(t *testing.T) (*fakesqldb.DB, *Monitor) {
+ db := fakesqldb.New(t)
+ params := db.ConnParams()
+ cp := *params
+ dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
+ config := &tabletenv.TabletConfig{
+ DB: dbc,
+ SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{
+ Interval: 10 * time.Second,
+ },
+ }
+ monitor := NewMonitor(config, exporter)
+ monitor.mu.Lock()
+ defer monitor.mu.Unlock()
+ monitor.isOpen = true
+ monitor.appPool.Open(config.DB.AppWithDB())
+ return db, monitor
+
+}
+
+// TestMonitorIsSemiSyncBlocked tests the functionality of isSemiSyncBlocked.
+func TestMonitorIsSemiSyncBlocked(t *testing.T) {
+ tests := []struct {
+ name string
+ res *sqltypes.Result
+ want bool
+ wantErr string
+ }{
+ {
+ name: "no rows",
+ res: &sqltypes.Result{},
+ want: false,
+ },
+ {
+ name: "incorrect number of rows",
+ res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_master_wait_sessions|1"),
+ wantErr: "Row count exceeded 1",
+ },
+ {
+ name: "incorrect number of fields",
+ res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value|a", "varchar|varchar|int"), "Rpl_semi_sync_source_wait_sessions|1|2"),
+ wantErr: "unexpected number of rows received",
+ },
+ {
+ name: "Unblocked",
+ res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"),
+ want: false,
+ },
+ {
+ name: "Blocked",
+ res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"),
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ db.AddQuery(semiSyncWaitSessionsRead, tt.res)
+ got, err := m.isSemiSyncBlocked(context.Background())
+ if tt.wantErr != "" {
+ require.EqualError(t, err, tt.wantErr)
+ return
+ }
+ require.NoError(t, err)
+ require.EqualValues(t, tt.want, got)
+ })
+ }
+}
+
+func TestMonitorBindSideCarDBName(t *testing.T) {
+ tests := []struct {
+ query string
+ expected string
+ }{
+ {
+ query: semiSyncHeartbeatWrite,
+ expected: "INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())",
+ },
+ {
+ query: semiSyncHeartbeatClear,
+ expected: "TRUNCATE TABLE _vt.semisync_heartbeat",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.query, func(t *testing.T) {
+ m := &Monitor{}
+ require.EqualValues(t, tt.expected, m.bindSideCarDBName(tt.query))
+ })
+ }
+}
+
+func TestMonitorClearAllData(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ db.SetNeverFail(true)
+ m.clearAllData()
+ ql := db.QueryLog()
+ require.EqualValues(t, "truncate table _vt.semisync_heartbeat", ql)
+}
+
+// TestMonitorWaitMechanism tests that the wait mechanism works as intended.
+// Setting the monitor to unblock state should unblock the waiters.
+func TestMonitorWaitMechanism(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ // Add a waiter.
+ ch := m.addWaiter()
+ var waiterUnblocked atomic.Bool
+ go func() {
+ <-ch
+ waiterUnblocked.Store(true)
+ }()
+
+ // Ensure that the waiter is currently blocked.
+ require.False(t, waiterUnblocked.Load())
+
+ // Verify that setting again to being blocked doesn't unblock the waiter.
+ m.setIsBlocked(true)
+ require.False(t, waiterUnblocked.Load())
+ require.False(t, m.isClosed())
+ require.True(t, m.stillBlocked())
+
+ // Verify that setting we are no longer blocked, unblocks the waiter.
+ m.setIsBlocked(false)
+ require.Eventually(t, func() bool {
+ return waiterUnblocked.Load()
+ }, 2*time.Second, time.Millisecond*100)
+ require.False(t, m.stillBlocked())
+ require.False(t, m.isClosed())
+}
+
+func TestMonitorIncrementWriteCount(t *testing.T) {
+ tests := []struct {
+ initVal int
+ finalVal int
+ want bool
+ }{
+ {
+ initVal: maxWritesPermitted - 2,
+ finalVal: maxWritesPermitted - 1,
+ want: true,
+ }, {
+ initVal: maxWritesPermitted - 1,
+ finalVal: maxWritesPermitted,
+ want: true,
+ }, {
+ initVal: maxWritesPermitted,
+ finalVal: maxWritesPermitted,
+ want: false,
+ }, {
+ initVal: 0,
+ finalVal: 1,
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ m.mu.Lock()
+ m.inProgressWriteCount = tt.initVal
+ m.mu.Unlock()
+ got := m.incrementWriteCount()
+ require.EqualValues(t, tt.want, got)
+ m.mu.Lock()
+ require.EqualValues(t, tt.finalVal, m.inProgressWriteCount)
+ require.EqualValues(t, tt.finalVal, m.writesBlockedGauge.Get())
+ m.mu.Unlock()
+ })
+ }
+}
+
+func TestMonitorDecrementWriteCount(t *testing.T) {
+ tests := []struct {
+ initVal int
+ finalVal int
+ }{
+ {
+ initVal: maxWritesPermitted - 1,
+ finalVal: maxWritesPermitted - 2,
+ }, {
+ initVal: maxWritesPermitted,
+ finalVal: maxWritesPermitted - 1,
+ }, {
+ initVal: 1,
+ finalVal: 0,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ m.mu.Lock()
+ m.inProgressWriteCount = tt.initVal
+ m.mu.Unlock()
+ m.decrementWriteCount()
+ m.mu.Lock()
+ require.EqualValues(t, tt.finalVal, m.inProgressWriteCount)
+ require.EqualValues(t, tt.finalVal, m.writesBlockedGauge.Get())
+ m.mu.Unlock()
+ })
+ }
+}
+
+func TestMonitorAllWritesBlocked(t *testing.T) {
+ tests := []struct {
+ initVal int
+ expected bool
+ }{
+ {
+ initVal: maxWritesPermitted - 1,
+ expected: false,
+ }, {
+ initVal: maxWritesPermitted,
+ expected: true,
+ }, {
+ initVal: 1,
+ expected: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ m.mu.Lock()
+ m.inProgressWriteCount = tt.initVal
+ m.mu.Unlock()
+ require.EqualValues(t, tt.expected, m.AllWritesBlocked())
+ })
+ }
+}
+
+func TestMonitorWrite(t *testing.T) {
+ tests := []struct {
+ initVal int
+ queryLog string
+ }{
+ {
+ initVal: maxWritesPermitted - 2,
+ queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())",
+ }, {
+ initVal: maxWritesPermitted - 1,
+ queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())",
+ }, {
+ initVal: maxWritesPermitted,
+ queryLog: "",
+ }, {
+ initVal: 0,
+ queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+ db.SetNeverFail(true)
+ m.mu.Lock()
+ m.inProgressWriteCount = tt.initVal
+ m.writesBlockedGauge.Set(int64(tt.initVal))
+ m.mu.Unlock()
+ m.write()
+ m.mu.Lock()
+ require.EqualValues(t, tt.initVal, m.inProgressWriteCount)
+ require.EqualValues(t, tt.initVal, m.writesBlockedGauge.Get())
+ m.mu.Unlock()
+ require.EqualValues(t, tt.queryLog, db.QueryLog())
+ })
+ }
+}
+
+// TestMonitorWriteBlocked tests the write function when the writes are blocked.
+func TestMonitorWriteBlocked(t *testing.T) {
+ initialVal := waitBetweenWrites
+ waitBetweenWrites = 250 * time.Millisecond
+ defer func() {
+ waitBetweenWrites = initialVal
+ }()
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ // Check the initial value of the inProgressWriteCount.
+ m.mu.Lock()
+ require.EqualValues(t, 0, m.inProgressWriteCount)
+ m.mu.Unlock()
+
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+
+ // Do a write, which we expect to block.
+ var writeFinished atomic.Bool
+ go func() {
+ m.write()
+ writeFinished.Store(true)
+ }()
+ // We should see the number of writes blocked to increase.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount == 1
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Once the writers are unblocked, we expect to see a zero value again.
+ close(ch)
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount == 0
+ }, 2*time.Second, 100*time.Millisecond)
+
+ require.Eventually(t, func() bool {
+ return writeFinished.Load()
+ }, 2*time.Second, 100*time.Millisecond)
+}
+
+// TestIsWriting checks the transitions for the isWriting field.
+func TestIsWriting(t *testing.T) {
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ // Check the initial value of the isWriting field.
+ m.mu.Lock()
+ require.False(t, m.isWriting)
+ m.mu.Unlock()
+
+ // Clearing a false field does nothing.
+ m.clearIsWriting()
+ m.mu.Lock()
+ require.False(t, m.isWriting)
+ m.mu.Unlock()
+
+ // Check and set should set the field.
+ set := m.checkAndSetIsWriting()
+ require.True(t, set)
+ m.mu.Lock()
+ require.True(t, m.isWriting)
+ m.mu.Unlock()
+
+ // Checking and setting shouldn't do anything.
+ set = m.checkAndSetIsWriting()
+ require.False(t, set)
+ m.mu.Lock()
+ require.True(t, m.isWriting)
+ m.mu.Unlock()
+
+ // Clearing should now make the field false.
+ m.clearIsWriting()
+ m.mu.Lock()
+ require.False(t, m.isWriting)
+ m.mu.Unlock()
+}
+
+func TestStartWrites(t *testing.T) {
+ initialVal := waitBetweenWrites
+ waitBetweenWrites = 250 * time.Millisecond
+ defer func() {
+ waitBetweenWrites = initialVal
+ }()
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+
+ // If we aren't blocked, then start writes doesn't do anything.
+ m.startWrites()
+ require.EqualValues(t, "", db.QueryLog())
+
+ // Now we set the monitor to be blocked.
+ m.setIsBlocked(true)
+
+ var writesFinished atomic.Bool
+ go func() {
+ m.startWrites()
+ writesFinished.Store(true)
+ }()
+
+ // We should see the number of writes blocked to increase.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount >= 1
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Once the writes have started, another call to startWrites shouldn't do anything
+ m.startWrites()
+
+ // We should continue to see the number of writes blocked increase.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount >= 2
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Check that the writes are still going.
+ require.False(t, writesFinished.Load())
+
+ // Make the monitor unblocked. This should stop the writes eventually.
+ m.setIsBlocked(false)
+ close(ch)
+
+ require.Eventually(t, func() bool {
+ return writesFinished.Load()
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Check that no writes are in progress anymore.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount == 0
+ }, 2*time.Second, 100*time.Millisecond)
+}
+
+func TestCheckAndFixSemiSyncBlocked(t *testing.T) {
+ initialVal := waitBetweenWrites
+ waitBetweenWrites = 250 * time.Millisecond
+ defer func() {
+ waitBetweenWrites = initialVal
+ }()
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ // Initially everything is unblocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+
+ // Check that the monitor thinks we are unblocked.
+ m.checkAndFixSemiSyncBlocked()
+ m.mu.Lock()
+ require.False(t, m.isBlocked)
+ m.mu.Unlock()
+
+ // Now we set the monitor to be blocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|2"))
+ m.checkAndFixSemiSyncBlocked()
+
+ m.mu.Lock()
+ require.True(t, m.isBlocked)
+ m.mu.Unlock()
+
+ // Checking again shouldn't make a difference.
+ m.checkAndFixSemiSyncBlocked()
+ m.mu.Lock()
+ require.True(t, m.isBlocked)
+ m.mu.Unlock()
+
+ // Meanwhile writes should have started and should be getting blocked.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount >= 2
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Now we set the monitor to be unblocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+ close(ch)
+ m.checkAndFixSemiSyncBlocked()
+
+ // We expect the writes to clear out and also the monitor should think its unblocked.
+ m.mu.Lock()
+ require.False(t, m.isBlocked)
+ m.mu.Unlock()
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.inProgressWriteCount == 0 && m.isWriting == false
+ }, 2*time.Second, 100*time.Millisecond)
+}
+
+func TestWaitUntilSemiSyncUnblocked(t *testing.T) {
+ initialVal := waitBetweenWrites
+ waitBetweenWrites = 250 * time.Millisecond
+ defer func() {
+ waitBetweenWrites = initialVal
+ }()
+ db, m := createFakeDBAndMonitor(t)
+ defer db.Close()
+ defer m.Close()
+
+ db.SetNeverFail(true)
+ // Initially everything is unblocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+
+ // When everything is unblocked, then this should return without blocking.
+ err := m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+ // Now we set the monitor to be blocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|3"))
+
+ // wg is used to keep track of all the go routines.
+ wg := sync.WaitGroup{}
+ // Start a cancellable context and use that to wait.
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ wg.Add(1)
+ var ctxErr error
+ var mu sync.Mutex
+ go func() {
+ defer wg.Done()
+ err := m.WaitUntilSemiSyncUnblocked(ctx)
+ mu.Lock()
+ ctxErr = err
+ mu.Unlock()
+ }()
+
+ // Start another go routine, also waiting for semi-sync being unblocked, but not using the cancellable context.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+ }()
+
+ // Wait until the writes have started.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.isWriting
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Now we cancel the context. This should fail the first wait.
+ cancel()
+ // Since we cancel the context before the semi-sync has been unblocked, we expect a context timeout error.
+ require.Eventually(t, func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return ctxErr != nil
+ }, 2*time.Second, 100*time.Millisecond)
+ mu.Lock()
+ require.EqualError(t, ctxErr, "context canceled")
+ mu.Unlock()
+
+ // Now we set the monitor to be unblocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+ close(ch)
+ err = m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+ // This should unblock the second wait.
+ wg.Wait()
+ // Eventually the writes should also stop.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return !m.isWriting
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // Also verify that if the monitor is closed, we don't wait.
+ m.Close()
+ err = m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+ require.True(t, m.isClosed())
+}
+
+// TestSemiSyncMonitor tests the semi-sync monitor as a black box.
+// It only calls the exported methods to see they work as intended.
+func TestSemiSyncMonitor(t *testing.T) {
+ initialVal := waitBetweenWrites
+ waitBetweenWrites = 250 * time.Millisecond
+ defer func() {
+ waitBetweenWrites = initialVal
+ }()
+ db := fakesqldb.New(t)
+ defer db.Close()
+ params := db.ConnParams()
+ cp := *params
+ dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
+ config := &tabletenv.TabletConfig{
+ DB: dbc,
+ SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{
+ Interval: 1 * time.Second,
+ },
+ }
+ m := NewMonitor(config, exporter)
+ defer m.Close()
+
+ db.SetNeverFail(true)
+ // Initially everything is unblocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+
+ // Open the monitor.
+ m.Open()
+
+ // Initially writes aren't blocked and the wait returns immediately.
+ require.False(t, m.AllWritesBlocked())
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ err := m.WaitUntilSemiSyncUnblocked(ctx)
+ require.NoError(t, err)
+
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch := make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+ // Now we set the monitor to be blocked.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
+
+ // Start a go routine waiting for semi-sync being unblocked.
+ var waitFinished atomic.Bool
+ go func() {
+ err := m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+ waitFinished.Store(true)
+ }()
+
+ // Even if we wait a second, the wait shouldn't be over.
+ time.Sleep(1 * time.Second)
+ require.False(t, waitFinished.Load())
+
+ // If we unblock the semi-sync, then the wait should finish.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+ close(ch)
+ require.Eventually(t, func() bool {
+ return waitFinished.Load()
+ }, 2*time.Second, 100*time.Millisecond)
+ require.False(t, m.AllWritesBlocked())
+
+ // Add a universal insert query pattern that would block until we make it unblock.
+ ch = make(chan int)
+ db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
+ <-ch
+ })
+ // We block the semi-sync again.
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
+
+ // Start another go routine, also waiting for semi-sync being unblocked.
+ waitFinished.Store(false)
+ go func() {
+ err := m.WaitUntilSemiSyncUnblocked(context.Background())
+ require.NoError(t, err)
+ waitFinished.Store(true)
+ }()
+
+ // Since the writes are now blocking, eventually all the writes should block.
+ require.Eventually(t, func() bool {
+ return m.AllWritesBlocked()
+ }, 10*time.Second, 100*time.Millisecond)
+
+ // The wait should still not have ended.
+ require.False(t, waitFinished.Load())
+
+ // Now we unblock the writes and semi-sync.
+ close(ch)
+ db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
+
+ // The wait should now finish.
+ require.Eventually(t, func() bool {
+ return waitFinished.Load()
+ }, 2*time.Second, 100*time.Millisecond)
+ require.False(t, m.AllWritesBlocked())
+
+ // Close the monitor.
+ m.Close()
+ // The test is technically over, but we wait for the writes to have stopped to prevent any data races.
+ require.Eventually(t, func() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return !m.isWriting
+ }, 2*time.Second, 100*time.Millisecond)
+}
diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go
index fbef04de357..4c7b706bc4b 100644
--- a/go/vt/vttablet/tabletmanager/tm_init.go
+++ b/go/vt/vttablet/tabletmanager/tm_init.go
@@ -73,6 +73,7 @@ import (
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
@@ -162,6 +163,7 @@ type TabletManager struct {
QueryServiceControl tabletserver.Controller
UpdateStream binlog.UpdateStreamControl
VREngine *vreplication.Engine
+ SemiSyncMonitor *semisyncmonitor.Monitor
VDiffEngine *vdiff.Engine
Env *vtenv.Environment
diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go
index b8c9c54dcc2..a6c5d33c975 100644
--- a/go/vt/vttablet/tabletmanager/tm_init_test.go
+++ b/go/vt/vttablet/tabletmanager/tm_init_test.go
@@ -39,6 +39,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttest"
@@ -683,6 +684,10 @@ func newTestMysqlDaemon(t *testing.T, port int32) *mysqlctl.FakeMysqlDaemon {
return mysqld
}
+var (
+ exporter = servenv.NewExporter("TestTabletManager", "")
+)
+
func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string, tags map[string]string) *TabletManager {
// reset stats
statsTabletTags.ResetAll()
@@ -691,11 +696,13 @@ func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string, t
t.Helper()
ctx := context.Background()
tablet := newTestTablet(t, uid, keyspace, shard, tags)
+ fakeDb := newTestMysqlDaemon(t, 1)
tm := &TabletManager{
BatchCtx: ctx,
TopoServer: ts,
- MysqlDaemon: newTestMysqlDaemon(t, 1),
+ MysqlDaemon: fakeDb,
DBConfigs: &dbconfigs.DBConfigs{},
+ SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(fakeDb.DB(), exporter),
QueryServiceControl: tabletservermock.NewController(),
}
err := tm.Start(tablet, nil)
diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go
index 9af2e061502..53b0db271a2 100644
--- a/go/vt/vttablet/tabletserver/state_manager_test.go
+++ b/go/vt/vttablet/tabletserver/state_manager_test.go
@@ -79,7 +79,6 @@ func TestStateManagerServePrimary(t *testing.T) {
assert.Equal(t, testNow, sm.ptsTimestamp)
verifySubcomponent(t, 1, sm.watcher, testStateClosed)
-
verifySubcomponent(t, 2, sm.se, testStateOpen)
verifySubcomponent(t, 3, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 4, sm.qe, testStateOpen)
diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go
index ddab935d393..47a9acfc930 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/config.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/config.go
@@ -75,6 +75,7 @@ var (
heartbeatInterval time.Duration
heartbeatOnDemandDuration time.Duration
healthCheckInterval time.Duration
+ semiSyncMonitorInterval time.Duration
degradedThreshold time.Duration
unhealthyThreshold time.Duration
transitionGracePeriod time.Duration
@@ -203,6 +204,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.DurationVar(°radedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
fs.DurationVar(&transitionGracePeriod, "serving_state_grace_period", 0, "how long to pause after broadcasting health to vtgate, before enforcing a new serving state")
+ fs.DurationVar(&semiSyncMonitorInterval, "semi-sync-monitor-interval", defaultConfig.SemiSyncMonitor.Interval, "How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs")
fs.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.")
fs.BoolVar(¤tConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.")
@@ -278,6 +280,7 @@ func Init() {
currentConfig.Healthcheck.DegradedThreshold = degradedThreshold
currentConfig.Healthcheck.UnhealthyThreshold = unhealthyThreshold
currentConfig.GracePeriods.Transition = transitionGracePeriod
+ currentConfig.SemiSyncMonitor.Interval = semiSyncMonitorInterval
logFormat := streamlog.GetQueryLogConfig().Format
switch logFormat {
@@ -317,6 +320,8 @@ type TabletConfig struct {
Healthcheck HealthcheckConfig `json:"healthcheck,omitempty"`
GracePeriods GracePeriodsConfig `json:"gracePeriods,omitempty"`
+ SemiSyncMonitor SemiSyncMonitorConfig `json:"semiSyncMonitor,omitempty"`
+
ReplicationTracker ReplicationTrackerConfig `json:"replicationTracker,omitempty"`
// Consolidator can be enable, disable, or notOnPrimary. Default is enable.
@@ -613,6 +618,42 @@ type HotRowProtectionConfig struct {
MaxConcurrency int `json:"maxConcurrency,omitempty"`
}
+// SemiSyncMonitorConfig contains the config for the semi-sync monitor.
+type SemiSyncMonitorConfig struct {
+ Interval time.Duration
+}
+
+func (cfg *SemiSyncMonitorConfig) MarshalJSON() ([]byte, error) {
+ var tmp struct {
+ IntervalSeconds string `json:"intervalSeconds,omitempty"`
+ }
+
+ if d := cfg.Interval; d != 0 {
+ tmp.IntervalSeconds = d.String()
+ }
+
+ return json.Marshal(&tmp)
+}
+
+func (cfg *SemiSyncMonitorConfig) UnmarshalJSON(data []byte) (err error) {
+ var tmp struct {
+ Interval string `json:"intervalSeconds,omitempty"`
+ }
+
+ if err = json.Unmarshal(data, &tmp); err != nil {
+ return err
+ }
+
+ if tmp.Interval != "" {
+ cfg.Interval, err = time.ParseDuration(tmp.Interval)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
// HealthcheckConfig contains the config for healthcheck.
type HealthcheckConfig struct {
Interval time.Duration
@@ -1008,6 +1049,9 @@ var defaultConfig = TabletConfig{
DegradedThreshold: 30 * time.Second,
UnhealthyThreshold: 2 * time.Hour,
},
+ SemiSyncMonitor: SemiSyncMonitorConfig{
+ Interval: 10 * time.Second,
+ },
ReplicationTracker: ReplicationTrackerConfig{
Mode: Disable,
HeartbeatInterval: 250 * time.Millisecond,
diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go
index 9ae653bafb9..20f4541cdc8 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go
@@ -97,6 +97,7 @@ rowStreamer:
maxMySQLReplLagSecs: 400
schemaChangeReloadTimeout: 30s
schemaReloadIntervalSeconds: 30m0s
+semiSyncMonitor: {}
txPool: {}
`
assert.Equal(t, wantBytes, string(gotBytes))
@@ -164,6 +165,8 @@ rowStreamer:
maxMySQLReplLagSecs: 43200
schemaChangeReloadTimeout: 30s
schemaReloadIntervalSeconds: 30m0s
+semiSyncMonitor:
+ intervalSeconds: 10s
signalWhenSchemaChange: true
streamBufferSize: 32768
txPool:
@@ -300,6 +303,12 @@ func TestFlags(t *testing.T) {
want.Healthcheck.Interval = time.Second
assert.Equal(t, want, currentConfig)
+ semiSyncMonitorInterval = time.Second
+ currentConfig.SemiSyncMonitor.Interval = 0
+ Init()
+ want.SemiSyncMonitor.Interval = time.Second
+ assert.Equal(t, want, currentConfig)
+
degradedThreshold = 2 * time.Second
currentConfig.Healthcheck.DegradedThreshold = 0
Init()
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go
index b70a64d644e..2eec782f301 100644
--- a/go/vt/wrangler/fake_tablet_test.go
+++ b/go/vt/wrangler/fake_tablet_test.go
@@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtenv"
@@ -41,6 +42,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice/fakes"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -158,6 +160,10 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy
}
}
+var (
+ exporter = servenv.NewExporter("TestWrangler", "")
+)
+
// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) {
@@ -199,6 +205,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VDiffEngine: vdiff2.NewEngine(wr.TopoServer(), ft.Tablet, collations.MySQL8(), sqlparser.NewTestParser()),
+ SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go
index 9649d717d73..7881ebf6a2a 100644
--- a/go/vt/wrangler/testlib/fake_tablet.go
+++ b/go/vt/wrangler/testlib/fake_tablet.go
@@ -33,12 +33,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
+ "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -159,6 +161,10 @@ func NewFakeTablet(t *testing.T, wr *wrangler.Wrangler, cell string, uid uint32,
}
}
+var (
+ exporter = servenv.NewExporter("TestWranglerTestLib", "")
+)
+
// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) {
@@ -202,6 +208,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VREngine: vreplication.NewTestEngine(wr.TopoServer(), ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil),
+ SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto
index eba4d323ee6..0c9ad87e210 100644
--- a/proto/replicationdata.proto
+++ b/proto/replicationdata.proto
@@ -106,4 +106,5 @@ message FullStatus {
bool super_read_only = 21;
replicationdata.Configuration replication_configuration = 22;
bool disk_stalled = 23;
+ bool semi_sync_blocked = 24;
}
diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts
index 527adc01326..55fb86d17e0 100644
--- a/web/vtadmin/src/proto/vtadmin.d.ts
+++ b/web/vtadmin/src/proto/vtadmin.d.ts
@@ -48801,6 +48801,9 @@ export namespace replicationdata {
/** FullStatus disk_stalled */
disk_stalled?: (boolean|null);
+
+ /** FullStatus semi_sync_blocked */
+ semi_sync_blocked?: (boolean|null);
}
/** Represents a FullStatus. */
@@ -48881,6 +48884,9 @@ export namespace replicationdata {
/** FullStatus disk_stalled. */
public disk_stalled: boolean;
+ /** FullStatus semi_sync_blocked. */
+ public semi_sync_blocked: boolean;
+
/**
* Creates a new FullStatus instance using the specified properties.
* @param [properties] Properties to set
diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js
index 1209c59cbe9..4cfc709fa9b 100644
--- a/web/vtadmin/src/proto/vtadmin.js
+++ b/web/vtadmin/src/proto/vtadmin.js
@@ -118503,6 +118503,7 @@ export const replicationdata = $root.replicationdata = (() => {
* @property {boolean|null} [super_read_only] FullStatus super_read_only
* @property {replicationdata.IConfiguration|null} [replication_configuration] FullStatus replication_configuration
* @property {boolean|null} [disk_stalled] FullStatus disk_stalled
+ * @property {boolean|null} [semi_sync_blocked] FullStatus semi_sync_blocked
*/
/**
@@ -118704,6 +118705,14 @@ export const replicationdata = $root.replicationdata = (() => {
*/
FullStatus.prototype.disk_stalled = false;
+ /**
+ * FullStatus semi_sync_blocked.
+ * @member {boolean} semi_sync_blocked
+ * @memberof replicationdata.FullStatus
+ * @instance
+ */
+ FullStatus.prototype.semi_sync_blocked = false;
+
/**
* Creates a new FullStatus instance using the specified properties.
* @function create
@@ -118774,6 +118783,8 @@ export const replicationdata = $root.replicationdata = (() => {
$root.replicationdata.Configuration.encode(message.replication_configuration, writer.uint32(/* id 22, wireType 2 =*/178).fork()).ldelim();
if (message.disk_stalled != null && Object.hasOwnProperty.call(message, "disk_stalled"))
writer.uint32(/* id 23, wireType 0 =*/184).bool(message.disk_stalled);
+ if (message.semi_sync_blocked != null && Object.hasOwnProperty.call(message, "semi_sync_blocked"))
+ writer.uint32(/* id 24, wireType 0 =*/192).bool(message.semi_sync_blocked);
return writer;
};
@@ -118900,6 +118911,10 @@ export const replicationdata = $root.replicationdata = (() => {
message.disk_stalled = reader.bool();
break;
}
+ case 24: {
+ message.semi_sync_blocked = reader.bool();
+ break;
+ }
default:
reader.skipType(tag & 7);
break;
@@ -119010,6 +119025,9 @@ export const replicationdata = $root.replicationdata = (() => {
if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled"))
if (typeof message.disk_stalled !== "boolean")
return "disk_stalled: boolean expected";
+ if (message.semi_sync_blocked != null && message.hasOwnProperty("semi_sync_blocked"))
+ if (typeof message.semi_sync_blocked !== "boolean")
+ return "semi_sync_blocked: boolean expected";
return null;
};
@@ -119087,6 +119105,8 @@ export const replicationdata = $root.replicationdata = (() => {
}
if (object.disk_stalled != null)
message.disk_stalled = Boolean(object.disk_stalled);
+ if (object.semi_sync_blocked != null)
+ message.semi_sync_blocked = Boolean(object.semi_sync_blocked);
return message;
};
@@ -119131,6 +119151,7 @@ export const replicationdata = $root.replicationdata = (() => {
object.super_read_only = false;
object.replication_configuration = null;
object.disk_stalled = false;
+ object.semi_sync_blocked = false;
}
if (message.server_id != null && message.hasOwnProperty("server_id"))
object.server_id = message.server_id;
@@ -119181,6 +119202,8 @@ export const replicationdata = $root.replicationdata = (() => {
object.replication_configuration = $root.replicationdata.Configuration.toObject(message.replication_configuration, options);
if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled"))
object.disk_stalled = message.disk_stalled;
+ if (message.semi_sync_blocked != null && message.hasOwnProperty("semi_sync_blocked"))
+ object.semi_sync_blocked = message.semi_sync_blocked;
return object;
};