Skip to content

Commit

Permalink
replace the HashMap structure with a JSON-based implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tipogi committed Jan 14, 2025
1 parent bd6eeb2 commit f2dba56
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 117 deletions.
2 changes: 1 addition & 1 deletion benches/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pubky_app_specs::{PubkyAppUser, PubkyAppUserLink};
use pubky_common::crypto::Keypair;
use pubky_homeserver::Homeserver;
use pubky_nexus::{
events::manager::{RetryManager, SenderChannel},
events::retry::manager::{RetryManager, SenderChannel},
types::PubkyId,
EventProcessor,
};
Expand Down
50 changes: 0 additions & 50 deletions src/db/kv/index/hash_map.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/db/kv/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod hash_map;
/// Module for redis Indexing operations split into modules by Redis types
pub mod json;
pub mod lists;
Expand Down
38 changes: 0 additions & 38 deletions src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,42 +681,4 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let key = key_parts.join(":");
sorted_sets::get_lex_range("Sorted", &key, min, max, skip, limit).await
}

// ############################################################
// ############ HASH MAP related functions ####################
// ############################################################

/// Inserts a value into a Redis hash map at the specified key and field.
/// # Arguments
/// * `prefix` - An optional string slice representing the prefix for the key
/// * `key_parts` - A slice of string slices representing parts of the key
/// * `field` - A string slice representing the field in the hash map where the value will be stored.
/// * `value` - A `String` containing the value to be stored in the hash map
async fn put_index_hash_map(
prefix: Option<&str>,
key_parts: &[&str],
field: &str,
value: String,
) -> Result<(), DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
// Store the elements in the Redis sorted set
hash_map::put(prefix, &key, field, value).await
}

/// Retrieves a value from a Redis hash map at the specified key and field.
/// # Arguments
/// * `prefix` - An optional string slice representing the prefix for the key
/// * `key_parts` - A slice of string slices representing parts of the key
/// * `field` - A string slice representing the field in the hash map from which the value will be retrieved
async fn get_index_hash_map(
prefix: Option<&str>,
key_parts: &[&str],
field: &str,
) -> Result<Option<String>, DynError> {
let prefix = prefix.unwrap_or(SORTED_PREFIX);
let key = key_parts.join(":");
// Store the elements in the Redis sorted set
hash_map::get(prefix, &key, field).await
}
}
2 changes: 1 addition & 1 deletion src/events/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl EventProcessor {
// This block is unlikely to be reached, as it would typically fail during the validation process.
return Ok(());
};
let index_key = format!("{} {}", event.event_type, index);
let index_key = format!("{}:{}", event.event_type, index);
let sender = self.sender.lock().await;
match sender
.send(SenderMessage::Add(index_key, retry_event))
Expand Down
54 changes: 30 additions & 24 deletions src/events/retry/event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use chrono::Utc;
use serde::{Deserialize, Serialize};

Expand All @@ -6,6 +7,7 @@ use crate::{events::error::EventProcessorError, types::DynError, RedisOps};
pub const RETRY_MAMAGER_PREFIX: &str = "RetryManager";
pub const RETRY_MANAGER_EVENTS_INDEX: [&str; 1] = ["events"];
pub const RETRY_MANAGER_STATE_INDEX: [&str; 1] = ["state"];
pub const HOMESERVER_PROTOCOL: &str = "pubky:";
pub const HOMESERVER_PUBLIC_REPOSITORY: &str = "pub";
pub const HOMESERVER_APP_REPOSITORY: &str = "pubky.app";

Expand All @@ -17,7 +19,12 @@ pub struct RetryEvent {
pub error_type: EventProcessorError,
}

impl RedisOps for RetryEvent {}
#[async_trait]
impl RedisOps for RetryEvent {
async fn prefix() -> String {
String::from(RETRY_MAMAGER_PREFIX)
}
}

impl RetryEvent {
pub fn new(error_type: EventProcessorError) -> Self {
Expand All @@ -28,22 +35,27 @@ impl RetryEvent {
}

/// It processes a homeserver URI and extracts specific components to form a index key
/// in the format `"{pubkyId}:{repository_model}/{event_id}"`
/// in the format `"{pubkyId}:{repository_model}:{event_id}"`
/// # Parameters
/// - `event_uri`: A string slice representing the event URI to be processed
pub fn generate_index_key(event_uri: &str) -> Option<String> {
let parts: Vec<&str> = event_uri.split('/').collect();
if parts.len() >= 7
&& parts[0] == "pubky:"
&& parts[0] == HOMESERVER_PROTOCOL
&& parts[3] == HOMESERVER_PUBLIC_REPOSITORY
&& parts[4] == HOMESERVER_APP_REPOSITORY
{
Some(format!("{}:{}/{}", parts[2], parts[5], parts[6]))
Some(format!("{}:{}:{}", parts[2], parts[5], parts[6]))
} else {
None
}
}

/// Stores an event in both a sorted set and a JSON index in Redis.
/// It adds an event line to a Redis sorted set with a timestamp-based score
/// and also stores the event details in a separate JSON index for retrieval.
/// # Arguments
/// * `event_line` - A `String` representing the event line to be indexed.
pub async fn put_to_index(&self, event_line: String) -> Result<(), DynError> {
Self::put_index_sorted_set(
&RETRY_MANAGER_EVENTS_INDEX,
Expand All @@ -54,23 +66,20 @@ impl RetryEvent {
)
.await?;

let event_serialized = serde_json::to_string(self)?;
let index = &[RETRY_MANAGER_STATE_INDEX, [&event_line]].concat();
self.put_index_json(index, None).await?;

Self::put_index_hash_map(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_STATE_INDEX,
&event_line,
event_serialized,
)
.await?;
Ok(())
}

pub async fn check_uri(event_line: &str) -> Result<Option<isize>, DynError> {
/// Checks if a specific event exists in the Redis sorted set
/// # Arguments
/// * `event_index` - A `&str` representing the event index to check
pub async fn check_uri(event_index: &str) -> Result<Option<isize>, DynError> {
if let Some(post_details) = Self::check_sorted_set_member(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_EVENTS_INDEX,
&[event_line],
&[event_index],
)
.await?
{
Expand All @@ -79,17 +88,14 @@ impl RetryEvent {
Ok(None)
}

pub async fn get_from_index(pubky_uri: &str) -> Result<Option<Self>, DynError> {
/// Retrieves an event from the JSON index in Redis based on its index
/// # Arguments
/// * `event_index` - A `&str` representing the event index to retrieve
pub async fn get_from_index(event_index: &str) -> Result<Option<Self>, DynError> {
let mut found_event = None;
if let Some(event_state) = Self::get_index_hash_map(
Some(RETRY_MAMAGER_PREFIX),
&RETRY_MANAGER_STATE_INDEX,
pubky_uri,
)
.await?
{
let event = serde_json::from_str::<Self>(&event_state)?;
found_event = Some(event);
let index = &[RETRY_MANAGER_STATE_INDEX, [event_index]].concat();
if let Some(fail_event) = Self::try_from_index_json(index).await? {
found_event = Some(fail_event);
}
Ok(found_event)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/watcher/tags/retry_post_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn test_homeserver_post_tag_event_to_queue() -> Result<()> {
tokio::time::sleep(Duration::from_millis(500)).await;

let index_key = format!(
"{} {}",
"{}:{}",
EventType::Put,
RetryEvent::generate_index_key(&tag_url).unwrap()
);
Expand Down
2 changes: 1 addition & 1 deletion tests/watcher/tags/retry_user_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn test_homeserver_user_tag_event_to_queue() -> Result<()> {
tokio::time::sleep(Duration::from_millis(500)).await;

let index_key = format!(
"{} {}",
"{}:{}",
EventType::Put,
RetryEvent::generate_index_key(&tag_url).unwrap()
);
Expand Down

0 comments on commit f2dba56

Please sign in to comment.