Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry manager #247

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions benches/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use pubky::PubkyClient;
use pubky_app_specs::{PubkyAppUser, PubkyAppUserLink};
use pubky_common::crypto::Keypair;
use pubky_homeserver::Homeserver;
use pubky_nexus::EventProcessor;
use pubky_nexus::{
events::retry::manager::{RetryManager, SenderChannel},
types::PubkyId,
EventProcessor,
};
use setup::run_setup;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::mpsc};

mod setup;

Expand All @@ -16,7 +20,7 @@ mod setup;
/// 2. Sign up the user
/// 3. Upload a profile.json
/// 4. Delete the profile.json
async fn create_homeserver_with_events() -> (Testnet, String) {
async fn create_homeserver_with_events() -> (Testnet, String, SenderChannel) {
// Create the test environment
let testnet = Testnet::new(3);
let homeserver = Homeserver::start_test(&testnet).await.unwrap();
Expand All @@ -27,6 +31,14 @@ async fn create_homeserver_with_events() -> (Testnet, String) {
let keypair = Keypair::random();
let user_id = keypair.public_key().to_z32();

let retry_manager = RetryManager::initialise(mpsc::channel(1024));
// Prepare the sender channel to send the messages to the retry manager
let sender_clone = retry_manager.sender.clone();
// Create new asynchronous task to control the failed events
tokio::spawn(async move {
let _ = retry_manager.exec().await;
});

// Create and delete a user profile (as per your requirement)
client
.signup(&keypair, &homeserver.public_key())
Expand All @@ -53,7 +65,7 @@ async fn create_homeserver_with_events() -> (Testnet, String) {
// Delete the user profile
client.delete(url.as_str()).await.unwrap();

(testnet, homeserver_url)
(testnet, homeserver_url, sender_clone)
}

fn bench_create_delete_user(c: &mut Criterion) {
Expand All @@ -65,13 +77,22 @@ fn bench_create_delete_user(c: &mut Criterion) {

// Set up the environment only once
let rt = Runtime::new().unwrap();
let (_, homeserver_url) = rt.block_on(create_homeserver_with_events());
let (_, homeserver_url, sender) = rt.block_on(create_homeserver_with_events());

c.bench_function("create_delete_homeserver_user", |b| {
b.to_async(&rt).iter(|| async {
// Benchmark the event processor initialization and run
let mut event_processor = EventProcessor::test(homeserver_url.clone()).await;
event_processor.run().await.unwrap();
b.to_async(&rt).iter(|| {
let sender_clone = sender.clone(); // Clone the sender for each iteration
let homeserver_url_clone = homeserver_url.clone();
async move {
// Create hardcoded homeserver pubkyId
let id = PubkyId::try_from("66h9hkdaud4ekkuummh3b4zhk68iggzirqbomyktfhq5s84jirno")
.unwrap();

// Benchmark the event processor initialization and run
let mut event_processor =
EventProcessor::test(homeserver_url_clone, id, sender_clone).await;
event_processor.run().await.unwrap();
}
});
});
}
Expand Down
13 changes: 11 additions & 2 deletions examples/from_file.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use pubky_nexus::events::retry::manager::RetryManager;
use pubky_nexus::{setup, types::DynError, Config, EventProcessor};
use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
use tokio::sync::mpsc;

// Create that file and add the file with that format
// PUT homeserver_uri
Expand All @@ -14,10 +16,17 @@ async fn main() -> Result<(), DynError> {
let config = Config::from_env();
setup(&config).await;

let mut event_processor = EventProcessor::from_config(&config).await?;
let retry_manager = RetryManager::initialise(mpsc::channel(1024));
// Prepare the sender channel to send the messages to the retry manager
let sender_clone = retry_manager.sender.clone();
// Create new asynchronous task to control the failed events
tokio::spawn(async move {
let _ = retry_manager.exec().await;
});

let events = read_events_from_file().unwrap();
let mut event_processor = EventProcessor::from_config(&config, sender_clone).await?;

let events = read_events_from_file().unwrap();
event_processor.process_event_lines(events).await?;

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/db/graph/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum OperationOutcome {
CreatedOrDeleted,
/// A required node/relationship was not found, indicating a missing dependency
/// (often due to the node/relationship not yet being indexed or otherwise unavailable).
Pending,
MissingDependency,
}

/// Executes a graph query expected to return exactly one row containing a boolean column named
Expand All @@ -22,7 +22,7 @@ pub enum OperationOutcome {
/// - `true` => Returns [`OperationOutcome::Updated`]
/// - `false` => Returns [`OperationOutcome::CreatedOrDeleted`]
///
/// If no rows are returned, this function returns [`OperationOutcome::Pending`], typically
/// If no rows are returned, this function returns [`OperationOutcome::MissingDependency`], typically
/// indicating a missing dependency or an unmatched query condition.
pub async fn execute_graph_operation(query: Query) -> Result<OperationOutcome, DynError> {
let mut result;
Expand All @@ -38,7 +38,7 @@ pub async fn execute_graph_operation(query: Query) -> Result<OperationOutcome, D
true => Ok(OperationOutcome::Updated),
false => Ok(OperationOutcome::CreatedOrDeleted),
},
None => Ok(OperationOutcome::Pending),
None => Ok(OperationOutcome::MissingDependency),
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,6 @@ pub fn post_stream(
cypher.push_str(&format!("LIMIT {}\n", limit));
}

println!("{:?}", cypher);

// Build the query and apply parameters using `param` method
build_query_with_params(&cypher, &source, tags, kind, &pagination)
}
Expand Down
4 changes: 3 additions & 1 deletion src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,14 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
/// * `key_parts` - A slice of string slices that represent the parts used to form the key under which the sorted set is stored.
/// * `member` - A slice of string slices that represent the parts used to form the key identifying the member within the sorted set.
async fn check_sorted_set_member(
prefix: Option<&str>,
key_parts: &[&str],
member: &[&str],
) -> Result<Option<isize>, DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
let member_key = member.join(":");
sorted_sets::check_member(SORTED_PREFIX, &key, &member_key).await
sorted_sets::check_member(prefix, &key, &member_key).await
}

/// Adds elements to a Redis sorted set using the provided key parts.
Expand Down
18 changes: 18 additions & 0 deletions src/events/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Error, Debug, Clone, Serialize, Deserialize)]
pub enum EventProcessorError {
#[error("The user could not be indexed in nexus")]
UserNotSync,
#[error("The event could not be indexed because some graph dependency is missing")]
MissingDependency { dependency: Vec<String> },
#[error("The event does not exist anymore in the homeserver")]
ContentNotFound { dependency: String },
#[error("PubkyClient could not reach/resolve the homeserver")]
NotResolvedHomeserver,
#[error("The event could not be parsed from a line")]
InvalidEvent,
#[error("")]
PubkyClientError,
}
6 changes: 4 additions & 2 deletions src/events/handlers/bookmark.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::uri::ParsedUri;
use crate::models::post::Bookmark;
use crate::models::user::UserCounts;
Expand Down Expand Up @@ -40,8 +41,9 @@ pub async fn sync_put(
OperationOutcome::CreatedOrDeleted => false,
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
let dependency = vec![format!("{author_id}:posts:{post_id}")];
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
};

Expand Down
10 changes: 5 additions & 5 deletions src/events/handlers/follow.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::models::follow::{Followers, Following, Friends, UserFollows};
use crate::models::notification::Notification;
use crate::models::user::UserCounts;
Expand All @@ -23,10 +24,9 @@ pub async fn sync_put(follower_id: PubkyId, followee_id: PubkyId) -> Result<(),
match Followers::put_to_graph(&follower_id, &followee_id).await? {
// Do not duplicate the follow relationship
OperationOutcome::Updated => return Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
let dependency = vec![format!("{followee_id}:user:profile.json")];
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
// The relationship did not exist, create all related indexes
OperationOutcome::CreatedOrDeleted => {
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn sync_del(follower_id: PubkyId, followee_id: PubkyId) -> Result<(),
match Followers::del_from_graph(&follower_id, &followee_id).await? {
// Both users exists but they do not have that relationship
OperationOutcome::Updated => Ok(()),
OperationOutcome::Pending => {
OperationOutcome::MissingDependency => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::CreatedOrDeleted => {
Expand Down
9 changes: 5 additions & 4 deletions src/events/handlers/mute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::graph::exec::OperationOutcome;
use crate::events::error::EventProcessorError;
use crate::models::user::Muted;
use crate::types::DynError;
use crate::types::PubkyId;
Expand All @@ -19,9 +20,9 @@ pub async fn sync_put(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynErro
// (user_id)-[:MUTED]->(muted_id)
match Muted::put_to_graph(&user_id, &muted_id).await? {
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
let dependency = vec![format!("{muted_id}:user:profile.json")];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
Expand All @@ -42,7 +43,7 @@ pub async fn sync_del(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynErro
match Muted::del_from_graph(&user_id, &muted_id).await? {
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
OperationOutcome::MissingDependency => {
Err("WATCHER: Missing some dependency to index the model".into())
}
OperationOutcome::CreatedOrDeleted => {
Expand Down
26 changes: 21 additions & 5 deletions src/events/handlers/post.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::db::graph::exec::{exec_single_row, execute_graph_operation, OperationOutcome};
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::retry::event::RetryEvent;
use crate::events::uri::ParsedUri;
use crate::models::notification::{Notification, PostChangedSource, PostChangedType};
use crate::models::post::{
Expand Down Expand Up @@ -41,10 +43,24 @@ pub async fn sync_put(
let existed = match post_details.put_to_graph(&post_relationships).await? {
OperationOutcome::CreatedOrDeleted => false,
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
let mut dependency = Vec::new();
if let Some(replied_uri) = &post_relationships.replied {
let reply_dependency = RetryEvent::generate_index_key(replied_uri)
// This block is unlikely to be reached, as it would typically fail during the validation process
.unwrap_or_else(|| replied_uri.clone());
dependency.push(reply_dependency);
}
if let Some(reposted_uri) = &post_relationships.reposted {
let reply_dependency = RetryEvent::generate_index_key(reposted_uri)
// This block is unlikely to be reached, as it would typically fail during the validation process
.unwrap_or_else(|| reposted_uri.clone());
dependency.push(reply_dependency);
}
if dependency.is_empty() {
dependency.push(format!("{author_id}:user:profile.json"))
}
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
};

Expand Down Expand Up @@ -276,7 +292,7 @@ pub async fn del(author_id: PubkyId, post_id: String) -> Result<(), DynError> {
}
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
OperationOutcome::MissingDependency => {
return Err("WATCHER: Missing some dependency to index the model".into())
}
};
Expand Down
17 changes: 9 additions & 8 deletions src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::uri::ParsedUri;
use crate::models::notification::Notification;
use crate::models::post::{PostCounts, PostStream};
Expand Down Expand Up @@ -79,10 +80,10 @@ async fn put_sync_post(
.await?
{
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
// Ensure that dependencies follow the same format as the RetryManager keys
let dependency = vec![format!("{author_id}:posts:{post_id}")];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEXES
Expand Down Expand Up @@ -158,10 +159,10 @@ async fn put_sync_user(
.await?
{
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
OperationOutcome::MissingDependency => {
// Ensure that dependencies follow the same format as the RetryManager keys
let dependency = vec![format!("{tagged_user_id}:user:profile.json")];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
Expand Down
8 changes: 6 additions & 2 deletions src/events/handlers/user.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::graph::exec::{execute_graph_operation, OperationOutcome};
use crate::events::error::EventProcessorError;
use crate::models::user::UserSearch;
use crate::models::{
traits::Collection,
Expand All @@ -25,7 +26,10 @@ pub async fn sync_put(user: PubkyAppUser, user_id: PubkyId) -> Result<(), DynErr
// Create UserDetails object
let user_details = UserDetails::from_homeserver(user, &user_id).await?;
// SAVE TO GRAPH
user_details.put_to_graph().await?;
match user_details.put_to_graph().await {
Ok(_) => (),
Err(_) => return Err(EventProcessorError::UserNotSync.into()),
}
// SAVE TO INDEX
let user_id = user_details.id.clone();
UserSearch::put_to_index(&[&user_details]).await?;
Expand Down Expand Up @@ -65,7 +69,7 @@ pub async fn del(user_id: PubkyId) -> Result<(), DynError> {
}
// Should return an error that could not be inserted in the RetryManager
// TODO: WIP, it will be fixed in the comming PRs the error messages
OperationOutcome::Pending => {
OperationOutcome::MissingDependency => {
return Err("WATCHER: Missing some dependency to index the model".into())
}
}
Expand Down
Loading
Loading