From a9c7c1ffb089156a060a99947edaa8c35ad81a08 Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Tue, 14 Nov 2023 16:51:44 -0800 Subject: [PATCH] arroyo-storage: make the StorageProvider's get() and put() calls relative to the full path. (#410) --- arroyo-storage/src/lib.rs | 68 ++++++++++++++++--- .../src/connectors/filesystem/delta.rs | 2 +- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/arroyo-storage/src/lib.rs b/arroyo-storage/src/lib.rs index adacbdb86..a2beb7ee7 100644 --- a/arroyo-storage/src/lib.rs +++ b/arroyo-storage/src/lib.rs @@ -24,6 +24,9 @@ pub struct StorageProvider { config: BackendConfig, object_store: Arc, canonical_url: String, + // A URL that object_store can parse. + // May require storage_options to properly instantiate + object_store_base_url: String, storage_options: HashMap, } @@ -258,6 +261,14 @@ impl BackendConfig { key, })) } + + fn key(&self) -> Option<&String> { + match self { + BackendConfig::S3(s3) => s3.key.as_ref(), + BackendConfig::GCS(gcs) => gcs.key.as_ref(), + BackendConfig::Local(local) => local.key.as_ref(), + } + } } fn last(opts: [Option; COUNT]) -> Option { @@ -310,7 +321,7 @@ impl StorageProvider { } .ok_or_else(|| StorageError::NoKeyInUrl)?; - let result = provider.get(path).await; + let result = provider.get("").await; result } @@ -372,13 +383,26 @@ impl StorageProvider { "true".to_string(), ); } - - let canonical_url = format!("s3://{}", config.bucket); - + let mut canonical_url = match (&config.region, &config.endpoint) { + (_, Some(endpoint)) => { + format!("s3::{}/{}", endpoint, config.bucket) + } + (Some(region), _) => { + format!("https://s3.{}.amazonaws.com/{}", region, config.bucket) + } + _ => { + format!("https://s3.amazonaws.com/{}", config.bucket) + } + }; + if let Some(key) = &config.key { + canonical_url = format!("{}/{}", canonical_url, key); + } + let object_store_base_url = format!("s3://{}", config.bucket); Ok(Self { config: BackendConfig::S3(config), object_store: Arc::new(builder.build().map_err(|e| Into::::into(e))?), canonical_url, + object_store_base_url, storage_options: s3_options .into_iter() .map(|(k, v)| (k.as_ref().to_string(), v)) @@ -391,11 +415,17 @@ impl StorageProvider { .with_bucket_name(&config.bucket) .build()?; - let canonical_url = format!("https://{}.storage.googleapis.com", config.bucket); + let mut canonical_url = format!("https://{}.storage.googleapis.com", config.bucket); + if let Some(key) = &config.key { + canonical_url = format!("{}/{}", canonical_url, key); + } + + let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket); Ok(Self { config: BackendConfig::GCS(config), object_store: Arc::new(gcs), + object_store_base_url, canonical_url, storage_options: HashMap::new(), }) @@ -415,10 +445,12 @@ impl StorageProvider { ); let canonical_url = format!("file://{}", config.path); + let object_store_base_url = canonical_url.clone(); Ok(Self { config: BackendConfig::Local(config), object_store, canonical_url, + object_store_base_url, storage_options: HashMap::new(), }) } @@ -427,7 +459,7 @@ impl StorageProvider { let path: String = path.into(); let bytes = self .object_store - .get(&path.into()) + .get(&self.qualify_path(&path.into())) .await .map_err(|e| Into::::into(e))? .bytes() @@ -441,12 +473,24 @@ impl StorageProvider { path: P, bytes: Vec, ) -> Result { - let path: Path = path.into().into(); - self.object_store.put(&path, bytes.into()).await?; + let path = path.into().into(); + self.object_store + .put(&self.qualify_path(&path), bytes.into()) + .await?; Ok(format!("{}/{}", self.canonical_url, path)) } + fn qualify_path(&self, path: &Path) -> Path { + match self.config.key() { + Some(prefix) => { + let prefix_path: Path = prefix.to_string().into(); + prefix_path.parts().chain(path.parts()).collect() + } + None => path.clone(), + } + } + pub async fn delete_if_present>(&self, path: P) -> Result<(), StorageError> { let path = path.into(); return match self.object_store.delete(&path.into()).await { @@ -501,6 +545,14 @@ impl StorageProvider { pub fn canonical_url(&self) -> &str { &self.canonical_url } + + // Returns a url that will, combined with storage_options, parse to + // the same ObjectStore as self.object_store. + // Needed for systems that build their own ObjectStore, such as delta-rs + pub fn object_store_base_url(&self) -> &str { + &self.object_store_base_url + } + pub fn storage_options(&self) -> &HashMap { &self.storage_options } diff --git a/arroyo-worker/src/connectors/filesystem/delta.rs b/arroyo-worker/src/connectors/filesystem/delta.rs index 16a29a3c9..63946841a 100644 --- a/arroyo-worker/src/connectors/filesystem/delta.rs +++ b/arroyo-worker/src/connectors/filesystem/delta.rs @@ -195,7 +195,7 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec) fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String { format!( "{}/{}", - storage_provider.canonical_url(), + storage_provider.object_store_base_url(), relative_table_path ) }