Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): implement a ListingCatalog #3300

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
pub struct ObjectStore {
// Inner object store
pub inner: Arc<dyn OSObjectStore>,
scheme: String,
pub scheme: String,
block_size: usize,
/// Whether to use constant size upload parts for multipart uploads. This
/// is only necessary for Cloudflare R2.
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub(crate) mod catalog_trait;

pub use catalog_trait::Catalog;
276 changes: 276 additions & 0 deletions rust/lance/src/catalog/catalog_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_array::RecordBatchIterator;
use arrow_schema::Schema;
use async_trait::async_trait;
use lance_io::object_store::{ObjectStore, ObjectStoreParams};
use lance_table::format::{Index, Manifest};
use lance_table::io::commit::{CommitError, CommitHandler, ManifestNamingScheme, ManifestWriter};
use object_store::path::Path;
use std::fmt::Debug;
use std::sync::Arc;
use url::Url;

use crate::dataset::builder::DatasetBuilder;
use crate::dataset::WriteParams;
use crate::{Dataset, Result};

/// Contains all the information that Lance needs to access a table
pub struct TableReference {
/// Base URI of the table
pub uri: String,
/// Object store wrapper used to access the table
pub object_store: ObjectStore,
/// Commit handler used to handle new commits to the table
pub commit_handler: Arc<dyn CommitHandler>,
/// Parameters used to read / write to the object store
pub store_params: Option<ObjectStoreParams>,
}

/// Trait to be implemented by any catalog that Lance can use
#[async_trait::async_trait]
pub trait Catalog: std::fmt::Debug + Send + Sync {
/// Create a new table in the catalog
///
/// Returns a table reference that can be used to read/write to the table
async fn create_table(&self, name: &str, schema: Arc<Schema>) -> Result<TableReference>;
/// Drop a table from the catalog
async fn drop_table(&self, name: &str) -> Result<()>;
/// Get a reference to a table in the catalog
///
/// Returns a table reference that can be used to read/write to the table
async fn get_table(&self, name: &str) -> Result<TableReference>;
/// List all tables in the catalog
///
/// The `start_after` parameter is an optional table name that, if provided, will
/// start the listing after the named table. If the named table is not found, the
/// listing will start after the table that would be named if it existed.
///
/// The `limit` parameter is an optional limit on the number of tables returned.
async fn list_tables(
&self,
start_after: Option<String>,
limit: Option<u32>,
) -> Result<Vec<String>>;
}

/// A simplified catalog that puts all tables in a single base directory
///
/// The object store's CAS primitives are used for commit handling.
///
/// This object store is simplistic but has zero dependencies (beyond an object store
/// of some kind)
#[derive(Debug)]
pub struct ListingCatalog {
base_path: Path,
object_store: ObjectStore,
commit_handler: Arc<dyn CommitHandler>,
}

const LANCE_EXTENSION: &str = "lance";

fn format_table_url_or_path(is_url: bool, scheme: &str, base_path: &str, name: &str) -> String {
if is_url {
format!("{}:///{}/{}.{}", scheme, base_path, name, LANCE_EXTENSION)
} else {
format!("{}/{}.{}", base_path, name, LANCE_EXTENSION)
}
}

impl ListingCatalog {
pub fn new(base_path: Path, object_store: ObjectStore) -> Self {

Check warning on line 82 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

associated function `new` is never used

Check warning on line 82 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

associated function `new` is never used

Check warning on line 82 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

associated function `new` is never used

Check warning on line 82 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

associated function `new` is never used

Check warning on line 82 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

associated function `new` is never used
Self {
base_path,
object_store: object_store.clone(),
commit_handler: Arc::new(ListingCommitHandler { object_store }),
}
}
}

#[async_trait::async_trait]
impl Catalog for ListingCatalog {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace I implemented a simple version for discussion. Please take a look when you have time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace Any feedback here? Please ignore the catalog trait's location currently. I define it here to avoid the prototype implementation having a big span between two packages.

async fn create_table(&self, name: &str, schema: Arc<Schema>) -> Result<TableReference> {
let table_url = format_table_url_or_path(
true,
&self.object_store.scheme,
self.base_path.as_ref(),
name,
);

let ds = Dataset::write(
RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
&table_url,
Some(WriteParams {
commit_handler: Option::from(self.commit_handler.clone()),
store_params: Some(ObjectStoreParams {
object_store: Some((
self.object_store.clone().inner,
Url::parse(&table_url).unwrap(),
)),
..Default::default()
}),
..Default::default()
}),
)
.await?;

Ok(TableReference {
uri: ds.uri().to_string(),
object_store: self.object_store.clone(),
commit_handler: self.commit_handler.clone(),
store_params: Some(ObjectStoreParams {
object_store: Some((
self.object_store.clone().inner,
Url::parse(&table_url).unwrap(),
)),
..Default::default()
}),
})
}

async fn drop_table(&self, name: &str) -> Result<()> {
let table_path = format_table_url_or_path(
false,
&self.object_store.scheme,
self.base_path.as_ref(),
name,
);

self.object_store.remove_dir_all(table_path).await?;

Ok(())
}

async fn get_table(&self, name: &str) -> Result<TableReference> {
let table_url = format_table_url_or_path(
true,
&self.object_store.scheme,
self.base_path.as_ref(),
name,
);

let ds = DatasetBuilder::from_uri(&table_url)
.with_commit_handler(self.commit_handler.clone())
.with_object_store(
self.object_store.clone().inner,
Url::parse(&table_url).unwrap(),
self.commit_handler.clone(),
)
.load()
.await?;

Ok(TableReference {
uri: ds.uri().to_string(),
object_store: self.object_store.clone(),
commit_handler: self.commit_handler.clone(),
store_params: Some(ObjectStoreParams {
object_store: Some((
self.object_store.clone().inner,
Url::parse(&table_url).unwrap(),
)),
..Default::default()
}),
})
}

async fn list_tables(
&self,
start_after: Option<String>,
limit: Option<u32>,
) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(std::path::Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
if let Some(start_after) = start_after {
let index = f
.iter()
.position(|name| name.as_str() > start_after.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
if let Some(limit) = limit {
f.truncate(limit as usize);
}
Ok(f)
}
}

pub struct ListingCommitHandler {
object_store: ObjectStore,

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `object_store` is never read

Check warning on line 213 in rust/lance/src/catalog/catalog_trait.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `object_store` is never read
}

impl Debug for ListingCommitHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListingCommitHandler").finish()
}
}

#[async_trait]
impl CommitHandler for ListingCommitHandler {
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<Path, CommitError> {
let version_path = naming_scheme.manifest_path(base_path, manifest.version);
manifest_writer(object_store, manifest, indices, &version_path).await?;

Ok(version_path)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};

#[tokio::test]
async fn test_listing_catalog() {
let object_store = ObjectStore::memory();
let base_path = Path::parse("/catalog").unwrap();

// Init a listing catalog
let catalog = ListingCatalog::new(base_path.clone(), object_store.clone());

// Create a table
let field_a = ArrowField::new("a", DataType::Int32, true);
let schema = Arc::new(ArrowSchema::new(vec![field_a.clone()]));
let table_ref = catalog
.create_table("test_table", schema.clone())
.await
.unwrap();

// Verify the table was created
let table_ref_fetched = catalog.get_table("test_table").await.unwrap();
assert_eq!(table_ref.uri, table_ref_fetched.uri);

// List tables
let tables = catalog.list_tables(None, None).await.unwrap();
assert_eq!(tables, vec!["test_table"]);

// Drop the table
catalog.drop_table("test_table").await.unwrap();

// Verify the table was dropped
let tables = catalog.list_tables(None, None).await.unwrap();
assert!(tables.is_empty());
}
}
1 change: 1 addition & 0 deletions rust/lance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub use lance_core::{datatypes, error};
pub use lance_core::{Error, Result};

pub mod arrow;
pub mod catalog;
pub mod datafusion;
pub mod dataset;
pub mod index;
Expand Down
Loading