Skip to content

Commit

Permalink
Tidying up journal writer code comments and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fulghum committed Jan 10, 2025
1 parent 071cd77 commit 5458ae9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 31 deletions.
80 changes: 58 additions & 22 deletions go/store/nbs/journal_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"time"

"github.com/dolthub/dolt/go/store/d"
Expand Down Expand Up @@ -101,14 +102,23 @@ var journalRecordTimestampGenerator = func() uint64 {
}

func chunkRecordSize(c CompressedChunk) (recordSz, payloadOff uint32) {
recordSz += journalRecLenSz
recordSz += journalRecTagSz + journalRecKindSz
recordSz += journalRecTagSz + journalRecAddrSz
recordSz += journalRecTagSz // payload tag
payloadOff = recordSz
payloadOff += journalRecLenSz
payloadOff += journalRecTagSz + journalRecKindSz
payloadOff += journalRecTagSz + journalRecAddrSz
payloadOff += journalRecTagSz // payload tag

// Make sure the size of the chunk wouldn't overflow the uint32 record length
maxCompressedChunkSize := math.MaxUint32 - int(payloadOff+journalRecChecksumSz)
if len(c.FullCompressedChunk) > maxCompressedChunkSize {
panic(fmt.Sprintf("compressed chunk size (%d) is larger than max size allowed "+
"for chunk record (%d)", len(c.FullCompressedChunk), maxCompressedChunkSize))
}

recordSz = payloadOff
recordSz += uint32(len(c.FullCompressedChunk))
recordSz += journalRecChecksumSz
return

return recordSz, payloadOff
}

func rootHashRecordSize() (recordSz int) {
Expand All @@ -121,7 +131,9 @@ func rootHashRecordSize() (recordSz int) {
}

func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) {
// length
// length – comes back as an unsigned 32 bit int, which aligns with the four bytes used
// in the journal storage protocol to store the total record length, assuring that we can't
// read a length that is too large to safely write.
l, _ := chunkRecordSize(c)
writeUint32(buf[:journalRecLenSz], l)
n += journalRecLenSz
Expand Down Expand Up @@ -211,16 +223,27 @@ func readJournalRecord(buf []byte) (rec journalRec, err error) {
return
}

func validateJournalRecord(buf []byte) bool {
// validateJournalRecord performs some sanity checks on the buffer |buf| containing a journal
// record, such as checking that the length of the record is not too short, and checking the
// checksum. If any problems are detected, an erorr is returned, otherwise nil is returned.
func validateJournalRecord(buf []byte) error {
if len(buf) < (journalRecLenSz + journalRecChecksumSz) {
return false
return fmt.Errorf("invalid journal record: buffer length too small (%d < %d)", len(buf), (journalRecLenSz + journalRecChecksumSz))
}

off := readUint32(buf)
if int(off) > len(buf) {
return false
return fmt.Errorf("invalid journal record: offset is greater than length of buffer (%d > %d)",
off, len(buf))
}

off -= indexRecChecksumSz
return crc(buf[:off]) == readUint32(buf[off:])
crcMatches := crc(buf[:off]) == readUint32(buf[off:])
if !crcMatches {
return fmt.Errorf("invalid journal record: CRC checksum does not match")
}

return nil
}

// processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at
Expand All @@ -241,36 +264,49 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
for {
// peek to read next record size
if buf, err = rdr.Peek(uint32Size); err != nil {
break
if err == io.EOF {
break
} else {
return 0, err
}
}

// The first 4 bytes in the journal record are the total length of the record (including
// these first four bytes)
l := readUint32(buf)
if buf, err = rdr.Peek(int(l)); err != nil {

// The journal file data is initialized to a block of zero bytes, so if we read a record
// length of 0, we know we've reached the end of the journal records and are starting to
// read the zero padding.
if l == 0 {
break
}

if !validateJournalRecord(buf) {
// We read the journal file until we run into an invalid record.
break // stop if we can't validate |rec|
if buf, err = rdr.Peek(int(l)); err != nil {
return 0, err
}

if err = validateJournalRecord(buf); err != nil {
// TODO: Should we add an emergency valve here to allow customers to skip invalid records?
// i.e. let people opt-in to the old behavior in case anyone is relying on it?
return 0, err
}

var rec journalRec
if rec, err = readJournalRecord(buf); err != nil {
break // failed to read valid record
return 0, err
}
if err = cb(off, rec); err != nil {
break
return 0, err
}

// advance |rdr| state by |l| bytes
if _, err = io.ReadFull(rdr, buf); err != nil {
break
return 0, err
}
off += int64(len(buf))
}
if err != nil && err != io.EOF {
return 0, err
}

// reset the file pointer to end of the last
// successfully processed journal record
if _, err = r.Seek(off, 0); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/store/nbs/journal_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func TestUnknownJournalRecordTag(t *testing.T) {
// test behavior encountering unknown tag
buf := makeUnknownTagJournalRecord()
// checksum is ok
ok := validateJournalRecord(buf)
assert.True(t, ok)
err := validateJournalRecord(buf)
assert.NoError(t, err)
// reading record fails
_, err := readJournalRecord(buf)
_, err = readJournalRecord(buf)
assert.Error(t, err)
}

Expand Down
19 changes: 13 additions & 6 deletions go/store/nbs/journal_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import (
)

const (
// chunkJournalFileSize is the size we initialize the journal file to when it is first created. We
// create a 16KB block of zero-initialized data and then sync the file to the first byte. We do this
// to ensure that we can write to the journal file and that we have some space for initial records.
// This probably isn't strictly necessary, but it also doesn't hurt.
chunkJournalFileSize = 16 * 1024

// todo(andy): buffer must be able to hold an entire record,
// but we don't have a hard limit on record size right now.
// JSON data has cases where it won't chunk down as small as other data,
// so we have increased this to 5MB. If/when JSON chunking handles those
// cases, we could decrease this size to 1MB again.
// journalWriterBuffSize is the size of the statically allocated buffer where journal records are
// built before being written to the journal file on disk. There is not a hard limit on the size
// of records – specifically, some newer data chunking formats (i.e. optimized JSON storage) can
// produce chunks (and therefore chunk records) that are megabytes in size. The current limit of
// 5MB should be large enough to cover all but the most extreme cases.
journalWriterBuffSize = 5 * 1024 * 1024

chunkJournalAddr = chunks.JournalFileID
Expand Down Expand Up @@ -118,6 +122,9 @@ func createJournalWriter(ctx context.Context, path string) (wr *journalWriter, e
return nil, err
}

// Open the journal file and initialize it with 16KB of zero bytes. This is intended to
// ensure that we can write to the journal and to allocate space for the first set of
// records, but probably isn't strictly necessary.
if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666); err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,7 +190,7 @@ var _ io.Closer = &journalWriter{}
// The journal index will bw truncated to the last valid batch of lookups. Lookups with offsets
// larger than the position of the last valid lookup metadata are rewritten to the index as they
// are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we
// extend the jounral index with one metadata flush before existing this function to save indexing
// extend the journal index with one metadata flush before existing this function to save indexing
// progress.
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer) (last hash.Hash, err error) {
wr.lock.Lock()
Expand Down

0 comments on commit 5458ae9

Please sign in to comment.