Skip to content

Commit

Permalink
empty database replication
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Jul 13, 2022
1 parent 42e5dde commit 198a45f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
3 changes: 3 additions & 0 deletions datafile/datafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (df *defaultDatafile) ReadAtHeader(index int64) (*Header, EOFType, error) {

h, err := d.DecodeHeader()
if err != nil {
if errors.Is(err, io.EOF) {
return nil, IsEOF, nil
}
return nil, IsEOF, errors.WithStack(err)
}

Expand Down
6 changes: 5 additions & 1 deletion repli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (s *repliSource) LastIndex(fileID datafile.FileID) int64 {
if s.b.curr.FileID().Equal(fileID) {
return s.b.curr.Size()
}
return s.b.datafiles[fileID].Size()
df, ok := s.b.datafiles[fileID]
if ok != true {
return -1
}
return df.Size()
}

func (s *repliSource) Header(fileID datafile.FileID, index int64) (*datafile.Header, datafile.EOFType, error) {
Expand Down
24 changes: 22 additions & 2 deletions repli/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand
})
return
}
header, isEOF, err := src.Header(req.FileID, req.Index)
header, eofType, err := src.Header(req.FileID, req.Index)
if err != nil {
e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: -1,
Expand All @@ -560,9 +560,19 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand
return
}

// file found but empty
if header == nil && eofType == datafile.IsEOF {
e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: 0,
EOF: bool(eofType),
Err: "",
})
return
}

e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: header.TotalSize,
EOF: bool(isEOF),
EOF: bool(eofType),
Err: "",
})
}
Expand Down Expand Up @@ -1088,6 +1098,11 @@ func (r *streamReciver) reqDiffData(conn *nats.Conn, dst Destination, fileID dat
return errors.WithStack(err)
}

// file found but empty
if size == 0 && isEOF {
return nil
}

entry, err := r.reqFetchData(conn, fileID, lastIndex, size)
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -1116,8 +1131,13 @@ func (r *streamReciver) requestBehindData(client *nats.Conn, dst Destination, re
if err != nil {
return errors.Wrapf(err, "failed request current file index: fileID=%d", f.FileID)
}

requestedFileIds[f.FileID] = struct{}{} // mark

if lastIndex < 0 {
continue // not found
}

if f.Index < lastIndex {
// server has new index
if err := r.reqDiffData(client, dst, f.FileID, f.Index); err != nil {
Expand Down

0 comments on commit 198a45f

Please sign in to comment.