Skip to content

Commit

Permalink
feat: conditional writes and evictions
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Feb 6, 2025
1 parent 2351a24 commit b708fd4
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ pub struct VnotifyCache {
fencing_queue_tx: tokio::sync::mpsc::UnboundedSender<oneshot::Sender<()>>,
}

#[derive(Debug, Clone)]
pub enum WriteCondition {
Unconditional,
IfNotPresent,
IfMatchEtag(String),
}

// Configuration for a `VnotifyCache`.
#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -65,6 +72,21 @@ struct Entry {
last_used_ms: AtomicU64,
}

#[derive(Debug, Clone)]
pub struct Item {
pub body: Bytes,
pub object_etag: String,
}

impl<'a> From<&'a Entry> for Item {
fn from(entry: &'a Entry) -> Self {
Self {
body: entry.body.clone(),
object_etag: entry.object_etag.clone(),
}
}
}

impl Config {
pub fn with_bucket_and_prefix(bucket: String, prefix: String) -> Self {
Self {
Expand Down Expand Up @@ -113,18 +135,18 @@ impl VnotifyCache {
}
}

pub async fn try_get(&self, key: &str) -> Option<Bytes> {
pub async fn try_get(&self, key: &str) -> Option<Item> {
let shard_index = get_shard_index_for_key(key, self.config.shards);
let shard = &self.shards[shard_index];
shard
.entries
.get(key)
.await
.flatten()
.map(|x| x.body.clone())
.map(|x| Item::from(&*x))
}

pub async fn get(&self, key: &str) -> anyhow::Result<Option<Bytes>> {
pub async fn get(&self, key: &str) -> anyhow::Result<Option<Item>> {
let shard_index = get_shard_index_for_key(key, self.config.shards);
let shard = &self.shards[shard_index];
shard
Expand Down Expand Up @@ -163,24 +185,33 @@ impl VnotifyCache {
self.config.time_base.elapsed().as_millis() as u64,
Ordering::Relaxed,
);
x.body.clone()
Item::from(&*x)
})
})
.map_err(|e| anyhow::anyhow!("failed to load object from s3: {:?}", e))
.await
}

pub async fn put(&self, key: &str, body: Bytes) -> anyhow::Result<()> {
pub async fn evict(&self, key: &str) {
let shard_index = get_shard_index_for_key(key, self.config.shards);
let shard = &self.shards[shard_index];
shard.entries.invalidate(key).await;
}

pub async fn put(&self, key: &str, body: Bytes, cond: WriteCondition) -> anyhow::Result<()> {
let shard_index = get_shard_index_for_key(key, self.config.shards);
let shard = &self.shards[shard_index];
let res = self
.client
.put_object()
.bucket(&self.config.bucket)
.key(key)
.body(ByteStream::from(body.clone()))
.send()
.await?;
.key(key);
let res = match cond {
WriteCondition::Unconditional => res,
WriteCondition::IfNotPresent => res.if_none_match("*"),
WriteCondition::IfMatchEtag(etag) => res.if_match(etag),
};
let res = res.body(ByteStream::from(body.clone())).send().await?;
let object_etag = res.e_tag.unwrap_or_default();
shard
.entries
Expand Down

0 comments on commit b708fd4

Please sign in to comment.