From ceef03bd730752015f68bf1338f5a28281cedc5e Mon Sep 17 00:00:00 2001 From: Tipogi <103417381+tipogi@users.noreply.github.com> Date: Fri, 23 Aug 2024 13:06:22 +0200 Subject: [PATCH] feat: Hot tags stream (#86) * Add hot tag taggers SET * Adapt endpoint for client requirements * Refactor and add endpoint to retrive tag taggers --- benches/tag.rs | 32 ++++++++- src/db/graph/queries.rs | 11 +-- src/db/kv/index/sets.rs | 65 +++++++++++++++++ src/db/kv/traits.rs | 30 ++++++++ src/models/tag/global.rs | 23 ++++++ src/models/tag/mod.rs | 1 + src/models/tag/post.rs | 3 +- src/models/tag/stream.rs | 70 ++++++++++++++----- src/routes/v0/endpoints.rs | 8 +++ src/routes/v0/mod.rs | 5 ++ src/routes/v0/stream/mod.rs | 7 +- src/routes/v0/stream/tags.rs | 91 ------------------------ src/routes/v0/tag/global.rs | 131 +++++++++++++++++++++++++++++++++++ src/routes/v0/tag/mod.rs | 24 +++++++ tests/tags/hot.rs | 29 +++++--- 15 files changed, 397 insertions(+), 133 deletions(-) create mode 100644 src/models/tag/global.rs delete mode 100644 src/routes/v0/stream/tags.rs create mode 100644 src/routes/v0/tag/global.rs create mode 100644 src/routes/v0/tag/mod.rs diff --git a/benches/tag.rs b/benches/tag.rs index 7ddbcca9..b11cf735 100644 --- a/benches/tag.rs +++ b/benches/tag.rs @@ -1,5 +1,6 @@ use criterion::{criterion_group, criterion_main}; use criterion::{BenchmarkId, Criterion}; +use pubky_nexus::models::tag::global::TagGlobal; use pubky_nexus::models::tag::post::PostTags; use pubky_nexus::models::tag::stream::HotTags; use pubky_nexus::models::tag::user::UserTags; @@ -83,7 +84,7 @@ fn bench_get_global_hot_tags(c: &mut Criterion) { c.bench_function("bench_get_global_hot_tags", |b| { b.to_async(&rt).iter(|| async { - let stream_tag = HotTags::get_global_tags_stream(None, Some(40)) + let stream_tag = HotTags::get_global_tags_stream(None, Some(40), Some(10)) .await .unwrap(); criterion::black_box(stream_tag); @@ -91,6 +92,34 @@ fn bench_get_global_hot_tags(c: &mut Criterion) { }); } +fn bench_get_global_tag_taggers(c: &mut Criterion) { + println!( + "****************************************************************************************" + ); + println!("Test the performance of getting global tag taggers"); + println!( + "****************************************************************************************" + ); + + run_setup(); + + let label = "ha"; + let rt: Runtime = Runtime::new().unwrap(); + + c.bench_with_input( + BenchmarkId::new("bench_get_global_tag_taggers", format!("label: {}", label)), + &[label], + |b, ¶ms| { + b.to_async(&rt).iter(|| async { + let tag_taggers = TagGlobal::get_tag_taggers(String::from(params[0]), None) + .await + .unwrap(); + criterion::black_box(tag_taggers); + }); + }, + ); +} + fn bench_get_following_reach_hot_tags(c: &mut Criterion) { println!( "****************************************************************************************" @@ -215,6 +244,7 @@ criterion_group! { targets = bench_get_user_tags, bench_get_post_tags, bench_get_global_hot_tags, + bench_get_global_tag_taggers, bench_get_following_reach_hot_tags, bench_get_followers_reach_hot_tags, bench_get_friends_reach_hot_tags diff --git a/src/db/graph/queries.rs b/src/db/graph/queries.rs index 08d53a04..73208224 100644 --- a/src/db/graph/queries.rs +++ b/src/db/graph/queries.rs @@ -210,13 +210,13 @@ pub fn get_user_following(user_id: &str, skip: Option, limit: Option Query { query( " MATCH (u:User)-[tag:TAGGED]->(p:Post) - WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts - RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags + WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts, COLLECT(DISTINCT u.id) AS user_ids + RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags_score, COLLECT([label, user_ids]) AS hot_tags_users ", ) } @@ -248,8 +248,9 @@ pub fn get_tags_by_user_ids(users_id: &[&str]) -> Query { WITH tag.label AS label, COLLECT(DISTINCT u.id) AS taggers, COUNT(DISTINCT p) AS uniquePosts WITH { label: label, - tagger_ids: taggers, - post_count: uniquePosts + taggers_id: taggers, + post_count: uniquePosts, + taggers_count: SIZE(taggers) } AS hot_tag ORDER BY hot_tag.post_count DESC RETURN COLLECT(hot_tag) AS hot_tags diff --git a/src/db/kv/index/sets.rs b/src/db/kv/index/sets.rs index 7e733e1d..796c79aa 100644 --- a/src/db/kv/index/sets.rs +++ b/src/db/kv/index/sets.rs @@ -138,3 +138,68 @@ pub async fn check_set_member( Ok((false, false)) } } + +/// Retrieves multiple sets from Redis in a single call using a pipeline. +/// +/// This asynchronous function fetches multiple sets from Redis based on the provided keys using a Redis pipeline. +/// It returns a vector of optional tuples, where each tuple contains a vector of elements from the corresponding set +/// and an integer representing the number of elements that were excluded if a limit was specified. +/// +/// # Arguments +/// +/// * `prefix` - A string slice representing the prefix to be prepended to each Redis key. +/// * `keys` - A slice of string slices representing the keys under which the sets are stored. +/// * `limit` - An optional `usize` specifying the maximum number of elements to retrieve from each set. +/// If `None`, all elements will be retrieved. +/// +/// # Returns +/// +/// Returns a `Result` containing: +/// * `Ok(Vec, usize)>>)` - A vector where each element is an `Option` containing a tuple: +/// * `Some((Vec, usize))` - The vector of elements from the set and the count of excluded elements. +/// * `None` - Indicates that the corresponding set does not exist. +/// * `Err` - An error if the Redis operation fails. +/// +/// # Errors +/// +/// Returns an error if the Redis connection fails or the pipeline query encounters an issue. + +pub async fn get_multiple_sets( + prefix: &str, + keys: &[&str], + limit: Option, +) -> Result, usize)>>, Box> { + let mut redis_conn = get_redis_conn().await?; + + // Create a Redis pipeline + let mut pipe = redis::pipe(); + + // Add each SMEMBERS command to the pipeline for all keys + for key in keys { + let index_key = format!("{}:{}", prefix, key); + pipe.smembers(index_key); + } + + // Execute the pipeline + let results: Vec> = pipe.query_async(&mut redis_conn).await?; + + let taggers_list = results + .into_iter() + .map(|set| { + if set.is_empty() { + None + } else { + let set_length = set.len(); + match limit { + Some(set_limit) if set_limit < set_length => { + let limited_set = set.into_iter().take(set_limit).collect(); + Some((limited_set, set_length)) + } + _ => Some((set, set_length)), + } + } + }) + .collect(); + + Ok(taggers_list) +} diff --git a/src/db/kv/traits.rs b/src/db/kv/traits.rs index 2b58f4e5..48d01e71 100644 --- a/src/db/kv/traits.rs +++ b/src/db/kv/traits.rs @@ -373,4 +373,34 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { let key = key_parts.join(":"); sorted_sets::get_lex_range("Sorted", &key, min, max, skip, limit).await } + + /// Fetches multiple sets from Redis using the specified key components. + /// + /// This asynchronous function retrieves multiple sets from Redis based on the provided key components. + /// It returns a vector where each element is an optional vector containing the elements of the corresponding set. + /// If a particular set does not exist, the corresponding position in the returned vector will be `None`. + /// + /// # Arguments + /// + /// * `key_parts_list` - A slice of string slices, where each inner slice represents the components + /// used to construct the Redis key for the corresponding set. + /// * `limit` - An optional parameter specifying the maximum number of elements to fetch from each set. + /// If `None`, all elements will be retrieved. + /// + /// # Returns + /// + /// A `Vec>>` where: + /// * Each inner `Vec` contains the elements of a set retrieved from Redis. + /// * `None` indicates that the set does not exist for the corresponding key. + /// + /// # Errors + /// + /// This function will return an error if the operation fails, such as in cases of a Redis connection issue. + async fn try_from_multiple_sets( + key_parts_list: &[&str], + limit: Option, + ) -> Result, usize)>>, Box> { + let prefix = Self::prefix().await; + sets::get_multiple_sets(&prefix, key_parts_list, limit).await + } } diff --git a/src/models/tag/global.rs b/src/models/tag/global.rs new file mode 100644 index 00000000..a441821b --- /dev/null +++ b/src/models/tag/global.rs @@ -0,0 +1,23 @@ +use super::stream::Taggers; +use crate::{models::user::UserStreamType, RedisOps}; +use std::error::Error; + +pub struct TagGlobal {} + +impl TagGlobal { + pub async fn get_tag_taggers( + label: String, + reach: Option, + ) -> Result>, Box> { + match reach { + None => read_from_set(&label).await, + _ => Ok(None), + } + } +} + +pub async fn read_from_set( + label: &str, +) -> Result>, Box> { + Taggers::try_from_index_set(&[label], None, None).await +} diff --git a/src/models/tag/mod.rs b/src/models/tag/mod.rs index 49bfa9f1..663e6f50 100644 --- a/src/models/tag/mod.rs +++ b/src/models/tag/mod.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::ops::Deref; use utoipa::ToSchema; +pub mod global; pub mod post; pub mod stream; pub mod user; diff --git a/src/models/tag/post.rs b/src/models/tag/post.rs index 0ee84d00..af91dc8b 100644 --- a/src/models/tag/post.rs +++ b/src/models/tag/post.rs @@ -1,8 +1,7 @@ -use std::ops::Deref; - use crate::db::connectors::neo4j::get_neo4j_graph; use crate::queries; use serde::{Deserialize, Serialize}; +use std::ops::Deref; use utoipa::ToSchema; use super::Tags; diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index 3d37f062..78990a2a 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -1,3 +1,4 @@ +use axum::async_trait; use neo4rs::Query; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -12,13 +13,34 @@ use crate::{db::connectors::neo4j::get_neo4j_graph, queries}; pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"]; -type TagList = Vec; +#[derive(Serialize, Deserialize, Debug, ToSchema)] +pub struct Taggers(Vec); + +#[async_trait] +impl RedisOps for Taggers { + async fn prefix() -> String { + String::from("Tags:Taggers") + } +} + +impl AsRef<[String]> for Taggers { + fn as_ref(&self) -> &[String] { + &self.0 + } +} + +impl Taggers { + fn from_vec(vec: Vec) -> Self { + Self(vec) + } +} #[derive(Deserialize, Serialize, ToSchema, Debug)] pub struct HotTag { label: String, - tagger_ids: Vec, + taggers_id: Taggers, post_count: u64, + taggers_count: usize, } // Define a newtype wrapper @@ -45,7 +67,6 @@ impl FromIterator for HotTags { } impl HotTags { - // TODO: Move another struct that is more related with reindexer process? pub async fn set_global_tag_scores() -> Result<(), Box> { let mut result; { @@ -57,14 +78,22 @@ impl HotTags { } if let Some(row) = result.next().await? { - let hot_tags: Vec<(f64, &str)> = row.get("hot_tags")?; - Self::put_index_sorted_set(&TAG_GLOBAL_HOT, hot_tags.as_slice()).await? + let hot_tags_score: Vec<(f64, &str)> = row.get("hot_tags_score")?; + Self::put_index_sorted_set(&TAG_GLOBAL_HOT, hot_tags_score.as_slice()).await?; + let hot_tags_users: Vec<(&str, Vec)> = row.get("hot_tags_users")?; + // Add all the users_id in the SET + for (label, user_list) in hot_tags_users.into_iter() { + let label_user_list = Taggers::from_vec(user_list); + label_user_list.put_index_set(&[label]).await?; + } } Ok(()) } + pub async fn get_global_tags_stream( skip: Option, limit: Option, + taggers_limit: Option, ) -> Result, Box> { let hot_tags = Self::try_from_index_sorted_set( &TAG_GLOBAL_HOT, @@ -81,22 +110,28 @@ impl HotTags { return Ok(None); } - let tag_list: Vec<&str> = hot_tags.iter().map(|(label, _)| label.as_ref()).collect(); - let query = queries::get_global_hot_tags_taggers(tag_list.as_slice()); - let tag_user_list = retrieve_from_graph::>(query, "tag_user_ids") - .await? - .unwrap(); + // Collect the labels as a vector of string slices + let labels: Vec<&str> = hot_tags.iter().map(|(label, _)| label.as_str()).collect(); + let label_slice: &[&str] = &labels; + + let list = Taggers::try_from_multiple_sets(label_slice, taggers_limit).await?; let hot_tags_stream: HotTags = hot_tags .into_iter() - .zip(tag_user_list) - .map(|((label, score), tagger_ids)| HotTag { - label, - tagger_ids, - post_count: score as u64, + .zip(list) + .filter_map(|((label, score), user_ids)| match user_ids { + Some((tagger_list, taggers_count)) => { + let taggers_id = Taggers::from_vec(tagger_list); + Some(HotTag { + label, + taggers_id, + post_count: score as u64, + taggers_count, + }) + } + None => None, }) .collect(); - Ok(Some(hot_tags_stream)) } @@ -106,7 +141,8 @@ impl HotTags { ) -> Result, Box> { // We cannot use here limit and skip because we want to get all the users reach by let users = - UserStream::get_user_list_from_reach(&user_id, reach, None, Some(10000)).await?; + UserStream::get_user_list_from_reach(&user_id, reach, None, Some(isize::MAX as usize)) + .await?; match users { Some(users) => get_users_tags_by_reach(&users).await, None => Ok(None), diff --git a/src/routes/v0/endpoints.rs b/src/routes/v0/endpoints.rs index 3ce9fb3e..85624e5d 100644 --- a/src/routes/v0/endpoints.rs +++ b/src/routes/v0/endpoints.rs @@ -42,9 +42,17 @@ pub const STREAM_POSTS_REACH_ROUTE: &str = concatcp!(STREAM_POSTS_ROUTE, "/reach pub const STREAM_POSTS_BOOKMARKED_ROUTE: &str = concatcp!(STREAM_POSTS_ROUTE, "/bookmarks/:user_id"); pub const STREAM_TAGS_ROUTE: &str = concatcp!(STREAM_PREFIX, "/tags"); +// Changed pub const STREAM_TAGS_GLOBAL_ROUTE: &str = concatcp!(STREAM_TAGS_ROUTE, "/global"); +// Changed pub const STREAM_TAGS_REACH_ROUTE: &str = concatcp!(STREAM_TAGS_ROUTE, "/reached/:user_id/:reach"); // Search routes const SEARCH_PREFIX: &str = concatcp!(VERSION_ROUTE, "/search"); pub const SEARCH_USERS_ROUTE: &str = concatcp!(SEARCH_PREFIX, "/users"); + +// Tag routes +const TAG_PREFIX: &str = concatcp!(VERSION_ROUTE, "/tag"); +pub const TAG_HOT_ROUTE: &str = concatcp!(TAG_PREFIX, "/hot"); +pub const TAG_REACH_ROUTE: &str = concatcp!(TAG_PREFIX, "/reached/:user_id/:reach"); +pub const TAG_TAGGERS_ROUTE: &str = concatcp!(TAG_PREFIX, "/taggers/:label"); diff --git a/src/routes/v0/mod.rs b/src/routes/v0/mod.rs index 1bbc3e2f..7142f731 100644 --- a/src/routes/v0/mod.rs +++ b/src/routes/v0/mod.rs @@ -7,6 +7,7 @@ pub mod info; pub mod post; pub mod search; pub mod stream; +pub mod tag; pub mod user; pub fn routes() -> Router { @@ -15,6 +16,7 @@ pub fn routes() -> Router { let route_user = user::routes(); let route_stream = stream::routes(); let route_search = search::routes(); + let route_tag = tag::routes(); let route_openapi = SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::merge_docs()); @@ -23,6 +25,7 @@ pub fn routes() -> Router { .merge(route_user) .merge(route_stream) .merge(route_search) + .merge(route_tag) .merge(route_openapi) } @@ -37,6 +40,8 @@ impl ApiDoc { combined.merge(user::UserApiDoc::merge_docs()); combined.merge(stream::StreamApiDoc::merge_docs()); combined.merge(search::SearchApiDoc::merge_docs()); + combined.merge(search::SearchApiDoc::merge_docs()); + combined.merge(tag::TagApiDoc::merge_docs()); combined } } diff --git a/src/routes/v0/stream/mod.rs b/src/routes/v0/stream/mod.rs index 70f07f1e..12d9e8a5 100644 --- a/src/routes/v0/stream/mod.rs +++ b/src/routes/v0/stream/mod.rs @@ -4,7 +4,6 @@ use axum::Router; use utoipa::OpenApi; mod posts; -mod tags; mod users; pub fn routes() -> Router { @@ -17,10 +16,7 @@ pub fn routes() -> Router { endpoints::STREAM_POSTS_ROUTE => posts::stream_global_posts_handler, endpoints::STREAM_POSTS_USER_ROUTE => posts::stream_user_posts_handler, endpoints::STREAM_POSTS_REACH_ROUTE => posts::stream_posts_by_reach_handler, - endpoints::STREAM_POSTS_BOOKMARKED_ROUTE => posts::stream_bookmarked_posts_handler, - // Tags stream - endpoints::STREAM_TAGS_GLOBAL_ROUTE => tags::stream_hot_tags_handler, - endpoints::STREAM_TAGS_REACH_ROUTE => tags::stream_tags_by_reach_handler + endpoints::STREAM_POSTS_BOOKMARKED_ROUTE => posts::stream_bookmarked_posts_handler ) } @@ -32,7 +28,6 @@ impl StreamApiDoc { pub fn merge_docs() -> utoipa::openapi::OpenApi { let mut combined = users::StreamUsersApiDocs::openapi(); combined.merge(posts::StreamPostsApiDocs::openapi()); - combined.merge(tags::StreamTagsApiDocs::openapi()); combined } } diff --git a/src/routes/v0/stream/tags.rs b/src/routes/v0/stream/tags.rs deleted file mode 100644 index c1442ad4..00000000 --- a/src/routes/v0/stream/tags.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::models::tag::stream::{HotTag, HotTags}; -use crate::models::user::UserStreamType; -use crate::routes::v0::endpoints::{STREAM_TAGS_GLOBAL_ROUTE, STREAM_TAGS_REACH_ROUTE}; -use crate::{Error, Result}; -use axum::extract::{Path, Query}; -use axum::Json; -use log::{error, info}; -use serde::Deserialize; -use utoipa::OpenApi; - -#[derive(Deserialize)] -pub struct HotTagsQuery { - skip: Option, - limit: Option, -} - -#[utoipa::path( - get, - path = STREAM_TAGS_GLOBAL_ROUTE, - params( - ("skip" = Option, Query, description = "Skip N tags"), - ("limit" = Option, Query, description = "Retrieve N tag") - ), - tag = "Stream hot Tags", - responses( - (status = 200, description = "Retrieve hot tags stream", body = Vec), - (status = 404, description = "Hot tags not found"), - (status = 500, description = "Internal server error") - ) -)] -pub async fn stream_hot_tags_handler(Query(query): Query) -> Result> { - info!("GET {STREAM_TAGS_GLOBAL_ROUTE}"); - - let skip = query.skip.unwrap_or(0); - let limit = query.limit.unwrap_or(40); - - match HotTags::get_global_tags_stream(Some(skip), Some(limit)).await { - Ok(Some(hot_tags)) => Ok(Json(hot_tags)), - Ok(None) => Err(Error::TagsNotFound { - reach: String::from("GLOBAL"), - }), - Err(source) => Err(Error::InternalServerError { source }), - } -} - -#[derive(Deserialize)] -pub struct StreamTagsReachQuery { - user_id: String, - reach: Option, -} - -#[utoipa::path( - get, - path = STREAM_TAGS_REACH_ROUTE, - tag = "Stream Tags by reach", - params( - ("user_id" = String, Path, description = "User Pubky ID"), - ("reach" = UserStreamType, Path, description = "Reach type: Follower | Following | Friends") - ), - responses( - (status = 200, description = "Retrieve tags by reach cluster", body = Vec), - (status = 404, description = "Hot tags not found"), - (status = 500, description = "Internal server error") - ) -)] -pub async fn stream_tags_by_reach_handler( - Path(path): Path, -) -> Result> { - info!("GET {STREAM_TAGS_REACH_ROUTE}"); - - let reach = path.reach.unwrap_or(UserStreamType::Following); - let user_id = path.user_id; - - match HotTags::get_stream_tags_by_reach(user_id, reach).await { - Ok(Some(hot_tags)) => Ok(Json(hot_tags)), - Ok(None) => Err(Error::TagsNotFound { - reach: String::from("REACH"), - }), - Err(source) => { - error!("Internal Server ERROR: {:?}", source); - Err(Error::InternalServerError { source }) - } - } -} - -#[derive(OpenApi)] -#[openapi( - paths(stream_hot_tags_handler, stream_tags_by_reach_handler), - components(schemas(HotTags, HotTag)) -)] -pub struct StreamTagsApiDocs; diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs new file mode 100644 index 00000000..61e79ad6 --- /dev/null +++ b/src/routes/v0/tag/global.rs @@ -0,0 +1,131 @@ +use crate::models::tag::global::TagGlobal; +use crate::models::tag::stream::{HotTag, HotTags, Taggers}; +use crate::models::user::UserStreamType; +use crate::routes::v0::endpoints::{TAG_HOT_ROUTE, TAG_REACH_ROUTE, TAG_TAGGERS_ROUTE}; +use crate::{Error, Result}; +use axum::extract::{Path, Query}; +use axum::Json; +use log::{error, info}; +use serde::Deserialize; +use utoipa::OpenApi; + +#[derive(Deserialize)] +pub struct HotTagsQuery { + skip: Option, + limit: Option, + max_taggers: Option, +} + +#[utoipa::path( + get, + path = TAG_HOT_ROUTE, + params( + ("skip" = Option, Query, description = "Skip N tags"), + ("limit" = Option, Query, description = "Retrieve N tag"), + ("max_taggers" = Option, Query, description = "Retrieve N user_id for each tag") + ), + tag = "Global hot Tags", + responses( + // TODO: Add hot tags + (status = 200, description = "Retrieve hot tags", body = Vec), + (status = 404, description = "Hot tags not found"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn hot_tags_handler(Query(query): Query) -> Result> { + info!("GET {TAG_HOT_ROUTE}"); + + let skip = query.skip.unwrap_or(0); + let limit = query.limit.unwrap_or(40); + let max_taggers = query.max_taggers.unwrap_or(20); + + match HotTags::get_global_tags_stream(Some(skip), Some(limit), Some(max_taggers)).await { + Ok(Some(hot_tags)) => Ok(Json(hot_tags)), + Ok(None) => Err(Error::TagsNotFound { + reach: String::from("GLOBAL"), + }), + Err(source) => Err(Error::InternalServerError { source }), + } +} + +#[derive(Deserialize)] +pub struct TagTaggersQuery { + reach: Option, +} + +#[utoipa::path( + get, + path = TAG_TAGGERS_ROUTE, + tag = "Global tag Taggers", + params( + ("label" = String, Path, description = "Tag name"), + ("reach" = UserStreamType, Path, description = "Reach type: Follower | Following | Friends") + ), + responses( + (status = 200, description = "Taggers", body = Vec), + (status = 404, description = "Tag not found"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn tag_taggers_handler( + Path(label): Path, + Query(query): Query, +) -> Result>> { + info!( + "GET {TAG_TAGGERS_ROUTE} label:{}, reach:{:?}", + label, query.reach + ); + + let reach = query.reach; + + match TagGlobal::get_tag_taggers(label.clone(), reach).await { + Ok(Some(post)) => Ok(Json(post)), + Ok(None) => Err(Error::TagsNotFound { reach: label }), + Err(source) => Err(Error::InternalServerError { source }), + } +} + +#[derive(Deserialize)] +pub struct TagsByReachQuery { + user_id: String, + reach: Option, +} + +#[utoipa::path( + get, + path = TAG_REACH_ROUTE, + tag = "Global Tags by reach", + params( + ("user_id" = String, Path, description = "User Pubky ID"), + ("reach" = UserStreamType, Path, description = "Reach type: Follower | Following | Friends") + ), + responses( + (status = 200, description = "Retrieve tags by reach cluster", body = Vec), + (status = 404, description = "Hot tags not found"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn tags_by_reach_handler(Path(path): Path) -> Result> { + info!("GET {TAG_REACH_ROUTE}"); + + let reach = path.reach.unwrap_or(UserStreamType::Following); + let user_id = path.user_id; + + match HotTags::get_stream_tags_by_reach(user_id, reach).await { + Ok(Some(hot_tags)) => Ok(Json(hot_tags)), + Ok(None) => Err(Error::TagsNotFound { + reach: String::from("REACH"), + }), + Err(source) => { + error!("Internal Server ERROR: {:?}", source); + Err(Error::InternalServerError { source }) + } + } +} + +#[derive(OpenApi)] +#[openapi( + paths(hot_tags_handler, tag_taggers_handler, tags_by_reach_handler), + components(schemas(HotTags, HotTag, Taggers)) +)] +pub struct TagGlobalApiDoc; diff --git a/src/routes/v0/tag/mod.rs b/src/routes/v0/tag/mod.rs new file mode 100644 index 00000000..552d8d11 --- /dev/null +++ b/src/routes/v0/tag/mod.rs @@ -0,0 +1,24 @@ +use crate::register_routes; +use crate::routes::v0::endpoints; +use axum::Router; +use utoipa::OpenApi; + +mod global; + +pub fn routes() -> Router { + register_routes!(Router::new(), + endpoints::TAG_HOT_ROUTE => global::hot_tags_handler, + endpoints::TAG_REACH_ROUTE => global::tags_by_reach_handler, + endpoints::TAG_TAGGERS_ROUTE => global::tag_taggers_handler + ) +} + +#[derive(OpenApi)] +#[openapi()] +pub struct TagApiDoc; + +impl TagApiDoc { + pub fn merge_docs() -> utoipa::openapi::OpenApi { + global::TagGlobalApiDoc::openapi() + } +} diff --git a/tests/tags/hot.rs b/tests/tags/hot.rs index 4ae10e4a..44136173 100644 --- a/tests/tags/hot.rs +++ b/tests/tags/hot.rs @@ -10,14 +10,16 @@ struct StreamTagMockup { label: String, tagger_ids: usize, post_count: u64, + taggers_count: usize, } impl StreamTagMockup { - fn new(label: String, tagger_ids: usize, post_count: u64) -> Self { + fn new(label: String, tagger_ids: usize, post_count: u64, taggers_count: usize) -> Self { Self { label, tagger_ids, post_count, + taggers_count, } } } @@ -38,13 +40,17 @@ fn analyse_hot_tags_structure(tags: &Vec) { for tag in tags { assert!(tag["label"].is_string(), "label should be a string"); assert!( - tag["tagger_ids"].is_array(), + tag["taggers_id"].is_array(), "tagger_ids should be an array" ); assert!( tag["post_count"].is_number(), "post_count should be a number" ); + assert!( + tag["taggers_count"].is_number(), + "post_count should be a number" + ); } } @@ -52,13 +58,14 @@ fn analyse_hot_tags_structure(tags: &Vec) { fn compare_unit_hot_tag(tag: &Value, hot_tag: StreamTagMockup) { assert_eq!(tag["post_count"], hot_tag.post_count); assert_eq!(tag["label"], hot_tag.label); - let tagger_ids = tag["tagger_ids"].as_array().unwrap(); + assert_eq!(tag["taggers_count"], hot_tag.taggers_count); + let tagger_ids = tag["taggers_id"].as_array().unwrap(); assert_eq!(tagger_ids.len(), hot_tag.tagger_ids); } #[tokio::test] async fn test_global_hot_tags() -> Result<()> { - let body = make_request("/v0/stream/tags/global").await?; + let body = make_request("/v0/tag/hot").await?; assert!(body.is_array()); @@ -68,7 +75,7 @@ async fn test_global_hot_tags() -> Result<()> { analyse_hot_tags_structure(tags); // Analyse the tag that is in the 4th index - let hot_tag = StreamTagMockup::new(String::from("ha"), 9, 16); + let hot_tag = StreamTagMockup::new(String::from("ha"), 9, 16, 9); compare_unit_hot_tag(&tags[4], hot_tag); Ok(()) @@ -77,7 +84,7 @@ async fn test_global_hot_tags() -> Result<()> { #[tokio::test] async fn test_hot_tags_by_following_reach() -> Result<()> { let endpoint = &format!( - "/v0/stream/tags/reached/{}/{:?}", + "/v0/tag/reached/{}/{:?}", PEER_PUBKY, UserStreamType::Following ); @@ -91,7 +98,7 @@ async fn test_hot_tags_by_following_reach() -> Result<()> { analyse_hot_tags_structure(tags); // Analyse the tag that is in the 1st index - let hot_tag = StreamTagMockup::new(String::from("pubky"), 4, 5); + let hot_tag = StreamTagMockup::new(String::from("pubky"), 4, 5, 4); compare_unit_hot_tag(&tags[1], hot_tag); Ok(()) @@ -100,7 +107,7 @@ async fn test_hot_tags_by_following_reach() -> Result<()> { #[tokio::test] async fn test_hot_tags_by_followers_reach() -> Result<()> { let endpoint = &format!( - "/v0/stream/tags/reached/{}/{:?}", + "/v0/tag/reached/{}/{:?}", PEER_PUBKY, UserStreamType::Followers ); @@ -114,7 +121,7 @@ async fn test_hot_tags_by_followers_reach() -> Result<()> { analyse_hot_tags_structure(tags); // Analyse the tag that is in the 1st index - let hot_tag = StreamTagMockup::new(String::from("test"), 3, 3); + let hot_tag = StreamTagMockup::new(String::from("test"), 3, 3, 3); compare_unit_hot_tag(&tags[1], hot_tag); Ok(()) @@ -123,7 +130,7 @@ async fn test_hot_tags_by_followers_reach() -> Result<()> { #[tokio::test] async fn test_hot_tags_by_friends_reach() -> Result<()> { let endpoint = &format!( - "/v0/stream/tags/reached/{}/{:?}", + "/v0/tag/reached/{}/{:?}", PEER_PUBKY, UserStreamType::Friends ); @@ -137,7 +144,7 @@ async fn test_hot_tags_by_friends_reach() -> Result<()> { analyse_hot_tags_structure(tags); // Analyse the tag that is in the 1st index - let hot_tag = StreamTagMockup::new(String::from("pubky"), 2, 3); + let hot_tag = StreamTagMockup::new(String::from("pubky"), 2, 3, 2); compare_unit_hot_tag(&tags[1], hot_tag); Ok(())