Skip to content

Commit

Permalink
Better handle inactivity.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain authored and mangalaman93 committed Jul 28, 2023
1 parent 6fdb2ec commit 6624e52
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
return
case <-tick.C:
last := atomic.LoadInt64(&lastReceived)
if last == 0 {
continue
}
lastTs := time.Unix(last, 0)
if time.Since(lastTs) > 60*time.Second {
glog.Warningf("Haven't received anything for over 60s." +
if time.Since(lastTs) > 300*time.Second {
glog.Warningf("Haven't received anything for over 300s." +
" Abandoning snapshot retrieval...\n")
cancel()
}
Expand All @@ -105,14 +108,14 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
size := 0
var done *pb.KVS
for {
// Track when we last received anything from the leader. If we don't
// receive anything for a while, we should abandon this connection.
atomic.StoreInt64(&lastReceived, time.Now().Unix())
kvs, err := stream.Recv()
if err != nil {
return errors.Wrapf(err, "stream.Recv")
}

// Track when we last received anything from the leader. If we don't
// receive anything for a while, we should abandon this connection.
atomic.StoreInt64(&lastReceived, time.Now().Unix())
atomic.StoreInt64(&lastReceived, 0) // We got something.

if kvs.Done {
done = kvs
Expand Down

0 comments on commit 6624e52

Please sign in to comment.