Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tags streams #68

Merged
merged 12 commits into from
Aug 21, 2024
8 changes: 3 additions & 5 deletions benches/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ fn bench_stream_posts_total_engagement(c: &mut Criterion) {
});
}


fn bench_stream_most_followed(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the user streams for most followed users.");
Expand All @@ -131,11 +130,10 @@ fn bench_stream_most_followed(c: &mut Criterion) {
.await
.unwrap();
criterion::black_box(user_stream);
});
},
);
});
});
}

fn bench_stream_user_posts(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the post streams for a specific user.");
Expand Down
96 changes: 70 additions & 26 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTag;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
use pubky_nexus::setup;
Expand Down Expand Up @@ -58,7 +58,10 @@ fn bench_get_post_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_post_tags", format!("user_id: {}, post_id: {}", user_id, post_id)),
BenchmarkId::new(
"bench_get_post_tags",
format!("user_id: {}, post_id: {}", user_id, post_id),
),
&[user_id, post_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
Expand All @@ -78,21 +81,26 @@ fn bench_get_global_hot_tags(c: &mut Criterion) {

let rt = Runtime::new().unwrap();

c.bench_function(
"bench_get_global_hot_tags",
|b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTag::get_global_tags_stream(None, Some(40)).await.unwrap();
criterion::black_box(stream_tag);
});
},
);
c.bench_function("bench_get_global_hot_tags", |b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTags::get_global_tags_stream(None, Some(40))
.await
.unwrap();
criterion::black_box(stream_tag);
});
});
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by following reach, using index or graph as needed");
println!("****************************************************************************************");
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by following reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

Expand All @@ -101,21 +109,35 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_following_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
BenchmarkId::new(
"bench_get_user_following_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Following).await.unwrap();
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Following,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by followers reach, using index or graph as needed");
println!("****************************************************************************************");
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by followers reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

Expand All @@ -124,21 +146,35 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_followers_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
BenchmarkId::new(
"bench_get_user_followers_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Followers).await.unwrap();
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Followers,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
println!("****************************************************************************************");
println!("Test the performance of getting tags by friends reach, using index or graph as needed");
println!("****************************************************************************************");
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by friends reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

Expand All @@ -147,11 +183,19 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_user_friends_hot_tags", format!("user_id: {}, reach: {}", user_id, reach_by)),
BenchmarkId::new(
"bench_get_user_friends_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTag::get_stream_tags_by_reach(String::from(params[0]), UserStreamType::Friends).await.unwrap();
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Friends,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
Expand Down
4 changes: 1 addition & 3 deletions benches/user.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use pubky_nexus::models::traits::Collection;
use pubky_nexus::models::user::{
Relationship, UserCounts, UserDetails, UserView,
};
use pubky_nexus::models::user::{Relationship, UserCounts, UserDetails, UserView};
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down
57 changes: 19 additions & 38 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,69 +212,50 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
// Retrieves popular tags across the entire network
// Results ordered by post count (descending), effectively ranking "hot" tags.
Copy link
Collaborator

@SHAcollision SHAcollision Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query does not seem to order the results by desceding post count. It would need something like this before the RETURN statement:

ORDER BY uniquePosts DESC

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tipogi This query still does not sort by descending post count. Maybe it works as intended this way, but the docstring is not correct.

pub fn get_global_hot_tags_scores(threshold: u64) -> Query {
query("
query(
"
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
WHERE uniquePosts >= $threshold
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
")
",
)
.param("threshold", threshold as f64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this threshold is a good idea. It's okay if we use more memory on this sorted set.

}

// Retrieves popular hot tags taggers across the entire network
pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
query("
query(
"
UNWIND $labels AS tag_name
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = tag_name
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS userIds
RETURN COLLECT(userIds) AS tag_user_ids
")
",
)
.param("labels", tag_list)
}

// Analyzes tag usage for a specific list of user IDs. Groups tags by name,
// showing for each: label, post count, list of user IDs and total usage count.
// Orders by user count and usage (descending).
// Note: Only considers users from the provided ID list.
pub fn get_tags_from_user_ids(user_ids: &[&str]) -> Query {
query("
// showing for each: label, post count and list of user IDs
// Orders by post_count (descending).
// Note: Only considers users from the provided users_id list.
pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query {
query(
"
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE u.id = id
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts, COUNT(*) AS times
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
times: times,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY times DESC
ORDER BY hot_tag.post_count DESC
RETURN COLLECT(hot_tag) AS hot_tags
")
.param("ids", user_ids)
",
)
.param("ids", users_id)
}

// Finds tags used by specified user IDs (Followers | Following | Friends), then counts their usage across all users.
// Note: Initial tag set from input user list, but final counts include all users.
pub fn get_general_count_tags_from_user_ids(user_ids: &[&str]) -> Query {
query("
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(Post)
WHERE u.id = id
WITH COLLECT(DISTINCT tag.label) AS userTags
UNWIND userTags AS label
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = label
WITH label, COUNT(*) AS times, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
times: times,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY times DESC
RETURN COLLECT(hot_tag) AS hot_tags
")
.param("ids", user_ids)
}
2 changes: 1 addition & 1 deletion src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod info;
pub mod post;
pub mod tag;
pub mod traits;
pub mod user;
pub mod traits;
2 changes: 1 addition & 1 deletion src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::ops::Deref;
use utoipa::ToSchema;

pub mod post;
pub mod user;
pub mod stream;
pub mod user;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
Expand Down
4 changes: 2 additions & 2 deletions src/models/tag/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ impl Deref for PostTags {
impl PostTags {
pub async fn get_by_id(
user_id: &str,
post_id: &str
post_id: &str,
) -> Result<Option<PostTags>, Box<dyn std::error::Error + Send + Sync>> {
Self::get_from_graph(user_id, post_id).await
}

async fn get_from_graph(
user_id: &str,
post_id: &str
post_id: &str,
) -> Result<Option<PostTags>, Box<dyn std::error::Error + Send + Sync>> {
let query = queries::post_tags(user_id, post_id);
let graph = get_neo4j_graph()?;
Expand Down
Loading