diff --git a/worker/draft.go b/worker/draft.go index 3e7f873b2fb..968c3b62c64 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -912,13 +912,19 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error { var pool *conn.Pool addr := snap.Context.GetAddr() glog.V(2).Infof("Snapshot.RaftContext.Addr: %q", addr) - if len(addr) > 0 { + if addr == n.MyAddr { + // There was a bug here. If the the address was my address, then the + // snapshot request would loop back to myself, and get stuck in an + // infinite loop. + glog.V(2).Infof("Snapshot.RaftContext address is mine. Skipping that...") + + } else if len(addr) > 0 { p, err := conn.GetPools().Get(addr) if err != nil { glog.V(2).Infof("conn.Get(%q) Error: %v", addr, err) } else { pool = p - glog.V(2).Infof("Leader connection picked from RaftContext") + glog.V(2).Infof("Leader connection picked from RaftContext: %s", addr) } } if pool == nil { @@ -928,6 +934,7 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error { return err } pool = p + glog.V(2).Infof("Leader connection picked from membership state: %s", p.Addr) } // Need to clear pl's stored in memory for the case when retrieving snapshot with @@ -1838,6 +1845,7 @@ func (n *node) InitAndStartNode() { _, _ = n.startTask(opRollup) go n.stopAllTasks() go n.Run() + go n.checkForFailedSnapshot() } func (n *node) AmLeader() bool { diff --git a/worker/snapshot.go b/worker/snapshot.go index f75130ff730..591802fcfcd 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -70,6 +70,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { sw := pstore.NewStreamWriter() defer sw.Cancel() + glog.Infof("Creating StreamWriter\n") if err := sw.Prepare(); err != nil { return err } @@ -79,14 +80,40 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { writer = pstore.NewManagedWriteBatch() } + glog.Infof("Starting to receive snapshot...\n") + lastReceived := time.Now().Unix() + go func() { + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + last := atomic.LoadInt64(&lastReceived) + lastTs := time.Unix(last, 0) + if time.Since(lastTs) > 60*time.Second { + glog.Warningf("Haven't received anything for over 60s." + + " Abandoning snapshot retrieval...\n") + cancel() + } + } + } + }() // We can use count to check the number of posting lists returned in tests. size := 0 var done *pb.KVS for { kvs, err := stream.Recv() if err != nil { - return err + return errors.Wrapf(err, "stream.Recv") } + + // Track when we last received anything from the leader. If we don't + // receive anything for a while, we should abandon this connection. + atomic.StoreInt64(&lastReceived, time.Now().Unix()) + if kvs.Done { done = kvs glog.V(1).Infoln("All key-values have been received.") @@ -260,11 +287,87 @@ func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) err return nil } +func (n *node) checkForFailedSnapshot() { + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + + loopOverStatus := func() { + glog.V(2).Infof("Entering snapshot monitoring state") + defer func() { + glog.V(2).Infof("Exiting snapshot monitoring state") + }() + + var last time.Time + + for range tick.C { + if n.closer.Ctx().Err() != nil { + return + } + status := n.Raft().Status() + if status.Lead != status.ID { + // I'm not the leader, so don't do anything. + return + } + var snapshotId uint64 + for id, prog := range status.Progress { + if prog.State != raft.ProgressStateSnapshot { + continue + } + snapshotId = id + break + } + if snapshotId == 0 { + // No Alpha is in pending snapshot state. + glog.Infof("No Alpha in pending snapshot state.\n") + return + } + if last.IsZero() { + last = time.Now() + glog.Infof("Registered a pending snapshot for ID: %#x\n", snapshotId) + } + + var ongoing bool + for _, t := range GetOngoingTasks() { + if t == "opSnapshot" { + glog.V(2).Infof("Snapshot: Ongoing. All good.\n") + ongoing = true + } + } + + if !ongoing && time.Since(last) > 60*time.Second { + // Report snapshot as failed. + glog.Warningf("Reporting snapshot for ID: %#x as failed\n", snapshotId) + n.Raft().ReportSnapshot(snapshotId, raft.SnapshotFailure) + return + } + } + } + + for { + select { + case <-n.closer.HasBeenClosed(): + return + case <-tick.C: + status := n.Raft().Status() + if status.Lead != status.ID { + // I'm not the leader, so don't do anything. + continue + } + for _, prog := range status.Progress { + if prog.State != raft.ProgressStateSnapshot { + continue + } + loopOverStatus() + } + } + } +} + func (w *grpcWorker) StreamSnapshot(stream pb.Worker_StreamSnapshotServer) error { // Pause rollups during snapshot streaming. closer, err := groups().Node.startTask(opSnapshot) if err != nil { - return err + return errors.Wrapf(err, "startTask on StreamSnapshot failed") } defer closer.Done()