Skip to content

Commit

Permalink
Database row fetch (AppFlowy-IO#345)
Browse files Browse the repository at this point in the history
* chore: chunk rows

* chore: log
  • Loading branch information
appflowy authored Dec 6, 2024
1 parent c4e23b6 commit 35181c7
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 16 deletions.
17 changes: 11 additions & 6 deletions collab-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,15 @@ impl Database {
pub fn init_database_rows<'a, T: Into<RowId> + Send + 'a>(
&'a self,
row_ids: Vec<T>,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Arc<RwLock<DatabaseRow>>, DatabaseError>> + 'a {
let row_ids_chunk_stream = stream::iter(
row_ids
.into_iter()
.map(Into::into)
.collect::<Vec<RowId>>()
.chunks(50)
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect::<Vec<Vec<RowId>>>(),
);
Expand All @@ -552,6 +553,7 @@ impl Database {
}
}

trace!("Initializing chunked database rows: {}", chunk.len());
self.body.block.init_database_rows(chunk).await
}
})
Expand Down Expand Up @@ -595,11 +597,12 @@ impl Database {
pub async fn get_rows_for_view(
&self,
view_id: &str,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + '_ {
let row_orders = self.get_row_orders_for_view(view_id);
self
.get_rows_from_row_orders(&row_orders, cancel_token)
.get_rows_from_row_orders(&row_orders, chunk_size, cancel_token)
.await
}

Expand All @@ -623,10 +626,11 @@ impl Database {
pub async fn get_rows_from_row_orders<'a>(
&'a self,
row_orders: &[RowOrder],
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + 'a {
let row_ids = row_orders.iter().map(|order| order.id.clone()).collect();
let rows_stream = self.init_database_rows(row_ids, cancel_token);
let rows_stream = self.init_database_rows(row_ids, chunk_size, cancel_token);
let database_id = self.get_database_id();
rows_stream.then(move |result| {
let database_id = database_id.clone();
Expand Down Expand Up @@ -1341,7 +1345,7 @@ impl Database {
let inline_view_id = self.body.get_inline_view_id(&txn);
let views = self.get_all_views();
let fields = self.body.get_fields_in_view(&txn, &inline_view_id, None);
let rows_stream = self.get_all_rows(None).await;
let rows_stream = self.get_all_rows(20, None).await;
let rows: Vec<Row> = rows_stream
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down Expand Up @@ -1372,6 +1376,7 @@ impl Database {

pub async fn get_all_rows(
&self,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + '_ {
let row_orders = {
Expand All @@ -1381,12 +1386,12 @@ impl Database {
};

self
.get_rows_from_row_orders(&row_orders, cancel_token)
.get_rows_from_row_orders(&row_orders, chunk_size, cancel_token)
.await
}

pub async fn collect_all_rows(&self) -> Vec<Result<Row, DatabaseError>> {
let rows_stream = self.get_all_rows(None).await;
let rows_stream = self.get_all_rows(20, None).await;
rows_stream.collect::<Vec<_>>().await
}

Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/database_test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct DatabaseTest {

impl DatabaseTest {
pub async fn get_rows_for_view(&self, view_id: &str) -> Vec<Row> {
let rows_stream = self.database.get_rows_for_view(view_id, None).await;
let rows_stream = self.database.get_rows_for_view(view_id, 10, None).await;
let rows: Vec<Row> = rows_stream
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/database_test/view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn create_initial_database_test() {
let database_test = create_database(1, &database_id);

let all_rows: Vec<Row> = database_test
.get_all_rows(None)
.get_all_rows(20, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn create_template_test() {

// Assert num of rows
let rows: Vec<Row> = database
.get_all_rows(None)
.get_all_rows(10, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/template_test/import_csv_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn import_csv_test() {

let fields = database.get_fields_in_view(&database.get_inline_view_id(), None);
let rows: Vec<Row> = database
.get_all_rows(None)
.get_all_rows(20, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
12 changes: 6 additions & 6 deletions collab-database/tests/user_test/database_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn duplicate_database_inline_view_test() {
.unwrap();

assert_eq!(
db.get_rows_for_view(&duplicated_view_id, None)
db.get_rows_for_view(&duplicated_view_id, 20, None)
.await
.count()
.await,
Expand All @@ -140,7 +140,7 @@ async fn duplicate_database_inline_view_test() {
database
.read()
.await
.get_rows_for_view("v1", None)
.get_rows_for_view("v1", 10, None)
.await
.count()
.await,
Expand Down Expand Up @@ -185,13 +185,13 @@ async fn duplicate_database_view_test() {

// Duplicated database should have the same rows as the original database
assert_eq!(
db.get_rows_for_view(&duplicated_view.id, None)
db.get_rows_for_view(&duplicated_view.id, 10, None)
.await
.count()
.await,
1
);
assert_eq!(db.get_rows_for_view("v1", None).await.count().await, 1);
assert_eq!(db.get_rows_for_view("v1", 10, None).await.count().await, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -283,14 +283,14 @@ async fn duplicate_database_data_test() {

// compare rows
let original_rows: Vec<Row> = original
.get_rows_for_view("v1", None)
.get_rows_for_view("v1", 10, None)
.await
.filter_map(|result| async { result.ok() })
.collect()
.await;

let duplicate_rows: Vec<Row> = duplicate
.get_rows_for_view(duplicated_view_id, None)
.get_rows_for_view(duplicated_view_id, 10, None)
.await
.filter_map(|result| async { result.ok() })
.collect()
Expand Down

0 comments on commit 35181c7

Please sign in to comment.