Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tags streams #68

Merged
merged 12 commits into from
Aug 21, 2024
98 changes: 96 additions & 2 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTag;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down Expand Up @@ -67,6 +69,95 @@ fn bench_get_post_tags(c: &mut Criterion) {
);
}

fn bench_get_global_hot_tags(c: &mut Criterion) {
println!("******************************************************************************");
println!("Test the performance of getting a global tags, using index or graph as needed");
println!("******************************************************************************");

run_setup();

let rt = Runtime::new().unwrap();

c.bench_function(
"bench_get_global_hot_tags",
|b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTag::get_global_tags_stream(None, Some(40)).await.unwrap();
criterion::black_box(stream_tag);
});
},
);
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by following reach, using index or graph as needed");
println!("****************************************************************************************");

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Following);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_following_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Following).await.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by followers reach, using index or graph as needed");
println!("****************************************************************************************");

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Followers);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_followers_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Followers).await.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by friends reach, using index or graph as needed");
println!("****************************************************************************************");

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Friends);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_friends_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Friends).await.unwrap();
criterion::black_box(profile);
});
},
);
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
Expand All @@ -78,8 +169,11 @@ criterion_group! {
name = benches;
config = configure_criterion();
targets = bench_get_user_tags,
bench_get_post_tags

bench_get_post_tags,
bench_get_global_hot_tags,
bench_get_following_reach_hot_tags,
bench_get_followers_reach_hot_tags,
bench_get_friends_reach_hot_tags
}

criterion_main!(benches);
68 changes: 68 additions & 0 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,71 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
}
query(&query_string).param("user_id", user_id)
}

// Retrieves popular tags across the entire network
// Results ordered by post count (descending), effectively ranking "hot" tags.
Copy link
Collaborator

@SHAcollision SHAcollision Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query does not seem to order the results by desceding post count. It would need something like this before the RETURN statement:

ORDER BY uniquePosts DESC

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tipogi This query still does not sort by descending post count. Maybe it works as intended this way, but the docstring is not correct.

pub fn get_global_hot_tags_scores() -> Query {
query("
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
")
}

// Retrieves popular hot tags taggers across the entire network
pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
query("
UNWIND $labels AS tag_name
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = tag_name
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS userIds
RETURN COLLECT(userIds) AS tag_user_ids
")
.param("labels", tag_list)
}

// Analyzes tag usage for a specific list of user IDs. Groups tags by name,
// showing for each: label, post count, list of user IDs and total usage count.
// Orders by user count and usage (descending).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering seems to be only by usage count (times), not by user count.

// Note: Only considers users from the provided ID list.
pub fn get_tags_from_user_ids(user_ids: &[&str]) -> Query {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: when naming functions we are typically using ..._by_user_ids instead of from

query("
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE u.id = id
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts, COUNT(*) AS times
WITH {
label: label,
times: times,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY times DESC
RETURN COLLECT(hot_tag) AS hot_tags
")
.param("ids", user_ids)
}

// Finds tags used by specified user IDs (Followers | Following | Friends), then counts their usage across all users.
// Note: Initial tag set from input user list, but final counts include all users.
pub fn get_general_count_tags_from_user_ids(user_ids: &[&str]) -> Query {
query("
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(Post)
WHERE u.id = id
WITH COLLECT(DISTINCT tag.label) AS userTags
UNWIND userTags AS label
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference of

WITH COLLECT(DISTINCT tag.label) AS userTags
UNWIND userTags AS label

and

WITH DISTINCT tag.label AS label

It is generally more efficient than the COLLECT and UNWIND approach because it avoids the overhead of creating a list and then unwinding it. But maybe it is not a direct replacement here?

MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = label
WITH label, COUNT(*) AS times, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
times: times,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY times DESC
RETURN COLLECT(hot_tag) AS hot_tags
")
.param("ids", user_ids)
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum Error {
InternalServerError { source: Box<dyn std::error::Error> },
#[error("Bookmarks not found: {user_id}")]
BookmarksNotFound { user_id: String },
#[error("Tags not found")]
TagsNotFound { reach: String },
// Add other custom errors here
}

Expand All @@ -25,6 +27,7 @@ impl IntoResponse for Error {
Error::UserNotFound { .. } => StatusCode::NOT_FOUND,
Error::PostNotFound { .. } => StatusCode::NOT_FOUND,
Error::BookmarksNotFound { .. } => StatusCode::NOT_FOUND,
Error::TagsNotFound { .. } => StatusCode::NOT_FOUND,
Error::InternalServerError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
// Map other errors to appropriate status codes
};
Expand All @@ -38,6 +41,9 @@ impl IntoResponse for Error {
Error::BookmarksNotFound { user_id } => {
debug!("Bookmarks not found: {}", user_id)
}
Error::TagsNotFound { reach } => {
debug!("Tags not found: {}", reach)
}
Error::InternalServerError { source } => error!("Internal server error: {:?}", source),
};

Expand Down
2 changes: 2 additions & 0 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use utoipa::ToSchema;

pub mod post;
pub mod user;
pub mod stream;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct Tag {
Expand Down
105 changes: 105 additions & 0 deletions src/models/tag/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use neo4rs::Query;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use std::error::Error;

use crate::db::kv::index::sorted_sets::Sorting;
use crate::RedisOps;
use crate::{db::connectors::neo4j::get_neo4j_graph, queries};
use crate::models::user::{UserStream, UserStreamType};

pub const GLOBAL_HOT_TAGS: [&str; 3] = ["Tags", "Global", "Hot"];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TAG_GLOBAL_HOT to stick with the other existing constant naming sorted sets.


#[derive(Deserialize, Serialize, ToSchema, Debug)]
pub struct HotTag {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if too late or if it matters. It feels like we have HotTag as unit and HotTags should the a Vec<HotTags> that implements all of the functionality? In the same way as our other streams are pub struct UserStream(Vec<UserView>); or pub struct PostStream(Vec<PostView>);

I do not think it matters much at all, but it is a slightly different approach :)

label: String,
tagger_ids: Vec<String>,
post_count: u64
}

impl HotTag {
fn new(label: String, tagger_ids: Vec<String>, post_count: u64) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly needed, although we still have many of these new() on our codebase.

Self { label, tagger_ids, post_count }
}
}

impl RedisOps for HotTag {}

type TagList = Vec<String>;



impl HotTag {
pub async fn get_global_tags_stream(skip: Option<usize>, limit: Option<usize>) -> Result<Option<Vec<Self>>, Box<dyn Error + Send + Sync>> {
let hot_tags = match Self::try_from_index_sorted_set(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: this one is a matter of taste. Instead of a match we could do

let hot_tags = Self::try_from....
....
....
.await?
.unwrap_or_default();

if hot_tags.is_empty() {
    return Ok(None);
}

&GLOBAL_HOT_TAGS,
None,
None,
skip,
limit,
Sorting::Descending
)
.await? {
Some(tags) => tags,
None => return Ok(None)
};

let tag_list: Vec<&str> = hot_tags.iter().map(|(label, _)| label.as_ref()).collect();
let query = queries::get_global_hot_tags_taggers(tag_list.as_slice());
let tag_user_list = retrieve_hot_tags_from_graph(query).await?.unwrap();

let hot_tags_stream: Vec<HotTag> = hot_tags
.into_iter()
.zip(tag_user_list)
.map(|((label, score), tagger_ids)| {
HotTag::new(label, tagger_ids, score as u64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we delete new() we can do
HotTag { label, tagger_ids, post_count: socre as u64 }

}).collect();

Ok(Some(hot_tags_stream))
}

pub async fn get_stream_tags_by_reach(user_id: String, reach: UserStreamType) -> Result<Option<Vec<Self>>, Box<dyn Error + Send + Sync>> {
// We cannot use here limit and skip because we want to get all the users reach by
let users = UserStream::get_user_list_from_reach(&user_id, reach, None, Some(10000)).await?;
match users {
Some(users) => retrieve_users_tags_by_reach(&users).await,
None => Ok(None),
}
}
}

async fn retrieve_users_tags_by_reach(users: &[String]) -> Result<Option<Vec<HotTag>>, Box<dyn Error + Send + Sync>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically use get_ on our function names. Let's avoid using new lexicon (retrieve_) for the same meaning and functionality.

If all of these functions are specific for HotTags maybe they belong as HotTags methods. We can also avoid using hot_tags in the function names if that is the case.

let user_slice = users.iter().map(AsRef::as_ref).collect::<Vec<&str>>();
let query = queries::get_tags_from_user_ids(user_slice.as_slice());
retrieve_by_reach_hot_tags(query).await
}

async fn retrieve_hot_tags_from_graph(query: Query) -> Result<Option<Vec<TagList>>, Box<dyn Error + Send + Sync>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think much of this, but maybe we can reduce repetitive code. The code for retrieving results from Neo4j is similar across these functions. Maybe something like this with some fixes works?

// Helper function to abstract fetching data from the Neo4j graph
async fn retrieve_from_graph<T: serde::de::DeserializeOwned>(
    query: Query,
) -> Result<Vec<T>, Box<dyn Error + Send + Sync>> {
    let graph = get_neo4j_graph()?.lock().await;
    let mut result = graph.execute(query).await?;
    let mut data = Vec::new();

    while let Some(row) = result.next().await? {
        data.push(row.get::<T>(0)?);
    }

    Ok(data)
}

let mut result;
{
let graph = get_neo4j_graph()?;

let graph = graph.lock().await;
result = graph.execute(query).await?;
}
if let Some(row) = result.next().await? {
let hot_tags: Vec<TagList> = row.get("tag_user_ids")?;
return Ok(Some(hot_tags));
}
Ok(None)
}

async fn retrieve_by_reach_hot_tags(query: Query) -> Result<Option<Vec<HotTag>>, Box<dyn Error + Send + Sync>> {
let mut result;
{
let graph = get_neo4j_graph()?;

let graph = graph.lock().await;
result = graph.execute(query).await?;
}
if let Some(row) = result.next().await? {
let hot_tags: Vec<HotTag> = row.get("hot_tags")?;
return Ok(Some(hot_tags));
}
Ok(None)
}
Loading