Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tipogi committed Jan 9, 2025
1 parent b46c375 commit 7b290ec
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 19 deletions.
10 changes: 5 additions & 5 deletions src/db/graph/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use serde::de::DeserializeOwned;
pub enum OperationOutcome {
/// The query found and updated an existing node/relationship.
Updated,
/// The query changed the existence state of a node/relationship
/// (i.e., it was created or deleted).
ExistenceChanged,
/// This variant represents a structural mutation where the node/relationship
/// did not exist before the operation (creation) or no longer exists after the operation (deletion)
CreatedOrDeleted,
/// A required node/relationship was not found, indicating a missing dependency
/// (often due to the node/relationship not yet being indexed or otherwise unavailable).
Pending,
Expand All @@ -20,7 +20,7 @@ pub enum OperationOutcome {
/// "flag". Interprets the boolean as follows:
///
/// - `true` => Returns [`OperationOutcome::Updated`]
/// - `false` => Returns [`OperationOutcome::ExistenceChanged`]
/// - `false` => Returns [`OperationOutcome::CreatedOrDeleted`]
///
/// If no rows are returned, this function returns [`OperationOutcome::Pending`], typically
/// indicating a missing dependency or an unmatched query condition.
Expand All @@ -36,7 +36,7 @@ pub async fn execute_graph_operation(query: Query) -> Result<OperationOutcome, D
// The "flag" field indicates a specific condition in the query
Some(row) => match row.get("flag")? {
true => Ok(OperationOutcome::Updated),
false => Ok(OperationOutcome::ExistenceChanged),
false => Ok(OperationOutcome::CreatedOrDeleted),
},
None => Ok(OperationOutcome::Pending),
}
Expand Down
2 changes: 1 addition & 1 deletion src/events/handlers/bookmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn sync_put(
let indexed_at = Utc::now().timestamp_millis();
let existed =
match Bookmark::put_to_graph(&author_id, &post_id, &user_id, &id, indexed_at).await? {
OperationOutcome::ExistenceChanged => false,
OperationOutcome::CreatedOrDeleted => false,
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
Expand Down
4 changes: 2 additions & 2 deletions src/events/handlers/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn sync_put(follower_id: PubkyId, followee_id: PubkyId) -> Result<(),
return Err("WATCHER: Missing some dependency to index the model".into())
}
// The relationship did not exist, create all related indexes
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// Checks whether the followee was following the follower (Is this a new friendship?)
let will_be_friends =
is_followee_following_follower(&follower_id, &followee_id).await?;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub async fn sync_del(follower_id: PubkyId, followee_id: PubkyId) -> Result<(),
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// Check if the users are friends. Is this a break? :(
let were_friends = Friends::check(&follower_id, &followee_id).await?;

Expand Down
4 changes: 2 additions & 2 deletions src/events/handlers/mute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn sync_put(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynErro
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
Muted(vec![muted_id.to_string()])
.put_to_index(&user_id)
Expand All @@ -45,7 +45,7 @@ pub async fn sync_del(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynErro
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// REMOVE FROM INDEX
Muted(vec![muted_id.to_string()])
.del_from_index(&user_id)
Expand Down
6 changes: 3 additions & 3 deletions src/events/handlers/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn sync_put(
let mut post_relationships = PostRelationships::from_homeserver(&post);

let existed = match post_details.put_to_graph(&post_relationships).await? {
OperationOutcome::ExistenceChanged => false,
OperationOutcome::CreatedOrDeleted => false,
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
Expand Down Expand Up @@ -251,11 +251,11 @@ pub async fn del(author_id: PubkyId, post_id: String) -> Result<(), DynError> {
// Graph query to check if there is any edge at all to this post other than AUTHORED, is a reply or is a repost.
let query = post_is_safe_to_delete(&author_id, &post_id);

// If there is none other relationship (OperationOutcome::ExistenceChanged), we delete from graph and redis.
// If there is none other relationship (OperationOutcome::CreatedOrDeleted), we delete from graph and redis.
// But if there is any (OperationOutcome::Updated), then we simply update the post with keyword content [DELETED].
// A deleted post is a post whose content is EXACTLY `"[DELETED]"`
match execute_graph_operation(query).await? {
OperationOutcome::ExistenceChanged => sync_del(author_id, post_id).await?,
OperationOutcome::CreatedOrDeleted => sync_del(author_id, post_id).await?,
OperationOutcome::Updated => {
let existing_relationships = PostRelationships::get_by_id(&author_id, &post_id).await?;
let parent = match existing_relationships {
Expand Down
4 changes: 2 additions & 2 deletions src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async fn put_sync_post(
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEXES
let post_key_slice: &[&str] = &[&author_id, &post_id];

Expand Down Expand Up @@ -163,7 +163,7 @@ async fn put_sync_user(
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
// Update user counts for the tagged user
UserCounts::update(&tagged_user_id, "tagged", JsonAction::Increment(1)).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/events/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub async fn del(user_id: PubkyId) -> Result<(), DynError> {
// 1. Graph query to check if there is any edge at all to this user.
let query = user_is_safe_to_delete(&user_id);

// 2. If there is no relationships (OperationOutcome::ExistenceChanged), delete from graph and redis.
// 2. If there is no relationships (OperationOutcome::CreatedOrDeleted), delete from graph and redis.
// 3. But if there is any relationship (OperationOutcome::Updated), then we simply update the user with empty profile
// and keyword username [DELETED].
// A deleted user is a user whose profile is empty and has username `"[DELETED]"`
match execute_graph_operation(query).await? {
OperationOutcome::ExistenceChanged => {
OperationOutcome::CreatedOrDeleted => {
UserDetails::delete(&user_id).await?;
UserCounts::delete(&user_id).await?;
}
Expand Down
4 changes: 3 additions & 1 deletion src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub struct Event {
}

impl Event {
pub fn from_str(line: &str) -> Result<Option<Self>, Box<dyn std::error::Error + Sync + Send>> {
pub fn parse_event(
line: &str,
) -> Result<Option<Self>, Box<dyn std::error::Error + Sync + Send>> {
debug!("New event: {}", line);
let parts: Vec<&str> = line.split(' ').collect();
if parts.len() != 2 {
Expand Down
2 changes: 1 addition & 1 deletion src/events/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl EventProcessor {
info!("Cursor for the next request: {}", cursor);
}
} else {
let event = match Event::from_str(line) {
let event = match Event::parse_event(line) {
Ok(event) => event,
Err(e) => {
error!("Error while creating event line from line: {}", e);
Expand Down

0 comments on commit 7b290ec

Please sign in to comment.