From 266de6bae1a4fa4079eda1462140a6c50e2730fa Mon Sep 17 00:00:00 2001 From: Marek Kaput Date: Fri, 20 Oct 2023 20:12:50 +0200 Subject: [PATCH] Implement caching of records returned from registry clients NOTE: Nothing is cached actually yet, because no registry client uses hooks provided by this code. This will come in subsequent PRs. commit-id:97fc578f --- Cargo.lock | 27 +++ Cargo.toml | 1 + scarb/Cargo.toml | 1 + scarb/src/core/registry/client/cache.rs | 281 ++++++++++++++++++++++-- scarb/src/core/registry/client/http.rs | 6 +- scarb/src/core/registry/client/local.rs | 8 +- scarb/src/core/registry/client/mod.rs | 6 +- scarb/src/internal/fsx.rs | 10 + scarb/src/sources/registry.rs | 2 +- utils/scarb-ui/src/lib.rs | 8 + 10 files changed, 331 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cc481324..475894fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4010,6 +4010,16 @@ dependencies = [ "human_format", ] +[[package]] +name = "pyo3-build-config" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a96fe70b176a89cff78f2fa7b3c930081e163d5379b4dcdf993e3ae29ca662e5" +dependencies = [ + "once_cell", + "target-lexicon", +] + [[package]] name = "quote" version = "1.0.33" @@ -4081,6 +4091,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58f6da33e3b54de2ef82201ce2b465e67f337deb15d45f54355e0f77202bb4" +dependencies = [ + "libc", + "pyo3-build-config", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4414,6 +4434,7 @@ dependencies = [ "pathdiff", "petgraph", "predicates", + "redb", "reqwest", "scarb-build-metadata", "scarb-metadata 1.9.0", @@ -5110,6 +5131,12 @@ dependencies = [ "xattr", ] +[[package]] +name = "target-lexicon" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" + [[package]] name = "tempfile" version = "3.8.1" diff --git a/Cargo.toml b/Cargo.toml index 4e4ceb000..94b53787b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ petgraph = "0.6" predicates = "3" proc-macro2 = "1" quote = "1" +redb = "1.3" reqwest = { version = "0.11", features = ["gzip", "brotli", "deflate", "json", "stream"], default-features = false } semver = { version = "1", features = ["serde"] } serde = { version = "1", features = ["serde_derive"] } diff --git a/scarb/Cargo.toml b/scarb/Cargo.toml index babf4152d..02d790d87 100644 --- a/scarb/Cargo.toml +++ b/scarb/Cargo.toml @@ -46,6 +46,7 @@ itertools.workspace = true once_cell.workspace = true pathdiff.workspace = true petgraph.workspace = true +redb.workspace = true reqwest.workspace = true scarb-build-metadata = { path = "../utils/scarb-build-metadata" } scarb-metadata = { path = "../scarb-metadata", default-features = false, features = ["builder"] } diff --git a/scarb/src/core/registry/client/cache.rs b/scarb/src/core/registry/client/cache.rs index b4dea39b2..9a55e8310 100644 --- a/scarb/src/core/registry/client/cache.rs +++ b/scarb/src/core/registry/client/cache.rs @@ -1,22 +1,86 @@ use std::path::PathBuf; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; +use camino::Utf8Path; +use redb::{MultimapTableDefinition, ReadableMultimapTable, ReadableTable, TableDefinition}; +use semver::Version; +use tokio::sync::OnceCell; +use tokio::task::block_in_place; use tracing::trace; +use scarb_ui::Ui; + use crate::core::registry::client::{RegistryClient, RegistryResource}; -use crate::core::registry::index::IndexRecords; -use crate::core::{Config, ManifestDependency, PackageId}; +use crate::core::registry::index::{IndexRecord, IndexRecords}; +use crate::core::{Config, ManifestDependency, PackageId, SourceId}; +use crate::internal::fsx; + +// TODO(mkaput): Implement cache downloading. +// FIXME(mkaput): Avoid creating database if inner client does not trigger cache writes. +// FIXME(mkaput): We probably have to call db.compact() after all write txs we run in Scarb run. + +/// Multimap: `package name -> (version, index records)`. +const RECORDS: MultimapTableDefinition<'_, &str, (&str, &[u8])> = + MultimapTableDefinition::new("records"); + +/// Map: `package name -> index records cache key`. +/// +/// Cache key as returned by wrapped [`RegistryClient`]. +const RECORDS_CACHE_KEYS: TableDefinition<'_, &str, &str> = + TableDefinition::new("records_cache_keys"); +/// A caching layer on top of a [`RegistryClient`]. +/// +/// ## Database +/// +/// It uses [`redb`] as a local key-value database, where this object stores the following: +/// 1. Multimap table `records`: mapping from _package name_ to all index records that came from +/// the registry to date. +/// +/// On the disk, each record are stored as a pair of _package version_ and the record itself +/// serialized as minified JSON. This allows the cache to filter out records that do not match +/// requested dependency specification before deserializing the record itself, saving some +/// execution time (exact numbers are unknown, but Cargo suffered from the same problem, and it +/// implemented identical measures). +/// 2. Table `records_cache_keys`: which maps _package name_ to the last known _cache key_ returned +/// from the [`RegistryClient::get_records`] method call. +/// +/// Database files are stored in the `$SCARB_GLOBAL_CACHE/registry/cache` directory. For each +/// `SourceId` a separate database file is maintained, named `{source_id.ident()}.v1.redb`. +/// In case a new database format is used, it should be saved in a `*.v2.redb` file and so on. +/// Old versions should be simply deleted, without using sophisticated migration logic (remember, +/// this is just a cache!) Also, if the database file appears to be corrupted, it is simply deleted +/// and recreated from scratch. +/// +/// ## Workflow +/// +/// Each wrapper method of this struct performs more or less the same flow of steps: +/// 1. Get existing cache key from the database if exists. +/// 2. Call actual [`RegistryClient`] method with found cache key (or `None`). +/// 3. If the method returned [`RegistryResource::NotFound`], then everything related to queried +/// resource is removed from the cache. +/// 4. Or, if the method returned [`RegistryResource::InCache`], then cached value is deserialized +/// and returned. +/// 5. Or, if the method returned [`RegistryResource::Download`], then new resource data is saved +/// in cache (replacing existing items) along with new cache key and returned to the caller. pub struct RegistryClientCache<'c> { + source_id: SourceId, client: Box, - _config: &'c Config, + db_cell: OnceCell, + config: &'c Config, } impl<'c> RegistryClientCache<'c> { - pub fn new(client: Box, config: &'c Config) -> Result { + pub fn new( + source_id: SourceId, + client: Box, + config: &'c Config, + ) -> Result { Ok(Self { + source_id, client, - _config: config, + db_cell: OnceCell::new(), + config, }) } @@ -29,18 +93,31 @@ impl<'c> RegistryClientCache<'c> { &self, dependency: &ManifestDependency, ) -> Result { - match self.client.get_records(dependency.name.clone()).await? { + let package_name = dependency.name.as_str(); + let db = self.db().await?; + + let cache_key = db.get_records_cache_key(package_name).await?; + + match self + .client + .get_records(dependency.name.clone(), cache_key.as_deref()) + .await? + { RegistryResource::NotFound => { - trace!("package not found in registry, pruning cache"); + db.prune_records(package_name).await?; bail!("package not found in registry: {dependency}") } - RegistryResource::InCache => { - trace!("getting records from cache"); - todo!() - } - RegistryResource::Download { resource, .. } => { - trace!("got new records, invalidating cache"); - Ok(resource) + + RegistryResource::InCache => db.get_records(dependency).await, + + RegistryResource::Download { + resource: records, + cache_key, + } => { + if let Some(cache_key) = cache_key { + db.upsert_records(package_name, cache_key, &records).await?; + } + Ok(records) } } } @@ -63,4 +140,178 @@ impl<'c> RegistryClientCache<'c> { } } } + + async fn db(&self) -> Result<&CacheDatabase> { + self.db_cell + .get_or_try_init(|| async { + let ui = self.config.ui(); + let fs = self.config.dirs().registry_dir().into_child("cache"); + let db_path = fs + .path_existent()? + .join(format!("{}.v1.redb", self.source_id.ident())); + + CacheDatabase::create(&db_path, ui) + }) + .await + } +} + +struct CacheDatabase { + db: redb::Database, + ui: Ui, +} + +impl CacheDatabase { + #[tracing::instrument(level = "trace", skip(ui))] + fn create(path: &Utf8Path, ui: Ui) -> Result { + fn create(path: &Utf8Path, ui: &Ui) -> Result { + redb::Database::create(path) + .context("failed to open local registry cache, trying to recreate it") + .or_else(|error| { + ui.warn_anyhow(&error); + fsx::remove_file(path).context("failed to remove local registry cache")?; + redb::Database::create(path) + .with_context(|| db_fatal("failed to open local registry cache")) + }) + } + + fn init_tables(db: &redb::Database) -> Result<()> { + let tx = db.begin_write()?; + { + tx.open_multimap_table(RECORDS)?; + tx.open_table(RECORDS_CACHE_KEYS)?; + } + tx.commit()?; + Ok(()) + } + + trace!("opening local registry cache: {path}"); + let db = block_in_place(|| -> Result<_> { + let db = create(path, &ui)?; + trace!("database opened/created successfully"); + init_tables(&db).context("failed to initialize local registry cache database")?; + trace!("created all tables in local registry cache database"); + Ok(db) + })?; + + Ok(Self { db, ui }) + } + + #[tracing::instrument(level = "trace", skip_all)] + async fn get_records_cache_key(&self, package_name: &str) -> Result> { + trace!("looking up cache key"); + block_in_place(|| -> Result<_> { + let tx = self.db.begin_read()?; + let table = tx.open_table(RECORDS_CACHE_KEYS)?; + let cache_key = table.get(package_name)?.map(|g| g.value().to_owned()); + trace!(?cache_key); + Ok(cache_key) + }) + .with_context(|| db_error("failed to lookup cache key in registry cache")) + .or_else(|err| -> Result<_> { + self.ui.warn_anyhow(&err); + Ok(None) + }) + } + + #[tracing::instrument(level = "trace", skip_all)] + async fn get_records(&self, dependency: &ManifestDependency) -> Result { + trace!("getting records from cache"); + block_in_place(|| -> Result<_> { + let tx = self.db.begin_read()?; + let table = tx.open_multimap_table(RECORDS)?; + + let mut records = IndexRecords::new(); + for g in table.get(dependency.name.as_str())? { + let g = g?; + let (raw_version, raw_record) = g.value(); + + let version = Version::parse(raw_version) + .with_context(|| db_fatal("failed to parse version from cache"))?; + if !dependency.matches_name_and_version(&dependency.name, &version) { + continue; + } + + let record = serde_json::from_slice::(raw_record) + .with_context(|| db_fatal("failed to deserialize index record from cache"))?; + + records.push(record); + } + trace!("records read successfully"); + Ok(records) + }) + } + + #[tracing::instrument(level = "trace", skip_all)] + async fn prune_records(&self, package_name: &str) -> Result<()> { + trace!("package not found in registry, pruning cache"); + block_in_place(|| -> Result<_> { + let tx = self.db.begin_write()?; + { + let mut table = tx.open_multimap_table(RECORDS)?; + table.remove_all(package_name)?; + } + tx.commit()?; + trace!("cache pruned successfully"); + Ok(()) + }) + .with_context(|| db_error("failed to purge cache from now non-existent entries")) + .or_else(|err| -> Result<_> { + self.ui.warn_anyhow(&err); + Ok(()) + })?; + Ok(()) + } + + #[tracing::instrument(level = "trace", skip_all)] + async fn upsert_records( + &self, + package_name: &str, + cache_key: String, + records: &IndexRecords, + ) -> Result<()> { + trace!("got new records, invalidating cache"); + trace!(?cache_key); + block_in_place(|| -> Result<_> { + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(RECORDS_CACHE_KEYS)?; + table.insert(package_name, cache_key.as_str())?; + } + { + let mut table = tx.open_multimap_table(RECORDS)?; + table.remove_all(package_name)?; + + for record in records { + let raw_version = record.version.to_string(); + let raw_record = serde_json::to_vec(&record)?; + table.insert(package_name, (raw_version.as_str(), raw_record.as_slice()))?; + } + } + tx.commit()?; + trace!("cache updated successfully"); + Ok(()) + }) + .with_context(|| db_error("failed to cache registry index records")) + .or_else(|err| -> Result<_> { + self.ui.warn_anyhow(&err); + Ok(()) + }) + } +} + +fn db_error(message: &str) -> String { + format!( + "{message}\n\ + note: perhaps cache is corrupted\n\ + help: try restarting scarb to recreate it" + ) +} + +fn db_fatal(message: &str) -> String { + format!( + "{message}\n\ + note: cache is corrupted and is in unrecoverable state\n\ + help: run the following to wipe entire cache: scarb cache clean" + ) } diff --git a/scarb/src/core/registry/client/http.rs b/scarb/src/core/registry/client/http.rs index 821d95795..f1f5fb850 100644 --- a/scarb/src/core/registry/client/http.rs +++ b/scarb/src/core/registry/client/http.rs @@ -78,7 +78,11 @@ impl<'c> HttpRegistryClient<'c> { #[async_trait] impl<'c> RegistryClient for HttpRegistryClient<'c> { - async fn get_records(&self, package: PackageName) -> Result> { + async fn get_records( + &self, + package: PackageName, + _cache_key: Option<&str>, + ) -> Result> { let index_config = self.index_config().await?; let records_url = index_config.index.expand(package.into())?; diff --git a/scarb/src/core/registry/client/local.rs b/scarb/src/core/registry/client/local.rs index 979d3d63e..5241e370b 100644 --- a/scarb/src/core/registry/client/local.rs +++ b/scarb/src/core/registry/client/local.rs @@ -94,9 +94,15 @@ impl LocalRegistryClient { #[async_trait] impl RegistryClient for LocalRegistryClient { #[tracing::instrument(level = "trace", skip_all)] - async fn get_records(&self, package: PackageName) -> Result> { + async fn get_records( + &self, + package: PackageName, + cache_key: Option<&str>, + ) -> Result> { trace!(?package); + assert!(cache_key.is_none()); + let records_path = self.records_path(&package); spawn_blocking(move || { diff --git a/scarb/src/core/registry/client/mod.rs b/scarb/src/core/registry/client/mod.rs index 5d2ceb15e..db5163692 100644 --- a/scarb/src/core/registry/client/mod.rs +++ b/scarb/src/core/registry/client/mod.rs @@ -38,7 +38,11 @@ pub trait RegistryClient: Send + Sync { /// /// This method is not expected to internally cache the result, but it is not prohibited either. /// Scarb applies specialized caching layers on top of clients. - async fn get_records(&self, package: PackageName) -> Result>; + async fn get_records( + &self, + package: PackageName, + cache_key: Option<&str>, + ) -> Result>; /// Download the package `.tar.zst` file. /// diff --git a/scarb/src/internal/fsx.rs b/scarb/src/internal/fsx.rs index df3ed405c..3530c8c62 100644 --- a/scarb/src/internal/fsx.rs +++ b/scarb/src/internal/fsx.rs @@ -35,6 +35,16 @@ pub fn create_dir_all(p: impl AsRef) -> Result<()> { } } +/// Equivalent to [`fs::remove_file`] with better error messages. +pub fn remove_file(p: impl AsRef) -> Result<()> { + return inner(p.as_ref()); + + fn inner(p: &Path) -> Result<()> { + fs::remove_file(p).with_context(|| format!("failed to remove file `{}`", p.display()))?; + Ok(()) + } +} + /// Equivalent to [`fs::remove_dir_all`] with better error messages. pub fn remove_dir_all(p: impl AsRef) -> Result<()> { return inner(p.as_ref()); diff --git a/scarb/src/sources/registry.rs b/scarb/src/sources/registry.rs index 274907587..ea492ea50 100644 --- a/scarb/src/sources/registry.rs +++ b/scarb/src/sources/registry.rs @@ -31,7 +31,7 @@ pub struct RegistrySource<'c> { impl<'c> RegistrySource<'c> { pub fn new(source_id: SourceId, config: &'c Config) -> Result { let client = Self::create_client(source_id, config)?; - let client = RegistryClientCache::new(client, config)?; + let client = RegistryClientCache::new(source_id, client, config)?; let package_sources = PackageSourceStore::new(source_id, config); diff --git a/utils/scarb-ui/src/lib.rs b/utils/scarb-ui/src/lib.rs index a7f8e0279..78a2fb8b5 100644 --- a/utils/scarb-ui/src/lib.rs +++ b/utils/scarb-ui/src/lib.rs @@ -122,6 +122,14 @@ impl Ui { self.error(format!("{error:?}").trim()) } + /// Nicely format an [`anyhow::Error`] for display to the user, and print it with [`Ui::warn`]. + pub fn warn_anyhow(&self, error: &anyhow::Error) { + // NOTE: Some errors, particularly ones from `toml_edit` like to add trailing newlines. + // This isn't a big problem for users, but it's causing issues in tests, where trailing + // whitespace collides with `indoc`. + self.warn(format!("{error:?}").trim()) + } + fn do_print(&self, message: T) { match self.output_format { OutputFormat::Text => message.print_text(),