Skip to content

Commit

Permalink
Introduce RegistryClientCache stub
Browse files Browse the repository at this point in the history
This PR lays groundwork for implementing caching of `RegistryClient` instances.
It just sets up all the types and refactors, no caching is done yet.

There is also a small functional change: local registry client does not print
`Unpacking` status in normal mode. This wasn't a big deal, and it allowed removing
the `is_offline` method from `RegistryClient`.

commit-id:afcc7328
  • Loading branch information
mkaput committed Oct 31, 2023
1 parent 7fd7bcc commit 5f110a7
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 90 deletions.
75 changes: 75 additions & 0 deletions scarb/src/core/registry/client/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::path::PathBuf;

use anyhow::{bail, Result};
use tracing::trace;

use crate::core::registry::client::{BeforeNetworkCallback, RegistryClient, RegistryResource};
use crate::core::registry::index::IndexRecords;
use crate::core::{Config, ManifestDependency, PackageId};

pub struct RegistryClientCache<'c> {
client: Box<dyn RegistryClient + 'c>,
_config: &'c Config,
}

impl<'c> RegistryClientCache<'c> {
pub fn new(client: Box<dyn RegistryClient + 'c>, config: &'c Config) -> Result<Self> {
Ok(Self {
client,
_config: config,
})
}

/// Layer over [`RegistryClient::get_records`] that caches the result.
///
/// It takes [`ManifestDependency`] instead of [`PackageName`] to allow performing some
/// optimizations by pre-filtering index records on cache-level.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_records_with_cache(
&self,
dependency: &ManifestDependency,
before_network: BeforeNetworkCallback,
) -> Result<IndexRecords> {
match self
.client
.get_records(dependency.name.clone(), before_network)
.await?
{
RegistryResource::NotFound => {
trace!("package not found in registry, pruning cache");
bail!("package not found in registry: {dependency}")
}
RegistryResource::InCache => {
trace!("getting records from cache");
todo!()
}
RegistryResource::Download { resource, .. } => {
trace!("got new records, invalidating cache");
Ok(resource)
}
}
}

/// Layer over [`RegistryClient::download`] that caches the result.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn download_with_cache(
&self,
package: PackageId,
before_network: BeforeNetworkCallback,
) -> Result<PathBuf> {
match self.client.download(package, before_network).await? {
RegistryResource::NotFound => {
trace!("archive not found in registry, pruning cache");
bail!("could not find downloadable archive for package indexed in registry: {package}")
}
RegistryResource::InCache => {
trace!("using cached archive");
todo!()
}
RegistryResource::Download { resource, .. } => {
trace!("got new archive, invalidating cache");
Ok(resource)
}
}
}
}
55 changes: 37 additions & 18 deletions scarb/src/core/registry/client/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{Context, Result};
use async_trait::async_trait;
Expand All @@ -10,9 +9,9 @@ use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::BufWriter;
use tokio::sync::OnceCell;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

use crate::core::registry::client::RegistryClient;
use crate::core::registry::client::{BeforeNetworkCallback, RegistryClient, RegistryResource};
use crate::core::registry::index::{IndexConfig, IndexRecords};
use crate::core::{Config, Package, PackageId, PackageName, SourceId};
use crate::flock::{FileLockGuard, Filesystem};
Expand Down Expand Up @@ -78,14 +77,16 @@ impl<'c> HttpRegistryClient<'c> {

#[async_trait]
impl<'c> RegistryClient for HttpRegistryClient<'c> {
fn is_offline(&self) -> bool {
false
}

async fn get_records(&self, package: PackageName) -> Result<Option<Arc<IndexRecords>>> {
async fn get_records(
&self,
package: PackageName,
before_network: BeforeNetworkCallback,
) -> Result<RegistryResource<IndexRecords>> {
let index_config = self.index_config().await?;
let records_url = index_config.index.expand(package.into())?;

before_network()?;

let response = self
.config
.http()?
Expand All @@ -97,7 +98,7 @@ impl<'c> RegistryClient for HttpRegistryClient<'c> {
if let Err(err) = &response {
if let Some(status) = err.status() {
if status == StatusCode::NOT_FOUND {
return Ok(None);
return Ok(RegistryResource::NotFound);
}
}
}
Expand All @@ -107,24 +108,39 @@ impl<'c> RegistryClient for HttpRegistryClient<'c> {
.await
.context("failed to deserialize index records")?;

Ok(Some(Arc::new(records)))
}

async fn is_downloaded(&self, _package: PackageId) -> bool {
// TODO(mkaput): Cache downloaded packages.
false
Ok(RegistryResource::Download {
resource: records,
cache_key: None,
})
}

async fn download(&self, package: PackageId) -> Result<PathBuf> {
async fn download(
&self,
package: PackageId,
before_network: BeforeNetworkCallback,
) -> Result<RegistryResource<PathBuf>> {
let dl_url = self.index_config().await?.dl.expand(package.into())?;

before_network()?;

let response = self
.config
.http()?
.get(dl_url)
.send()
.await?
.error_for_status()?;
.error_for_status();

if let Err(err) = &response {
if let Some(status) = err.status() {
if status == StatusCode::NOT_FOUND {
error!("package `{package}` not found in registry: {err:?}");
return Ok(RegistryResource::NotFound);
}
}
}

let response = response?;

let output_path = self.dl_fs.path_existent()?.join(package.tarball_name());
let output_file = OpenOptions::new()
Expand All @@ -149,7 +165,10 @@ impl<'c> RegistryClient for HttpRegistryClient<'c> {
.context("failed to save response chunk on disk")?;
}

Ok(output_path.into_std_path_buf())
Ok(RegistryResource::Download {
resource: output_path.into_std_path_buf(),
cache_key: None,
})
}

async fn supports_publish(&self) -> Result<bool> {
Expand Down
38 changes: 23 additions & 15 deletions scarb/src/core/registry/client/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::io;
use std::io::{BufReader, BufWriter, Seek, SeekFrom};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{ensure, Context, Error, Result};
use async_trait::async_trait;
use fs4::FileExt;
use tokio::task::spawn_blocking;
use tracing::trace;
use url::Url;

use crate::core::registry::client::RegistryClient;
use crate::core::registry::client::{BeforeNetworkCallback, RegistryClient, RegistryResource};
use crate::core::registry::index::{IndexDependency, IndexRecord, IndexRecords, TemplateUrl};
use crate::core::{Checksum, Digest, Package, PackageId, PackageName, Summary};
use crate::flock::FileLockGuard;
Expand Down Expand Up @@ -93,12 +93,14 @@ impl LocalRegistryClient {

#[async_trait]
impl RegistryClient for LocalRegistryClient {
fn is_offline(&self) -> bool {
true
}
#[tracing::instrument(level = "trace", skip_all)]
async fn get_records(
&self,
package: PackageName,
_: BeforeNetworkCallback,
) -> Result<RegistryResource<IndexRecords>> {
trace!(?package);

#[tracing::instrument(level = "trace", skip(self))]
async fn get_records(&self, package: PackageName) -> Result<Option<Arc<IndexRecords>>> {
let records_path = self.records_path(&package);

spawn_blocking(move || {
Expand All @@ -107,22 +109,28 @@ impl RegistryClient for LocalRegistryClient {
if e.downcast_ref::<io::Error>()
.map_or(false, |ioe| ioe.kind() == io::ErrorKind::NotFound) =>
{
return Ok(None);
return Ok(RegistryResource::NotFound);
}
r => r?,
};
let records = serde_json::from_slice(&records)?;
Ok(Some(Arc::new(records)))
Ok(RegistryResource::Download {
resource: records,
cache_key: None,
})
})
.await?
}

async fn is_downloaded(&self, _package: PackageId) -> bool {
true
}

async fn download(&self, package: PackageId) -> Result<PathBuf> {
Ok(self.dl_path(package))
async fn download(
&self,
package: PackageId,
_: BeforeNetworkCallback,
) -> Result<RegistryResource<PathBuf>> {
Ok(RegistryResource::Download {
resource: self.dl_path(package),
cache_key: None,
})
}

async fn supports_publish(&self) -> Result<bool> {
Expand Down
62 changes: 44 additions & 18 deletions scarb/src/core/registry/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -8,43 +7,70 @@ use crate::core::registry::index::IndexRecords;
use crate::core::{Package, PackageId, PackageName};
use crate::flock::FileLockGuard;

pub mod cache;
pub mod http;
pub mod local;

/// Result from loading data from a registry.
pub enum RegistryResource<T> {
/// The requested resource was not found.
NotFound,
/// The cache is valid and the cached data should be used.
#[allow(dead_code)]
InCache,
/// The cache is out of date, new data was downloaded and should be used from now on.
Download {
resource: T,
/// Client-dependent opaque value used to determine whether resource is out of date.
///
/// Returning `None` means that this client/resource is not cacheable.
cache_key: Option<String>,
},
}

pub type BeforeNetworkCallback = Box<dyn FnOnce() -> Result<()> + Send>;

#[async_trait]
pub trait RegistryClient: Send + Sync {
/// State whether this registry works in offline mode.
///
/// Local registries are expected to perform immediate file operations, while remote registries
/// can take some IO-bound time. This flag also influences appearance of various UI elements.
fn is_offline(&self) -> bool;

/// Get the index record for a specific named package from this index.
///
/// Returns `None` if the package is not present in the index.
///
/// ## Callbacks
///
/// The `before_network` callback **must** be called right before doing actual network requests.
/// It might return an error which **must** be immediately bubbled out. If this client does not
/// perform network requests, this callback **must** not be called at all.
///
/// ## Caching
///
/// This method is not expected to internally cache the result, but it is not prohibited either.
/// Scarb applies specialized caching layers on top of clients.
async fn get_records(&self, package: PackageName) -> Result<Option<Arc<IndexRecords>>>;

/// Check if the package `.tar.zst` file has already been downloaded and is stored on disk.
///
/// On internal errors, this method should return `false`. This method must not perform any
/// network operations (it can be called before offline mode check).
async fn is_downloaded(&self, package: PackageId) -> bool;
async fn get_records(
&self,
package: PackageName,
before_network: BeforeNetworkCallback,
) -> Result<RegistryResource<IndexRecords>>;

/// Download the package `.tar.zst` file.
///
/// Returns a [`PathBuf`] to the downloaded `.tar.zst` file.
///
/// ## Callbacks
///
/// The `before_network` callback **must** be called right before doing actual network requests.
/// It might return an error which **must** be immediately bubbled out. If this client does not
/// perform network requests, this callback **must** not be called at all.
///
/// ## Caching
///
/// If the registry is remote, i.e. actually downloads files and writes them to disk,
/// it should write downloaded files to Scarb cache directory. If the file has already been
/// downloaded, it should avoid downloading it again, and read it from this cache instead.
async fn download(&self, package: PackageId) -> Result<PathBuf>;
/// This method is not expected to internally cache the result, but it is not prohibited either.
/// Scarb applies specialized caching layers on top of clients.
async fn download(
&self,
package: PackageId,
before_network: BeforeNetworkCallback,
) -> Result<RegistryResource<PathBuf>>;

/// State whether packages can be published to this registry.
///
Expand Down
Loading

0 comments on commit 5f110a7

Please sign in to comment.