Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tags streams #68

Merged
merged 12 commits into from
Aug 21, 2024
139 changes: 137 additions & 2 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down Expand Up @@ -70,6 +72,136 @@ fn bench_get_post_tags(c: &mut Criterion) {
);
}

fn bench_get_global_hot_tags(c: &mut Criterion) {
println!("******************************************************************************");
println!("Test the performance of getting a global tags, using index or graph as needed");
println!("******************************************************************************");

run_setup();

let rt = Runtime::new().unwrap();

c.bench_function("bench_get_global_hot_tags", |b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTags::get_global_tags_stream(None, Some(40))
.await
.unwrap();
criterion::black_box(stream_tag);
});
});
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by following reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Following);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_following_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Following,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by followers reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Followers);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_followers_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Followers,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by friends reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Friends);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_friends_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Friends,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
Expand All @@ -81,8 +213,11 @@ criterion_group! {
name = benches;
config = configure_criterion();
targets = bench_get_user_tags,
bench_get_post_tags

bench_get_post_tags,
bench_get_global_hot_tags,
bench_get_following_reach_hot_tags,
bench_get_followers_reach_hot_tags,
bench_get_friends_reach_hot_tags
}

criterion_main!(benches);
49 changes: 49 additions & 0 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub async fn setup_graph() -> Result<(), Box<dyn std::error::Error>> {
let indexes = [
"CREATE INDEX userIdIndex IF NOT EXISTS FOR (u:User) ON (u.id)",
"CREATE INDEX postIdIndex IF NOT EXISTS FOR (p:Post) ON (p.id)",
"CREATE INDEX taggedLabelIndex IF NOT EXISTS FOR ()-[r:TAGGED]-() ON (r.label)",
];

let queries = constraints.iter().chain(indexes.iter());
Expand Down Expand Up @@ -209,6 +210,54 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
query(&query_string).param("user_id", user_id)
}

// Retrieves popular tags across the entire network
pub fn get_global_hot_tags_scores() -> Query {
query(
"
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
",
)
}

// Retrieves popular hot tags taggers across the entire network
pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
query(
"
UNWIND $labels AS tag_name
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = tag_name
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS userIds
RETURN COLLECT(userIds) AS tag_user_ids
",
)
.param("labels", tag_list)
}

// Analyzes tag usage for a specific list of user IDs. Groups tags by name,
// showing for each: label, post count and list of user IDs
// Orders by post_count (descending).
// Note: Only considers users from the provided users_id list.
pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query {
query(
"
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE u.id = id
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY hot_tag.post_count DESC
RETURN COLLECT(hot_tag) AS hot_tags
",
)
.param("ids", users_id)
}

pub fn get_thread(author_id: &str, post_id: &str, skip: usize, limit: usize) -> Query {
query(
"
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum Error {
InternalServerError { source: Box<dyn std::error::Error> },
#[error("Bookmarks not found: {user_id}")]
BookmarksNotFound { user_id: String },
#[error("Tags not found")]
TagsNotFound { reach: String },
#[error("Invalid input: {message}")]
InvalidInput { message: String },
// Add other custom errors here
Expand All @@ -27,6 +29,7 @@ impl IntoResponse for Error {
Error::UserNotFound { .. } => StatusCode::NOT_FOUND,
Error::PostNotFound { .. } => StatusCode::NOT_FOUND,
Error::BookmarksNotFound { .. } => StatusCode::NOT_FOUND,
Error::TagsNotFound { .. } => StatusCode::NOT_FOUND,
Error::InvalidInput { .. } => StatusCode::BAD_REQUEST,
Error::InternalServerError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
// Map other errors to appropriate status codes
Expand All @@ -41,6 +44,9 @@ impl IntoResponse for Error {
Error::BookmarksNotFound { user_id } => {
debug!("Bookmarks not found: {}", user_id)
}
Error::TagsNotFound { reach } => {
debug!("Tags not found: {}", reach)
}
Error::InvalidInput { message } => {
debug!("Invalid input: {}", message)
}
Expand Down
2 changes: 2 additions & 0 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::ops::Deref;
use utoipa::ToSchema;

pub mod post;
pub mod stream;
pub mod user;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct Tag {
Expand Down
Loading
Loading