-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add graph return control for all ingestion events #253
Conversation
src/db/graph/queries/put.rs
Outdated
"); | ||
|
||
// Create the generic part of the query depending the post type | ||
if action == "replies" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use a Match and instead of matching a string "replies" and "repost" let's make "action" and Enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's looking cleaner!
src/db/graph/queries/put.rs
Outdated
new_relationships.push("MERGE (new_post)-[:REPOSTED]->(repost_parent_post)"); | ||
} | ||
} | ||
if let Some(_) = &post_relationships.replied { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if post_relationships.replied.is_some() {
src/db/graph/queries/put.rs
Outdated
"); | ||
new_relationships.push("MERGE (new_post)-[:REPLIED]->(reply_parent_post)"); | ||
}; | ||
if let Some(_) = &post_relationships.reposted { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if post_relationships.reposted.is_some() {
src/models/post/relationships.rs
Outdated
@@ -88,6 +89,23 @@ impl PostRelationships { | |||
} | |||
} | |||
|
|||
/// Constructs a `Self` instance by extracting relationships from a `PubkyAppPost` object | |||
pub fn get_from_homeserver(post: &PubkyAppPost) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from_homeserver()
if we want to match the convention we are using in other models
if let Some(parent_uri) = &post.parent { | ||
relationship.replied = Some(parent_uri.to_string()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if
can be removed and simplified as:
relationship.replied = post.parent.clone();
src/events/handlers/post.rs
Outdated
@@ -104,44 +99,55 @@ pub async fn sync_put( | |||
) | |||
.await?; | |||
} | |||
// Populate the reply parent keys to after index the reply |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Populate the... to after....
Suggest to improve this comment to:
Define the reply parent key to index the reply later .
Applies to other comments that contain "to after".
Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.134 to 1.0.135. - [Release notes](https://github.com/serde-rs/json/releases) - [Commits](serde-rs/json@v1.0.134...v1.0.135) --- updated-dependencies: - dependency-name: serde_json dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now implicitly using the Option<bool>
returned by most graph queries to have a stricter control of what did really happend when we made a graph query. However Option<bool>
is not very expressive even though we are making a very sistematic usage of it. This will be hard to understand by anyone reading the code, and even hard for us when we add new queries or revisit existing queries in the future.
Do you think there's a way to make this even clearer and more obvious in the 1. Queries themselves, 2. the way we execute them, and 3. the returned type (something more expressive than Option) ?
In addition, for instance, exec_boolean_row
could be renamed to something even clearer for its new very clear purpose. As long as we are consistent in the way we use this function to execute queries.
src/db/graph/exec.rs
Outdated
@@ -13,18 +13,19 @@ pub async fn exec_single_row(query: Query) -> Result<(), DynError> { | |||
} | |||
|
|||
// Exec a graph query that has a single "boolean" return | |||
pub async fn exec_boolean_row(query: Query) -> Result<bool, DynError> { | |||
pub async fn exec_boolean_row(query: Query) -> Result<Option<bool>, DynError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we documented this well? For example, it can be document the implicit usage we are making of the returned type of this function to control for non existing objects.
This function is usually used for queries that will return a row where the response is:
- None: Some dependency is missing (e.g., a reply's parent).
- Some(true): The node/relationship already existed, and from this, we deduce that it is an EDIT.
- Some(false): The node/relationship did not exist, so we have created a new node or relationship.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the following enum
pub enum QueryResult {
Pending, // None: Some dependency is pending
Edited, // Some(true): The node/relationship existed (an edit was performed)
Created, // Some(false): The node/relationship did not exist (a new one was created)
}
src/db/graph/exec.rs
Outdated
if let Some(row) = result.next().await? { | ||
// The "flag" field indicates a specific condition in the query | ||
match row.get("flag")? { | ||
true => Ok(OperationOutcome::Updated), | ||
false => Ok(OperationOutcome::Created), | ||
} | ||
} else { | ||
Ok(OperationOutcome::Pending) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match result.next().await? {
Some(row) => match row.get("flag")? {
true => Ok(OperationOutcome::Updated),
false => Ok(OperationOutcome::Created),
},
None => Ok(OperationOutcome::Pending),
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woop, woop!! This is true quality upgrade. Hoping to see much fewer partial synchronization issues from now on 🥳
tests/watcher/utils/watcher.rs
Outdated
/// # Arguments | ||
/// * `event_line` - A string slice that represents the URI of the event to be retrieved | ||
/// from the homeserver. It contains the event type and the homeserver uri | ||
pub async fn retrieve_event_from_homeserver(event_line: &str) -> Result<(), DynError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn retrieve_event_from_homeserver(event_line: &str) -> Result<(), DynError> { | |
pub async fn retrieve_and_handle_event_line(event_line: &str) -> Result<(), DynError> { |
|
||
// Switch OFF the event processor to simulate the pending events to index | ||
// In that case, shadow user | ||
test = test.remove_event_processing().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
smart
tests/watcher/tags/fail_index.rs
Outdated
test.put(tag_url.as_str(), tag_blob).await?; | ||
|
||
// Create raw event line to retrieve the content from the homeserver. Event processor is deactivated | ||
// Like this, we can trigger the error in that test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Like this, we can trigger the error in that test
clarify, like what? and what test?
tests/watcher/tags/fail_index.rs
Outdated
|
||
assert!( | ||
sync_fail, | ||
"Cannot exist the tag because it is not in sync the graph with events" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify
Pre-submission Checklist
cargo test
.cargo bench
PR description
The initial implementation was risky due to the lack of clear feedback when an event failed to index in the graph. This made troubleshooting difficult. We’ve improved observability and can now identify specific failure scenarios, such as missing relationships in the graph. Knowing this, we can avoid ineffective retries and instead take alternative actions for such cases.
One proposed solution is to implement a
RetryManager
(#247), which would manage retries intelligently by analyzing failures and determining the best course of action—whether retrying after resolving dependencies or flagging for manual intervention. This approach enhances both the reliability and efficiency of the indexing process