From 8d017eb521d5783f44adbd392dac0efd9058bd3f Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 14 Mar 2024 15:42:58 +0800 Subject: [PATCH] fix: create row (#177) * chore: fix create row * chore: fix create row * chore: fix create row * chore: update logs * chore: update logs --- collab-database/src/blocks/task_controller.rs | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/collab-database/src/blocks/task_controller.rs b/collab-database/src/blocks/task_controller.rs index 331e7245e..4ff2c17c8 100644 --- a/collab-database/src/blocks/task_controller.rs +++ b/collab-database/src/blocks/task_controller.rs @@ -1,15 +1,13 @@ use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::sync::{Arc, Weak}; -use std::time::Duration; -use anyhow::anyhow; use async_trait::async_trait; use collab::core::collab::{CollabDocState, MutexCollab}; use collab::core::origin::CollabOrigin; use collab_entity::CollabType; use collab_plugins::local_storage::kv::doc::CollabKVAction; -use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::CollabKVDB; use tokio::sync::watch; @@ -137,7 +135,7 @@ impl TaskHandler for BlockTaskHandler { { let mut row_details = vec![]; for (oid, updates) in updates_by_oid { - if let Some(row_detail) = save_row(&collab_db, updates, *uid, oid) { + if let Some(row_detail) = save_row(&collab_db, updates, *uid, &RowId::from(oid)) { row_details.push(row_detail); } } @@ -155,11 +153,11 @@ impl TaskHandler for BlockTaskHandler { } } -fn save_row>( +fn save_row( collab_db: &Arc, collab_doc_state: CollabDocState, uid: i64, - row_id: R, + row_id: &RowId, ) -> Option { if collab_doc_state.is_empty() { tracing::error!("Unexpected empty row: {} collab update", row_id.as_ref()); @@ -173,41 +171,46 @@ fn save_row>( vec![], ) { Ok(collab) => { - let collab_guard = - collab - .try_lock_for(Duration::from_secs(1)) - .ok_or(PersistenceError::Internal(anyhow!( - "Timeout while trying to acquire lock for saving database row" - )))?; - - let txn = collab_guard.try_transaction().map_err(|err| { - PersistenceError::Internal(anyhow!( - "Failed to get transaction for saving database row: {:?}", - err - )) - })?; + let collab_lock_guard = collab.lock(); + let encode_collab = collab_lock_guard.encode_collab_v1(); let object_id = row_id.as_ref(); - if let Err(e) = write_txn.create_new_doc(uid, object_id, &txn) { - tracing::error!("Failed to save the database row collab: {:?}", e); + if let Err(e) = write_txn.flush_doc( + uid, + object_id, + encode_collab.state_vector.to_vec(), + encode_collab.doc_state.to_vec(), + ) { + tracing::error!( + "{} failed to save the database row collab: {:?}", + row_id.as_ref(), + e + ); + } + + let txn = collab_lock_guard.transact(); + let row_detail = RowDetail::from_collab(&collab_lock_guard, &txn); + if row_detail.is_none() { + tracing::error!("{} doesn't have any row information in it", row_id.as_ref()); } - Ok(RowDetail::from_collab(&collab_guard, &txn)) + Ok(row_detail) }, Err(e) => { - tracing::error!("Failed to create database row collab: {:?}", e); + tracing::error!("Failed to deserialize doc state to row: {:?}", e); Ok(None) }, } }); match row { - Ok(None) => { - tracing::error!("Unexpected empty row. The row should not be empty at this point."); - None - }, + Ok(None) => None, Ok(row) => row, Err(e) => { - tracing::error!("Failed to save the database row collab: {:?}", e); + tracing::error!( + "{} failed to save the database row collab: {:?}", + row_id.as_ref(), + e + ); None }, }