Skip to content

Commit

Permalink
error control in PUT events for RetryManager and add tag related inte…
Browse files Browse the repository at this point in the history
…gration tests
  • Loading branch information
tipogi committed Jan 13, 2025
1 parent afa71d9 commit cade794
Show file tree
Hide file tree
Showing 31 changed files with 574 additions and 149 deletions.
15 changes: 0 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ reqwest = "0.12.9"
base32 = "0.5.1"
blake3 = "1.5.5"
url = "2.5.4"
dashmap = "6.1.0"
async-trait = "0.1.84"

[dev-dependencies]
Expand Down
9 changes: 7 additions & 2 deletions benches/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use pubky_common::crypto::Keypair;
use pubky_homeserver::Homeserver;
use pubky_nexus::{
events::retry::{RetryManager, SenderChannel},
types::PubkyId,
EventProcessor,
};
use setup::run_setup;
Expand Down Expand Up @@ -35,7 +36,7 @@ async fn create_homeserver_with_events() -> (Testnet, String, SenderChannel) {
let sender_clone = retry_manager.sender.clone();
// Create new asynchronous task to control the failed events
tokio::spawn(async move {
retry_manager.exec().await;
let _ = retry_manager.exec().await;
});

// Create and delete a user profile (as per your requirement)
Expand Down Expand Up @@ -83,9 +84,13 @@ fn bench_create_delete_user(c: &mut Criterion) {
let sender_clone = sender.clone(); // Clone the sender for each iteration
let homeserver_url_clone = homeserver_url.clone();
async move {
// Create hardcoded homeserver pubkyId
let id = PubkyId::try_from("66h9hkdaud4ekkuummh3b4zhk68iggzirqbomyktfhq5s84jirno")
.unwrap();

// Benchmark the event processor initialization and run
let mut event_processor =
EventProcessor::test(homeserver_url_clone, sender_clone).await;
EventProcessor::test(homeserver_url_clone, id, sender_clone).await;
event_processor.run().await.unwrap();
}
});
Expand Down
13 changes: 11 additions & 2 deletions examples/from_file.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use pubky_nexus::events::retry::RetryManager;
use pubky_nexus::{setup, types::DynError, Config, EventProcessor};
use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
use tokio::sync::mpsc;

// Create that file and add the file with that format
// PUT homeserver_uri
Expand All @@ -14,10 +16,17 @@ async fn main() -> Result<(), DynError> {
let config = Config::from_env();
setup(&config).await;

let mut event_processor = EventProcessor::from_config(&config).await?;
let retry_manager = RetryManager::initialise(mpsc::channel(1024));
// Prepare the sender channel to send the messages to the retry manager
let sender_clone = retry_manager.sender.clone();
// Create new asynchronous task to control the failed events
tokio::spawn(async move {
let _ = retry_manager.exec().await;
});

let events = read_events_from_file().unwrap();
let mut event_processor = EventProcessor::from_config(&config, sender_clone).await?;

let events = read_events_from_file().unwrap();
event_processor.process_event_lines(events).await?;

Ok(())
Expand Down
50 changes: 50 additions & 0 deletions src/db/kv/index/hash_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::db::connectors::redis::get_redis_conn;
use crate::types::DynError;
use redis::AsyncCommands;

/// Inserts a value into a Redis hash map under the specified prefix, key, and field
/// * `prefix` - A string slice representing the prefix for the key. This is typically used to group related keys in Redis
/// * `key` - A string slice representing the main key in the hash map
/// * `field` - A string slice representing the field within the hash map where the value will be stored
/// * `value` - A `String` containing the value to be stored in the hash map
pub async fn put(prefix: &str, key: &str, field: &str, value: String) -> Result<(), DynError> {
let index_key = format!("{}:{}", prefix, key);
let mut redis_conn = get_redis_conn().await?;
// HSETNX sets the field only if it does not exist. Returns 1 (true) or 0 (false).
let _: bool = redis_conn.hset(&index_key, field, value).await?;
Ok(())
}

/// Retrieves a value from a Redis hash map using the specified prefix, key, and field
/// # Arguments
/// * `prefix` - A string slice representing the prefix for the key. This is used to group related keys in Redis
/// * `key` - A string slice representing the main key in the hash map
/// * `field` - A string slice representing the field within the hash map from which the value will be retrieved
pub async fn get(prefix: &str, key: &str, field: &str) -> Result<Option<String>, DynError> {
let index_key = format!("{}:{}", prefix, key);
let mut redis_conn = get_redis_conn().await?;

// HGET retrieves the value for the given field.
let value: Option<String> = redis_conn.hget(&index_key, field).await?;
Ok(value)
}

/// Deletes one or more fields from a Redis hash map under the specified prefix and key.
/// # Arguments
/// * `prefix` - A string slice representing the prefix for the key. This is used to group related keys in Redis
/// * `key` - A string slice representing the main key in the hash map
/// * `fields` - A slice of string slices representing the fields to be removed from the hash map
pub async fn _del(prefix: &str, key: &str, fields: &[&str]) -> Result<(), DynError> {
if fields.is_empty() {
return Ok(());
}

let index_key = format!("{}:{}", prefix, key);
let mut redis_conn = get_redis_conn().await?;

// The HDEL command is used to remove one or more fields from a hash.
// It returns the number of fields that were removed
let _removed_count: i32 = redis_conn.hdel(index_key, fields).await?;

Ok(())
}
1 change: 1 addition & 0 deletions src/db/kv/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod hash_map;
/// Module for redis Indexing operations split into modules by Redis types
pub mod json;
pub mod lists;
Expand Down
42 changes: 41 additions & 1 deletion src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,14 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
/// * `key_parts` - A slice of string slices that represent the parts used to form the key under which the sorted set is stored.
/// * `member` - A slice of string slices that represent the parts used to form the key identifying the member within the sorted set.
async fn check_sorted_set_member(
prefix: Option<&str>,
key_parts: &[&str],
member: &[&str],
) -> Result<Option<isize>, DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
let member_key = member.join(":");
sorted_sets::check_member(SORTED_PREFIX, &key, &member_key).await
sorted_sets::check_member(prefix, &key, &member_key).await
}

/// Adds elements to a Redis sorted set using the provided key parts.
Expand Down Expand Up @@ -679,4 +681,42 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let key = key_parts.join(":");
sorted_sets::get_lex_range("Sorted", &key, min, max, skip, limit).await
}

// ############################################################
// ############ HASH MAP related functions ####################
// ############################################################

/// Inserts a value into a Redis hash map at the specified key and field.
/// # Arguments
/// * `prefix` - An optional string slice representing the prefix for the key
/// * `key_parts` - A slice of string slices representing parts of the key
/// * `field` - A string slice representing the field in the hash map where the value will be stored.
/// * `value` - A `String` containing the value to be stored in the hash map
async fn put_index_hash_map(
prefix: Option<&str>,
key_parts: &[&str],
field: &str,
value: String,
) -> Result<(), DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
// Store the elements in the Redis sorted set
hash_map::put(prefix, &key, field, value).await
}

/// Retrieves a value from a Redis hash map at the specified key and field.
/// # Arguments
/// * `prefix` - An optional string slice representing the prefix for the key
/// * `key_parts` - A slice of string slices representing parts of the key
/// * `field` - A string slice representing the field in the hash map from which the value will be retrieved
async fn get_index_hash_map(
prefix: Option<&str>,
key_parts: &[&str],
field: &str,
) -> Result<Option<String>, DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
// Store the elements in the Redis sorted set
hash_map::get(prefix, &key, field).await
}
}
17 changes: 17 additions & 0 deletions src/events/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum EventProcessorError {
#[error("The user could not be indexed in nexus")]
UserNotSync,
#[error("The event could not be indexed because some graph dependency is missing")]
MissingDependency { dependency: Vec<String> },
#[error("The event does not exist anymore in the homeserver")]
ContentNotFound { dependency: String },
#[error("PubkyClient could not reach/resolve the homeserver")]
NotResolvedHomeserver,
#[error("The event could not be parsed from a line")]
InvalidEvent,
#[error("")]
PubkyClientError,
}
4 changes: 3 additions & 1 deletion src/events/handlers/bookmark.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::uri::ParsedUri;
use crate::models::post::Bookmark;
use crate::models::user::UserCounts;
Expand Down Expand Up @@ -41,7 +42,8 @@ pub async fn sync_put(
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
let dependency = vec![format!("pubky://{author_id}/pub/pubky.app/posts/{post_id}")];
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/events/handlers/follow.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::models::follow::{Followers, Following, Friends, UserFollows};
use crate::models::notification::Notification;
use crate::models::user::UserCounts;
Expand All @@ -23,10 +24,9 @@ pub async fn sync_put(follower_id: PubkyId, followee_id: PubkyId) -> Result<(),
match Followers::put_to_graph(&follower_id, &followee_id).await? {
// Do not duplicate the follow relationship
OperationOutcome::Updated => return Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
let dependency = vec![format!("pubky://{followee_id}/pub/pubky.app/profile.json")];
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
// The relationship did not exist, create all related indexes
OperationOutcome::CreatedOrDeleted => {
Expand Down
5 changes: 3 additions & 2 deletions src/events/handlers/mute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::graph::exec::OperationOutcome;
use crate::events::error::EventProcessorError;
use crate::models::user::Muted;
use crate::types::DynError;
use crate::types::PubkyId;
Expand All @@ -19,9 +20,9 @@ pub async fn sync_put(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynErro
// (user_id)-[:MUTED]->(muted_id)
match Muted::put_to_graph(&user_id, &muted_id).await? {
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
let dependency = vec![format!("pubky://{muted_id}/pub/pubky.app/profile.json")];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
Expand Down
15 changes: 12 additions & 3 deletions src/events/handlers/post.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::{exec_single_row, execute_graph_operation, OperationOutcome};
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::uri::ParsedUri;
use crate::models::notification::{Notification, PostChangedSource, PostChangedType};
use crate::models::post::{
Expand Down Expand Up @@ -41,10 +42,18 @@ pub async fn sync_put(
let existed = match post_details.put_to_graph(&post_relationships).await? {
OperationOutcome::CreatedOrDeleted => false,
OperationOutcome::Updated => true,
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
return Err("WATCHER: Missing some dependency to index the model".into())
let mut dependency = Vec::new();
if let Some(replied_uri) = &post_relationships.replied {
dependency.push(replied_uri.clone());
}
if let Some(reposted_uri) = &post_relationships.reposted {
dependency.push(reposted_uri.clone());
}
if dependency.is_empty() {
dependency.push(format!("pubky://{author_id}/pub/pubky.app/profile.json"))
}
return Err(EventProcessorError::MissingDependency { dependency }.into());
}
};

Expand Down
24 changes: 18 additions & 6 deletions src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db::graph::exec::OperationOutcome;
use crate::db::kv::index::json::JsonAction;
use crate::events::error::EventProcessorError;
use crate::events::uri::ParsedUri;
use crate::models::notification::Notification;
use crate::models::post::{PostCounts, PostStream};
Expand Down Expand Up @@ -79,10 +80,9 @@ async fn put_sync_post(
.await?
{
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
let dependency = vec![format!("pubky://{author_id}/pub/pubky.app/posts/{post_id}")];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEXES
Expand Down Expand Up @@ -158,10 +158,11 @@ async fn put_sync_user(
.await?
{
OperationOutcome::Updated => Ok(()),
// TODO: Should return an error that should be processed by RetryManager
// WIP: Create a custom error type to pass enough info to the RetryManager
OperationOutcome::Pending => {
Err("WATCHER: Missing some dependency to index the model".into())
let dependency = vec![format!(
"pubky://{tagged_user_id}/pub/pubky.app/profile.json"
)];
Err(EventProcessorError::MissingDependency { dependency }.into())
}
OperationOutcome::CreatedOrDeleted => {
// SAVE TO INDEX
Expand Down Expand Up @@ -189,6 +190,17 @@ async fn put_sync_user(
Ok(())
}
}
// TagUser::put_to_graph(
// &tagger_user_id,
// &tagged_user_id,
// None,
// &tag_id,
// &tag_label,
// indexed_at,
// )
// .await?;
// let dependency = format!("pubky://{tagged_user_id}/pub/pubky.app/profile.json");
// Err(EventProcessorError::MissingDependency { dependency }.into())
}

pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), DynError> {
Expand Down
6 changes: 5 additions & 1 deletion src/events/handlers/user.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::graph::exec::{execute_graph_operation, OperationOutcome};
use crate::events::error::EventProcessorError;
use crate::models::user::UserSearch;
use crate::models::{
traits::Collection,
Expand All @@ -25,7 +26,10 @@ pub async fn sync_put(user: PubkyAppUser, user_id: PubkyId) -> Result<(), DynErr
// Create UserDetails object
let user_details = UserDetails::from_homeserver(user, &user_id).await?;
// SAVE TO GRAPH
user_details.put_to_graph().await?;
match user_details.put_to_graph().await {
Ok(_) => (),
Err(_) => return Err(EventProcessorError::UserNotSync.into()),
}
// SAVE TO INDEX
let user_id = user_details.id.clone();
UserSearch::put_to_index(&[&user_details]).await?;
Expand Down
Loading

0 comments on commit cade794

Please sign in to comment.