diff --git a/benches/streams.rs b/benches/streams.rs index 5209b4ea..3cde9875 100644 --- a/benches/streams.rs +++ b/benches/streams.rs @@ -32,7 +32,7 @@ criterion_group! { tag::bench_stream_tag_timeline, tag::bench_stream_tag_total_engagement, user::bench_stream_users_by_username_search, - user::bench_stream_pioneers, + user::bench_stream_influencers, user::bench_stream_following, user::bench_stream_most_followed, kind::bench_stream_post_kind_short, diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index fa3d0e1e..4aaa2f55 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -1,6 +1,9 @@ use crate::run_setup; use criterion::Criterion; -use pubky_nexus::models::user::{UserStream, UserStreamSource}; +use pubky_nexus::{ + models::user::{UserStream, UserStreamInput, UserStreamSource}, + types::StreamReach, +}; use tokio::runtime::Runtime; /// USER STREAMS BENCHMARKS @@ -17,14 +20,17 @@ pub fn bench_stream_following(c: &mut Criterion) { c.bench_function("stream_following", |b| { b.to_async(&rt).iter(|| async { - let user_stream = UserStream::get_by_id( - Some(user_id), - None, - None, - Some(20), - UserStreamSource::Pioneers, - None, - ) + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: Some(String::from(user_id)), + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::Influencers, + reach: Some(StreamReach::Following), + depth: None, + timeframe: None, + preview: None, + }) .await .unwrap(); criterion::black_box(user_stream); @@ -43,14 +49,17 @@ pub fn bench_stream_most_followed(c: &mut Criterion) { c.bench_function("stream_most_followed", |b| { b.to_async(&rt).iter(|| async { - let user_stream = UserStream::get_by_id( - None, - None, - None, - Some(20), - UserStreamSource::MostFollowed, - None, - ) + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: None, + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::MostFollowed, + reach: None, + depth: None, + timeframe: None, + preview: None, + }) .await .unwrap(); criterion::black_box(user_stream); @@ -83,21 +92,30 @@ pub fn bench_stream_users_by_username_search(c: &mut Criterion) { }); } -pub fn bench_stream_pioneers(c: &mut Criterion) { +pub fn bench_stream_influencers(c: &mut Criterion) { println!("***************************************"); - println!("Benchmarking the user streams for pioneer users."); + println!("Benchmarking the user streams for influencer users."); println!("***************************************"); run_setup(); let rt = Runtime::new().unwrap(); - c.bench_function("stream_pioneers", |b| { + c.bench_function("stream_influencers", |b| { b.to_async(&rt).iter(|| async { - let user_stream = - UserStream::get_by_id(None, None, None, Some(20), UserStreamSource::Pioneers, None) - .await - .unwrap(); + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: None, + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::Influencers, + reach: Some(StreamReach::Wot(3)), + depth: None, + timeframe: None, + preview: None, + }) + .await + .unwrap(); criterion::black_box(user_stream); }); }); diff --git a/benches/tag.rs b/benches/tag.rs index 6b2bd902..e608f3fe 100644 --- a/benches/tag.rs +++ b/benches/tag.rs @@ -2,10 +2,10 @@ use criterion::{criterion_group, criterion_main}; use criterion::{BenchmarkId, Criterion}; use pubky_nexus::models::tag::global::TagGlobal; use pubky_nexus::models::tag::post::TagPost; -use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput, TagStreamReach}; +use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput}; use pubky_nexus::models::tag::traits::{TagCollection, TaggersCollection}; use pubky_nexus::models::tag::user::TagUser; -use pubky_nexus::types::{Pagination, Timeframe}; +use pubky_nexus::types::{Pagination, StreamReach, Timeframe}; use setup::run_setup; use std::time::Duration; use tokio::runtime::Runtime; @@ -245,7 +245,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Following); + let reach_by = format!("{:?}", StreamReach::Following); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -265,7 +265,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Following), + Some(StreamReach::Following), &input, ) .await @@ -286,7 +286,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Followers); + let reach_by = format!("{:?}", StreamReach::Followers); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -306,7 +306,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Followers), + Some(StreamReach::Followers), &input, ) .await @@ -327,7 +327,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Friends); + let reach_by = format!("{:?}", StreamReach::Friends); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -347,7 +347,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Friends), + Some(StreamReach::Friends), &input, ) .await diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index bacc4312..9634de36 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -1,9 +1,9 @@ use crate::models::post::StreamSource; use crate::models::tag::stream::HotTagsInput; -use crate::models::tag::stream::TagStreamReach; use crate::types::Pagination; +use crate::types::StreamReach; use crate::types::StreamSorting; -use log::debug; +use crate::types::Timeframe; use neo4rs::{query, Query}; use pubky_app_specs::PubkyAppPostKind; @@ -414,21 +414,23 @@ pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query { .param("labels", tag_list) } -fn tag_stream_reach_to_graph_subquery(reach: &TagStreamReach) -> String { - let query = match reach { - TagStreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)", - TagStreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)", - TagStreamReach::Friends => { - "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)" +fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String { + match reach { + StreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)".to_string(), + StreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)".to_string(), + StreamReach::Friends => { + "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)".to_string() } - }; - String::from(query) + StreamReach::Wot(depth) => { + format!("MATCH (viewer)-[:FOLLOWS*1..{}]->(tagger:User)", depth) + } + } } pub fn get_tag_taggers_by_reach( label: &str, user_id: &str, - reach: TagStreamReach, + reach: StreamReach, skip: usize, limit: usize, ) -> Query { @@ -443,7 +445,7 @@ pub fn get_tag_taggers_by_reach( SKIP $skip LIMIT $limit RETURN COLLECT(reach.id) as tagger_ids ", - tag_stream_reach_to_graph_subquery(&reach) + stream_reach_to_graph_subquery(&reach) ) .as_str(), ) @@ -455,7 +457,7 @@ pub fn get_tag_taggers_by_reach( pub fn get_hot_tags_by_reach( user_id: &str, - reach: TagStreamReach, + reach: StreamReach, tags_query: &HotTagsInput, ) -> Query { let input_tagged_type = match &tags_query.tagged_type { @@ -485,7 +487,7 @@ pub fn get_hot_tags_by_reach( SKIP $skip LIMIT $limit RETURN COLLECT(hot_tag) as hot_tags ", - tag_stream_reach_to_graph_subquery(&reach), + stream_reach_to_graph_subquery(&reach), input_tagged_type, tags_query.taggers_limit ) @@ -504,7 +506,6 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInput) -> Query { None => String::from("Post|User"), }; let (from, to) = tags_query.timeframe.to_timestamp_range(); - debug!("get_global_hot_tags query: {:?} {:?}", from, to); query( format!( " @@ -535,6 +536,84 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInput) -> Query { .param("to", to) } +pub fn get_influencers_by_reach( + user_id: &str, + reach: StreamReach, + skip: usize, + limit: usize, + timeframe: &Timeframe, +) -> Query { + let (from, to) = timeframe.to_timestamp_range(); + query( + format!( + " + {} + WHERE user.id = $user_id + + OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(reach) + WHERE follow.indexed_at >= $from AND follow.indexed_at < $to + + OPTIONAL MATCH (reach)-[tag:TAGGED]->(tagged:Post) + WHERE tag.indexed_at >= $from AND tag.indexed_at < $to + + OPTIONAL MATCH (reach)-[authored:AUTHORED]->(post:Post) + WHERE authored.indexed_at >= $from AND authored.indexed_at < $to + + WITH reach, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, + COUNT(DISTINCT post) AS posts_count + WITH {{ + id: reach.id, + score: (tags_count + posts_count) * sqrt(followers_count) + }} AS influencer + ORDER BY influencer.score DESC, reach.id ASC + SKIP $skip LIMIT $limit + RETURN COLLECT(influencer) as influencers + ", + stream_reach_to_graph_subquery(&reach), + ) + .as_str(), + ) + .param("user_id", user_id) + .param("skip", skip as i64) + .param("limit", limit as i64) + .param("from", from) + .param("to", to) +} + +pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) -> Query { + let (from, to) = timeframe.to_timestamp_range(); + query( + " + MATCH (user:User) + WITH DISTINCT user + + OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user) + WHERE follow.indexed_at >= $from AND follow.indexed_at < $to + + OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post) + WHERE tag.indexed_at >= $from AND tag.indexed_at < $to + + OPTIONAL MATCH (user)-[authored:AUTHORED]->(post:Post) + WHERE authored.indexed_at >= $from AND authored.indexed_at < $to + + WITH user, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, + COUNT(DISTINCT post) AS posts_count + WITH { + id: user.id, + score: (tags_count + posts_count) * sqrt(followers_count + 1) + } AS influencer + WHERE influencer.id IS NOT NULL + ORDER BY influencer.score DESC, influencer.id ASC + SKIP $skip LIMIT $limit + RETURN COLLECT(influencer) as influencers + ", + ) + .param("skip", skip as i64) + .param("limit", limit as i64) + .param("from", from) + .param("to", to) +} + pub fn get_files_by_ids(key_pair: &[&[&str]]) -> Query { query( " diff --git a/src/models/tag/global.rs b/src/models/tag/global.rs index 0aaeecdd..354fbcda 100644 --- a/src/models/tag/global.rs +++ b/src/models/tag/global.rs @@ -1,6 +1,6 @@ -use super::stream::{TagStreamReach, Taggers}; +use super::stream::Taggers; use crate::db::graph::exec::retrieve_from_graph; -use crate::types::DynError; +use crate::types::{DynError, StreamReach}; use crate::{queries, RedisOps}; pub struct TagGlobal {} @@ -9,7 +9,7 @@ impl TagGlobal { pub async fn get_tag_taggers( label: String, user_id: Option, - reach: Option, + reach: Option, skip: usize, limit: usize, ) -> Result>, DynError> { @@ -19,7 +19,7 @@ impl TagGlobal { get_tag_taggers_by_reach( &label, &id, - reach.unwrap_or(TagStreamReach::Friends), + reach.unwrap_or(StreamReach::Friends), skip, limit, ) @@ -40,7 +40,7 @@ pub async fn read_from_set( pub async fn get_tag_taggers_by_reach( label: &str, user_id: &str, - reach: TagStreamReach, + reach: StreamReach, skip: usize, limit: usize, ) -> Result>, DynError> { diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index 02584b8e..03e8734c 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -1,6 +1,6 @@ use crate::db::graph::exec::retrieve_from_graph; use crate::db::kv::index::sorted_sets::SortOrder; -use crate::types::{DynError, Timeframe}; +use crate::types::{DynError, StreamReach, Timeframe}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -13,19 +13,11 @@ use crate::{RedisOps, ScoreAction}; pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"]; -const GLOBAL_HOT_TAGS_PREFIX: &str = "Cached:Hot:Tags"; +const GLOBAL_HOT_TAGS_PREFIX: &str = "Cache:Hot:Tags"; #[derive(Serialize, Deserialize, Debug, ToSchema, Clone)] pub struct Taggers(pub Vec); -#[derive(Deserialize, Debug, ToSchema, Clone)] -#[serde(rename_all = "snake_case")] -pub enum TagStreamReach { - Followers, - Following, - Friends, -} - #[derive(Deserialize, Debug, ToSchema, Clone)] pub enum TaggedType { Post, @@ -231,14 +223,14 @@ impl HotTags { pub async fn get_hot_tags( user_id: Option, - reach: Option, + reach: Option, tags_query: &HotTagsInput, ) -> Result, DynError> { match user_id { Some(user_id) => { HotTags::get_hot_tags_by_reach( user_id, - reach.unwrap_or(TagStreamReach::Friends), + reach.unwrap_or(StreamReach::Friends), tags_query, ) .await @@ -249,7 +241,7 @@ impl HotTags { async fn get_hot_tags_by_reach( user_id: String, - reach: TagStreamReach, + reach: StreamReach, tags_query: &HotTagsInput, ) -> Result, DynError> { let query = queries::get::get_hot_tags_by_reach(user_id.as_str(), reach, tags_query); diff --git a/src/models/user/counts.rs b/src/models/user/counts.rs index 48349b1e..3250460e 100644 --- a/src/models/user/counts.rs +++ b/src/models/user/counts.rs @@ -77,7 +77,7 @@ impl UserCounts { pub async fn put_to_index(&self, user_id: &str) -> Result<(), DynError> { self.put_index_json(&[user_id], None).await?; UserStream::add_to_most_followed_sorted_set(user_id, self).await?; - UserStream::add_to_pioneers_sorted_set(user_id, self).await?; + UserStream::add_to_influencers_sorted_set(user_id, self).await?; Ok(()) } @@ -93,11 +93,11 @@ impl UserCounts { pub async fn update(user_id: &str, field: &str, action: JsonAction) -> Result<(), DynError> { // Update user counts index Self::update_index_field(user_id, field, action).await?; - // Just update pioneer and most followed indexes, when that fields are updated + // Just update influencer and most followed indexes, when that fields are updated if field == "followers" || field == "tags" || field == "posts" { let exist_count = Self::get_by_id(user_id).await?; if let Some(user_counts) = exist_count { - UserStream::add_to_pioneers_sorted_set(user_id, &user_counts).await?; + UserStream::add_to_influencers_sorted_set(user_id, &user_counts).await?; // Increment followers if field == "followers" { UserStream::add_to_most_followed_sorted_set(user_id, &user_counts).await? diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs new file mode 100644 index 00000000..eb307fb0 --- /dev/null +++ b/src/models/user/influencers.rs @@ -0,0 +1,174 @@ +use crate::db::graph::exec::retrieve_from_graph; +use crate::db::kv::index::sorted_sets::SortOrder; +use crate::types::DynError; +use crate::types::StreamReach; +use crate::types::Timeframe; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::ops::Deref; +use utoipa::ToSchema; + +use crate::queries; +use crate::RedisOps; + +const GLOBAL_INFLUENCERS_PREFIX: &str = "Cache:Influencers"; + +#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)] +pub struct Influencer { + pub id: String, + pub score: f64, +} + +// Define a newtype wrapper +#[derive(Serialize, Deserialize, Debug, ToSchema, Default, Clone)] +pub struct Influencers(pub Vec); + +impl RedisOps for Influencers {} + +// Create a Influencers instance directly from an iterator of Influencer items +// Need it in collect() +impl FromIterator for Influencers { + fn from_iter>(iter: I) -> Self { + Influencers(iter.into_iter().collect()) + } +} + +// Implement Deref so Influencers can be used like Vec +impl Deref for Influencers { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Influencers { + pub async fn reindex() -> Result<(), DynError> { + Influencers::get_global_influencers(0, 100, &Timeframe::AllTime).await?; + Influencers::get_global_influencers(0, 100, &Timeframe::ThisMonth).await?; + Ok(()) + } + + fn get_cache_key_parts(timeframe: &Timeframe) -> Vec { + vec![timeframe.to_string()] + } + + async fn get_from_global_cache( + skip: usize, + limit: usize, + timeframe: &Timeframe, + ) -> Result, DynError> { + let key_parts = Influencers::get_cache_key_parts(timeframe); + let key_parts_vector: Vec<&str> = + key_parts.iter().map(|s| s.as_str()).collect::>(); + let ranking = Influencers::try_from_index_sorted_set( + key_parts_vector.as_slice(), + None, + None, + Some(skip), + Some(limit), + SortOrder::Descending, + Some(GLOBAL_INFLUENCERS_PREFIX), + ) + .await?; + + match ranking { + None => Ok(None), + Some(ranking) => { + let mut influencers = Vec::new(); + for (id, score) in ranking { + influencers.push(Influencer { id, score }); + } + Ok(Some(Influencers(influencers))) + } + } + } + + async fn put_to_global_cache( + result: Influencers, + timeframe: &Timeframe, + ) -> Result<(), DynError> { + let key_parts = Influencers::get_cache_key_parts(timeframe); + let key_parts_vector: Vec<&str> = + key_parts.iter().map(|s| s.as_str()).collect::>(); + + // store the ranking as sorted set in cache + Influencers::put_index_sorted_set( + key_parts_vector.as_slice(), + result + .iter() + .map(|influencer| (influencer.score, influencer.id.as_str())) + .collect::>() + .as_slice(), + Some(GLOBAL_INFLUENCERS_PREFIX), + Some(timeframe.to_cache_period()), + ) + .await?; + Ok(()) + } + + pub async fn get_influencers( + user_id: Option<&str>, + reach: Option, + skip: usize, + limit: usize, + timeframe: &Timeframe, + preview: bool, + ) -> Result, DynError> { + let (skip, limit) = if preview { + let skip = Utc::now().timestamp_subsec_micros() % 98; + (skip as usize, 3) + } else { + (skip, limit) + }; + match user_id { + Some(user_id) => { + Influencers::get_influencers_by_reach( + user_id, + reach.unwrap_or(StreamReach::Friends), + skip, + limit, + timeframe, + ) + .await + } + None => Influencers::get_global_influencers(skip, limit, timeframe).await, + } + } + + async fn get_influencers_by_reach( + user_id: &str, + reach: StreamReach, + skip: usize, + limit: usize, + timeframe: &Timeframe, + ) -> Result, DynError> { + let query = queries::get::get_influencers_by_reach(user_id, reach, skip, limit, timeframe); + retrieve_from_graph::(query, "influencers").await + } + + async fn get_global_influencers( + skip: usize, + limit: usize, + timeframe: &Timeframe, + ) -> Result, DynError> { + let cached_influencers = Influencers::get_from_global_cache(skip, limit, timeframe).await?; + if cached_influencers.is_some() { + return Ok(cached_influencers); + } + + let query = queries::get::get_global_influencers(0, 100, timeframe); + let result = retrieve_from_graph::(query, "influencers").await?; + + let influencers = match result { + Some(influencers) => influencers, + None => return Ok(None), + }; + + if influencers.is_empty() { + Influencers::put_to_global_cache(influencers.clone(), timeframe).await?; + } + + Influencers::get_from_global_cache(skip, limit, timeframe).await + } +} diff --git a/src/models/user/mod.rs b/src/models/user/mod.rs index c3014a0b..ae3d0663 100644 --- a/src/models/user/mod.rs +++ b/src/models/user/mod.rs @@ -1,6 +1,7 @@ mod counts; mod details; //mod id; +mod influencers; mod muted; mod relationship; mod search; @@ -11,11 +12,13 @@ mod view; pub use counts::UserCounts; pub use details::UserDetails; //pub use id::PubkyId; +pub use influencers::Influencers; pub use muted::Muted; pub use relationship::Relationship; pub use search::{UserSearch, USER_NAME_KEY_PARTS}; pub use stream::{ - UserStream, UserStreamSource, USER_MOSTFOLLOWED_KEY_PARTS, USER_PIONEERS_KEY_PARTS, + UserStream, UserStreamInput, UserStreamSource, USER_INFLUENCERS_KEY_PARTS, + USER_MOSTFOLLOWED_KEY_PARTS, }; pub use tags::ProfileTag; pub use tags::UserTags; diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index 39d84002..6320a927 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -1,6 +1,6 @@ -use super::{Muted, UserCounts, UserSearch, UserView}; +use super::{Influencers, Muted, UserCounts, UserSearch, UserView}; use crate::models::follow::{Followers, Following, Friends, UserFollows}; -use crate::types::DynError; +use crate::types::{DynError, StreamReach, Timeframe}; use crate::{db::kv::index::sorted_sets::SortOrder, RedisOps}; use crate::{get_neo4j_graph, queries}; use serde::{Deserialize, Serialize}; @@ -8,12 +8,12 @@ use tokio::task::spawn; use utoipa::ToSchema; pub const USER_MOSTFOLLOWED_KEY_PARTS: [&str; 2] = ["Users", "MostFollowed"]; -pub const USER_PIONEERS_KEY_PARTS: [&str; 2] = ["Users", "Pioneers"]; +pub const USER_INFLUENCERS_KEY_PARTS: [&str; 2] = ["Users", "Influencers"]; pub const CACHE_USER_RECOMMENDED_KEY_PARTS: [&str; 3] = ["Cache", "Users", "Recommended"]; // TTL, 12HR pub const CACHE_USER_RECOMMENDED_TTL: i64 = 12 * 60 * 60; -#[derive(Deserialize, ToSchema, Debug, Clone)] +#[derive(Deserialize, ToSchema, Debug, Clone, PartialEq)] #[serde(rename_all = "snake_case")] pub enum UserStreamSource { Followers, @@ -21,27 +21,43 @@ pub enum UserStreamSource { Friends, Muted, MostFollowed, - Pioneers, + Influencers, Recommended, } +pub struct UserStreamInput { + pub user_id: Option, + pub viewer_id: Option, + pub skip: Option, + pub limit: Option, + pub source: UserStreamSource, + pub reach: Option, + pub depth: Option, + pub timeframe: Option, + pub preview: Option, +} + #[derive(Serialize, Deserialize, ToSchema, Default)] pub struct UserStream(Vec); impl RedisOps for UserStream {} impl UserStream { - pub async fn get_by_id( - user_id: Option<&str>, - viewer_id: Option<&str>, - skip: Option, - limit: Option, - source: UserStreamSource, - depth: Option, - ) -> Result, DynError> { - let user_ids = Self::get_user_list_from_source(user_id, source, skip, limit).await?; + pub async fn get_by_id(input: &UserStreamInput) -> Result, DynError> { + let user_ids = Self::get_user_list_from_source( + input.user_id.as_deref(), + input.source.clone(), + input.reach.clone(), + input.skip, + input.limit, + input.timeframe.clone(), + input.preview, + ) + .await?; match user_ids { - Some(users) => Self::from_listed_user_ids(&users, viewer_id, depth).await, + Some(users) => { + Self::from_listed_user_ids(&users, input.viewer_id.as_deref(), input.depth).await + } None => Ok(None), } } @@ -112,12 +128,13 @@ impl UserStream { } /// Adds the post to a Redis sorted set using the follower counts as score. - pub async fn add_to_pioneers_sorted_set( + pub async fn add_to_influencers_sorted_set( user_id: &str, counts: &UserCounts, ) -> Result<(), DynError> { let score = (counts.tags + counts.posts) as f64 * (counts.followers as f64).sqrt(); - Self::put_index_sorted_set(&USER_PIONEERS_KEY_PARTS, &[(score, user_id)], None, None).await + Self::put_index_sorted_set(&USER_INFLUENCERS_KEY_PARTS, &[(score, user_id)], None, None) + .await } /// Retrieves recommended user IDs based on the specified criteria. async fn get_recommended_ids( @@ -191,8 +208,11 @@ impl UserStream { pub async fn get_user_list_from_source( user_id: Option<&str>, source: UserStreamSource, + reach: Option, skip: Option, limit: Option, + timeframe: Option, + preview: Option, ) -> Result>, DynError> { let user_ids = match source { UserStreamSource::Followers => Followers::get_by_id( @@ -237,17 +257,21 @@ impl UserStream { ) .await? .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), - UserStreamSource::Pioneers => Self::try_from_index_sorted_set( - &USER_PIONEERS_KEY_PARTS, - None, - None, - skip, - limit, - SortOrder::Descending, - None, + UserStreamSource::Influencers => Influencers::get_influencers( + user_id, + Some(reach.unwrap_or(StreamReach::Wot(3))), + skip.unwrap_or(0), + limit.unwrap_or(10).min(100), + &timeframe.unwrap_or(Timeframe::AllTime), + preview.unwrap_or(false), ) .await? - .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), + .map(|result| { + result + .iter() + .map(|influencer| influencer.id.clone()) + .collect() + }), UserStreamSource::Recommended => { UserStream::get_recommended_ids( user_id.ok_or( diff --git a/src/reindex.rs b/src/reindex.rs index f0ce4556..882af2bf 100644 --- a/src/reindex.rs +++ b/src/reindex.rs @@ -7,7 +7,7 @@ use crate::models::tag::stream::HotTags; use crate::models::tag::traits::TagCollection; use crate::models::tag::user::TagUser; use crate::models::traits::Collection; -use crate::models::user::{Muted, UserDetails}; +use crate::models::user::{Influencers, Muted, UserDetails}; use crate::types::DynError; use crate::{ db::connectors::neo4j::get_neo4j_graph, @@ -69,6 +69,10 @@ pub async fn reindex() { .await .expect("Failed to store the global hot tags"); + Influencers::reindex() + .await + .expect("Failed to reindex influencers"); + TagSearch::reindex() .await .expect("Failed to store the global post tags"); diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 389aefe1..608303db 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -1,8 +1,8 @@ -use crate::models::user::{UserStream, UserStreamSource}; +use crate::models::user::{UserStream, UserStreamInput, UserStreamSource}; use crate::routes::v0::endpoints::{ STREAM_USERS_BY_IDS_ROUTE, STREAM_USERS_ROUTE, STREAM_USERS_USERNAME_SEARCH_ROUTE, }; -use crate::types::Pagination; +use crate::types::{Pagination, StreamReach, Timeframe}; use crate::{Error, Result}; use axum::extract::Query; use axum::Json; @@ -17,7 +17,10 @@ pub struct UserStreamQuery { skip: Option, limit: Option, source: Option, + reach: Option, depth: Option, + timeframe: Option, + preview: Option, } #[utoipa::path( @@ -26,12 +29,15 @@ pub struct UserStreamQuery { description = "Stream users", tag = "Stream", params( - ("user_id" = Option, Query, description = "User ID to use for streams with source 'following', 'followers', 'friends', 'muted' and 'recommended'"), + ("user_id" = Option, Query, description = "User ID to use for streams with source 'following', 'followers', 'friends', 'muted', 'most_followed', 'influencers' and 'recommended'"), ("viewer_id" = Option, Query, description = "Viewer Pubky ID"), ("skip" = Option, Query, description = "Skip N followers"), ("limit" = Option, Query, description = "Retrieve N followers"), ("source" = Option, Query, description = "Source of users for the stream."), - ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored") + ("reach" = Option, Query, description = "The target reach of the source. Supported in 'influencers' source."), + ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored"), + ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range"), + ("preview" = Option, Query, description = "Provide a random selection of size 3 for sources supporting preview. Passing preview ignores skip and limit parameters.") ), responses( (status = 200, description = "Users stream", body = UserStream), @@ -48,8 +54,9 @@ pub async fn stream_users_handler( ); let skip = query.skip.unwrap_or(0); - let limit = query.limit.unwrap_or(6).min(20); + let limit = query.limit.unwrap_or(5).min(20); let source = query.source.unwrap_or(UserStreamSource::Followers); + let timeframe = query.timeframe.unwrap_or(Timeframe::AllTime); if query.user_id.is_none() { match source { @@ -82,18 +89,36 @@ pub async fn stream_users_handler( .to_string(), }) } + UserStreamSource::Influencers => { + if query.reach.is_some() { + return Err(Error::InvalidInput { + message: + "reach query param must be provided for source 'influencers' with a user_id" + .to_string(), + }); + } + } _ => (), } + } else if source == UserStreamSource::Influencers && query.reach.is_none() { + return Err(Error::InvalidInput { + message: + "reach query param must be provided for source 'influencers' when you pass a user_id" + .to_string(), + }); } - match UserStream::get_by_id( - query.user_id.as_deref(), - query.viewer_id.as_deref(), - Some(skip), - Some(limit), - source.clone(), - query.depth, - ) + match UserStream::get_by_id(&UserStreamInput { + user_id: query.user_id.clone(), + viewer_id: query.viewer_id, + skip: Some(skip), + limit: Some(limit), + source: source.clone(), + reach: query.reach, + depth: query.depth, + timeframe: Some(timeframe), + preview: query.preview, + }) .await { Ok(Some(stream)) => Ok(Json(stream)), diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs index 33eb19b1..1c716801 100644 --- a/src/routes/v0/tag/global.rs +++ b/src/routes/v0/tag/global.rs @@ -1,9 +1,7 @@ use crate::models::tag::global::TagGlobal; -use crate::models::tag::stream::{ - HotTag, HotTags, HotTagsInput, TagStreamReach, TaggedType, Taggers, -}; +use crate::models::tag::stream::{HotTag, HotTags, HotTagsInput, TaggedType, Taggers}; use crate::routes::v0::endpoints::{TAGS_HOT_ROUTE, TAG_TAGGERS_ROUTE}; -use crate::types::{Pagination, Timeframe}; +use crate::types::{Pagination, StreamReach, Timeframe}; use crate::{Error, Result}; use axum::extract::{Path, Query}; use axum::Json; @@ -14,7 +12,7 @@ use utoipa::OpenApi; #[derive(Deserialize, Debug)] pub struct HotTagsQuery { user_id: Option, - reach: Option, + reach: Option, taggers_limit: Option, timeframe: Option, @@ -26,7 +24,7 @@ pub struct HotTagsQuery { pub struct TagTaggersQuery { pagination: Pagination, user_id: Option, - reach: Option, + reach: Option, } #[utoipa::path( @@ -36,7 +34,7 @@ pub struct TagTaggersQuery { tag = "Tags", params( ("label" = String, Path, description = "Tag name"), - ("reach" = TagStreamReach, Path, description = "Reach type: Follower | Following | Friends"), + ("reach" = StreamReach, Path, description = "Reach type: Follower | Following | Friends | Wot"), ("user_id" = Option, Query, description = "User ID to base reach on"), ), responses( @@ -71,7 +69,7 @@ pub async fn tag_taggers_handler( tag = "Tags", params( ("user_id" = Option, Query, description = "User Pubky ID"), - ("reach" = Option, Query, description = "Reach type: follower | following | friends"), + ("reach" = Option, Query, description = "Reach type: follower | following | friends | wot"), ("taggers_limit" = Option, Query, description = "Retrieve N user_id for each tag"), ("skip" = Option, Query, description = "Skip N tags"), ("limit" = Option, Query, description = "Retrieve N tag"), diff --git a/src/types/mod.rs b/src/types/mod.rs index 62cebb47..ee287d5c 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -19,3 +19,12 @@ pub enum StreamSorting { Timeline, TotalEngagement, } + +#[derive(Deserialize, Debug, ToSchema, Clone)] +#[serde(rename_all = "snake_case")] +pub enum StreamReach { + Followers, + Following, + Friends, + Wot(u8), +} diff --git a/tests/service/stream/user/influencers.rs b/tests/service/stream/user/influencers.rs new file mode 100644 index 00000000..0fcf376e --- /dev/null +++ b/tests/service/stream/user/influencers.rs @@ -0,0 +1,277 @@ +use anyhow::Result; +use reqwest::StatusCode; + +use crate::service::utils::{make_request, make_wrong_request}; + +// TODO: Create deterministic integration tests + +const PEER_PUBKY: &str = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; + +#[tokio::test] +async fn test_global_influencers() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy", + "kzq3o8y8w1b7ffogpq73okop4gb3ahm31ytwwk1na8p6gpr4511o", + "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo", + "zdbg13k5gh4tfz9qz11quohrxetgqxs7awandu8h57147xddcuhy", + "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", + ]; + + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_preview() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&preview=true").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + // assert preview size is respected + assert_eq!(influencers.len(), 3); + + let first_influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // make the request a second time to ensure the preview is generating different results + let body = make_request("/v0/stream/users?source=influencers&preview=true").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + // assert preview size is respected + assert_eq!(influencers.len(), 3); + + let second_influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + assert!(first_influencer_ids != second_influencer_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_skip_limit() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&skip=3&limit=3").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + // assert limit + assert_eq!(influencers.len(), 3); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "zdbg13k5gh4tfz9qz11quohrxetgqxs7awandu8h57147xddcuhy", + "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", + "7hq56kap6exmhghyedrw1q3ar8b1wutomq8ax9eazhajcpdfx3so", + ]; + + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_with_today_timeframe() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&timeframe=today&limit=4").await?; + + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "r91hi8kc3x6761gwfiigr7yn6nca1z47wm6jadhw1jbx1co93r9y", + "qumq6fady4bmw4w5tpsrj1tg36g3qo4tcfedga9p4bg4so4ikyzy", + "r4irb481b8qspaixq1brwre8o87cxybsbk9iwe1f6f9ukrxxs7bo", + "tkpeqpx3ywoawiw6q8e6kuo9o3egr7fnhx83rudznbrrmqgdmomo", + ]; + + // Verify that each expected user ID is present in the response + for id in &expected_user_ids { + let exists = influencer_ids.clone().into_iter().any(|item| item == *id); + assert!(exists, "Expected user ID not found: {}", id); + } + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_with_this_month_timeframe() -> Result<()> { + let body = + make_request("/v0/stream/users?source=influencers&timeframe=this_month&limit=5").await?; + + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "r91hi8kc3x6761gwfiigr7yn6nca1z47wm6jadhw1jbx1co93r9y", + "tkpeqpx3ywoawiw6q8e6kuo9o3egr7fnhx83rudznbrrmqgdmomo", + "pyc598poqkdgtx1wc4aeptx67mqg71mmywyh7uzkffzittjmbiuo", + "r4irb481b8qspaixq1brwre8o87cxybsbk9iwe1f6f9ukrxxs7bo", + "qumq6fady4bmw4w5tpsrj1tg36g3qo4tcfedga9p4bg4so4ikyzy", + ]; + + // Verify that each expected user ID is present in the response + for id in &expected_user_ids { + let exists = influencer_ids.clone().into_iter().any(|item| item == *id); + assert!(exists, "Expected user ID not found: {}", id); + } + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_reach_no_user_id() -> Result<()> { + let endpoint = + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&reach=following"; + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_reach_no_reach() -> Result<()> { + let endpoint = &format!( + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}", + PEER_PUBKY + ); + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_following_reach() -> Result<()> { + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=following", PEER_PUBKY); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "5g3fwnue819wfdjwiwm8qr35ww6uxxgbzrigrtdgmbi19ksioeoy", + "9arfi37owcrdywc9zqw3m5uc7gd5gqu1yfuykzo66od6tcayqk9y", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_followers_reach() -> Result<()> { + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=followers", PEER_PUBKY); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body.as_array().expect("Post stream should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "gxk8itzrnikrpshfsudgsgtxrz59ojp4iwmp4w9iff3ess6zfr4y", + "h3fghnb3x59oh7r53x8y6a5x38oatqyjym9b31ybss17zqdnhcoy", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_friends_reach() -> Result<()> { + let endpoint = &format!( + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=friends", + PEER_PUBKY + ); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body.as_array().expect("Post stream should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "gxk8itzrnikrpshfsudgsgtxrz59ojp4iwmp4w9iff3ess6zfr4y", + "h3fghnb3x59oh7r53x8y6a5x38oatqyjym9b31ybss17zqdnhcoy", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} diff --git a/tests/service/stream/user/mod.rs b/tests/service/stream/user/mod.rs index 2b0900db..e436322d 100644 --- a/tests/service/stream/user/mod.rs +++ b/tests/service/stream/user/mod.rs @@ -1,3 +1,4 @@ +pub mod influencers; pub mod list; pub mod reach; pub mod score; diff --git a/tests/service/stream/user/score.rs b/tests/service/stream/user/score.rs index 2eba7954..6e87203c 100644 --- a/tests/service/stream/user/score.rs +++ b/tests/service/stream/user/score.rs @@ -75,76 +75,6 @@ async fn test_stream_most_followed() -> Result<()> { Ok(()) } -// ##### PIONEERS USERS ###### - -#[tokio::test] -async fn test_stream_pioneers() -> Result<()> { - let client = httpc_test::new_client(HOST_URL)?; - - // Test retrieving the most followed users - let res = client.do_get("/v0/stream/users?source=pioneers").await?; - assert_eq!(res.status(), 200); - - let body = res.json_body()?; - assert!(body.is_array()); - - let pioneers_users = body.as_array().expect("User stream should be an array"); - - // Check if the response has the expected number of users - assert!( - !pioneers_users.is_empty(), - "There should be at least one user in the most followed stream" - ); - - // List of expected user IDs - let expected_user_ids = vec![ - "pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy", - "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo", - "kzq3o8y8w1b7ffogpq73okop4gb3ahm31ytwwk1na8p6gpr4511o", - "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", - "7hq56kap6exmhghyedrw1q3ar8b1wutomq8ax9eazhajcpdfx3so", - ]; - - // Verify that each expected user ID is present in the response - for id in &expected_user_ids { - let exists = pioneers_users.iter().any(|f| f["details"]["id"] == *id); - assert!(exists, "Expected user ID not found: {}", id); - } - - // Additional checks for specific user attributes (e.g., name, follower counts) - for user in pioneers_users { - assert!( - user["details"]["name"].is_string(), - "Name should be a string" - ); - assert!(user["details"]["bio"].is_string(), "Bio should be a string"); - assert!( - user["counts"]["followers"].is_number(), - "Follower counts should be a number" - ); - } - - // Test limiting the results to 5 users - let res = client - .do_get("/v0/stream/users?source=pioneers&limit=5") - .await?; - assert_eq!(res.status(), 200); - - let body = res.json_body()?; - assert!(body.is_array()); - - let limited_users = body.as_array().expect("User stream should be an array"); - - // Check if the response has the expected number of users - assert_eq!( - limited_users.len(), - 5, - "Expected 5 users in the limited stream" - ); - - Ok(()) -} - // ##### RECOMMENDED USERS ###### #[tokio::test] diff --git a/tests/watcher/posts/engagement.rs b/tests/watcher/posts/engagement.rs index db224fd0..b382f4a9 100644 --- a/tests/watcher/posts/engagement.rs +++ b/tests/watcher/posts/engagement.rs @@ -56,7 +56,7 @@ async fn test_homeserver_post_engagement() -> Result<()> { let parent_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let reply = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Reply".to_string(), + content: "Watcher:PostInfluencer:Bob:Reply".to_string(), kind: PubkyAppPostKind::Short, parent: Some(parent_uri.clone()), embed: None, @@ -69,7 +69,7 @@ async fn test_homeserver_post_engagement() -> Result<()> { let post_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let repost = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Repost".to_string(), + content: "Watcher:PostInfluencer:Bob:Repost".to_string(), kind: PubkyAppPostKind::Short, parent: None, embed: Some(PubkyAppPostEmbed { diff --git a/tests/watcher/posts/pioneer.rs b/tests/watcher/posts/influencer.rs similarity index 62% rename from tests/watcher/posts/pioneer.rs rename to tests/watcher/posts/influencer.rs index 18f38f1a..93325662 100644 --- a/tests/watcher/posts/pioneer.rs +++ b/tests/watcher/posts/influencer.rs @@ -1,27 +1,26 @@ -use crate::watcher::users::utils::check_member_user_pioneer; -use crate::watcher::utils::watcher::WatcherTest; +use crate::watcher::{users::utils::check_member_user_influencer, utils::watcher::WatcherTest}; use anyhow::Result; use pubky_app_specs::{PubkyAppPost, PubkyAppPostEmbed, PubkyAppPostKind, PubkyAppUser}; use pubky_common::crypto::Keypair; #[tokio_shared_rt::test(shared)] -async fn test_homeserver_post_pioneer() -> Result<()> { +async fn test_homeserver_post_influencer() -> Result<()> { let mut test = WatcherTest::setup().await?; let alice_user_keypair = Keypair::random(); let alice_user = PubkyAppUser { - bio: Some("test_homeserver_post_pioneer".to_string()), + bio: Some("test_homeserver_post_influencer".to_string()), image: None, links: None, - name: "Watcher:PostPioneer:Alice".to_string(), + name: "Watcher:PostInfluencer:Alice".to_string(), status: None, }; let alice_id = test.create_user(&alice_user_keypair, &alice_user).await?; // Alice creates a new post let alice_post = PubkyAppPost { - content: "Watcher:PostPioneer:Alice:Post".to_string(), + content: "Watcher:PostInfluencer:Alice:Post".to_string(), kind: PubkyAppPostKind::Short, parent: None, embed: None, @@ -31,19 +30,19 @@ async fn test_homeserver_post_pioneer() -> Result<()> { let alice_post_id = test.create_post(&alice_id, &alice_post).await?; // CACHE_OP: Assert cache has not been updated. Missing followers - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&alice_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&alice_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // Create new user let bob_user_keypair = Keypair::random(); let bob_user = PubkyAppUser { - bio: Some("test_homeserver_post_pioneer".to_string()), + bio: Some("test_homeserver_post_influencer".to_string()), image: None, links: None, - name: "Watcher:PostPioneer:Bob".to_string(), + name: "Watcher:PostInfluencer:Bob".to_string(), status: None, }; let bob_id = test.create_user(&bob_user_keypair, &bob_user).await?; @@ -52,29 +51,28 @@ async fn test_homeserver_post_pioneer() -> Result<()> { test.create_follow(&bob_id, &alice_id).await?; // CACHE_OP: Assert if cache has been updated - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&alice_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 1); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&alice_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 1); // Bob replies to popular alice post let parent_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let reply = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Reply".to_string(), + content: "Watcher:PostInfluencer:Bob:Reply".to_string(), kind: PubkyAppPostKind::Short, parent: Some(parent_uri.clone()), - embed: None, attachments: None, + embed: None, }; - let _reply_id = test.create_post(&bob_id, &reply).await?; // Create repost of alice post let post_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let repost = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Repost".to_string(), + content: "Watcher:PostInfluencer:Bob:Repost".to_string(), kind: PubkyAppPostKind::Short, parent: None, embed: Some(PubkyAppPostEmbed { @@ -87,18 +85,18 @@ async fn test_homeserver_post_pioneer() -> Result<()> { test.create_post(&bob_id, &repost).await?; // CACHE_OP: Assert if cache has been updated - let pioneer_score = check_member_user_pioneer(&bob_id).await.unwrap(); - assert!(pioneer_score.is_some()); - // Pioneer score does not update because popular user does not have any interaction - assert_eq!(pioneer_score.unwrap(), 0); + let influencer_score = check_member_user_influencer(&bob_id).await.unwrap(); + assert!(influencer_score.is_some()); + // Influencer score does not update because popular user does not have any interaction + assert_eq!(influencer_score.unwrap(), 0); // Follow Bob test.create_follow(&alice_id, &bob_id).await?; // CACHE_OP: Assert if cache has been updated - let pioneer_score = check_member_user_pioneer(&bob_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 2); + let influencer_score = check_member_user_influencer(&bob_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 2); // TODO: Impl DEL post. Assert the reply does not exist in Nexus // test.cleanup_post(&user_id, &reply_id).await?; diff --git a/tests/watcher/posts/mod.rs b/tests/watcher/posts/mod.rs index bfb554ac..3af1f496 100644 --- a/tests/watcher/posts/mod.rs +++ b/tests/watcher/posts/mod.rs @@ -16,7 +16,7 @@ mod engagement; mod fail_reply; mod fail_repost; mod fail_user; -mod pioneer; +mod influencer; mod raw; mod reply; mod reply_engagement; diff --git a/tests/watcher/posts/raw.rs b/tests/watcher/posts/raw.rs index 4db200b1..4c0f4dc9 100644 --- a/tests/watcher/posts/raw.rs +++ b/tests/watcher/posts/raw.rs @@ -2,7 +2,7 @@ use super::utils::{ check_member_global_timeline_user_post, check_member_user_post_timeline, find_post_counts, }; use crate::watcher::posts::utils::find_post_details; -use crate::watcher::users::utils::{check_member_user_pioneer, find_user_counts}; +use crate::watcher::users::utils::{check_member_user_influencer, find_user_counts}; use crate::watcher::utils::watcher::WatcherTest; use anyhow::Result; use pubky_app_specs::{PubkyAppPost, PubkyAppPostKind, PubkyAppUser}; @@ -80,10 +80,10 @@ async fn test_homeserver_put_post_event() -> Result<()> { assert!(post_timeline.is_some()); assert_eq!(post_timeline.unwrap(), post_details.indexed_at as isize); - // Has pioneer score. Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // Has influencer score. Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); let exist_count = find_user_counts(&user_id).await; assert_eq!(exist_count.posts, 1); diff --git a/tests/watcher/tags/user_to_self_put.rs b/tests/watcher/tags/user_to_self_put.rs index c60645e0..506a83ef 100644 --- a/tests/watcher/tags/user_to_self_put.rs +++ b/tests/watcher/tags/user_to_self_put.rs @@ -1,6 +1,8 @@ use super::utils::find_user_tag; -use crate::watcher::users::utils::{check_member_user_pioneer, find_user_counts}; -use crate::watcher::utils::watcher::WatcherTest; +use crate::watcher::{ + users::utils::{check_member_user_influencer, find_user_counts}, + utils::watcher::WatcherTest, +}; use anyhow::Result; use chrono::Utc; use pubky_app_specs::{traits::HashId, PubkyAppTag, PubkyAppUser}; @@ -70,12 +72,15 @@ async fn test_homeserver_put_tag_user_self() -> Result<()> { assert_eq!(user_counts.tags, 1); assert_eq!(user_counts.tagged, 1); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id) .await - .expect("Failed to check user pioneer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + .expect("Failed to check user influencer score"); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&user_id).await?; diff --git a/tests/watcher/tags/user_to_user_del.rs b/tests/watcher/tags/user_to_user_del.rs index c9d4e22c..647f1b5d 100644 --- a/tests/watcher/tags/user_to_user_del.rs +++ b/tests/watcher/tags/user_to_user_del.rs @@ -1,6 +1,6 @@ use super::utils::find_user_tag; use crate::watcher::{ - users::utils::{check_member_user_pioneer, find_user_counts}, + users::utils::{check_member_user_influencer, find_user_counts}, utils::watcher::WatcherTest, }; use anyhow::Result; @@ -80,13 +80,16 @@ async fn test_homeserver_del_tag_to_another_user() -> Result<()> { let user_counts = find_user_counts(&tagged_user_id).await; assert_eq!(user_counts.tagged, 0); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&tagger_user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&tagger_user_id) .await - .expect("Failed to check user pioneer score"); + .expect("Failed to check user influencer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&tagged_user_id).await?; diff --git a/tests/watcher/tags/user_to_user_put.rs b/tests/watcher/tags/user_to_user_put.rs index be6cd27d..88519253 100644 --- a/tests/watcher/tags/user_to_user_put.rs +++ b/tests/watcher/tags/user_to_user_put.rs @@ -1,6 +1,6 @@ use super::utils::find_user_tag; use crate::watcher::{ - users::utils::{check_member_user_pioneer, find_user_counts}, + users::utils::{check_member_user_influencer, find_user_counts}, utils::watcher::WatcherTest, }; use anyhow::Result; @@ -90,12 +90,15 @@ async fn test_homeserver_put_tag_user_another() -> Result<()> { let user_counts = find_user_counts(&tagger_user_id).await; assert_eq!(user_counts.tags, 1); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&tagged_user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&tagged_user_id) .await - .expect("Failed to check user pioneer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + .expect("Failed to check user influencer score"); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&tagged_user_id).await?; diff --git a/tests/watcher/users/raw.rs b/tests/watcher/users/raw.rs index 66be1874..768352d7 100644 --- a/tests/watcher/users/raw.rs +++ b/tests/watcher/users/raw.rs @@ -1,5 +1,5 @@ use crate::watcher::{ - users::utils::{check_member_most_followed, check_member_user_pioneer, find_user_details}, + users::utils::{check_member_most_followed, check_member_user_influencer, find_user_details}, utils::watcher::WatcherTest, }; use anyhow::Result; @@ -74,15 +74,15 @@ async fn test_homeserver_user_put_event() -> Result<()> { assert!(is_member.is_some()); assert_eq!(is_member.unwrap(), 0); - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // most_followed score: Sorted:Users:MostFollowed - let pioneer_score = check_member_most_followed(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + let influencer_score = check_member_most_followed(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup test.cleanup_user(&user_id).await?; diff --git a/tests/watcher/users/utils.rs b/tests/watcher/users/utils.rs index 2de7d68f..028e95ec 100644 --- a/tests/watcher/users/utils.rs +++ b/tests/watcher/users/utils.rs @@ -2,24 +2,26 @@ use anyhow::Result; use pubky_nexus::{ get_neo4j_graph, models::user::{ - UserCounts, UserDetails, UserStream, USER_MOSTFOLLOWED_KEY_PARTS, USER_PIONEERS_KEY_PARTS, + UserCounts, UserDetails, UserStream, USER_INFLUENCERS_KEY_PARTS, + USER_MOSTFOLLOWED_KEY_PARTS, }, queries, RedisOps, }; pub async fn check_member_most_followed(user_id: &str) -> Result> { - let pioneer_score = + let influencer_score = UserStream::check_sorted_set_member(&USER_MOSTFOLLOWED_KEY_PARTS, &[user_id]) .await .unwrap(); - Ok(pioneer_score) + Ok(influencer_score) } -pub async fn check_member_user_pioneer(user_id: &str) -> Result> { - let pioneer_score = UserStream::check_sorted_set_member(&USER_PIONEERS_KEY_PARTS, &[user_id]) - .await - .unwrap(); - Ok(pioneer_score) +pub async fn check_member_user_influencer(user_id: &str) -> Result> { + let influencer_score = + UserStream::check_sorted_set_member(&USER_INFLUENCERS_KEY_PARTS, &[user_id]) + .await + .unwrap(); + Ok(influencer_score) } pub async fn find_user_counts(user_id: &str) -> UserCounts {