Skip to content

Commit

Permalink
feat: Hot tags stream (#86)
Browse files Browse the repository at this point in the history
* Add hot tag taggers SET
* Adapt endpoint for client requirements
* Refactor and add endpoint to retrive tag taggers
  • Loading branch information
tipogi authored Aug 23, 2024
1 parent 04d2562 commit ceef03b
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 133 deletions.
32 changes: 31 additions & 1 deletion benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::global::TagGlobal;
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
Expand Down Expand Up @@ -83,14 +84,42 @@ fn bench_get_global_hot_tags(c: &mut Criterion) {

c.bench_function("bench_get_global_hot_tags", |b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTags::get_global_tags_stream(None, Some(40))
let stream_tag = HotTags::get_global_tags_stream(None, Some(40), Some(10))
.await
.unwrap();
criterion::black_box(stream_tag);
});
});
}

fn bench_get_global_tag_taggers(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!("Test the performance of getting global tag taggers");
println!(
"****************************************************************************************"
);

run_setup();

let label = "ha";
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_global_tag_taggers", format!("label: {}", label)),
&[label],
|b, &params| {
b.to_async(&rt).iter(|| async {
let tag_taggers = TagGlobal::get_tag_taggers(String::from(params[0]), None)
.await
.unwrap();
criterion::black_box(tag_taggers);
});
},
);
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
Expand Down Expand Up @@ -215,6 +244,7 @@ criterion_group! {
targets = bench_get_user_tags,
bench_get_post_tags,
bench_get_global_hot_tags,
bench_get_global_tag_taggers,
bench_get_following_reach_hot_tags,
bench_get_followers_reach_hot_tags,
bench_get_friends_reach_hot_tags
Expand Down
11 changes: 6 additions & 5 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
query(&query_string).param("user_id", user_id)
}

// Retrieves popular tags across the entire network
// Retrieves popular tags and its taggers across the entire network
pub fn get_global_hot_tags_scores() -> Query {
query(
"
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts, COLLECT(DISTINCT u.id) AS user_ids
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags_score, COLLECT([label, user_ids]) AS hot_tags_users
",
)
}
Expand Down Expand Up @@ -248,8 +248,9 @@ pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query {
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
tagger_ids: taggers,
post_count: uniquePosts
taggers_id: taggers,
post_count: uniquePosts,
taggers_count: SIZE(taggers)
} AS hot_tag
ORDER BY hot_tag.post_count DESC
RETURN COLLECT(hot_tag) AS hot_tags
Expand Down
65 changes: 65 additions & 0 deletions src/db/kv/index/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,68 @@ pub async fn check_set_member(
Ok((false, false))
}
}

/// Retrieves multiple sets from Redis in a single call using a pipeline.
///
/// This asynchronous function fetches multiple sets from Redis based on the provided keys using a Redis pipeline.
/// It returns a vector of optional tuples, where each tuple contains a vector of elements from the corresponding set
/// and an integer representing the number of elements that were excluded if a limit was specified.
///
/// # Arguments
///
/// * `prefix` - A string slice representing the prefix to be prepended to each Redis key.
/// * `keys` - A slice of string slices representing the keys under which the sets are stored.
/// * `limit` - An optional `usize` specifying the maximum number of elements to retrieve from each set.
/// If `None`, all elements will be retrieved.
///
/// # Returns
///
/// Returns a `Result` containing:
/// * `Ok(Vec<Option<(Vec<String>, usize)>>)` - A vector where each element is an `Option` containing a tuple:
/// * `Some((Vec<String>, usize))` - The vector of elements from the set and the count of excluded elements.
/// * `None` - Indicates that the corresponding set does not exist.
/// * `Err` - An error if the Redis operation fails.
///
/// # Errors
///
/// Returns an error if the Redis connection fails or the pipeline query encounters an issue.
pub async fn get_multiple_sets(
prefix: &str,
keys: &[&str],
limit: Option<usize>,
) -> Result<Vec<Option<(Vec<String>, usize)>>, Box<dyn Error + Send + Sync>> {
let mut redis_conn = get_redis_conn().await?;

// Create a Redis pipeline
let mut pipe = redis::pipe();

// Add each SMEMBERS command to the pipeline for all keys
for key in keys {
let index_key = format!("{}:{}", prefix, key);
pipe.smembers(index_key);
}

// Execute the pipeline
let results: Vec<Vec<String>> = pipe.query_async(&mut redis_conn).await?;

let taggers_list = results
.into_iter()
.map(|set| {
if set.is_empty() {
None
} else {
let set_length = set.len();
match limit {
Some(set_limit) if set_limit < set_length => {
let limited_set = set.into_iter().take(set_limit).collect();
Some((limited_set, set_length))
}
_ => Some((set, set_length)),
}
}
})
.collect();

Ok(taggers_list)
}
30 changes: 30 additions & 0 deletions src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,34 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let key = key_parts.join(":");
sorted_sets::get_lex_range("Sorted", &key, min, max, skip, limit).await
}

/// Fetches multiple sets from Redis using the specified key components.
///
/// This asynchronous function retrieves multiple sets from Redis based on the provided key components.
/// It returns a vector where each element is an optional vector containing the elements of the corresponding set.
/// If a particular set does not exist, the corresponding position in the returned vector will be `None`.
///
/// # Arguments
///
/// * `key_parts_list` - A slice of string slices, where each inner slice represents the components
/// used to construct the Redis key for the corresponding set.
/// * `limit` - An optional parameter specifying the maximum number of elements to fetch from each set.
/// If `None`, all elements will be retrieved.
///
/// # Returns
///
/// A `Vec<Option<Vec<String>>>` where:
/// * Each inner `Vec<String>` contains the elements of a set retrieved from Redis.
/// * `None` indicates that the set does not exist for the corresponding key.
///
/// # Errors
///
/// This function will return an error if the operation fails, such as in cases of a Redis connection issue.
async fn try_from_multiple_sets(
key_parts_list: &[&str],
limit: Option<usize>,
) -> Result<Vec<Option<(Vec<String>, usize)>>, Box<dyn Error + Send + Sync>> {
let prefix = Self::prefix().await;
sets::get_multiple_sets(&prefix, key_parts_list, limit).await
}
}
23 changes: 23 additions & 0 deletions src/models/tag/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use super::stream::Taggers;
use crate::{models::user::UserStreamType, RedisOps};
use std::error::Error;

pub struct TagGlobal {}

impl TagGlobal {
pub async fn get_tag_taggers(
label: String,
reach: Option<UserStreamType>,
) -> Result<Option<Vec<String>>, Box<dyn std::error::Error + Send + Sync>> {
match reach {
None => read_from_set(&label).await,
_ => Ok(None),
}
}
}

pub async fn read_from_set(
label: &str,
) -> Result<Option<Vec<String>>, Box<dyn Error + Send + Sync>> {
Taggers::try_from_index_set(&[label], None, None).await
}
1 change: 1 addition & 0 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::ops::Deref;
use utoipa::ToSchema;

pub mod global;
pub mod post;
pub mod stream;
pub mod user;
Expand Down
3 changes: 1 addition & 2 deletions src/models/tag/post.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::ops::Deref;

use crate::db::connectors::neo4j::get_neo4j_graph;
use crate::queries;
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use utoipa::ToSchema;

use super::Tags;
Expand Down
70 changes: 53 additions & 17 deletions src/models/tag/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use axum::async_trait;
use neo4rs::Query;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand All @@ -12,13 +13,34 @@ use crate::{db::connectors::neo4j::get_neo4j_graph, queries};

pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"];

type TagList = Vec<String>;
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub struct Taggers(Vec<String>);

#[async_trait]
impl RedisOps for Taggers {
async fn prefix() -> String {
String::from("Tags:Taggers")
}
}

impl AsRef<[String]> for Taggers {
fn as_ref(&self) -> &[String] {
&self.0
}
}

impl Taggers {
fn from_vec(vec: Vec<String>) -> Self {
Self(vec)
}
}

#[derive(Deserialize, Serialize, ToSchema, Debug)]
pub struct HotTag {
label: String,
tagger_ids: Vec<String>,
taggers_id: Taggers,
post_count: u64,
taggers_count: usize,
}

// Define a newtype wrapper
Expand All @@ -45,7 +67,6 @@ impl FromIterator<HotTag> for HotTags {
}

impl HotTags {
// TODO: Move another struct that is more related with reindexer process?
pub async fn set_global_tag_scores() -> Result<(), Box<dyn Error + Send + Sync>> {
let mut result;
{
Expand All @@ -57,14 +78,22 @@ impl HotTags {
}

if let Some(row) = result.next().await? {
let hot_tags: Vec<(f64, &str)> = row.get("hot_tags")?;
Self::put_index_sorted_set(&TAG_GLOBAL_HOT, hot_tags.as_slice()).await?
let hot_tags_score: Vec<(f64, &str)> = row.get("hot_tags_score")?;
Self::put_index_sorted_set(&TAG_GLOBAL_HOT, hot_tags_score.as_slice()).await?;
let hot_tags_users: Vec<(&str, Vec<String>)> = row.get("hot_tags_users")?;
// Add all the users_id in the SET
for (label, user_list) in hot_tags_users.into_iter() {
let label_user_list = Taggers::from_vec(user_list);
label_user_list.put_index_set(&[label]).await?;
}
}
Ok(())
}

pub async fn get_global_tags_stream(
skip: Option<usize>,
limit: Option<usize>,
taggers_limit: Option<usize>,
) -> Result<Option<Self>, Box<dyn Error + Send + Sync>> {
let hot_tags = Self::try_from_index_sorted_set(
&TAG_GLOBAL_HOT,
Expand All @@ -81,22 +110,28 @@ impl HotTags {
return Ok(None);
}

let tag_list: Vec<&str> = hot_tags.iter().map(|(label, _)| label.as_ref()).collect();
let query = queries::get_global_hot_tags_taggers(tag_list.as_slice());
let tag_user_list = retrieve_from_graph::<Vec<TagList>>(query, "tag_user_ids")
.await?
.unwrap();
// Collect the labels as a vector of string slices
let labels: Vec<&str> = hot_tags.iter().map(|(label, _)| label.as_str()).collect();
let label_slice: &[&str] = &labels;

let list = Taggers::try_from_multiple_sets(label_slice, taggers_limit).await?;

let hot_tags_stream: HotTags = hot_tags
.into_iter()
.zip(tag_user_list)
.map(|((label, score), tagger_ids)| HotTag {
label,
tagger_ids,
post_count: score as u64,
.zip(list)
.filter_map(|((label, score), user_ids)| match user_ids {
Some((tagger_list, taggers_count)) => {
let taggers_id = Taggers::from_vec(tagger_list);
Some(HotTag {
label,
taggers_id,
post_count: score as u64,
taggers_count,
})
}
None => None,
})
.collect();

Ok(Some(hot_tags_stream))
}

Expand All @@ -106,7 +141,8 @@ impl HotTags {
) -> Result<Option<HotTags>, Box<dyn Error + Send + Sync>> {
// We cannot use here limit and skip because we want to get all the users reach by
let users =
UserStream::get_user_list_from_reach(&user_id, reach, None, Some(10000)).await?;
UserStream::get_user_list_from_reach(&user_id, reach, None, Some(isize::MAX as usize))
.await?;
match users {
Some(users) => get_users_tags_by_reach(&users).await,
None => Ok(None),
Expand Down
8 changes: 8 additions & 0 deletions src/routes/v0/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,17 @@ pub const STREAM_POSTS_REACH_ROUTE: &str = concatcp!(STREAM_POSTS_ROUTE, "/reach
pub const STREAM_POSTS_BOOKMARKED_ROUTE: &str =
concatcp!(STREAM_POSTS_ROUTE, "/bookmarks/:user_id");
pub const STREAM_TAGS_ROUTE: &str = concatcp!(STREAM_PREFIX, "/tags");
// Changed
pub const STREAM_TAGS_GLOBAL_ROUTE: &str = concatcp!(STREAM_TAGS_ROUTE, "/global");
// Changed
pub const STREAM_TAGS_REACH_ROUTE: &str = concatcp!(STREAM_TAGS_ROUTE, "/reached/:user_id/:reach");

// Search routes
const SEARCH_PREFIX: &str = concatcp!(VERSION_ROUTE, "/search");
pub const SEARCH_USERS_ROUTE: &str = concatcp!(SEARCH_PREFIX, "/users");

// Tag routes
const TAG_PREFIX: &str = concatcp!(VERSION_ROUTE, "/tag");
pub const TAG_HOT_ROUTE: &str = concatcp!(TAG_PREFIX, "/hot");
pub const TAG_REACH_ROUTE: &str = concatcp!(TAG_PREFIX, "/reached/:user_id/:reach");
pub const TAG_TAGGERS_ROUTE: &str = concatcp!(TAG_PREFIX, "/taggers/:label");
Loading

0 comments on commit ceef03b

Please sign in to comment.