diff --git a/datafile/datafile.go b/datafile/datafile.go index 91d508b..82055e8 100644 --- a/datafile/datafile.go +++ b/datafile/datafile.go @@ -137,6 +137,9 @@ func (df *defaultDatafile) ReadAtHeader(index int64) (*Header, EOFType, error) { h, err := d.DecodeHeader() if err != nil { + if errors.Is(err, io.EOF) { + return nil, IsEOF, nil + } return nil, IsEOF, errors.WithStack(err) } diff --git a/repli.go b/repli.go index a1ea1d0..dc96906 100644 --- a/repli.go +++ b/repli.go @@ -46,7 +46,11 @@ func (s *repliSource) LastIndex(fileID datafile.FileID) int64 { if s.b.curr.FileID().Equal(fileID) { return s.b.curr.Size() } - return s.b.datafiles[fileID].Size() + df, ok := s.b.datafiles[fileID] + if ok != true { + return -1 + } + return df.Size() } func (s *repliSource) Header(fileID datafile.FileID, index int64) (*datafile.Header, datafile.EOFType, error) { diff --git a/repli/stream.go b/repli/stream.go index 787c21d..52b2767 100644 --- a/repli/stream.go +++ b/repli/stream.go @@ -550,7 +550,7 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand }) return } - header, isEOF, err := src.Header(req.FileID, req.Index) + header, eofType, err := src.Header(req.FileID, req.Index) if err != nil { e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ Size: -1, @@ -560,9 +560,19 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand return } + // file found but empty + if header == nil && eofType == datafile.IsEOF { + e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ + Size: 0, + EOF: bool(eofType), + Err: "", + }) + return + } + e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ Size: header.TotalSize, - EOF: bool(isEOF), + EOF: bool(eofType), Err: "", }) } @@ -1088,6 +1098,11 @@ func (r *streamReciver) reqDiffData(conn *nats.Conn, dst Destination, fileID dat return errors.WithStack(err) } + // file found but empty + if size == 0 && isEOF { + return nil + } + entry, err := r.reqFetchData(conn, fileID, lastIndex, size) if err != nil { return errors.WithStack(err) @@ -1116,8 +1131,13 @@ func (r *streamReciver) requestBehindData(client *nats.Conn, dst Destination, re if err != nil { return errors.Wrapf(err, "failed request current file index: fileID=%d", f.FileID) } + requestedFileIds[f.FileID] = struct{}{} // mark + if lastIndex < 0 { + continue // not found + } + if f.Index < lastIndex { // server has new index if err := r.reqDiffData(client, dst, f.FileID, f.Index); err != nil {