Skip to content

Commit

Permalink
add Record::new
Browse files Browse the repository at this point in the history
  • Loading branch information
ckampfe committed May 24, 2024
1 parent 6d32c0a commit ca67c84
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 72 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ I have known about Bitcask for a while, and I wanted to learn it by building a w
- [ ] clean up datamodel around records/entrypointers/mergepointers
- [ ] more research into how async drop interacts with disk writes/buffer flushes
- [x] investigate a better, less ambiguous tombstone value
- [ ] move record write_insert and write_delete into Record
- [x] move more of write_insert and write_delete into Record
- [ ] improve error contexts reported to callers (e.g. with `snafu` or improving use of `thiserror`)
- [ ] error handling and reporting in the event of a corrupt record
- [ ] investigate allowing the access of old values
- [ ] optimize layout of EntryPointer size, file_id to u32, value_position to u32?, tx_id to `time::Time`
- [ ] investigate restricting key size to u16
- [ ] use crc32 instead of blake3
69 changes: 13 additions & 56 deletions src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,50 +314,26 @@ where
v: V,
) -> crate::Result<()> {
self.tx_id += 1;
let encoded_tx_id = self.tx_id.to_be_bytes();
let encoded_key = bincode::serialize(&k).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let encoded_value = bincode::serialize(&v).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;
let record = Record::new(&k, &v, self.tx_id)?;

let key_size = encoded_key.len();

let value_size = encoded_value.len();

let encoded_key_size = (key_size as u32).to_be_bytes();
let encoded_value_size = (value_size as u32).to_be_bytes();

let mut payload = vec![];
payload.extend_from_slice(&encoded_tx_id);
payload.extend_from_slice(&encoded_key_size);
payload.extend_from_slice(&encoded_value_size);
payload.extend_from_slice(&encoded_key);
payload.extend_from_slice(&encoded_value);

let hash = blake3::hash(&payload);
let hash = hash.as_bytes();

self.active_file.write_all(hash).await?;
self.active_file.write_all(&payload).await?;
self.active_file.write_all(&record).await?;

let value_position =
self.offset + crate::record::Record::HEADER_SIZE as u64 + key_size as u64;
self.offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size() as u64;

let entry = EntryPointer {
file_id: self.active_file_id,
value_position,
value_size: value_size.try_into().unwrap(),
value_size: record.value_size(),
tx_id: self.tx_id,
};

self.keydir.insert(k, entry);

let entry_size = crate::record::Record::HEADER_SIZE + key_size + value_size;
let entry_size = crate::record::Record::HEADER_SIZE
+ record.key_size() as usize
+ record.value_size() as usize;

self.offset += entry_size as u64;

Expand Down Expand Up @@ -389,37 +365,18 @@ where

async fn write_delete(&mut self, k: K) -> crate::Result<()> {
self.tx_id += 1;
let encoded_tx_id = self.tx_id.to_be_bytes();
let encoded_key = bincode::serialize(&k).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let encoded_value = Record::tombstone();

let key_size = encoded_key.len();

let value_size = encoded_value.len();

let encoded_key_size = (key_size as u32).to_be_bytes();
let encoded_value_size = (value_size as u32).to_be_bytes();

let mut payload = vec![];
payload.extend_from_slice(&encoded_tx_id);
payload.extend_from_slice(&encoded_key_size);
payload.extend_from_slice(&encoded_value_size);
payload.extend_from_slice(&encoded_key);
payload.extend_from_slice(encoded_value);
let v = Record::tombstone();

let hash = blake3::hash(&payload);
let hash = hash.as_bytes();
let record = Record::new(&k, &v, self.tx_id)?;

self.active_file.write_all(hash).await?;
self.active_file.write_all(&payload).await?;
self.active_file.write_all(&record).await?;

self.keydir.remove(&k);

let entry_size = crate::record::Record::HEADER_SIZE + key_size + value_size;
let entry_size = crate::record::Record::HEADER_SIZE
+ record.key_size() as usize
+ record.value_size() as usize;

self.offset += entry_size as u64;

Expand Down
77 changes: 62 additions & 15 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::sync::OnceLock;

use crate::keydir::Liveness;
use serde::de::DeserializeOwned;
use crate::{error, keydir::Liveness};
use serde::{de::DeserializeOwned, Serialize};
use std::{ops::Deref, sync::OnceLock};
use tokio::io::{AsyncRead, AsyncReadExt};

const TOMBSTONE_BYTES: &[u8] = b"bitcask_tombstone";

static TOMBSTONE: OnceLock<Vec<u8>> = OnceLock::new();
static SERIALIZED_TOMBSTONE: OnceLock<Vec<u8>> = OnceLock::new();

/// A record is a "header" and a "body"
/// The header is (in on-disk and in-memory order):
/// - hash (the paper calls this `crc`)
/// - tx_id (the paper calls this `tstamp`)
/// - key_size
/// - value_size
/// - hash (the paper calls this `crc`) (32 bytes)
/// - tx_id (the paper calls this `tstamp`) (16 bytes)
/// - key_size (4 bytes)
/// - value_size (4 bytes)
///
/// The body is (also in on-disk and in-memory order):
/// - key
Expand All @@ -22,11 +21,63 @@ pub(crate) struct Record {
buf: Vec<u8>,
}

impl Deref for Record {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
&self.buf
}
}

// crate-public impls
impl Record {
pub(crate) const HEADER_SIZE: usize =
Record::HASH_SIZE + Record::TX_ID_SIZE + Record::KEY_SIZE_SIZE + Record::VALUE_SIZE_SIZE;

pub(crate) fn new<K: Serialize, V: Serialize>(
k: &K,
v: &V,
tx_id: u128,
) -> crate::Result<Self> {
let encoded_tx_id = tx_id.to_be_bytes();

let encoded_key = bincode::serialize(k).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let encoded_value = bincode::serialize(v).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let key_size = encoded_key.len();
let value_size = encoded_value.len();
let body_size = key_size + value_size;

let encoded_key_size = (key_size as u32).to_be_bytes();
let encoded_value_size = (value_size as u32).to_be_bytes();

let mut buf = Vec::with_capacity(Self::HEADER_SIZE + body_size);
// header
// dummy hash bytes, added back in at the end...
buf.extend_from_slice(&[0u8; blake3::OUT_LEN]);
// rest of header
buf.extend_from_slice(&encoded_tx_id);
buf.extend_from_slice(&encoded_key_size);
buf.extend_from_slice(&encoded_value_size);
// body
buf.extend_from_slice(&encoded_key);
buf.extend_from_slice(&encoded_value);

let hash = blake3::hash(&buf[32..]);
let hash = hash.as_bytes();
// ...and finally set the first 32 bytes to the hash
buf[..32].copy_from_slice(hash);

Ok(Record { buf })
}

pub(crate) async fn read_from<R: AsyncRead + Unpin>(
reader: &mut tokio::io::BufReader<R>,
) -> std::io::Result<Record> {
Expand Down Expand Up @@ -62,7 +113,7 @@ impl Record {

pub(crate) fn liveness(&self) -> Liveness {
if self.value_bytes()
== TOMBSTONE.get_or_init(|| bincode::serialize(&TOMBSTONE_BYTES).unwrap())
== SERIALIZED_TOMBSTONE.get_or_init(|| bincode::serialize(&TOMBSTONE_BYTES).unwrap())
{
Liveness::Deleted
} else {
Expand All @@ -71,7 +122,7 @@ impl Record {
}

pub(crate) fn tombstone() -> &'static [u8] {
TOMBSTONE.get_or_init(|| bincode::serialize(&TOMBSTONE_BYTES).unwrap())
TOMBSTONE_BYTES
}

pub(crate) fn key_bytes(&self) -> &[u8] {
Expand All @@ -90,10 +141,6 @@ impl Record {
self.buf.len()
}

pub(crate) fn body_len(&self) -> usize {
self.body().len()
}

pub(crate) fn tx_id(&self) -> u128 {
u128::from_be_bytes(self.tx_id_bytes().try_into().unwrap())
}
Expand Down

0 comments on commit ca67c84

Please sign in to comment.