Skip to content

Commit

Permalink
fix: create row (AppFlowy-IO#177)
Browse files Browse the repository at this point in the history
* chore: fix create row

* chore: fix create row

* chore: fix create row

* chore: update logs

* chore: update logs
  • Loading branch information
appflowy authored Mar 14, 2024
1 parent 8f6dba4 commit 8d017eb
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions collab-database/src/blocks/task_controller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Weak};
use std::time::Duration;

use anyhow::anyhow;
use async_trait::async_trait;
use collab::core::collab::{CollabDocState, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use collab_plugins::local_storage::kv::doc::CollabKVAction;
use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError};
use collab_plugins::local_storage::kv::KVTransactionDB;
use collab_plugins::CollabKVDB;
use tokio::sync::watch;

Expand Down Expand Up @@ -137,7 +135,7 @@ impl TaskHandler<BlockTask> for BlockTaskHandler {
{
let mut row_details = vec![];
for (oid, updates) in updates_by_oid {
if let Some(row_detail) = save_row(&collab_db, updates, *uid, oid) {
if let Some(row_detail) = save_row(&collab_db, updates, *uid, &RowId::from(oid)) {
row_details.push(row_detail);
}
}
Expand All @@ -155,11 +153,11 @@ impl TaskHandler<BlockTask> for BlockTaskHandler {
}
}

fn save_row<R: AsRef<str>>(
fn save_row(
collab_db: &Arc<CollabKVDB>,
collab_doc_state: CollabDocState,
uid: i64,
row_id: R,
row_id: &RowId,
) -> Option<RowDetail> {
if collab_doc_state.is_empty() {
tracing::error!("Unexpected empty row: {} collab update", row_id.as_ref());
Expand All @@ -173,41 +171,46 @@ fn save_row<R: AsRef<str>>(
vec![],
) {
Ok(collab) => {
let collab_guard =
collab
.try_lock_for(Duration::from_secs(1))
.ok_or(PersistenceError::Internal(anyhow!(
"Timeout while trying to acquire lock for saving database row"
)))?;

let txn = collab_guard.try_transaction().map_err(|err| {
PersistenceError::Internal(anyhow!(
"Failed to get transaction for saving database row: {:?}",
err
))
})?;
let collab_lock_guard = collab.lock();
let encode_collab = collab_lock_guard.encode_collab_v1();
let object_id = row_id.as_ref();
if let Err(e) = write_txn.create_new_doc(uid, object_id, &txn) {
tracing::error!("Failed to save the database row collab: {:?}", e);
if let Err(e) = write_txn.flush_doc(
uid,
object_id,
encode_collab.state_vector.to_vec(),
encode_collab.doc_state.to_vec(),
) {
tracing::error!(
"{} failed to save the database row collab: {:?}",
row_id.as_ref(),
e
);
}

let txn = collab_lock_guard.transact();
let row_detail = RowDetail::from_collab(&collab_lock_guard, &txn);
if row_detail.is_none() {
tracing::error!("{} doesn't have any row information in it", row_id.as_ref());
}
Ok(RowDetail::from_collab(&collab_guard, &txn))
Ok(row_detail)
},

Err(e) => {
tracing::error!("Failed to create database row collab: {:?}", e);
tracing::error!("Failed to deserialize doc state to row: {:?}", e);
Ok(None)
},
}
});

match row {
Ok(None) => {
tracing::error!("Unexpected empty row. The row should not be empty at this point.");
None
},
Ok(None) => None,
Ok(row) => row,
Err(e) => {
tracing::error!("Failed to save the database row collab: {:?}", e);
tracing::error!(
"{} failed to save the database row collab: {:?}",
row_id.as_ref(),
e
);
None
},
}
Expand Down

0 comments on commit 8d017eb

Please sign in to comment.