Skip to content

Commit

Permalink
arroyo-storage: make the StorageProvider's get() and put() calls rela…
Browse files Browse the repository at this point in the history
…tive to the full path. (#410)
  • Loading branch information
Jackson Newhouse authored Nov 15, 2023
1 parent a47e63e commit a9c7c1f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 9 deletions.
68 changes: 60 additions & 8 deletions arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct StorageProvider {
config: BackendConfig,
object_store: Arc<dyn ObjectStore>,
canonical_url: String,
// A URL that object_store can parse.
// May require storage_options to properly instantiate
object_store_base_url: String,
storage_options: HashMap<String, String>,
}

Expand Down Expand Up @@ -258,6 +261,14 @@ impl BackendConfig {
key,
}))
}

fn key(&self) -> Option<&String> {
match self {
BackendConfig::S3(s3) => s3.key.as_ref(),
BackendConfig::GCS(gcs) => gcs.key.as_ref(),
BackendConfig::Local(local) => local.key.as_ref(),
}
}
}

fn last<I: Sized, const COUNT: usize>(opts: [Option<I>; COUNT]) -> Option<I> {
Expand Down Expand Up @@ -310,7 +321,7 @@ impl StorageProvider {
}
.ok_or_else(|| StorageError::NoKeyInUrl)?;

let result = provider.get(path).await;
let result = provider.get("").await;

result
}
Expand Down Expand Up @@ -372,13 +383,26 @@ impl StorageProvider {
"true".to_string(),
);
}

let canonical_url = format!("s3://{}", config.bucket);

let mut canonical_url = match (&config.region, &config.endpoint) {
(_, Some(endpoint)) => {
format!("s3::{}/{}", endpoint, config.bucket)
}
(Some(region), _) => {
format!("https://s3.{}.amazonaws.com/{}", region, config.bucket)
}
_ => {
format!("https://s3.amazonaws.com/{}", config.bucket)
}
};
if let Some(key) = &config.key {
canonical_url = format!("{}/{}", canonical_url, key);
}
let object_store_base_url = format!("s3://{}", config.bucket);
Ok(Self {
config: BackendConfig::S3(config),
object_store: Arc::new(builder.build().map_err(|e| Into::<StorageError>::into(e))?),
canonical_url,
object_store_base_url,
storage_options: s3_options
.into_iter()
.map(|(k, v)| (k.as_ref().to_string(), v))
Expand All @@ -391,11 +415,17 @@ impl StorageProvider {
.with_bucket_name(&config.bucket)
.build()?;

let canonical_url = format!("https://{}.storage.googleapis.com", config.bucket);
let mut canonical_url = format!("https://{}.storage.googleapis.com", config.bucket);
if let Some(key) = &config.key {
canonical_url = format!("{}/{}", canonical_url, key);
}

let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket);

Ok(Self {
config: BackendConfig::GCS(config),
object_store: Arc::new(gcs),
object_store_base_url,
canonical_url,
storage_options: HashMap::new(),
})
Expand All @@ -415,10 +445,12 @@ impl StorageProvider {
);

let canonical_url = format!("file://{}", config.path);
let object_store_base_url = canonical_url.clone();
Ok(Self {
config: BackendConfig::Local(config),
object_store,
canonical_url,
object_store_base_url,
storage_options: HashMap::new(),
})
}
Expand All @@ -427,7 +459,7 @@ impl StorageProvider {
let path: String = path.into();
let bytes = self
.object_store
.get(&path.into())
.get(&self.qualify_path(&path.into()))
.await
.map_err(|e| Into::<StorageError>::into(e))?
.bytes()
Expand All @@ -441,12 +473,24 @@ impl StorageProvider {
path: P,
bytes: Vec<u8>,
) -> Result<String, StorageError> {
let path: Path = path.into().into();
self.object_store.put(&path, bytes.into()).await?;
let path = path.into().into();
self.object_store
.put(&self.qualify_path(&path), bytes.into())
.await?;

Ok(format!("{}/{}", self.canonical_url, path))
}

fn qualify_path(&self, path: &Path) -> Path {
match self.config.key() {
Some(prefix) => {
let prefix_path: Path = prefix.to_string().into();
prefix_path.parts().chain(path.parts()).collect()
}
None => path.clone(),
}
}

pub async fn delete_if_present<P: Into<String>>(&self, path: P) -> Result<(), StorageError> {
let path = path.into();
return match self.object_store.delete(&path.into()).await {
Expand Down Expand Up @@ -501,6 +545,14 @@ impl StorageProvider {
pub fn canonical_url(&self) -> &str {
&self.canonical_url
}

// Returns a url that will, combined with storage_options, parse to
// the same ObjectStore as self.object_store.
// Needed for systems that build their own ObjectStore, such as delta-rs
pub fn object_store_base_url(&self) -> &str {
&self.object_store_base_url
}

pub fn storage_options(&self) -> &HashMap<String, String> {
&self.storage_options
}
Expand Down
2 changes: 1 addition & 1 deletion arroyo-worker/src/connectors/filesystem/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>)
fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String {
format!(
"{}/{}",
storage_provider.canonical_url(),
storage_provider.object_store_base_url(),
relative_table_path
)
}

0 comments on commit a9c7c1f

Please sign in to comment.