Skip to content

Commit

Permalink
Merge pull request #1031 from percona/release-2.7.0
Browse files Browse the repository at this point in the history
Release 2.7.0
  • Loading branch information
defbin authored Oct 9, 2024
2 parents f2cf881 + 957ac50 commit 94ad374
Show file tree
Hide file tree
Showing 55 changed files with 1,070 additions and 601 deletions.
218 changes: 149 additions & 69 deletions cmd/pbm-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Agent struct {

brief topo.NodeBrief

dumpConns int
numParallelColls int

closeCMD chan struct{}
pauseHB int32
Expand All @@ -44,7 +44,12 @@ type Agent struct {
monStopSig chan struct{}
}

func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConns int) (*Agent, error) {
func newAgent(
ctx context.Context,
leadConn connect.Client,
uri string,
numParallelColls int,
) (*Agent, error) {
nodeConn, err := connect.MongoConnect(ctx, uri, connect.Direct(true))
if err != nil {
return nil, err
Expand Down Expand Up @@ -72,7 +77,7 @@ func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConn
ConfigSvr: info.IsConfigSrv(),
Version: mongoVersion,
},
dumpConns: dumpConns,
numParallelColls: numParallelColls,
}
return a, nil
}
Expand Down Expand Up @@ -104,16 +109,16 @@ func (a *Agent) CanStart(ctx context.Context) error {
return ErrDelayedNode
}

ver, err := version.GetMongoVersion(ctx, a.leadConn.MongoClient())
if err != nil {
return errors.Wrap(err, "get mongo version")
}
if err := version.FeatureSupport(ver).PBMSupport(); err != nil {
return nil
}

func (a *Agent) showIncompatibilityWarning(ctx context.Context) {
if err := version.FeatureSupport(a.brief.Version).PBMSupport(); err != nil {
log.FromContext(ctx).
Warning("", "", "", primitive.Timestamp{}, "WARNING: %v", err)
}

if ver.IsShardedTimeseriesSupported() {
if a.brief.Sharded && a.brief.Version.IsShardedTimeseriesSupported() {
tss, err := topo.ListShardedTimeseries(ctx, a.leadConn)
if err != nil {
log.FromContext(ctx).
Expand All @@ -127,7 +132,18 @@ func (a *Agent) CanStart(ctx context.Context) error {
}
}

return nil
if a.brief.Sharded && a.brief.Version.IsConfigShardSupported() {
hasConfigShard, err := topo.HasConfigShard(ctx, a.leadConn)
if err != nil {
log.FromContext(ctx).
Error("", "", "", primitive.Timestamp{},
"failed to check for Config Shard: %v", err)
} else if hasConfigShard {
log.FromContext(ctx).
Warning("", "", "", primitive.Timestamp{},
"WARNING: selective backup and restore is not supported with Config Shard")
}
}
}

// Start starts listening the commands stream.
Expand All @@ -151,7 +167,7 @@ func (a *Agent) Start(ctx context.Context) error {
return nil
}

logger.Printf("got command %s", cmd)
logger.Printf("got command %s, opid: %s", cmd, cmd.OPID)

ep, err := config.GetEpoch(ctx, a.leadConn)
if err != nil {
Expand Down Expand Up @@ -266,8 +282,17 @@ func (a *Agent) HbStatus(ctx context.Context) {
MongoVer: nodeVersion.VersionString,
PerconaVer: nodeVersion.PSMDBVersion,
}

updateAgentStat(ctx, a, l, true, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
if err != nil {
l.Error("set status: %v", err)
}

defer func() {
if err := topo.RemoveAgentStatus(ctx, a.leadConn, hb); err != nil {
l.Debug("deleting agent status")
err := topo.RemoveAgentStatus(context.Background(), a.leadConn, hb)
if err != nil {
logger := logger.NewEvent("agentCheckup", "", "", primitive.Timestamp{})
logger.Error("remove agent heartbeat: %v", err)
}
Expand All @@ -276,74 +301,128 @@ func (a *Agent) HbStatus(ctx context.Context) {
tk := time.NewTicker(defs.AgentsStatCheckRange)
defer tk.Stop()

storageCheckTime := time.Now()
parallelAgentCheckTime := time.Now()

// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
const checkStoreIn = int(60 / (defs.AgentsStatCheckRange / time.Second))
cc := 0
for range tk.C {
// don't check if on pause (e.g. physical restore)
if !a.HbIsRun() {
continue
}
const storageCheckInterval = 15 * time.Second
const parallelAgentCheckInternval = time.Minute

hb.PBMStatus = a.pbmStatus(ctx)
logHbStatus("PBM connection", hb.PBMStatus, l)
for {
select {
case <-ctx.Done():
return
case <-tk.C:
// don't check if on pause (e.g. physical restore)
if !a.HbIsRun() {
continue
}

hb.NodeStatus = a.nodeStatus(ctx)
logHbStatus("node connection", hb.NodeStatus, l)
now := time.Now()
if now.Sub(parallelAgentCheckTime) >= parallelAgentCheckInternval {
a.warnIfParallelAgentDetected(ctx, l, hb.Heartbeat)
parallelAgentCheckTime = now
}

cc++
hb.StorageStatus = a.storStatus(ctx, l, cc == checkStoreIn)
logHbStatus("storage connection", hb.StorageStatus, l)
if cc == checkStoreIn {
cc = 0
if now.Sub(storageCheckTime) >= storageCheckInterval {
updateAgentStat(ctx, a, l, true, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
if err == nil {
storageCheckTime = now
}
} else {
updateAgentStat(ctx, a, l, false, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
}
if err != nil {
l.Error("set status: %v", err)
}
}
}
}

hb.Err = ""
hb.Hidden = false
hb.Passive = false
func updateAgentStat(
ctx context.Context,
agent *Agent,
l log.LogEvent,
checkStore bool,
hb *topo.AgentStat,
) {
hb.PBMStatus = agent.pbmStatus(ctx)
logHbStatus("PBM connection", hb.PBMStatus, l)

inf, err := topo.GetNodeInfo(ctx, a.nodeConn)
if err != nil {
l.Error("get NodeInfo: %v", err)
hb.Err += fmt.Sprintf("get NodeInfo: %v", err)
hb.NodeStatus = agent.nodeStatus(ctx)
logHbStatus("node connection", hb.NodeStatus, l)

hb.StorageStatus = agent.storStatus(ctx, l, checkStore, hb)
logHbStatus("storage connection", hb.StorageStatus, l)

hb.Err = ""
hb.Hidden = false
hb.Passive = false

inf, err := topo.GetNodeInfo(ctx, agent.nodeConn)
if err != nil {
l.Error("get NodeInfo: %v", err)
hb.Err += fmt.Sprintf("get NodeInfo: %v", err)
} else {
hb.Hidden = inf.Hidden
hb.Passive = inf.Passive
hb.Arbiter = inf.ArbiterOnly
if inf.SecondaryDelayOld != 0 {
hb.DelaySecs = inf.SecondaryDelayOld
} else {
hb.Hidden = inf.Hidden
hb.Passive = inf.Passive
hb.Arbiter = inf.ArbiterOnly
if inf.SecondaryDelayOld != 0 {
hb.DelaySecs = inf.SecondaryDelayOld
} else {
hb.DelaySecs = inf.SecondaryDelaySecs
}
hb.DelaySecs = inf.SecondaryDelaySecs
}

if inf != nil && inf.ArbiterOnly {
hb.State = defs.NodeStateArbiter
hb.StateStr = "ARBITER"
hb.Heartbeat, err = topo.ClusterTimeFromNodeInfo(inf)
if err != nil {
hb.Err += fmt.Sprintf("get cluster time: %v", err)
}
}

if inf != nil && inf.ArbiterOnly {
hb.State = defs.NodeStateArbiter
hb.StateStr = "ARBITER"
} else {
n, err := topo.GetNodeStatus(ctx, agent.nodeConn, agent.brief.Me)
if err != nil {
l.Error("get replSetGetStatus: %v", err)
hb.Err += fmt.Sprintf("get replSetGetStatus: %v", err)
hb.State = defs.NodeStateUnknown
hb.StateStr = "UNKNOWN"
} else {
n, err := topo.GetNodeStatus(ctx, a.nodeConn, a.brief.Me)
if err != nil {
l.Error("get replSetGetStatus: %v", err)
hb.Err += fmt.Sprintf("get replSetGetStatus: %v", err)
hb.State = defs.NodeStateUnknown
hb.StateStr = "UNKNOWN"
} else {
hb.State = n.State
hb.StateStr = n.StateStr
hb.State = n.State
hb.StateStr = n.StateStr

rLag, err := topo.ReplicationLag(ctx, a.nodeConn, a.brief.Me)
if err != nil {
l.Error("get replication lag: %v", err)
hb.Err += fmt.Sprintf("get replication lag: %v", err)
}
hb.ReplicationLag = rLag
rLag, err := topo.ReplicationLag(ctx, agent.nodeConn, agent.brief.Me)
if err != nil {
l.Error("get replication lag: %v", err)
hb.Err += fmt.Sprintf("get replication lag: %v", err)
}
hb.ReplicationLag = rLag
}
}
}

err = topo.SetAgentStatus(ctx, a.leadConn, hb)
if err != nil {
l.Error("set status: %v", err)
func (a *Agent) warnIfParallelAgentDetected(
ctx context.Context,
l log.LogEvent,
lastHeartbeat primitive.Timestamp,
) {
s, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return
}
l.Error("detecting parallel agent: get status: %v", err)
return
}
if !s.Heartbeat.Equal(lastHeartbeat) {
l.Warning("detected possible parallel agent for the node: "+
"expected last heartbeat to be %d.%d, actual is %d.%d",
lastHeartbeat.T, lastHeartbeat.I, s.Heartbeat.T, s.Heartbeat.I)
return
}
}

Expand All @@ -365,13 +444,14 @@ func (a *Agent) nodeStatus(ctx context.Context) topo.SubsysStatus {
return topo.SubsysStatus{OK: true}
}

func (a *Agent) storStatus(ctx context.Context, log log.LogEvent, forceCheckStorage bool) topo.SubsysStatus {
func (a *Agent) storStatus(
ctx context.Context,
log log.LogEvent,
forceCheckStorage bool,
stat *topo.AgentStat,
) topo.SubsysStatus {
// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
// but if storage was(is) failed, check it always
stat, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
if err != nil {
log.Warning("get current storage status: %v", err)
}
if !forceCheckStorage && stat.StorageStatus.OK {
return topo.SubsysStatus{OK: true}
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
case defs.LogicalBackup:
fallthrough
default:
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns)
numParallelColls := a.numParallelColls
if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 {
numParallelColls = cfg.Backup.NumParallelCollections
}
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls)
}

bcp.SetConfig(cfg)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pbm-agent/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
bcp := &cr.Backups[i]

eg.Go(func() error {
err := backup.DeleteBackupFiles(bcp, stg)
err := backup.DeleteBackupFiles(stg, bcp.Name)
return errors.Wrapf(err, "delete backup files %q", bcp.Name)
})
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/pbm-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
stdlog "log"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -32,7 +33,8 @@ func main() {
Envar("PBM_MONGODB_URI").
Required().
String()
dumpConns = pbmAgentCmd.Flag("dump-parallel-collections", "Number of collections to dump in parallel").
dumpConns = pbmAgentCmd.
Flag("dump-parallel-collections", "Number of collections to dump in parallel").
Envar("PBM_DUMP_PARALLEL_COLLECTIONS").
Default(strconv.Itoa(runtime.NumCPU() / 2)).
Int()
Expand Down Expand Up @@ -85,7 +87,7 @@ func runAgent(mongoURI string, dumpConns int) error {
mtLog.SetDateFormat(log.LogTimeFormat)
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

leadConn, err := connect.Connect(ctx, mongoURI, "pbm-agent")
Expand Down Expand Up @@ -117,6 +119,8 @@ func runAgent(mongoURI string, dumpConns int) error {
return errors.Wrap(err, "setup pbm collections")
}

agent.showIncompatibilityWarning(ctx)

if canRunSlicer {
go agent.PITR(ctx)
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/pbm-agent/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,16 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
}
}()

cfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
l.Error("get PBM config: %v", err)
return
}

l.Info("oplog replay started")
if err := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).ReplayOplog(ctx, r, opID, l); err != nil {
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
err = rr.ReplayOplog(ctx, r, opID, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no oplog for the shard, skipping")
} else {
Expand Down
Loading

0 comments on commit 94ad374

Please sign in to comment.