From 57d02f88918f2d274de37bb31b44fae9377887ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Wed, 8 Jan 2025 18:57:27 +0300 Subject: [PATCH] K8SPXC-1512: Install binlog UDF component for 8.4 --- cmd/pitr/collector/collector.go | 61 ++++++++++++++++----- cmd/pitr/main.go | 10 +++- cmd/pitr/pxc/pxc.go | 96 +++++++++++++++++---------------- 3 files changed, 105 insertions(+), 62 deletions(-) diff --git a/cmd/pitr/collector/collector.go b/cmd/pitr/collector/collector.go index c35cf41a0d..fb7955937a 100644 --- a/cmd/pitr/collector/collector.go +++ b/cmd/pitr/collector/collector.go @@ -21,6 +21,8 @@ import ( "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage" ) +const collectorPasswordPath = "/etc/mysql/mysql-users-secret/xtrabackup" + type Collector struct { db *pxc.PXC storage storage.Storage @@ -93,13 +95,56 @@ func New(ctx context.Context, c Config) (*Collector, error) { return nil, errors.New("unknown STORAGE_TYPE") } + file, err := os.Open(collectorPasswordPath) + if err != nil { + return nil, errors.Wrap(err, "open file") + } + pxcPass, err := io.ReadAll(file) + if err != nil { + return nil, errors.Wrap(err, "read password") + } + return &Collector{ storage: s, pxcUser: c.PXCUser, + pxcPass: string(pxcPass), pxcServiceName: c.PXCServiceName, }, nil } +func (c *Collector) Init(ctx context.Context) error { + host, err := pxc.GetPXCFirstHost(ctx, c.pxcServiceName) + if err != nil { + return errors.Wrap(err, "get first PXC host") + } + + db, err := pxc.NewPXC(host, c.pxcUser, c.pxcPass) + if err != nil { + return errors.Wrapf(err, "new manager with host %s", host) + } + defer db.Close() + + version, err := db.GetVersion(ctx) + if err != nil { + return errors.Wrap(err, "get version") + } + + switch { + case strings.HasPrefix(version, "8.0"): + log.Println("creating collector functions") + if err := db.CreateCollectorFunctions(ctx); err != nil { + return errors.Wrap(err, "init 8.0: create collector functions") + } + case strings.HasPrefix(version, "8.4"): + log.Println("installing binlog UDF component") + if err := db.InstallBinlogUDFComponent(ctx); err != nil { + return errors.Wrap(err, "init 8.4: install component") + } + } + + return nil +} + func (c *Collector) Run(ctx context.Context) error { err := c.newDB(ctx) if err != nil { @@ -136,22 +181,12 @@ func (c *Collector) lastGTIDSet(ctx context.Context, suffix string) (pxc.GTIDSet } func (c *Collector) newDB(ctx context.Context) error { - file, err := os.Open("/etc/mysql/mysql-users-secret/xtrabackup") - if err != nil { - return errors.Wrap(err, "open file") - } - pxcPass, err := io.ReadAll(file) - if err != nil { - return errors.Wrap(err, "read password") - } - c.pxcPass = string(pxcPass) - host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass) if err != nil { return errors.Wrap(err, "get host") } - log.Println("Reading binlogs from pxc with hostname=", host) + log.Println("reading binlogs from pxc with hostname=", host) c.db, err = pxc.NewPXC(host, c.pxcUser, c.pxcPass) if err != nil { @@ -317,7 +352,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error { } if len(lastGTIDSetList) == 0 { - log.Println("No binlogs to upload") + log.Println("no binlogs to upload") return nil } @@ -381,7 +416,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error { } if len(list) == 0 { - log.Println("No binlogs to upload") + log.Println("no binlogs to upload after filter") return nil } diff --git a/cmd/pitr/main.go b/cmd/pitr/main.go index 51e07e9fe7..a373492fd1 100644 --- a/cmd/pitr/main.go +++ b/cmd/pitr/main.go @@ -41,9 +41,15 @@ func runCollector(ctx context.Context) { } c, err := collector.New(ctx, config) if err != nil { - log.Fatalln("ERROR: new controller:", err) + log.Fatalln("ERROR: new collector:", err) } - log.Println("run binlog collector") + + log.Println("initializing collector") + if err := c.Init(ctx); err != nil { + log.Fatalln("ERROR: init collector:", err) + } + + log.Println("running binlog collector") for { timeout, cancel := context.WithTimeout(ctx, time.Duration(config.CollectSpanSec)*time.Second) defer cancel() diff --git a/cmd/pitr/pxc/pxc.go b/cmd/pitr/pxc/pxc.go index 3fd302711e..fe93e442da 100644 --- a/cmd/pitr/pxc/pxc.go +++ b/cmd/pitr/pxc/pxc.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/go-sql-driver/mysql" + v "github.com/hashicorp/go-version" "github.com/pkg/errors" ) @@ -17,8 +18,9 @@ const UsingPassErrorMessage = `mysqlbinlog: [Warning] Using a password on the co // PXC is a type for working with pxc type PXC struct { - db *sql.DB // handle for work with database - host string // host for connection + db *sql.DB // handle for work with database + host string // host for connection + version *v.Version } // NewManager return new manager for work with pxc @@ -31,6 +33,7 @@ func NewPXC(addr string, user, pass string) (*PXC, error) { config.Net = "tcp" config.Addr = addr + ":33062" config.Params = map[string]string{"interpolateParams": "true"} + config.DBName = "mysql" mysqlDB, err := sql.Open("mysql", config.FormatDSN()) if err != nil { @@ -55,23 +58,10 @@ func (p *PXC) GetHost() string { // GetGTIDSet return GTID set by binary log file name func (p *PXC) GetGTIDSet(ctx context.Context, binlogName string) (string, error) { - // select name from mysql.func where name='get_gtid_set_by_binlog' - var existFunc string - nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_gtid_set_by_binlog'") - err := nameRow.Scan(&existFunc) - if err != nil && err != sql.ErrNoRows { - return "", errors.Wrap(err, "get udf name") - } - if len(existFunc) == 0 { - _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'") - if err != nil { - return "", errors.Wrap(err, "create function") - } - } var binlogSet string row := p.db.QueryRowContext(ctx, "SELECT get_gtid_set_by_binlog(?)", binlogName) - err = row.Scan(&binlogSet) - if err != nil && !strings.Contains(err.Error(), "Binary log does not exist") { + + if err := row.Scan(&binlogSet); err != nil && !strings.Contains(err.Error(), "Binary log does not exist") { return "", errors.Wrap(err, "scan set") } @@ -110,6 +100,16 @@ func (s *GTIDSet) List() []string { return list } +func (p *PXC) GetVersion(ctx context.Context) (string, error) { + var version string + + if err := p.db.QueryRowContext(ctx, "select @@VERSION").Scan(&version); err != nil { + return "", errors.Wrap(err, "select @@VERSION") + } + + return version, nil +} + // GetBinLogList return binary log files list func (p *PXC) GetBinLogList(ctx context.Context) ([]Binlog, error) { rows, err := p.db.QueryContext(ctx, "SHOW BINARY LOGS") @@ -166,23 +166,10 @@ func (p *PXC) GTIDSubset(ctx context.Context, set1, set2 string) (bool, error) { // GetBinLogFirstTimestamp return binary log file first timestamp func (p *PXC) GetBinLogFirstTimestamp(ctx context.Context, binlog string) (string, error) { - var existFunc string - nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_first_record_timestamp_by_binlog'") - err := nameRow.Scan(&existFunc) - if err != nil && err != sql.ErrNoRows { - return "", errors.Wrap(err, "get udf name") - } - if len(existFunc) == 0 { - _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") - if err != nil { - return "", errors.Wrap(err, "create function") - } - } var timestamp string row := p.db.QueryRowContext(ctx, "SELECT get_first_record_timestamp_by_binlog(?) DIV 1000000", binlog) - err = row.Scan(×tamp) - if err != nil { + if err := row.Scan(×tamp); err != nil { return "", errors.Wrap(err, "scan binlog timestamp") } @@ -191,23 +178,10 @@ func (p *PXC) GetBinLogFirstTimestamp(ctx context.Context, binlog string) (strin // GetBinLogLastTimestamp return binary log file last timestamp func (p *PXC) GetBinLogLastTimestamp(ctx context.Context, binlog string) (string, error) { - var existFunc string - nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_last_record_timestamp_by_binlog'") - err := nameRow.Scan(&existFunc) - if err != nil && err != sql.ErrNoRows { - return "", errors.Wrap(err, "get udf name") - } - if len(existFunc) == 0 { - _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") - if err != nil { - return "", errors.Wrap(err, "create function") - } - } var timestamp string row := p.db.QueryRowContext(ctx, "SELECT get_last_record_timestamp_by_binlog(?) DIV 1000000", binlog) - err = row.Scan(×tamp) - if err != nil { + if err := row.Scan(×tamp); err != nil { return "", errors.Wrap(err, "scan binlog timestamp") } @@ -217,8 +191,8 @@ func (p *PXC) GetBinLogLastTimestamp(ctx context.Context, binlog string) (string func (p *PXC) SubtractGTIDSet(ctx context.Context, set, subSet string) (string, error) { var result string row := p.db.QueryRowContext(ctx, "SELECT GTID_SUBTRACT(?,?)", set, subSet) - err := row.Scan(&result) - if err != nil { + + if err := row.Scan(&result); err != nil { return "", errors.Wrap(err, "scan gtid subtract result") } @@ -330,6 +304,34 @@ func getBinlogTimeByName(ctx context.Context, db *PXC, binlogName string) (int64 return binlogTime, nil } +func (p *PXC) InstallBinlogUDFComponent(ctx context.Context) error { + _, err := p.db.ExecContext(ctx, "INSTALL COMPONENT 'file://component_binlog_utils_udf'") + if err != nil { + return errors.Wrap(err, "install component") + } + + return nil +} + +func (p *PXC) CreateCollectorFunctions(ctx context.Context) error { + _, err := p.db.ExecContext(ctx, "CREATE FUNCTION IF NOT EXISTS get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") + if err != nil { + return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog") + } + + _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'") + if err != nil { + return errors.Wrap(err, "create function get_gtid_set_by_binlog") + } + + _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") + if err != nil { + return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog") + } + + return nil +} + func (p *PXC) DropCollectorFunctions(ctx context.Context) error { _, err := p.db.ExecContext(ctx, "DROP FUNCTION IF EXISTS get_first_record_timestamp_by_binlog") if err != nil {