Skip to content

Commit

Permalink
feat: add tags streams (#68)
Browse files Browse the repository at this point in the history
* Cypher queries for tags
* Global hot tags endpoint
* Hot tags by reach endpoint: Following, Followers, Friend, Most Followed

---------

Co-authored-by: SHAcollision
  • Loading branch information
tipogi authored Aug 21, 2024
1 parent 91ec606 commit 9c96ee5
Show file tree
Hide file tree
Showing 14 changed files with 629 additions and 27 deletions.
139 changes: 137 additions & 2 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
Expand Down Expand Up @@ -70,6 +72,136 @@ fn bench_get_post_tags(c: &mut Criterion) {
);
}

fn bench_get_global_hot_tags(c: &mut Criterion) {
println!("******************************************************************************");
println!("Test the performance of getting a global tags, using index or graph as needed");
println!("******************************************************************************");

run_setup();

let rt = Runtime::new().unwrap();

c.bench_function("bench_get_global_hot_tags", |b| {
b.to_async(&rt).iter(|| async {
let stream_tag = HotTags::get_global_tags_stream(None, Some(40))
.await
.unwrap();
criterion::black_box(stream_tag);
});
});
}

fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by following reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Following);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_following_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Following,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by followers reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Followers);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_followers_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Followers,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
println!(
"****************************************************************************************"
);
println!(
"Test the performance of getting tags by friends reach, using index or graph as needed"
);
println!(
"****************************************************************************************"
);

run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", UserStreamType::Friends);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new(
"bench_get_user_friends_hot_tags",
format!("user_id: {}, reach: {}", user_id, reach_by),
),
&[user_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = HotTags::get_stream_tags_by_reach(
String::from(params[0]),
UserStreamType::Friends,
)
.await
.unwrap();
criterion::black_box(profile);
});
},
);
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
Expand All @@ -81,8 +213,11 @@ criterion_group! {
name = benches;
config = configure_criterion();
targets = bench_get_user_tags,
bench_get_post_tags

bench_get_post_tags,
bench_get_global_hot_tags,
bench_get_following_reach_hot_tags,
bench_get_followers_reach_hot_tags,
bench_get_friends_reach_hot_tags
}

criterion_main!(benches);
49 changes: 49 additions & 0 deletions src/db/graph/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub async fn setup_graph() -> Result<(), Box<dyn std::error::Error>> {
let indexes = [
"CREATE INDEX userIdIndex IF NOT EXISTS FOR (u:User) ON (u.id)",
"CREATE INDEX postIdIndex IF NOT EXISTS FOR (p:Post) ON (p.id)",
"CREATE INDEX taggedLabelIndex IF NOT EXISTS FOR ()-[r:TAGGED]-() ON (r.label)",
];

let queries = constraints.iter().chain(indexes.iter());
Expand Down Expand Up @@ -209,6 +210,54 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz
query(&query_string).param("user_id", user_id)
}

// Retrieves popular tags across the entire network
pub fn get_global_hot_tags_scores() -> Query {
query(
"
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags
",
)
}

// Retrieves popular hot tags taggers across the entire network
pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
query(
"
UNWIND $labels AS tag_name
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE tag.label = tag_name
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS userIds
RETURN COLLECT(userIds) AS tag_user_ids
",
)
.param("labels", tag_list)
}

// Analyzes tag usage for a specific list of user IDs. Groups tags by name,
// showing for each: label, post count and list of user IDs
// Orders by post_count (descending).
// Note: Only considers users from the provided users_id list.
pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query {
query(
"
UNWIND $ids AS id
MATCH (u:User)-[tag:TAGGED]->(p:Post)
WHERE u.id = id
WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts
WITH {
label: label,
tagger_ids: taggers,
post_count: uniquePosts
} AS hot_tag
ORDER BY hot_tag.post_count DESC
RETURN COLLECT(hot_tag) AS hot_tags
",
)
.param("ids", users_id)
}

pub fn get_thread(author_id: &str, post_id: &str, skip: usize, limit: usize) -> Query {
query(
"
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum Error {
InternalServerError { source: Box<dyn std::error::Error> },
#[error("Bookmarks not found: {user_id}")]
BookmarksNotFound { user_id: String },
#[error("Tags not found")]
TagsNotFound { reach: String },
#[error("Invalid input: {message}")]
InvalidInput { message: String },
// Add other custom errors here
Expand All @@ -27,6 +29,7 @@ impl IntoResponse for Error {
Error::UserNotFound { .. } => StatusCode::NOT_FOUND,
Error::PostNotFound { .. } => StatusCode::NOT_FOUND,
Error::BookmarksNotFound { .. } => StatusCode::NOT_FOUND,
Error::TagsNotFound { .. } => StatusCode::NOT_FOUND,
Error::InvalidInput { .. } => StatusCode::BAD_REQUEST,
Error::InternalServerError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
// Map other errors to appropriate status codes
Expand All @@ -41,6 +44,9 @@ impl IntoResponse for Error {
Error::BookmarksNotFound { user_id } => {
debug!("Bookmarks not found: {}", user_id)
}
Error::TagsNotFound { reach } => {
debug!("Tags not found: {}", reach)
}
Error::InvalidInput { message } => {
debug!("Invalid input: {}", message)
}
Expand Down
2 changes: 2 additions & 0 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::ops::Deref;
use utoipa::ToSchema;

pub mod post;
pub mod stream;
pub mod user;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct Tag {
Expand Down
Loading

0 comments on commit 9c96ee5

Please sign in to comment.