Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arroyo-storage: make the StorageProvider's get() and put() calls relative to the full path. #410

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct StorageProvider {
config: BackendConfig,
object_store: Arc<dyn ObjectStore>,
canonical_url: String,
// A URL that object_store can parse.
// May require storage_options to properly instantiate
object_store_base_url: String,
storage_options: HashMap<String, String>,
}

Expand Down Expand Up @@ -258,6 +261,14 @@ impl BackendConfig {
key,
}))
}

fn key(&self) -> Option<&String> {
match self {
BackendConfig::S3(s3) => s3.key.as_ref(),
BackendConfig::GCS(gcs) => gcs.key.as_ref(),
BackendConfig::Local(local) => local.key.as_ref(),
}
}
}

fn last<I: Sized, const COUNT: usize>(opts: [Option<I>; COUNT]) -> Option<I> {
Expand Down Expand Up @@ -310,7 +321,7 @@ impl StorageProvider {
}
.ok_or_else(|| StorageError::NoKeyInUrl)?;

let result = provider.get(path).await;
let result = provider.get("").await;

result
}
Expand Down Expand Up @@ -372,13 +383,26 @@ impl StorageProvider {
"true".to_string(),
);
}

let canonical_url = format!("s3://{}", config.bucket);

let mut canonical_url = match (&config.region, &config.endpoint) {
(_, Some(endpoint)) => {
format!("s3::{}/{}", endpoint, config.bucket)
}
(Some(region), _) => {
format!("https://s3.{}.amazonaws.com/{}", region, config.bucket)
}
_ => {
format!("https://s3.amazonaws.com/{}", config.bucket)
}
};
if let Some(key) = &config.key {
canonical_url = format!("{}/{}", canonical_url, key);
}
let object_store_base_url = format!("s3://{}", config.bucket);
Ok(Self {
config: BackendConfig::S3(config),
object_store: Arc::new(builder.build().map_err(|e| Into::<StorageError>::into(e))?),
canonical_url,
object_store_base_url,
storage_options: s3_options
.into_iter()
.map(|(k, v)| (k.as_ref().to_string(), v))
Expand All @@ -391,11 +415,17 @@ impl StorageProvider {
.with_bucket_name(&config.bucket)
.build()?;

let canonical_url = format!("https://{}.storage.googleapis.com", config.bucket);
let mut canonical_url = format!("https://{}.storage.googleapis.com", config.bucket);
if let Some(key) = &config.key {
canonical_url = format!("{}/{}", canonical_url, key);
}

let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket);

Ok(Self {
config: BackendConfig::GCS(config),
object_store: Arc::new(gcs),
object_store_base_url,
canonical_url,
storage_options: HashMap::new(),
})
Expand All @@ -415,10 +445,12 @@ impl StorageProvider {
);

let canonical_url = format!("file://{}", config.path);
let object_store_base_url = canonical_url.clone();
Ok(Self {
config: BackendConfig::Local(config),
object_store,
canonical_url,
object_store_base_url,
storage_options: HashMap::new(),
})
}
Expand All @@ -427,7 +459,7 @@ impl StorageProvider {
let path: String = path.into();
let bytes = self
.object_store
.get(&path.into())
.get(&self.qualify_path(&path.into()))
.await
.map_err(|e| Into::<StorageError>::into(e))?
.bytes()
Expand All @@ -441,12 +473,24 @@ impl StorageProvider {
path: P,
bytes: Vec<u8>,
) -> Result<String, StorageError> {
let path: Path = path.into().into();
self.object_store.put(&path, bytes.into()).await?;
let path = path.into().into();
self.object_store
.put(&self.qualify_path(&path), bytes.into())
.await?;

Ok(format!("{}/{}", self.canonical_url, path))
}

fn qualify_path(&self, path: &Path) -> Path {
match self.config.key() {
Some(prefix) => {
let prefix_path: Path = prefix.to_string().into();
prefix_path.parts().chain(path.parts()).collect()
}
None => path.clone(),
}
}

pub async fn delete_if_present<P: Into<String>>(&self, path: P) -> Result<(), StorageError> {
let path = path.into();
return match self.object_store.delete(&path.into()).await {
Expand Down Expand Up @@ -501,6 +545,14 @@ impl StorageProvider {
pub fn canonical_url(&self) -> &str {
&self.canonical_url
}

// Returns a url that will, combined with storage_options, parse to
// the same ObjectStore as self.object_store.
// Needed for systems that build their own ObjectStore, such as delta-rs
pub fn object_store_base_url(&self) -> &str {
&self.object_store_base_url
}

pub fn storage_options(&self) -> &HashMap<String, String> {
&self.storage_options
}
Expand Down
2 changes: 1 addition & 1 deletion arroyo-worker/src/connectors/filesystem/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>)
fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String {
format!(
"{}/{}",
storage_provider.canonical_url(),
storage_provider.object_store_base_url(),
relative_table_path
)
}