Skip to content

Commit

Permalink
VReplication: Disable /debug/vrlog by default (#17832)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Feb 21, 2025
1 parent a9192fd commit 55909dc
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 17 deletions.
4 changes: 1 addition & 3 deletions go/flags/endtoend/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"text/template"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/utils"
)

var (
Expand Down Expand Up @@ -138,7 +136,7 @@ func TestHelpOutput(t *testing.T) {
cmd.Stdout = &output
err = cmd.Run()
require.NoError(t, err)
utils.MustMatch(t, buf.String(), output.String())
require.Equal(t, buf.String(), output.String())
})
}
}
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ Flags:
--v Level log level for V logs
-v, --version print binary version
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
--vreplication-enable-http-log Enable the /debug/vrlog HTTP endpoint, which will produce a log of the events replicated on primary tablets in the target keyspace by all VReplication workflows that are in the running/replicating phase.
--vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1)
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ Flags:
--v Level log level for V logs
-v, --version print binary version
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
--vreplication-enable-http-log Enable the /debug/vrlog HTTP endpoint, which will produce a log of the events replicated on primary tablets in the target keyspace by all VReplication workflows that are in the running/replicating phase.
--vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1)
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type VReplicationConfig struct {
StoreCompressedGTID bool
ParallelInsertWorkers int
TabletTypesStr string
EnableHttpLog bool // Enable the /debug/vrlog endpoint

// Config parameters applicable to the source side (vstreamer)
// The coresponding Override fields are used to determine if the user has provided a value for the parameter so
Expand Down Expand Up @@ -94,6 +95,7 @@ func GetVReplicationConfigDefaults(useCached bool) *VReplicationConfig {
StoreCompressedGTID: vreplicationStoreCompressedGTID,
ParallelInsertWorkers: vreplicationParallelInsertWorkers,
TabletTypesStr: vreplicationTabletTypesStr,
EnableHttpLog: vreplicationEnableHttpLog,

VStreamPacketSizeOverride: false,
VStreamPacketSize: VStreamerDefaultPacketSize,
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var (
VStreamerBinlogRotationThreshold = int64(64 * 1024 * 1024) // 64MiB
VStreamerDefaultPacketSize = 250000
VStreamerUseDynamicPacketSize = true

// Enable the /debug/vrlog HTTP endpoint.
vreplicationEnableHttpLog = false
)

func GetVReplicationNetReadTimeout() int {
Expand Down Expand Up @@ -96,4 +99,6 @@ func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode.")

fs.BoolVar(&vreplicationEnableHttpLog, "vreplication-enable-http-log", vreplicationEnableHttpLog, "Enable the /debug/vrlog HTTP endpoint, which will produce a log of the events replicated on primary tablets in the target keyspace by all VReplication workflows that are in the running/replicating phase.")
}
35 changes: 26 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,14 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return fmt.Errorf("unexpected event on table %s", rowEvent.TableName)
}
applyFunc := func(sql string) (*sqltypes.Result, error) {
stats := NewVrLogStats("ROWCHANGE")
start := time.Now()
qr, err := vp.query(ctx, sql)
vp.vr.stats.QueryCount.Add(vp.phase, 1)
vp.vr.stats.QueryTimings.Record(vp.phase, start)
stats.Send(sql)
if vp.vr.workflowConfig.EnableHttpLog {
stats := NewVrLogStats("ROWCHANGE", start)
stats.Send(sql)
}
return qr, err
}

Expand Down Expand Up @@ -643,7 +645,10 @@ func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string {
}

func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error {
stats := NewVrLogStats(event.Type.String())
var stats *VrLogStats
if vp.vr.workflowConfig.EnableHttpLog {
stats = NewVrLogStats(event.Type.String(), time.Now())
}
switch event.Type {
case binlogdatapb.VEventType_GTID:
pos, err := binlogplayer.DecodePosition(event.Gtid)
Expand Down Expand Up @@ -689,7 +694,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return err
}
vp.tablePlans[event.FieldEvent.TableName] = tplan
stats.Send(fmt.Sprintf("%v", event.FieldEvent))
if stats != nil {
stats.Send(fmt.Sprintf("%v", event.FieldEvent))
}

case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE,
binlogdatapb.VEventType_REPLACE, binlogdatapb.VEventType_SAVEPOINT:
Expand All @@ -707,7 +714,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.applyStmtEvent(ctx, event); err != nil {
return err
}
stats.Send(sql)
if stats != nil {
stats.Send(sql)
}
}
case binlogdatapb.VEventType_ROW:
// This player is configured for row based replication
Expand All @@ -720,7 +729,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}
// Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed
// time for the Row event.
stats.Send(fmt.Sprintf("%v", event.RowEvent))
if stats != nil {
stats.Send(fmt.Sprintf("%v", event.RowEvent))
}
case binlogdatapb.VEventType_OTHER:
if vp.vr.dbClient.InTransaction {
// Unreachable
Expand Down Expand Up @@ -774,7 +785,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if _, err := vp.query(ctx, event.Statement); err != nil {
return err
}
stats.Send(fmt.Sprintf("%v", event.Statement))
if stats != nil {
stats.Send(fmt.Sprintf("%v", event.Statement))
}
posReached, err := vp.updatePos(ctx, event.Timestamp)
if err != nil {
return err
Expand All @@ -786,7 +799,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if _, err := vp.query(ctx, event.Statement); err != nil {
log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement)
}
stats.Send(fmt.Sprintf("%v", event.Statement))
if stats != nil {
stats.Send(fmt.Sprintf("%v", event.Statement))
}
posReached, err := vp.updatePos(ctx, event.Timestamp)
if err != nil {
return err
Expand Down Expand Up @@ -840,7 +855,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}
return io.EOF
}
stats.Send(fmt.Sprintf("%v", event.Journal))
if stats != nil {
stats.Send(fmt.Sprintf("%v", event.Journal))
}
return io.EOF
case binlogdatapb.VEventType_HEARTBEAT:
if event.Throttled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,8 @@ func TestPlayerStatementMode(t *testing.T) {
func TestPlayerFilters(t *testing.T) {
defer deleteTablet(addTablet(100))

vttablet.DefaultVReplicationConfig.EnableHttpLog = true

execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), primary key(id))", vrepldb),
Expand Down
12 changes: 8 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vrlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package vreplication
import (
"net/http"
"strconv"
"sync"
"text/template"
"time"

Expand All @@ -35,6 +36,7 @@ var (
vrLogStatsLogger = streamlog.New[*VrLogStats]("VReplication", 50)
vrLogStatsTemplate = template.Must(template.New("vrlog").
Parse("{{.Type}} Event {{.Detail}} {{.LogTime}} {{.DurationNs}}\n"))
addEndpointOnce sync.Once
)

// VrLogStats collects attributes of a vreplication event for logging
Expand All @@ -46,9 +48,11 @@ type VrLogStats struct {
DurationNs int64
}

// NewVrLogStats should be called at the start of the event to be logged
func NewVrLogStats(eventType string) *VrLogStats {
return &VrLogStats{Type: eventType, StartTime: time.Now()}
func NewVrLogStats(eventType string, startTime time.Time) *VrLogStats {
addEndpointOnce.Do(func() {
addHttpEndpoint()
})
return &VrLogStats{Type: eventType, StartTime: startTime}
}

// Send records the log event, should be called on a stats object constructed by NewVrLogStats()
Expand All @@ -63,7 +67,7 @@ func (stats *VrLogStats) Send(detail string) {
vrLogStatsLogger.Send(stats)
}

func init() {
func addHttpEndpoint() {
servenv.HTTPHandleFunc("/debug/vrlog", func(w http.ResponseWriter, r *http.Request) {
ch := vrLogStatsLogger.Subscribe("vrlogstats")
defer vrLogStatsLogger.Unsubscribe(ch)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVrLog(t *testing.T) {
vrlogStatsHandler(ch, w, r)
}()
eventType, detail := "Test", "detail 1"
stats := NewVrLogStats(eventType)
stats := NewVrLogStats(eventType, time.Now())
stats.Send(detail)
var s string
select {
Expand Down

0 comments on commit 55909dc

Please sign in to comment.