Skip to content

Commit

Permalink
added download as a stream option; validate stream digest
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinrp committed May 9, 2024
1 parent dba2fa9 commit 301f50b
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 4 deletions.
38 changes: 36 additions & 2 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_util::{Stream, TryStreamExt};
use futures_util::{future::ready, stream::once, Stream, StreamExt, TryStreamExt};
use indexmap::IndexMap;
use reqwest::{
header::{HeaderMap, HeaderValue},
Expand Down Expand Up @@ -441,7 +441,10 @@ impl Client {
continue;
}

return Ok(response.bytes_stream().map_err(|e| anyhow!(e)));
return Ok(validate_stream(
digest,
response.bytes_stream().map_err(|e| anyhow!(e)),
));
}

Err(ClientError::AllSourcesFailed(digest.clone()))
Expand Down Expand Up @@ -627,3 +630,34 @@ impl Client {
Ok(())
}
}

fn validate_stream(
digest: &AnyHash,
stream: impl Stream<Item = Result<Bytes>>,
) -> impl Stream<Item = Result<Bytes>> {
let hasher = Some(digest.algorithm().hasher());
let expected = digest.clone();
stream
.map_ok(Some)
.chain(once(async { Ok(None) }))
.scan(hasher, move |hasher, res| {
ready(match res {
Ok(Some(bytes)) => {
hasher.as_mut().unwrap().update(&bytes);
Some(Ok(bytes))
}
Ok(None) => {
let hasher = std::mem::take(hasher).unwrap();
let computed = hasher.finalize();
if expected == computed {
None
} else {
Some(Err(anyhow!(
"expected digest `{expected}` but computed digest `{computed}`"
)))
}
}
Err(err) => Some(Err(err)),
})
})
}
135 changes: 133 additions & 2 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use crate::storage::PackageInfo;

use anyhow::{anyhow, Context, Result};

use bytes::Bytes;
use futures_util::{Stream, StreamExt, TryStreamExt};
use indexmap::IndexMap;
use reqwest::{Body, IntoUrl};
use secrecy::Secret;
Expand All @@ -18,6 +19,7 @@ use storage::{
FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
};
use thiserror::Error;
use tokio_util::io::ReaderStream;
use warg_api::v1::{
fetch::{FetchError, FetchLogsRequest},
package::{
Expand Down Expand Up @@ -624,13 +626,59 @@ Attempt to create `{package_name}` and publish the release y/N\n",
}
}

/// Downloads the specified version of a package into client storage.
/// Downloads the latest version of a package.
///
/// If the requested package log is not present in client storage, it
/// will be fetched from the registry first.
///
/// An error is returned if the package does not exist.
///
/// If a version satisfying the requirement does not exist, `None` is
/// returned.
pub async fn download_as_stream(
&self,
package: &PackageName,
requirement: &VersionReq,
) -> Result<Option<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>)>, ClientError> {
let info = self.package(package).await?;

let registry_domain = self.get_warg_registry(package.namespace()).await?;

tracing::debug!(
package = package.as_ref(),
version_requirement = requirement.to_string(),
registry_header = ?registry_domain,
"downloading",
);

match info.state.find_latest_release(requirement) {
Some(release) => {
let digest = release
.content()
.context("invalid state: not yanked but missing content")?
.clone();
let stream = self
.download_content_stream(registry_domain.as_ref(), &digest)
.await?;
Ok(Some((
PackageDownloadInfo {
version: release.version.clone(),
digest,
},
stream,
)))
}
None => Ok(None),
}
}

/// Downloads the specified version of a package into client storage.
///
/// If the requested package log is not present in client storage, it
/// will be fetched from the registry first.
///
/// An error is returned if the package or version does not exist.
///
/// Returns the path within client storage of the package contents for
/// the specified version.
pub async fn download_exact(
Expand Down Expand Up @@ -673,6 +721,53 @@ Attempt to create `{package_name}` and publish the release y/N\n",
})
}

/// Downloads the specified version of a package.
///
/// If the requested package log is not present in client storage, it
/// will be fetched from the registry first.
///
/// An error is returned if the package or version does not exist.
pub async fn download_exact_as_stream(
&self,
package: &PackageName,
version: &Version,
) -> Result<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>), ClientError> {
let info = self.package(package).await?;

let registry_domain = self.get_warg_registry(package.namespace()).await?;

tracing::debug!(
package = package.as_ref(),
version = version.to_string(),
registry_header = ?registry_domain,
"downloading exact version",
);

let release =
info.state
.release(version)
.ok_or_else(|| ClientError::PackageVersionDoesNotExist {
version: version.clone(),
name: package.clone(),
})?;

let digest = release
.content()
.ok_or_else(|| ClientError::PackageVersionDoesNotExist {
version: version.clone(),
name: package.clone(),
})?;

Ok((
PackageDownloadInfo {
version: version.clone(),
digest: digest.clone(),
},
self.download_content_stream(registry_domain.as_ref(), digest)
.await?,
))
}

async fn update_packages_and_return_federated_packages<'a>(
&self,
registry_domain: Option<&RegistryDomain>,
Expand Down Expand Up @@ -1177,6 +1272,30 @@ current_registry = registry_domain.map(|d| d.as_str()).unwrap_or(&self.url().saf
}
}
}

/// Downloads the content for the specified digest as a stream.
///
/// If the content already exists in client storage, it is read from the client storage.
///
/// The download is not stored in client storage.
async fn download_content_stream(
&self,
registry_domain: Option<&RegistryDomain>,
digest: &AnyHash,
) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
match self.content.content_location(digest) {
Some(path) => {
tracing::info!("content for digest `{digest}` already exists in storage");
let file = tokio::fs::File::open(path)
.await
.map_err(ClientError::IoError)?;
Ok(ReaderStream::new(file).map_err(Into::into).boxed())
}
None => Ok(Box::pin(
self.api.download_content(registry_domain, digest).await?,
)),
}
}
}
/// A Warg registry client that uses the local file system to store
/// package logs and content.
Expand Down Expand Up @@ -1322,6 +1441,14 @@ pub struct PackageDownload {
pub path: PathBuf,
}

/// Represents information about a downloaded package.
pub struct PackageDownloadInfo {
/// The package version that was downloaded.
pub version: Version,
/// The digest of the package contents.
pub digest: AnyHash,
}

/// Represents an error returned by Warg registry clients.
#[derive(Debug, Error)]
pub enum ClientError {
Expand Down Expand Up @@ -1515,6 +1642,10 @@ pub enum ClientError {
/// An error occurred while performing a client operation.
#[error("{0:?}")]
Other(#[from] anyhow::Error),

/// An error occurred while performing a IO.
#[error("error: {0:?}")]
IoError(#[from] std::io::Error),
}

impl ClientError {
Expand Down

0 comments on commit 301f50b

Please sign in to comment.