Skip to content

Commit

Permalink
fix: rewrite caching API
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian May committed Nov 26, 2024
1 parent 055430a commit aa0ccce
Show file tree
Hide file tree
Showing 9 changed files with 827 additions and 265 deletions.
410 changes: 341 additions & 69 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,5 @@ thiserror = "2.0.3"
sha256 = "1.5.0"
docker_credential = "1.3.1"
oci-client = { version = "0.14.0", default-features = false, features = [ "rustls-tls"] }
tempfile = "3.14.0"
bytes = "1.8.0"
flate2 = "1.0.35"
tar = "0.4.43"
tokio-util = "0.7.12"
futures-util = "0.3.31"
async-tar = "0.5.0"
async-std = "1.13.0"
238 changes: 162 additions & 76 deletions src/repos/cache.rs
Original file line number Diff line number Diff line change
@@ -1,124 +1,210 @@
use std::fmt::Debug;
use std::io::SeekFrom;
use std::path::Path;
use std::path::PathBuf;

use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;
use super::hash::Sha256Hash;
use async_std::fs::File;
use async_std::io::SeekExt;
use async_std::io::WriteExt;
use tap::Pipe;
use thiserror::Error;
use tokio::io::AsyncWriteExt;

use super::charts::Chart;
use super::{hash::Sha256Hash, meta::Meta};

#[derive(Error, Debug)]
pub enum Error {
#[error("IO error {0}: {1}")]
Io(PathBuf, std::io::Error),
#[error("Digest mismatch {0}: {1} != {2}")]
DigestMismatch(PathBuf, Sha256Hash, Sha256Hash),
// #[error("Invalid JSON {0}: {1}")]
// InvalidJson(PathBuf, serde_json::Error),
}

#[derive(Error, Debug)]
pub enum StreamError<E> {
#[error("IO error {0}: {1}")]
Io(PathBuf, std::io::Error),
#[error("Digest mismatch {0}: {1} != {2}")]
DigestMismatch(PathBuf, Sha256Hash, Sha256Hash),
// #[error("Invalid JSON {0}: {1}")]
// InvalidJson(PathBuf, serde_json::Error),
#[error("Stream error: {0}")]
Stream(PathBuf, E),
#[error("SHA256 hash mismatch {0}: {1} != {2}")]
Sha256HashMismatch(PathBuf, Sha256Hash, Sha256Hash),
#[error("File too big error {0}: expected {1} got {2}")]
TooBig(PathBuf, u64, u64),
}

#[derive(Debug)]
pub struct Cache {
pub path: std::path::PathBuf,
}

impl Cache {
pub fn new(path: &Path) -> Self {
#[derive(Clone, Debug)]
pub(super) struct Key {
pub name: String,
pub sha256_hash: Sha256Hash,
}

pub(super) struct CreateCacheEntry {
path: PathBuf,
temp_path: PathBuf,
file: File,
expected_size: Option<u64>,
}

impl CreateCacheEntry {
async fn new(
path: PathBuf,
temp_path: PathBuf,
expected_size: Option<u64>,
) -> Result<Self, Error> {
let parent = temp_path.parent().ok_or_else(|| {
Error::Io(
temp_path.clone(),
std::io::Error::new(std::io::ErrorKind::NotFound, "No parent directory"),
)
})?;

tokio::fs::create_dir_all(&parent)
.await
.map_err(|e| Error::Io(parent.into(), e))?;

let file = File::create(&temp_path)
.await
.map_err(|e| Error::Io(temp_path.clone(), e))?;

Self {
path: path.join(".charts"),
path,
temp_path,
file,
expected_size,
}
.pipe(Ok)
}

fn get_cache_file_path(&self, chart_name: &str, digest: &Sha256Hash) -> PathBuf {
self.path.join(chart_name).join(format!("{digest}.tgz"))
pub(super) async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), Error> {
self.file
.write_all(chunk)
.await
.map_err(|e| Error::Io(self.temp_path.clone(), e))?;

if let Some(expected_size) = self.expected_size {
let file_len = self
.file
.seek(SeekFrom::End(0))
.await
.map_err(|e| Error::Io(self.temp_path.clone(), e))?;

if file_len > expected_size {
return Err(Error::TooBig(
self.temp_path.clone(),
expected_size,
file_len,
));
}
}

Ok(())
}

fn get_cache_temp_path(&self, chart_name: &str, digest: &Sha256Hash) -> PathBuf {
self.path.join(chart_name).join(format!("{digest}.tmp"))
// pub(super) const fn file(&self) -> &File {
// &self.file
// }

pub(super) fn mut_file(&mut self) -> &mut File {
&mut self.file
}

pub(super) async fn get_cache_entry(&self, meta: &Meta) -> Result<Option<Chart>, Error> {
let file_path = self.get_cache_file_path(&meta.name, &meta.sha256_hash);
fn get_cache_file_path(&self, key: &Key) -> PathBuf {
self.path
.join(&key.name)
.join(format!("{}.tgz", key.sha256_hash))
}

if !file_path.exists() {
return Ok(None);
}
pub(super) async fn finalize(mut self, key: &Key) -> Result<PathBuf, Error> {
let final_path = self.get_cache_file_path(key);

let sha256_hash = Sha256Hash::from_file_async(&file_path)
.await
.map_err(|e| Error::Io(file_path.clone(), e))?;
self.flush().await?;

if meta.sha256_hash != sha256_hash {
return Err(Error::DigestMismatch(
file_path.clone(),
meta.sha256_hash.clone(),
let sha256_hash = self.calc_sha256_hash().await?;
if key.sha256_hash != sha256_hash {
return Err(Error::Sha256HashMismatch(
self.temp_path.clone(),
key.sha256_hash.clone(),
sha256_hash,
));
}

Ok(Some(Chart {
file_path,
meta: meta.clone(),
}))
let parent = final_path.parent().ok_or_else(|| {
Error::Io(
final_path.clone(),
std::io::Error::new(std::io::ErrorKind::NotFound, "No parent directory"),
)
})?;

tokio::fs::create_dir_all(&parent)
.await
.map_err(|e| Error::Io(parent.into(), e))?;

tokio::fs::rename(&self.temp_path, &final_path)
.await
.map_err(|e| Error::Io(final_path.clone(), e))?;

Ok(final_path.clone())
}

pub(super) async fn create_cache_entry<E: Send>(
&self,
meta: Meta,
mut stream: impl Stream<Item = Result<Bytes, E>> + Unpin + Send,
) -> Result<Chart, StreamError<E>> {
let file_path = self.get_cache_file_path(&meta.name, &meta.sha256_hash);
let temp_path = self.get_cache_temp_path(&meta.name, &meta.sha256_hash);

if let Some(parent) = temp_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| StreamError::Io(temp_path.clone(), e))?;
pub(super) async fn flush(&mut self) -> Result<(), Error> {
self.file
.flush()
.await
.map_err(|e| Error::Io(self.temp_path.clone(), e))
}

pub(super) async fn calc_sha256_hash(&self) -> Result<Sha256Hash, Error> {
Sha256Hash::from_async_path(&self.temp_path)
.await
.map_err(|e| Error::Io(self.temp_path.clone(), e))
}
}

impl Drop for CreateCacheEntry {
fn drop(&mut self) {
if self.temp_path.exists() {
let _ = std::fs::remove_file(&self.temp_path);
}
}
}

{
let mut file = tokio::fs::File::create(&temp_path)
.await
.map_err(|e| StreamError::Io(temp_path.clone(), e))?;
// Cache is not thread safe as creating entries will have race conditions.
unsafe impl Sync for Cache {}

impl Cache {
pub fn new(path: &Path) -> Self {
Self {
path: path.join(".charts"),
}
}

fn get_cache_file_path(&self, key: &Key) -> PathBuf {
self.path
.join(&key.name)
.join(format!("{}.tgz", key.sha256_hash))
}

while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| StreamError::Stream(temp_path.clone(), e))?;
pub(super) async fn get_cache_entry(&self, key: &Key) -> Result<Option<PathBuf>, Error> {
let file_path = self.get_cache_file_path(key);

file.write_all(&chunk)
.await
.map_err(|e| StreamError::Io(temp_path.clone(), e))?;
}
if !file_path.exists() {
return Ok(None);
}

let sha256_hash = Sha256Hash::from_file_async(&temp_path)
let sha256_hash = Sha256Hash::from_async_path(&file_path)
.await
.map_err(|e| StreamError::Io(temp_path.clone(), e))?;
if meta.sha256_hash != sha256_hash {
return Err(StreamError::DigestMismatch(
temp_path,
meta.sha256_hash.clone(),
.map_err(|e| Error::Io(file_path.clone(), e))?;

if key.sha256_hash != sha256_hash {
return Err(Error::Sha256HashMismatch(
file_path.clone(),
key.sha256_hash.clone(),
sha256_hash,
));
}

tokio::fs::rename(&temp_path, &file_path)
.await
.map_err(|e| StreamError::Io(file_path.clone(), e))?;
Ok(Some(file_path))
}

Ok(Chart { file_path, meta })
pub(super) async fn create_cache_entry(
&self,
extension: &str,
expected_size: Option<u64>,
) -> Result<CreateCacheEntry, Error> {
let temp_path = self.path.join(format!("tmp{extension}"));
CreateCacheEntry::new(self.path.clone(), temp_path, expected_size).await
}
}
2 changes: 1 addition & 1 deletion src/repos/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Sha256Hash {
// Ok(Self(hash))
// }

pub async fn from_file_async(path: &std::path::Path) -> Result<Self, std::io::Error> {
pub async fn from_async_path(path: &std::path::Path) -> Result<Self, std::io::Error> {
let hash = path.async_digest().await?;
Ok(Self(hash))
}
Expand Down
Loading

0 comments on commit aa0ccce

Please sign in to comment.