Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tags streams #68

Merged
merged 12 commits into from
Aug 21, 2024
8 changes: 3 additions & 5 deletions benches/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ fn bench_stream_posts_total_engagement(c: &mut Criterion) {
});
}


fn bench_stream_most_followed(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the user streams for most followed users.");
Expand All @@ -131,11 +130,10 @@ fn bench_stream_most_followed(c: &mut Criterion) {
.await
.unwrap();
criterion::black_box(user_stream);
});
},
);
});
});
}

fn bench_stream_user_posts(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the post streams for a specific user.");
Expand Down
144 changes: 141 additions & 3 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down Expand Up @@ -56,7 +58,10 @@ fn bench_get_post_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("bench_get_post_tags", format!("user_id: {}, post_id: {}", user_id, post_id)),
BenchmarkId::new(
"bench_get_post_tags",
format!("user_id: {}, post_id: {}", user_id, post_id),
),
&[user_id, post_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
Expand All @@ -67,6 +72,136 @@ fn bench_get_post_tags(c: &mut Criterion) {
);
}

fn bench_get_global_hot_tags(c: &mut Criterion) {
println!("******************************************************************************");
println!("Test the performance of getting a global tags, using index or graph as needed");
println!("******************************************************************************");

run_setup();

let rt = Runtime::new().unwrap();

c.bench_function("bench_get_global_hot_tags", |b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTags::get_global_tags_stream(None, Some(40))
.await
.unwrap();
criterion::black_box(stream_tag);
});
});
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by following reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Following);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_following_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Following,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by followers reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Followers);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_followers_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Followers,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by friends reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Friends);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_friends_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Friends,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
Expand All @@ -78,8 +213,11 @@ criterion_group! {
name = benches;
config = configure_criterion();
targets = bench_get_user_tags,
bench_get_post_tags

bench_get_post_tags,
bench_get_global_hot_tags,
bench_get_following_reach_hot_tags,
bench_get_followers_reach_hot_tags,
bench_get_friends_reach_hot_tags
}

criterion_main!(benches);
4 changes: 1 addition & 3 deletions benches/user.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use pubky_nexus::models::traits::Collection;
use pubky_nexus::models::user::{
Relationship, UserCounts, UserDetails, UserView,
};
use pubky_nexus::models::user::{Relationship, UserCounts, UserDetails, UserView};
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down
51 changes: 51 additions & 0 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,54 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
}
query(&query_string).param("user_id", user_id)
}

// Retrieves popular tags across the entire network
// Results ordered by post count (descending), effectively ranking "hot" tags.
Copy link
Collaborator

@SHAcollision SHAcollision Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query does not seem to order the results by desceding post count. It would need something like this before the RETURN statement:

ORDER BY uniquePosts DESC

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tipogi This query still does not sort by descending post count. Maybe it works as intended this way, but the docstring is not correct.

pub fn get_global_hot_tags_scores(threshold: u64) -> Query {
query(
"
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
WHERE uniquePosts >= $threshold
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
",
)
.param("threshold", threshold as f64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this threshold is a good idea. It's okay if we use more memory on this sorted set.

}

// Retrieves popular hot tags taggers across the entire network
pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
query(
"
UNWIND $labels AS tag_name
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = tag_name
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS userIds
RETURN COLLECT(userIds) AS tag_user_ids
",
)
.param("labels", tag_list)
}

// Analyzes tag usage for a specific list of user IDs. Groups tags by name,
// showing for each: label, post count and list of user IDs
// Orders by post_count (descending).
// Note: Only considers users from the provided users_id list.
pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query {
query(
"
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE u.id = id
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY hot_tag.post_count DESC
RETURN COLLECT(hot_tag) AS hot_tags
",
)
.param("ids", users_id)
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum Error {
InternalServerError { source: Box<dyn std::error::Error> },
#[error("Bookmarks not found: {user_id}")]
BookmarksNotFound { user_id: String },
#[error("Tags not found")]
TagsNotFound { reach: String },
// Add other custom errors here
}

Expand All @@ -25,6 +27,7 @@ impl IntoResponse for Error {
Error::UserNotFound { .. } => StatusCode::NOT_FOUND,
Error::PostNotFound { .. } => StatusCode::NOT_FOUND,
Error::BookmarksNotFound { .. } => StatusCode::NOT_FOUND,
Error::TagsNotFound { .. } => StatusCode::NOT_FOUND,
Error::InternalServerError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
// Map other errors to appropriate status codes
};
Expand All @@ -38,6 +41,9 @@ impl IntoResponse for Error {
Error::BookmarksNotFound { user_id } => {
debug!("Bookmarks not found: {}", user_id)
}
Error::TagsNotFound { reach } => {
debug!("Tags not found: {}", reach)
}
Error::InternalServerError { source } => error!("Internal server error: {:?}", source),
};

Expand Down
2 changes: 1 addition & 1 deletion src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod info;
pub mod post;
pub mod tag;
pub mod traits;
pub mod user;
pub mod traits;
2 changes: 2 additions & 0 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::ops::Deref;
use utoipa::ToSchema;

pub mod post;
pub mod stream;
pub mod user;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct Tag {
Expand Down
4 changes: 2 additions & 2 deletions src/models/tag/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ impl Deref for PostTags {
impl PostTags {
pub async fn get_by_id(
user_id: &str,
post_id: &str
post_id: &str,
) -> Result<Option<PostTags>, Box<dyn std::error::Error + Send + Sync>> {
Self::get_from_graph(user_id, post_id).await
}

async fn get_from_graph(
user_id: &str,
post_id: &str
post_id: &str,
) -> Result<Option<PostTags>, Box<dyn std::error::Error + Send + Sync>> {
let query = queries::post_tags(user_id, post_id);
let graph = get_neo4j_graph()?;
Expand Down
Loading