Skip to content

Commit

Permalink
refactor RetryEvent and crete new folder for RetryManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tipogi committed Jan 14, 2025
1 parent cade794 commit bd6eeb2
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 253 deletions.
2 changes: 1 addition & 1 deletion benches/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pubky_app_specs::{PubkyAppUser, PubkyAppUserLink};
use pubky_common::crypto::Keypair;
use pubky_homeserver::Homeserver;
use pubky_nexus::{
events::retry::{RetryManager, SenderChannel},
events::manager::{RetryManager, SenderChannel},
types::PubkyId,
EventProcessor,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/from_file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use pubky_nexus::events::retry::RetryManager;
use pubky_nexus::events::retry::manager::RetryManager;
use pubky_nexus::{setup, types::DynError, Config, EventProcessor};
use std::fs::File;
use std::io::{self, BufRead};
Expand Down
3 changes: 2 additions & 1 deletion src/events/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Error, Debug)]
#[derive(Error, Debug, Clone, Serialize, Deserialize)]
pub enum EventProcessorError {
#[error("The user could not be indexed in nexus")]
UserNotSync,
Expand Down
11 changes: 0 additions & 11 deletions src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,6 @@ async fn put_sync_user(
Ok(())
}
}
// TagUser::put_to_graph(
// &tagger_user_id,
// &tagged_user_id,
// None,
// &tag_id,
// &tag_label,
// indexed_at,
// )
// .await?;
// let dependency = format!("pubky://{tagged_user_id}/pub/pubky.app/profile.json");
// Err(EventProcessorError::MissingDependency { dependency }.into())
}

pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), DynError> {
Expand Down
12 changes: 12 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt;

use crate::{
db::connectors::pubky::PubkyConnector,
types::{DynError, PubkyId},
Expand Down Expand Up @@ -50,6 +52,16 @@ pub enum EventType {
Del,
}

impl fmt::Display for EventType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let upper_case_str = match self {
EventType::Put => "PUT",
EventType::Del => "DEL",
};
write!(f, "{}", upper_case_str)
}
}

#[derive(Debug, Clone)]
pub struct Event {
uri: String,
Expand Down
28 changes: 12 additions & 16 deletions src/events/processor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::error::EventProcessorError;
use super::retry::SenderChannel;
use super::retry::SenderMessage;
use super::retry::manager::{SenderChannel, SenderMessage};
use super::Event;
use crate::events::retry::RetryEvent;
use crate::events::retry::event::RetryEvent;
use crate::types::DynError;
use crate::types::PubkyId;
use crate::{models::homeserver::Homeserver, Config};
Expand Down Expand Up @@ -153,25 +152,22 @@ impl EventProcessor {
error!("Error while handling event: {}", e);

let retry_event = match e.downcast_ref::<EventProcessorError>() {
Some(EventProcessorError::UserNotSync) => RetryEvent::new(
&event.uri,
&event.event_type,
None,
EventErrorType::GraphError,
),
Some(EventProcessorError::MissingDependency { dependency }) => RetryEvent::new(
&event.uri,
&event.event_type,
Some(dependency.clone()),
EventErrorType::MissingDependency,
),
Some(event_processor_error) => RetryEvent::new(event_processor_error.clone()),
// Other retry errors must be ignored
_ => return Ok(()),
};

// Generate a compress index to save in the cache
let index = if let Some(retry_index) = RetryEvent::generate_index_key(&event.uri) {
retry_index
} else {
// This block is unlikely to be reached, as it would typically fail during the validation process.
return Ok(());
};
let index_key = format!("{} {}", event.event_type, index);
let sender = self.sender.lock().await;
match sender
.send(SenderMessage::Add(self.homeserver.id.clone(), retry_event))
.send(SenderMessage::Add(index_key, retry_event))
.await
{
Ok(_) => {
Expand Down
177 changes: 0 additions & 177 deletions src/events/retry.rs

This file was deleted.

96 changes: 96 additions & 0 deletions src/events/retry/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use chrono::Utc;
use serde::{Deserialize, Serialize};

use crate::{events::error::EventProcessorError, types::DynError, RedisOps};

pub const RETRY_MAMAGER_PREFIX: &str = "RetryManager";
pub const RETRY_MANAGER_EVENTS_INDEX: [&str; 1] = ["events"];
pub const RETRY_MANAGER_STATE_INDEX: [&str; 1] = ["state"];
pub const HOMESERVER_PUBLIC_REPOSITORY: &str = "pub";
pub const HOMESERVER_APP_REPOSITORY: &str = "pubky.app";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryEvent {
// Number of retries attempted
pub retry_count: u32,
// This determines how the event should be processed during the retry process
pub error_type: EventProcessorError,
}

impl RedisOps for RetryEvent {}

impl RetryEvent {
pub fn new(error_type: EventProcessorError) -> Self {
Self {
retry_count: 0,
error_type,
}
}

/// It processes a homeserver URI and extracts specific components to form a index key
/// in the format `"{pubkyId}:{repository_model}/{event_id}"`
/// # Parameters
/// - `event_uri`: A string slice representing the event URI to be processed
pub fn generate_index_key(event_uri: &str) -> Option<String> {
let parts: Vec<&str> = event_uri.split('/').collect();
if parts.len() >= 7
&& parts[0] == "pubky:"
&& parts[3] == HOMESERVER_PUBLIC_REPOSITORY
&& parts[4] == HOMESERVER_APP_REPOSITORY
{
Some(format!("{}:{}/{}", parts[2], parts[5], parts[6]))
} else {
None
}
}

pub async fn put_to_index(&self, event_line: String) -> Result<(), DynError> {
Self::put_index_sorted_set(
&RETRY_MANAGER_EVENTS_INDEX,
// NOTE: Don't know if we should use now timestamp or the event timestamp
&[(Utc::now().timestamp_millis() as f64, &event_line)],
Some(RETRY_MAMAGER_PREFIX),
None,
)
.await?;

let event_serialized = serde_json::to_string(self)?;

Self::put_index_hash_map(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_STATE_INDEX,
&event_line,
event_serialized,
)
.await?;
Ok(())
}

pub async fn check_uri(event_line: &str) -> Result<Option<isize>, DynError> {
if let Some(post_details) = Self::check_sorted_set_member(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_EVENTS_INDEX,
&[event_line],
)
.await?
{
return Ok(Some(post_details));
}
Ok(None)
}

pub async fn get_from_index(pubky_uri: &str) -> Result<Option<Self>, DynError> {
let mut found_event = None;
if let Some(event_state) = Self::get_index_hash_map(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_STATE_INDEX,
pubky_uri,
)
.await?
{
let event = serde_json::from_str::<Self>(&event_state)?;
found_event = Some(event);
}
Ok(found_event)
}
}
Loading

0 comments on commit bd6eeb2

Please sign in to comment.