Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add graph return control for all ingestion events #253

Merged
merged 21 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/db/graph/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ pub async fn exec_single_row(query: Query) -> Result<(), DynError> {
}

// Exec a graph query that has a single "boolean" return
pub async fn exec_boolean_row(query: Query) -> Result<bool, DynError> {
pub async fn exec_boolean_row(query: Query) -> Result<Option<bool>, DynError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we documented this well? For example, it can be document the implicit usage we are making of the returned type of this function to control for non existing objects.

This function is usually used for queries that will return a row where the response is:

  • None: Some dependency is missing (e.g., a reply's parent).
  • Some(true): The node/relationship already existed, and from this, we deduce that it is an EDIT.
  • Some(false): The node/relationship did not exist, so we have created a new node or relationship.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the following enum

pub enum QueryResult {
    Pending,      // None: Some dependency is pending
    Edited,       // Some(true): The node/relationship existed (an edit was performed)
    Created,      // Some(false): The node/relationship did not exist (a new one was created)
}

let mut result;
{
let graph = get_neo4j_graph()?;
let graph = graph.lock().await;
result = graph.execute(query).await?;
}
let mut boolean = false;
let mut exist = None;
while let Some(row) = result.next().await? {
boolean = row.get("boolean")?;
let result: bool = row.get("boolean")?;
exist = Some(result);
}
Ok(boolean)
Ok(exist)
}

// Generic function to retrieve data from Neo4J
Expand Down
37 changes: 21 additions & 16 deletions src/db/graph/queries/del.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ pub fn delete_post(author_id: &str, post_id: &str) -> Query {
// Delete a follows relationship between two users
pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query {
query(
"MATCH (follower:User {id: $follower_id})-[r:FOLLOWS]->(followee:User {id: $followee_id})

DELETE r

// returns whether the relationship existed as 'boolean'
RETURN r IS NOT NULL AS boolean;",
"// Important that MATCH to check if both users are in the graph
MATCH (follower:User {id: $follower_id}), (followee:User {id: $followee_id})
// Check if follow already exist.
OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee)
DELETE existing
// returns whether the relationship existed as 'boolean'
RETURN existing IS NOT NULL AS boolean;",
)
.param("follower_id", follower_id.to_string())
.param("followee_id", followee_id.to_string())
Expand All @@ -38,8 +39,12 @@ pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query {
// Delete a muted relationship between two users
pub fn delete_mute(user_id: &str, muted_id: &str) -> Query {
query(
"MATCH (user:User {id: $user_id})-[r:MUTED]->(muted:User {id: $muted_id})
DELETE r;",
"// Important that MATCH to check if both users are in the graph
MATCH (user:User {id: $user_id}), (muted:User {id: $muted_id})
OPTIONAL MATCH (user)-[existing:MUTED]->(muted)
DELETE existing
// returns whether the relationship existed as 'boolean'
RETURN existing IS NOT NULL AS boolean;",
)
.param("user_id", user_id.to_string())
.param("muted_id", muted_id.to_string())
Expand All @@ -58,14 +63,14 @@ pub fn delete_bookmark(user_id: &str, bookmark_id: &str) -> Query {
}

// Delete a tagged relationship
pub fn delete_tag(user_id: &str, tag_id: &str) -> Query {
query(
"MATCH (user:User {id: $user_id})-[t:TAGGED {id: $tag_id}]->(target)
DELETE t",
)
.param("user_id", user_id)
.param("tag_id", tag_id)
}
// pub fn delete_tag(user_id: &str, tag_id: &str) -> Query {
// query(
// "MATCH (user:User {id: $user_id})-[t:TAGGED {id: $tag_id}]->(target)
// DELETE t",
// )
// .param("user_id", user_id)
// .param("tag_id", tag_id)
// }

// Delete a file node
pub fn delete_file(owner_id: &str, file_id: &str) -> Query {
Expand Down
14 changes: 10 additions & 4 deletions src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,11 @@ fn build_query_with_params(
pub fn user_is_safe_to_delete(user_id: &str) -> Query {
query(
"
MATCH (u:User {id: $user_id})-[r]-()
RETURN COUNT(r) = 0 AS boolean
MATCH (u:User {id: $user_id})
OPTIONAL MATCH (u)-[r]-()
WITH u, COUNT(r) = 0 AS boolean
// Returning a user_id, ensures to return no rows if the user does not exist
RETURN u.id, boolean
",
)
.param("user_id", user_id)
Expand All @@ -777,7 +780,8 @@ pub fn post_is_safe_to_delete(author_id: &str, post_id: &str) -> Query {
query(
"
MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id})
MATCH (p)-[r]-()
// Ensures missing relationships are handled
OPTIONAL MATCH (p)-[r]-()
WHERE NOT (
// Allowed relationships:
// 1. Incoming AUTHORED relationship from the specified user
Expand All @@ -789,7 +793,9 @@ pub fn post_is_safe_to_delete(author_id: &str, post_id: &str) -> Query {
// 3. Outgoing REPLIED relationship to another post
(type(r) = 'REPLIED' AND startNode(r) = p)
)
RETURN COUNT(r) = 0 AS boolean
WITH p, COUNT(r) = 0 AS boolean
// Returning a post_id, ensures to return no rows if the user or post does not exist
RETURN p.id, boolean
",
)
.param("author_id", author_id)
Expand Down
157 changes: 94 additions & 63 deletions src/db/graph/queries/put.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::events::uri::ParsedUri;
use crate::models::post::PostRelationships;
use crate::models::{file::FileDetails, post::PostDetails, user::UserDetails};
use crate::types::DynError;
use neo4rs::{query, Query};
Expand All @@ -21,74 +23,94 @@ pub fn create_user(user: &UserDetails) -> Result<Query, DynError> {
Ok(query)
}

// Create a post node
// TODO: DIscuss if it is necessary here or create a URI when we get the post_id, get_posts_details_by_id
pub fn create_post(post: &PostDetails) -> Result<Query, DynError> {
let kind = serde_json::to_string(&post.kind)?;
/// Creates a Cypher query to add or edit a post to the graph database and handles its relationships.
/// # Arguments
/// * `post` - A reference to a `PostDetails` struct containing information about the post to be created or edited
/// * `post_relationships` - A reference to a PostRelationships struct that define relationships
/// for the post (e.g., replies or reposts).
pub fn create_post(post: &PostDetails, post_relationships: &PostRelationships) -> Result<Query, DynError> {
let mut cypher = String::new();
let mut new_relationships = Vec::new();

let query = query(
"MATCH (u:User {id: $author_id})
// Check if all the dependencies are consistent in the graph
if let Some(_) = &post_relationships.replied {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if post_relationships.replied.is_some() {

cypher.push_str("
MATCH (reply_parent_author:User {id: $reply_parent_author_id})-[:AUTHORED]->(reply_parent_post:Post {id: $reply_parent_post_id})
");
new_relationships.push("MERGE (new_post)-[:REPLIED]->(reply_parent_post)");
};
if let Some(_) = &post_relationships.reposted {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if post_relationships.reposted.is_some() {

cypher.push_str("
MATCH (repost_parent_author:User {id: $repost_parent_author_id})-[:AUTHORED]->(repost_parent_post:Post {id: $repost_parent_post_id})
");
new_relationships.push("MERGE (new_post)-[:REPOSTED]->(repost_parent_post)");
}
// Create the new post
cypher.push_str("
MATCH (author:User {id: $author_id})
OPTIONAL MATCH (u)-[:AUTHORED]->(existing_post:Post {id: $post_id})
MERGE (author)-[:AUTHORED]->(new_post:Post {id: $post_id})
");

// Check if post already existed
OPTIONAL MATCH (u)-[:AUTHORED]->(existing:Post {id: $post_id})
// Add relationships to the query
cypher.push_str(&new_relationships.join("\n"));

// Write data
MERGE (u)-[:AUTHORED]->(p:Post {id: $post_id})
SET p.content = $content,
p.indexed_at = $indexed_at,
p.kind = $kind,
p.attachments = $attachments

// boolean == existed
RETURN existing IS NOT NULL AS boolean;",
)
.param("author_id", post.author.to_string())
.param("post_id", post.id.to_string())
.param("content", post.content.to_string())
.param("indexed_at", post.indexed_at)
.param("kind", kind.trim_matches('"'))
.param(
"attachments",
post.attachments.clone().unwrap_or(vec![] as Vec<String>),
cypher.push_str("
SET new_post.content = $content,
new_post.indexed_at = $indexed_at,
new_post.kind = $kind,
new_post.attachments = $attachments
RETURN existing_post IS NOT NULL AS boolean"
);

let kind = serde_json::to_string(&post.kind)?;

let mut cypher_query = query(&cypher)
.param("author_id", post.author.to_string())
.param("post_id", post.id.to_string())
.param("content", post.content.to_string())
.param("indexed_at", post.indexed_at)
.param("kind", kind.trim_matches('"'))
.param(
"attachments",
post.attachments.clone().unwrap_or(vec![] as Vec<String>),
);

Ok(query)
}
// Fill up relationships parameters
cypher_query = add_relationship_params(
cypher_query,
&post_relationships.replied,
"reply_parent_author_id",
"reply_parent_post_id",
)?;

/// Create a reply relationship between two posts
pub fn create_reply_relationship(
author_id: &str,
post_id: &str,
parent_author_id: &str,
parent_post_id: &str,
) -> Query {
query(
"MATCH (parent_author:User {id: $parent_author_id})-[:AUTHORED]->(parent_post:Post {id: $parent_post_id}),
(author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id})
MERGE (post)-[:REPLIED]->(parent_post)",
)
.param("author_id", author_id)
.param("post_id", post_id)
.param("parent_author_id", parent_author_id)
.param("parent_post_id", parent_post_id)
// Handle "reposted" relationship
cypher_query = add_relationship_params(
cypher_query,
&post_relationships.reposted,
"repost_parent_author_id",
"repost_parent_post_id",
)?;

Ok(cypher_query)
}

/// Create a repost relationship between two posts
pub fn create_repost_relationship(
author_id: &str,
post_id: &str,
reposted_author_id: &str,
reposted_post_id: &str,
) -> Query {
query(
"MATCH (reposted_author:User {id: $reposted_author_id})-[:AUTHORED]->(reposted_post:Post {id: $reposted_post_id}),
(author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id})
MERGE (post)-[:REPOSTED]->(reposted_post)",
)
.param("author_id", author_id)
.param("post_id", post_id)
.param("reposted_author_id", reposted_author_id)
.param("reposted_post_id", reposted_post_id)
fn add_relationship_params(
cypher_query: Query,
uri: &Option<String>,
author_param: &str,
post_param: &str,
) -> Result<Query, DynError> {
if let Some(uri) = uri {
let parsed_uri = ParsedUri::try_from(uri.as_str())?;
let parent_author_id = parsed_uri.user_id;
let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?;

return Ok(cypher_query
.param(author_param, parent_author_id.as_str())
.param(post_param, parent_post_id.as_str()));
}
Ok(cypher_query)
}

// Create a mentioned relationship between a post and a user
Expand Down Expand Up @@ -132,7 +154,14 @@ pub fn create_follow(follower_id: &str, followee_id: &str, indexed_at: i64) -> Q
pub fn create_mute(user_id: &str, muted_id: &str, indexed_at: i64) -> Query {
query(
"MATCH (user:User {id: $user_id}), (muted:User {id: $muted_id})
MERGE (user)-[:MUTED {indexed_at: $indexed_at}]->(muted);",
// Check if follow already existed
OPTIONAL MATCH (user)-[existing:MUTED]->(muted)

MERGE (user)-[r:MUTED]->(muted)
SET r.indexed_at = $indexed_at

// boolean == existed
RETURN existing IS NOT NULL AS boolean;",
)
.param("user_id", user_id.to_string())
.param("muted_id", muted_id.to_string())
Expand All @@ -148,6 +177,7 @@ pub fn create_post_bookmark(
) -> Query {
query(
"MATCH (u:User {id: $user_id})
// We assume these nodes are already created. If not we would not be able to add a bookmark
MATCH (author:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id})

// Check if bookmark already existed
Expand Down Expand Up @@ -177,8 +207,9 @@ pub fn create_post_tag(
indexed_at: i64,
) -> Query {
query(
"MATCH (author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id})
MATCH (user:User {id: $user_id})
"MATCH (user:User {id: $user_id})
// We assume these nodes are already created. If not we would not be able to add a tag
MATCH (author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id})

// Check if tag already existed
OPTIONAL MATCH (user)-[existing:TAGGED {label: $label}]->(post)
Expand Down
8 changes: 6 additions & 2 deletions src/events/handlers/bookmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ pub async fn sync_put(
parsed_uri.post_id.ok_or("Bookmarked URI missing post_id")?,
);

// Save new bookmark relationship to the graph
// Save new bookmark relationship to the graph, only if the bookmarked user exists
let indexed_at = Utc::now().timestamp_millis();
let existed = Bookmark::put_to_graph(&author_id, &post_id, &user_id, &id, indexed_at).await?;
let existed = match Bookmark::put_to_graph(&author_id, &post_id, &user_id, &id, indexed_at).await? {
Some(exist) => exist,
// Should return an error that could not be inserted in the RetryManager
None => return Err("WATCHER: User not synchronized".into())
};

// SAVE TO INDEX
let bookmark_details = Bookmark { id, indexed_at };
Expand Down
23 changes: 12 additions & 11 deletions src/events/handlers/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,26 @@ pub async fn put(
async fn ingest(
user_id: &PubkyId,
file_id: &str,
pubkyapp_file: &PubkyAppFile,
//client: &PubkyClient,
pubkyapp_file: &PubkyAppFile
) -> Result<FileMeta, DynError> {
let pubky_client = PubkyConnector::get_pubky_client()?;

let blob = match pubky_client.get(pubkyapp_file.src.as_str()).await? {
Some(metadata) => metadata,
// TODO: Shape the error to avoid the retyManager
None => return Err("EVENT ERROR: no metadata in the file blob".into()),
};

debug!("File Metadata: {:?}\n{:?}", file_id, blob);

store_blob(file_id.to_string(), user_id.to_string(), &blob).await?;

let static_path = format!("{}/{}", user_id, file_id);
Ok(FileMeta {
urls: FileUrls { main: static_path },
urls: FileUrls { main: format!("{}/{}", user_id, file_id) },
})
}

async fn store_blob(name: String, path: String, blob: &Bytes) -> Result<(), DynError> {
let storage_path = Config::from_env().file_path;
// TODO: Is it well formatting. The file path already has / at the end
let full_path = format!("{}/{}", storage_path, path);

debug!("store blob in full_path: {}", full_path);
Expand Down Expand Up @@ -115,13 +115,14 @@ pub async fn del(user_id: &PubkyId, file_id: String) -> Result<(), DynError> {
)
.await?;

let file = &result[0];
if !result.is_empty() {
let file = &result[0];

if let Some(value) = file {
value.delete().await?;
if let Some(value) = file {
value.delete().await?;
}
remove_blob(file_id, user_id.to_string()).await?;
}

remove_blob(file_id, user_id.to_string()).await?;

Ok(())
}
Loading
Loading