From 22519d00c667871899d709188f4ae6fb4c48e7d4 Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 29 Nov 2024 14:23:20 -0500 Subject: [PATCH 01/18] rename pioneers to influencers --- benches/streams.rs | 2 +- benches/streams_benches/user.rs | 22 +++++---- src/models/user/counts.rs | 6 +-- src/models/user/mod.rs | 2 +- src/models/user/stream.rs | 13 ++--- tests/service/stream/user/score.rs | 14 +++--- tests/watcher/posts/engagement.rs | 4 +- .../posts/{pioneer.rs => influencer.rs} | 48 +++++++++---------- tests/watcher/posts/mod.rs | 2 +- tests/watcher/posts/raw.rs | 10 ++-- tests/watcher/tags/user_to_self_put.rs | 15 +++--- tests/watcher/tags/user_to_user_del.rs | 15 +++--- tests/watcher/tags/user_to_user_put.rs | 15 +++--- tests/watcher/users/raw.rs | 16 +++---- tests/watcher/users/utils.rs | 18 +++---- 15 files changed, 110 insertions(+), 92 deletions(-) rename tests/watcher/posts/{pioneer.rs => influencer.rs} (61%) diff --git a/benches/streams.rs b/benches/streams.rs index 5209b4ea..3cde9875 100644 --- a/benches/streams.rs +++ b/benches/streams.rs @@ -32,7 +32,7 @@ criterion_group! { tag::bench_stream_tag_timeline, tag::bench_stream_tag_total_engagement, user::bench_stream_users_by_username_search, - user::bench_stream_pioneers, + user::bench_stream_influencers, user::bench_stream_following, user::bench_stream_most_followed, kind::bench_stream_post_kind_short, diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index fa3d0e1e..7080ddac 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -22,7 +22,7 @@ pub fn bench_stream_following(c: &mut Criterion) { None, None, Some(20), - UserStreamSource::Pioneers, + UserStreamSource::Influencers, None, ) .await @@ -83,21 +83,27 @@ pub fn bench_stream_users_by_username_search(c: &mut Criterion) { }); } -pub fn bench_stream_pioneers(c: &mut Criterion) { +pub fn bench_stream_influencers(c: &mut Criterion) { println!("***************************************"); - println!("Benchmarking the user streams for pioneer users."); + println!("Benchmarking the user streams for influencer users."); println!("***************************************"); run_setup(); let rt = Runtime::new().unwrap(); - c.bench_function("stream_pioneers", |b| { + c.bench_function("stream_influencers", |b| { b.to_async(&rt).iter(|| async { - let user_stream = - UserStream::get_by_id(None, None, None, Some(20), UserStreamSource::Pioneers, None) - .await - .unwrap(); + let user_stream = UserStream::get_by_id( + None, + None, + None, + Some(20), + UserStreamSource::Influencers, + None, + ) + .await + .unwrap(); criterion::black_box(user_stream); }); }); diff --git a/src/models/user/counts.rs b/src/models/user/counts.rs index 16824f6e..ac55fe1e 100644 --- a/src/models/user/counts.rs +++ b/src/models/user/counts.rs @@ -77,7 +77,7 @@ impl UserCounts { pub async fn put_to_index(&self, user_id: &str) -> Result<(), DynError> { self.put_index_json(&[user_id]).await?; UserStream::add_to_most_followed_sorted_set(user_id, self).await?; - UserStream::add_to_pioneers_sorted_set(user_id, self).await?; + UserStream::add_to_influencers_sorted_set(user_id, self).await?; Ok(()) } @@ -93,11 +93,11 @@ impl UserCounts { pub async fn update(user_id: &str, field: &str, action: JsonAction) -> Result<(), DynError> { // Update user counts index Self::update_index_field(user_id, field, action).await?; - // Just update pioneer and most followed indexes, when that fields are updated + // Just update influencer and most followed indexes, when that fields are updated if field == "followers" || field == "tags" || field == "posts" { let exist_count = Self::get_by_id(user_id).await?; if let Some(user_counts) = exist_count { - UserStream::add_to_pioneers_sorted_set(user_id, &user_counts).await?; + UserStream::add_to_influencers_sorted_set(user_id, &user_counts).await?; // Increment followers if field == "followers" { UserStream::add_to_most_followed_sorted_set(user_id, &user_counts).await? diff --git a/src/models/user/mod.rs b/src/models/user/mod.rs index c3014a0b..32bb3903 100644 --- a/src/models/user/mod.rs +++ b/src/models/user/mod.rs @@ -15,7 +15,7 @@ pub use muted::Muted; pub use relationship::Relationship; pub use search::{UserSearch, USER_NAME_KEY_PARTS}; pub use stream::{ - UserStream, UserStreamSource, USER_MOSTFOLLOWED_KEY_PARTS, USER_PIONEERS_KEY_PARTS, + UserStream, UserStreamSource, USER_INFLUENCERS_KEY_PARTS, USER_MOSTFOLLOWED_KEY_PARTS, }; pub use tags::ProfileTag; pub use tags::UserTags; diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index 39d84002..5cd178c0 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -8,7 +8,7 @@ use tokio::task::spawn; use utoipa::ToSchema; pub const USER_MOSTFOLLOWED_KEY_PARTS: [&str; 2] = ["Users", "MostFollowed"]; -pub const USER_PIONEERS_KEY_PARTS: [&str; 2] = ["Users", "Pioneers"]; +pub const USER_INFLUENCERS_KEY_PARTS: [&str; 2] = ["Users", "Influencers"]; pub const CACHE_USER_RECOMMENDED_KEY_PARTS: [&str; 3] = ["Cache", "Users", "Recommended"]; // TTL, 12HR pub const CACHE_USER_RECOMMENDED_TTL: i64 = 12 * 60 * 60; @@ -21,7 +21,7 @@ pub enum UserStreamSource { Friends, Muted, MostFollowed, - Pioneers, + Influencers, Recommended, } @@ -112,12 +112,13 @@ impl UserStream { } /// Adds the post to a Redis sorted set using the follower counts as score. - pub async fn add_to_pioneers_sorted_set( + pub async fn add_to_influencers_sorted_set( user_id: &str, counts: &UserCounts, ) -> Result<(), DynError> { let score = (counts.tags + counts.posts) as f64 * (counts.followers as f64).sqrt(); - Self::put_index_sorted_set(&USER_PIONEERS_KEY_PARTS, &[(score, user_id)], None, None).await + Self::put_index_sorted_set(&USER_INFLUENCERS_KEY_PARTS, &[(score, user_id)], None, None) + .await } /// Retrieves recommended user IDs based on the specified criteria. async fn get_recommended_ids( @@ -237,8 +238,8 @@ impl UserStream { ) .await? .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), - UserStreamSource::Pioneers => Self::try_from_index_sorted_set( - &USER_PIONEERS_KEY_PARTS, + UserStreamSource::Influencers => Self::try_from_index_sorted_set( + &USER_INFLUENCERS_KEY_PARTS, None, None, skip, diff --git a/tests/service/stream/user/score.rs b/tests/service/stream/user/score.rs index 2eba7954..b22811fe 100644 --- a/tests/service/stream/user/score.rs +++ b/tests/service/stream/user/score.rs @@ -78,21 +78,21 @@ async fn test_stream_most_followed() -> Result<()> { // ##### PIONEERS USERS ###### #[tokio::test] -async fn test_stream_pioneers() -> Result<()> { +async fn test_stream_influencers() -> Result<()> { let client = httpc_test::new_client(HOST_URL)?; // Test retrieving the most followed users - let res = client.do_get("/v0/stream/users?source=pioneers").await?; + let res = client.do_get("/v0/stream/users?source=influencers").await?; assert_eq!(res.status(), 200); let body = res.json_body()?; assert!(body.is_array()); - let pioneers_users = body.as_array().expect("User stream should be an array"); + let influencers_users = body.as_array().expect("User stream should be an array"); // Check if the response has the expected number of users assert!( - !pioneers_users.is_empty(), + !influencers_users.is_empty(), "There should be at least one user in the most followed stream" ); @@ -107,12 +107,12 @@ async fn test_stream_pioneers() -> Result<()> { // Verify that each expected user ID is present in the response for id in &expected_user_ids { - let exists = pioneers_users.iter().any(|f| f["details"]["id"] == *id); + let exists = influencers_users.iter().any(|f| f["details"]["id"] == *id); assert!(exists, "Expected user ID not found: {}", id); } // Additional checks for specific user attributes (e.g., name, follower counts) - for user in pioneers_users { + for user in influencers_users { assert!( user["details"]["name"].is_string(), "Name should be a string" @@ -126,7 +126,7 @@ async fn test_stream_pioneers() -> Result<()> { // Test limiting the results to 5 users let res = client - .do_get("/v0/stream/users?source=pioneers&limit=5") + .do_get("/v0/stream/users?source=influencers&limit=5") .await?; assert_eq!(res.status(), 200); diff --git a/tests/watcher/posts/engagement.rs b/tests/watcher/posts/engagement.rs index bb875f36..04f940a8 100644 --- a/tests/watcher/posts/engagement.rs +++ b/tests/watcher/posts/engagement.rs @@ -56,7 +56,7 @@ async fn test_homeserver_post_engagement() -> Result<()> { let parent_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let reply = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Reply".to_string(), + content: "Watcher:PostInfluencer:Bob:Reply".to_string(), kind: PostKind::Short, parent: Some(parent_uri.clone()), embed: None, @@ -69,7 +69,7 @@ async fn test_homeserver_post_engagement() -> Result<()> { let post_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let repost = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Repost".to_string(), + content: "Watcher:PostInfluencer:Bob:Repost".to_string(), kind: PostKind::Short, parent: None, embed: Some(PostEmbed { diff --git a/tests/watcher/posts/pioneer.rs b/tests/watcher/posts/influencer.rs similarity index 61% rename from tests/watcher/posts/pioneer.rs rename to tests/watcher/posts/influencer.rs index 3b061e0b..9c0c2a5f 100644 --- a/tests/watcher/posts/pioneer.rs +++ b/tests/watcher/posts/influencer.rs @@ -1,26 +1,26 @@ -use crate::watcher::{users::utils::check_member_user_pioneer, utils::WatcherTest}; +use crate::watcher::{users::utils::check_member_user_influencer, utils::WatcherTest}; use anyhow::Result; use pubky_app_specs::{PostEmbed, PostKind, PubkyAppPost, PubkyAppUser}; use pubky_common::crypto::Keypair; #[tokio::test] -async fn test_homeserver_post_pioneer() -> Result<()> { +async fn test_homeserver_post_influencer() -> Result<()> { let mut test = WatcherTest::setup().await?; let alice_user_keypair = Keypair::random(); let alice_user = PubkyAppUser { - bio: Some("test_homeserver_post_pioneer".to_string()), + bio: Some("test_homeserver_post_influencer".to_string()), image: None, links: None, - name: "Watcher:PostPioneer:Alice".to_string(), + name: "Watcher:PostInfluencer:Alice".to_string(), status: None, }; let alice_id = test.create_user(&alice_user_keypair, &alice_user).await?; // Alice creates a new post let alice_post = PubkyAppPost { - content: "Watcher:PostPioneer:Alice:Post".to_string(), + content: "Watcher:PostInfluencer:Alice:Post".to_string(), kind: PostKind::Short, parent: None, embed: None, @@ -30,19 +30,19 @@ async fn test_homeserver_post_pioneer() -> Result<()> { let alice_post_id = test.create_post(&alice_id, &alice_post).await?; // CACHE_OP: Assert cache has not been updated. Missing followers - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&alice_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&alice_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // Create new user let bob_user_keypair = Keypair::random(); let bob_user = PubkyAppUser { - bio: Some("test_homeserver_post_pioneer".to_string()), + bio: Some("test_homeserver_post_influencer".to_string()), image: None, links: None, - name: "Watcher:PostPioneer:Bob".to_string(), + name: "Watcher:PostInfluencer:Bob".to_string(), status: None, }; let bob_id = test.create_user(&bob_user_keypair, &bob_user).await?; @@ -51,16 +51,16 @@ async fn test_homeserver_post_pioneer() -> Result<()> { test.create_follow(&bob_id, &alice_id).await?; // CACHE_OP: Assert if cache has been updated - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&alice_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 1); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&alice_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 1); // Bob replies to popular alice post let parent_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let reply = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Reply".to_string(), + content: "Watcher:PostInfluencer:Bob:Reply".to_string(), kind: PostKind::Short, parent: Some(parent_uri.clone()), embed: None, @@ -73,7 +73,7 @@ async fn test_homeserver_post_pioneer() -> Result<()> { let post_uri = format!("pubky://{}/pub/pubky.app/posts/{}", alice_id, alice_post_id); let repost = PubkyAppPost { - content: "Watcher:PostPioneer:Bob:Repost".to_string(), + content: "Watcher:PostInfluencer:Bob:Repost".to_string(), kind: PostKind::Short, parent: None, embed: Some(PostEmbed { @@ -86,18 +86,18 @@ async fn test_homeserver_post_pioneer() -> Result<()> { test.create_post(&bob_id, &repost).await?; // CACHE_OP: Assert if cache has been updated - let pioneer_score = check_member_user_pioneer(&bob_id).await.unwrap(); - assert!(pioneer_score.is_some()); - // Pioneer score does not update because popular user does not have any interaction - assert_eq!(pioneer_score.unwrap(), 0); + let influencer_score = check_member_user_influencer(&bob_id).await.unwrap(); + assert!(influencer_score.is_some()); + // Influencer score does not update because popular user does not have any interaction + assert_eq!(influencer_score.unwrap(), 0); // Follow Bob test.create_follow(&alice_id, &bob_id).await?; // CACHE_OP: Assert if cache has been updated - let pioneer_score = check_member_user_pioneer(&bob_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 2); + let influencer_score = check_member_user_influencer(&bob_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 2); // TODO: Impl DEL post. Assert the reply does not exist in Nexus // test.cleanup_post(&user_id, &reply_id).await?; diff --git a/tests/watcher/posts/mod.rs b/tests/watcher/posts/mod.rs index 825cc143..1b21b27b 100644 --- a/tests/watcher/posts/mod.rs +++ b/tests/watcher/posts/mod.rs @@ -13,7 +13,7 @@ mod edit_reply_parent_notification; mod edit_reposted_notification; mod edit_tagged_notification; mod engagement; -mod pioneer; +mod influencer; mod raw; mod reply; mod reply_engagement; diff --git a/tests/watcher/posts/raw.rs b/tests/watcher/posts/raw.rs index ef50a3c5..acccba36 100644 --- a/tests/watcher/posts/raw.rs +++ b/tests/watcher/posts/raw.rs @@ -2,7 +2,7 @@ use super::utils::{ check_member_global_timeline_user_post, check_member_user_post_timeline, find_post_counts, }; use crate::watcher::posts::utils::find_post_details; -use crate::watcher::users::utils::{check_member_user_pioneer, find_user_counts}; +use crate::watcher::users::utils::{check_member_user_influencer, find_user_counts}; use crate::watcher::utils::WatcherTest; use anyhow::Result; use pubky_app_specs::{PostKind, PubkyAppPost, PubkyAppUser}; @@ -80,10 +80,10 @@ async fn test_homeserver_put_post_event() -> Result<()> { assert!(post_timeline.is_some()); assert_eq!(post_timeline.unwrap(), post_details.indexed_at as isize); - // Has pioneer score. Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // Has influencer score. Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); let exist_count = find_user_counts(&user_id).await; assert_eq!(exist_count.posts, 1); diff --git a/tests/watcher/tags/user_to_self_put.rs b/tests/watcher/tags/user_to_self_put.rs index f86812b4..5cfef389 100644 --- a/tests/watcher/tags/user_to_self_put.rs +++ b/tests/watcher/tags/user_to_self_put.rs @@ -1,6 +1,6 @@ use super::utils::find_user_tag; use crate::watcher::{ - users::utils::{check_member_user_pioneer, find_user_counts}, + users::utils::{check_member_user_influencer, find_user_counts}, utils::WatcherTest, }; use anyhow::Result; @@ -72,12 +72,15 @@ async fn test_homeserver_put_tag_user_self() -> Result<()> { assert_eq!(user_counts.tags, 1); assert_eq!(user_counts.tagged, 1); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id) .await - .expect("Failed to check user pioneer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + .expect("Failed to check user influencer score"); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&user_id).await?; diff --git a/tests/watcher/tags/user_to_user_del.rs b/tests/watcher/tags/user_to_user_del.rs index 26f989c0..24241ffd 100644 --- a/tests/watcher/tags/user_to_user_del.rs +++ b/tests/watcher/tags/user_to_user_del.rs @@ -1,6 +1,6 @@ use super::utils::find_user_tag; use crate::watcher::{ - users::utils::{check_member_user_pioneer, find_user_counts}, + users::utils::{check_member_user_influencer, find_user_counts}, utils::WatcherTest, }; use anyhow::Result; @@ -80,13 +80,16 @@ async fn test_homeserver_del_tag_to_another_user() -> Result<()> { let user_counts = find_user_counts(&tagged_user_id).await; assert_eq!(user_counts.tagged, 0); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&tagger_user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&tagger_user_id) .await - .expect("Failed to check user pioneer score"); + .expect("Failed to check user influencer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&tagged_user_id).await?; diff --git a/tests/watcher/tags/user_to_user_put.rs b/tests/watcher/tags/user_to_user_put.rs index de22966b..17a05b28 100644 --- a/tests/watcher/tags/user_to_user_put.rs +++ b/tests/watcher/tags/user_to_user_put.rs @@ -1,6 +1,6 @@ use super::utils::find_user_tag; use crate::watcher::{ - users::utils::{check_member_user_pioneer, find_user_counts}, + users::utils::{check_member_user_influencer, find_user_counts}, utils::WatcherTest, }; use anyhow::Result; @@ -90,12 +90,15 @@ async fn test_homeserver_put_tag_user_another() -> Result<()> { let user_counts = find_user_counts(&tagger_user_id).await; assert_eq!(user_counts.tags, 1); - // Check user pionner score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&tagged_user_id) + // Check user pionner score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&tagged_user_id) .await - .expect("Failed to check user pioneer score"); - assert!(pioneer_score.is_some(), "Pioneer score should be present"); - assert_eq!(pioneer_score.unwrap(), 0); + .expect("Failed to check user influencer score"); + assert!( + influencer_score.is_some(), + "Influencer score should be present" + ); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup user test.cleanup_user(&tagged_user_id).await?; diff --git a/tests/watcher/users/raw.rs b/tests/watcher/users/raw.rs index 0382b792..25d16d90 100644 --- a/tests/watcher/users/raw.rs +++ b/tests/watcher/users/raw.rs @@ -1,5 +1,5 @@ use crate::watcher::{ - users::utils::{check_member_most_followed, check_member_user_pioneer, find_user_details}, + users::utils::{check_member_most_followed, check_member_user_influencer, find_user_details}, utils::WatcherTest, }; use anyhow::Result; @@ -74,15 +74,15 @@ async fn test_homeserver_user_put_event() -> Result<()> { assert!(is_member.is_some()); assert_eq!(is_member.unwrap(), 0); - // pioneers score: Sorted:Users:Pioneers - let pioneer_score = check_member_user_pioneer(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + // influencers score: Sorted:Users:Influencers + let influencer_score = check_member_user_influencer(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // most_followed score: Sorted:Users:MostFollowed - let pioneer_score = check_member_most_followed(&user_id).await.unwrap(); - assert!(pioneer_score.is_some()); - assert_eq!(pioneer_score.unwrap(), 0); + let influencer_score = check_member_most_followed(&user_id).await.unwrap(); + assert!(influencer_score.is_some()); + assert_eq!(influencer_score.unwrap(), 0); // Cleanup test.cleanup_user(&user_id).await?; diff --git a/tests/watcher/users/utils.rs b/tests/watcher/users/utils.rs index 2de7d68f..028e95ec 100644 --- a/tests/watcher/users/utils.rs +++ b/tests/watcher/users/utils.rs @@ -2,24 +2,26 @@ use anyhow::Result; use pubky_nexus::{ get_neo4j_graph, models::user::{ - UserCounts, UserDetails, UserStream, USER_MOSTFOLLOWED_KEY_PARTS, USER_PIONEERS_KEY_PARTS, + UserCounts, UserDetails, UserStream, USER_INFLUENCERS_KEY_PARTS, + USER_MOSTFOLLOWED_KEY_PARTS, }, queries, RedisOps, }; pub async fn check_member_most_followed(user_id: &str) -> Result> { - let pioneer_score = + let influencer_score = UserStream::check_sorted_set_member(&USER_MOSTFOLLOWED_KEY_PARTS, &[user_id]) .await .unwrap(); - Ok(pioneer_score) + Ok(influencer_score) } -pub async fn check_member_user_pioneer(user_id: &str) -> Result> { - let pioneer_score = UserStream::check_sorted_set_member(&USER_PIONEERS_KEY_PARTS, &[user_id]) - .await - .unwrap(); - Ok(pioneer_score) +pub async fn check_member_user_influencer(user_id: &str) -> Result> { + let influencer_score = + UserStream::check_sorted_set_member(&USER_INFLUENCERS_KEY_PARTS, &[user_id]) + .await + .unwrap(); + Ok(influencer_score) } pub async fn find_user_counts(user_id: &str) -> UserCounts { From 233bbf3956fcf2c99ec8bbd6bda5a456182de6be Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 29 Nov 2024 16:04:03 -0500 Subject: [PATCH 02/18] add influencers --- src/db/graph/queries/get.rs | 18 +-- src/models/tag/stream.rs | 66 +---------- src/models/user/influencers.rs | 195 +++++++++++++++++++++++++++++++++ src/models/user/mod.rs | 1 + src/routes/v0/tag/global.rs | 4 +- src/types/mod.rs | 10 ++ src/types/timeframe.rs | 53 +++++++++ 7 files changed, 274 insertions(+), 73 deletions(-) create mode 100644 src/models/user/influencers.rs create mode 100644 src/types/timeframe.rs diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index 6b00bb02..52a53ade 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -1,7 +1,7 @@ use crate::models::post::StreamSource; use crate::models::tag::stream::HotTagsInput; -use crate::models::tag::stream::TagStreamReach; use crate::types::Pagination; +use crate::types::StreamReach; use crate::types::StreamSorting; use neo4rs::{query, Query}; use pubky_app_specs::PostKind; @@ -413,11 +413,11 @@ pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query { .param("labels", tag_list) } -fn tag_stream_reach_to_graph_subquery(reach: &TagStreamReach) -> String { +fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String { let query = match reach { - TagStreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)", - TagStreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)", - TagStreamReach::Friends => { + StreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)", + StreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)", + StreamReach::Friends => { "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)" } }; @@ -427,7 +427,7 @@ fn tag_stream_reach_to_graph_subquery(reach: &TagStreamReach) -> String { pub fn get_tag_taggers_by_reach( label: &str, user_id: &str, - reach: TagStreamReach, + reach: StreamReach, skip: usize, limit: usize, ) -> Query { @@ -442,7 +442,7 @@ pub fn get_tag_taggers_by_reach( SKIP $skip LIMIT $limit RETURN COLLECT(reach.id) as tagger_ids ", - tag_stream_reach_to_graph_subquery(&reach) + stream_reach_to_graph_subquery(&reach) ) .as_str(), ) @@ -454,7 +454,7 @@ pub fn get_tag_taggers_by_reach( pub fn get_hot_tags_by_reach( user_id: &str, - reach: TagStreamReach, + reach: StreamReach, tags_query: &HotTagsInput, ) -> Query { let input_tagged_type = match &tags_query.tagged_type { @@ -484,7 +484,7 @@ pub fn get_hot_tags_by_reach( SKIP $skip LIMIT $limit RETURN COLLECT(hot_tag) as hot_tags ", - tag_stream_reach_to_graph_subquery(&reach), + stream_reach_to_graph_subquery(&reach), input_tagged_type, tags_query.taggers_limit ) diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index e354e8e1..664baf96 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -1,15 +1,14 @@ use crate::db::graph::exec::retrieve_from_graph; use crate::db::kv::index::sorted_sets::SortOrder; -use crate::types::DynError; +use crate::types::{DynError, StreamReach, Timeframe}; use axum::async_trait; -use chrono::Datelike; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Display; use std::ops::Deref; use utoipa::ToSchema; -use crate::{db::connectors::neo4j::get_neo4j_graph, queries}; +use crate::queries; use crate::{RedisOps, ScoreAction}; pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"]; @@ -19,14 +18,6 @@ const GLOBAL_HOT_TAGS_PREFIX: &str = "Hot_Tags"; #[derive(Serialize, Deserialize, Debug, ToSchema, Clone)] pub struct Taggers(pub Vec); -#[derive(Deserialize, Debug, ToSchema, Clone)] -#[serde(rename_all = "snake_case")] -pub enum TagStreamReach { - Followers, - Following, - Friends, -} - #[derive(Deserialize, Debug, ToSchema, Clone)] pub enum TaggedType { Post, @@ -42,55 +33,6 @@ impl Display for TaggedType { } } -#[derive(Deserialize, Debug, ToSchema, Clone)] -pub enum Timeframe { - Today, - ThisMonth, - AllTime, -} - -impl Display for Timeframe { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Timeframe::Today => write!(f, "Today"), - Timeframe::ThisMonth => write!(f, "ThisMonth"), - Timeframe::AllTime => write!(f, "AllTime"), - } - } -} - -impl Timeframe { - pub fn to_timestamp_range(&self) -> (i64, i64) { - let now = chrono::Utc::now(); - let start = match self { - Timeframe::Today => now - .date_naive() - .and_hms_opt(0, 0, 0) - .unwrap() - .and_utc() - .timestamp_millis(), - Timeframe::ThisMonth => now - .date_naive() - .with_day(1) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap() - .and_utc() - .timestamp_millis(), - Timeframe::AllTime => 0, - }; - (start, now.timestamp_millis()) - } - - pub fn to_cache_period(&self) -> i64 { - match self { - Timeframe::Today => 60 * 60, - Timeframe::ThisMonth => 60 * 60 * 24, - Timeframe::AllTime => 60 * 60 * 24, - } - } -} - pub struct HotTagsInput { pub timeframe: Timeframe, pub skip: usize, @@ -278,7 +220,7 @@ impl HotTags { pub async fn get_hot_tags( user_id: Option, - reach: Option, + reach: Option, tags_query: &HotTagsInput, ) -> Result, DynError> { match user_id { @@ -291,7 +233,7 @@ impl HotTags { async fn get_hot_tags_by_reach( user_id: String, - reach: TagStreamReach, + reach: StreamReach, tags_query: &HotTagsInput, ) -> Result, DynError> { let query = queries::get::get_hot_tags_by_reach(user_id.as_str(), reach, tags_query); diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs new file mode 100644 index 00000000..89153998 --- /dev/null +++ b/src/models/user/influencers.rs @@ -0,0 +1,195 @@ +use crate::db::graph::exec::retrieve_from_graph; +use crate::db::kv::index::sorted_sets::SortOrder; +use crate::types::DynError; +use crate::types::StreamReach; +use crate::types::Timeframe; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::ops::Deref; +use utoipa::ToSchema; + +use crate::queries; +use crate::RedisOps; + +const GLOBAL_INFLUENCERS_PREFIX: &str = "Influencers"; + +#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)] +pub struct Influencer { + id: String, + score: f64, +} + +// Define a newtype wrapper +#[derive(Serialize, Deserialize, Debug, ToSchema, Default, Clone)] +pub struct Influencers(pub Vec); + +impl RedisOps for Influencers {} + +#[derive(Serialize, Deserialize, Debug, ToSchema, Default)] +pub struct InfluencersData(pub HashMap); + +impl RedisOps for InfluencersData {} + +// Create a Influencers instance directly from an iterator of Influencer items +// Need it in collect() +impl FromIterator for Influencers { + fn from_iter>(iter: I) -> Self { + Influencers(iter.into_iter().collect()) + } +} + +// Implement Deref so Influencers can be used like Vec +impl Deref for Influencers { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Influencers { + pub async fn reindex() -> Result<(), DynError> { + Influencers::get_global_influencers(0, 100, Timeframe::AllTime).await?; + Influencers::get_global_influencers(0, 100, Timeframe::ThisMonth).await?; + Ok(()) + } + + fn get_cache_key_parts(timeframe: Timeframe) -> Vec { + vec![timeframe.to_string()] + } + + async fn get_from_global_cache( + skip: usize, + limit: usize, + timeframe: Timeframe, + ) -> Result, DynError> { + let key_parts = Influencers::get_cache_key_parts(timeframe); + let key_parts_vector: Vec<&str> = + key_parts.iter().map(|s| s.as_str()).collect::>(); + let data = + InfluencersData::try_from_index_json(key_parts_vector.clone().as_slice()).await?; + let ranking = Influencers::try_from_index_sorted_set( + key_parts_vector.as_slice(), + None, + None, + Some(skip), + Some(limit), + SortOrder::Descending, + Some(GLOBAL_INFLUENCERS_PREFIX), + ) + .await?; + + if data.is_none() || ranking.is_none() { + return Ok(None); + } + + let mapping = data.unwrap(); + + // for each value in ranking, look up the value in data + let mut influencers = Vec::new(); + for (id, _) in ranking.unwrap() { + if let Some(influencer) = mapping.0.get(&id) { + influencers.push(Influencer { + id, + score: influencer.score, + }); + } + } + Ok(Some(Influencers(influencers))) + } + + async fn set_to_global_cache( + result: Influencers, + timeframe: Timeframe, + ) -> Result<(), DynError> { + let key_parts = Influencers::get_cache_key_parts(timeframe.clone()); + let key_parts_vector: Vec<&str> = + key_parts.iter().map(|s| s.as_str()).collect::>(); + + // turn result which is a vector of Influencer, into a mapping from label to Influencer + let mapping = result.clone(); + let influencers_data: HashMap = mapping + .iter() + .map(|item| item.clone()) + .into_iter() + .map(|influencer| (influencer.id.clone(), influencer)) + .collect(); + + // store the data as json in cache + InfluencersData::put_index_json( + &InfluencersData(influencers_data), + key_parts_vector.as_slice(), + Some(timeframe.to_cache_period()), + ) + .await?; + + // store the ranking as sorted set in cache + Influencers::put_index_sorted_set( + key_parts_vector.as_slice(), + result + .iter() + .map(|influencer| (influencer.score as f64, influencer.id.as_str())) + .collect::>() + .as_slice(), + Some(GLOBAL_INFLUENCERS_PREFIX), + Some(timeframe.to_cache_period()), + ) + .await?; + Ok(()) + } + + pub async fn get_influencers( + user_id: Option, + reach: Option, + skip: usize, + limit: usize, + timeframe: Timeframe, + ) -> Result, DynError> { + match user_id { + Some(user_id) => { + Influencers::get_influencers_by_reach( + user_id, + reach.unwrap(), + skip, + limit, + timeframe, + ) + .await + } + None => Influencers::get_global_influencers(skip, limit, timeframe).await, + } + } + + async fn get_influencers_by_reach( + user_id: String, + reach: StreamReach, + skip: usize, + limit: usize, + timeframe: Timeframe, + ) -> Result, DynError> { + let query = + queries::get::get_influencers_by_reach(user_id.as_str(), reach, skip, limit, timeframe); + retrieve_from_graph::(query, "influencers").await + } + + async fn get_global_influencers( + skip: usize, + limit: usize, + timeframe: Timeframe, + ) -> Result, DynError> { + let cached_influencers = Influencers::get_from_global_cache(skip, limit, timeframe).await?; + if cached_influencers.is_some() { + return Ok(cached_influencers); + } + + let query = queries::get::get_global_influencers(0, 100, timeframe.clone()); + let result = retrieve_from_graph::(query, "influencers").await?; + + let influencers = result.unwrap(); + if influencers.len() > 0 { + Influencers::set_to_global_cache(influencers.clone(), timeframe).await?; + } + + Influencers::get_from_global_cache(skip, limit, timeframe).await + } +} diff --git a/src/models/user/mod.rs b/src/models/user/mod.rs index 32bb3903..e2cbb833 100644 --- a/src/models/user/mod.rs +++ b/src/models/user/mod.rs @@ -1,6 +1,7 @@ mod counts; mod details; //mod id; +mod influencers; mod muted; mod relationship; mod search; diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs index e3600565..ce7883c8 100644 --- a/src/routes/v0/tag/global.rs +++ b/src/routes/v0/tag/global.rs @@ -1,9 +1,9 @@ use crate::models::tag::global::TagGlobal; use crate::models::tag::stream::{ - HotTag, HotTags, HotTagsInput, TagStreamReach, TaggedType, Taggers, Timeframe, + HotTag, HotTags, HotTagsInput, TagStreamReach, TaggedType, Taggers, }; use crate::routes::v0::endpoints::{TAGS_HOT_ROUTE, TAG_TAGGERS_ROUTE}; -use crate::types::Pagination; +use crate::types::{Pagination, Timeframe}; use crate::{Error, Result}; use axum::extract::{Path, Query}; use axum::Json; diff --git a/src/types/mod.rs b/src/types/mod.rs index 84055af5..69e18a2c 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,8 +1,10 @@ mod pagination; mod pubky; +mod timeframe; pub use pagination::Pagination; pub use pubky::PubkyId; +pub use timeframe::Timeframe; use serde::{Deserialize, Serialize}; use std::error::Error; @@ -17,3 +19,11 @@ pub enum StreamSorting { Timeline, TotalEngagement, } + +#[derive(Deserialize, Debug, ToSchema, Clone)] +#[serde(rename_all = "snake_case")] +pub enum StreamReach { + Followers, + Following, + Friends, +} diff --git a/src/types/timeframe.rs b/src/types/timeframe.rs new file mode 100644 index 00000000..9d9277f5 --- /dev/null +++ b/src/types/timeframe.rs @@ -0,0 +1,53 @@ +use chrono::Datelike; +use serde::Deserialize; +use std::fmt::Display; +use utoipa::ToSchema; + +#[derive(Deserialize, Debug, ToSchema, Clone)] +pub enum Timeframe { + Today, + ThisMonth, + AllTime, +} + +impl Display for Timeframe { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Timeframe::Today => write!(f, "Today"), + Timeframe::ThisMonth => write!(f, "ThisMonth"), + Timeframe::AllTime => write!(f, "AllTime"), + } + } +} + +impl Timeframe { + pub fn to_timestamp_range(&self) -> (i64, i64) { + let now = chrono::Utc::now(); + let start = match self { + Timeframe::Today => now + .date_naive() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp_millis(), + Timeframe::ThisMonth => now + .date_naive() + .with_day(1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp_millis(), + Timeframe::AllTime => 0, + }; + (start, now.timestamp_millis()) + } + + pub fn to_cache_period(&self) -> i64 { + match self { + Timeframe::Today => 60 * 60, + Timeframe::ThisMonth => 60 * 60 * 24, + Timeframe::AllTime => 60 * 60 * 24, + } + } +} From 97a8a658bfb561d8fbe3596b6c0a6c138a540f61 Mon Sep 17 00:00:00 2001 From: amir Date: Sat, 30 Nov 2024 00:39:37 -0500 Subject: [PATCH 03/18] Add queries --- src/db/graph/queries/get.rs | 95 +++++++++++++++++++++++++++++++--- src/models/user/influencers.rs | 27 +++++----- src/types/mod.rs | 1 + 3 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index 52a53ade..33464300 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -3,6 +3,7 @@ use crate::models::tag::stream::HotTagsInput; use crate::types::Pagination; use crate::types::StreamReach; use crate::types::StreamSorting; +use crate::types::Timeframe; use neo4rs::{query, Query}; use pubky_app_specs::PostKind; @@ -414,14 +415,19 @@ pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query { } fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String { - let query = match reach { - StreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)", - StreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)", + match reach { + StreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)".to_string(), + StreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)".to_string(), StreamReach::Friends => { - "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)" + "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)".to_string() } - }; - String::from(query) + StreamReach::Wot(depth) => { + format!( + "MATCH (viewer)-[:FOLLOWS*1..{}]->(tagger:User)", + depth.clone() + ) + } + } } pub fn get_tag_taggers_by_reach( @@ -533,6 +539,83 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInput) -> Query { .param("to", to) } +pub fn get_influencers_by_reach( + user_id: &str, + reach: StreamReach, + skip: usize, + limit: usize, + timeframe: &Timeframe, +) -> Query { + let (from, to) = timeframe.to_timestamp_range(); + query( + format!( + " + {} + WHERE user.id = $user_id + + OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(reach) + WHERE follow.indexed_at >= $from AND follow.indexed_at < $to + + OPTIONAL MATCH (reach)-[tag:TAGGED]->(tagged:Post) + WHERE tag.indexed_at >= $from AND tag.indexed_at < $to + + OPTIONAL MATCH (reach)-[post:AUTHORED]->(post:Post) + WHERE post.indexed_at >= $from AND post.indexed_at < $to + + WITH reach, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, + COUNT(DISTINCT post) AS posts_count + WITH {{ + id: reach.id, + score: (tags_count + posts_count) * sqrt(followers_count), + }} AS influencer + ORDER BY influencer.score DESC, reach.id ASC + SKIP $skip LIMIT $limit + RETURN COLLECT(influencer) as influencers + ", + stream_reach_to_graph_subquery(&reach), + ) + .as_str(), + ) + .param("user_id", user_id) + .param("skip", skip as i64) + .param("limit", limit as i64) + .param("from", from) + .param("to", to) +} + +pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) -> Query { + let (from, to) = timeframe.to_timestamp_range(); + query( + format!( + " + OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user:User) + WHERE follow.indexed_at >= $from AND follow.indexed_at < $to + + OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post) + WHERE tag.indexed_at >= $from AND tag.indexed_at < $to + + OPTIONAL MATCH (user)-[post:AUTHORED]->(post:Post) + WHERE post.indexed_at >= $from AND post.indexed_at < $to + + WITH user, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, + COUNT(DISTINCT post) AS posts_count + WITH {{ + id: user.id, + score: (tags_count + posts_count) * sqrt(followers_count), + }} AS influencer + ORDER BY influencer.score DESC, user.id ASC + SKIP $skip LIMIT $limit + RETURN COLLECT(influencer) as influencers + ", + ) + .as_str(), + ) + .param("skip", skip as i64) + .param("limit", limit as i64) + .param("from", from) + .param("to", to) +} + pub fn get_files_by_ids(key_pair: &[&[&str]]) -> Query { query( " diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index 89153998..386a70f7 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -49,19 +49,19 @@ impl Deref for Influencers { impl Influencers { pub async fn reindex() -> Result<(), DynError> { - Influencers::get_global_influencers(0, 100, Timeframe::AllTime).await?; - Influencers::get_global_influencers(0, 100, Timeframe::ThisMonth).await?; + Influencers::get_global_influencers(0, 100, &Timeframe::AllTime).await?; + Influencers::get_global_influencers(0, 100, &Timeframe::ThisMonth).await?; Ok(()) } - fn get_cache_key_parts(timeframe: Timeframe) -> Vec { + fn get_cache_key_parts(timeframe: &Timeframe) -> Vec { vec![timeframe.to_string()] } async fn get_from_global_cache( skip: usize, limit: usize, - timeframe: Timeframe, + timeframe: &Timeframe, ) -> Result, DynError> { let key_parts = Influencers::get_cache_key_parts(timeframe); let key_parts_vector: Vec<&str> = @@ -98,11 +98,11 @@ impl Influencers { Ok(Some(Influencers(influencers))) } - async fn set_to_global_cache( + async fn put_to_global_cache( result: Influencers, - timeframe: Timeframe, + timeframe: &Timeframe, ) -> Result<(), DynError> { - let key_parts = Influencers::get_cache_key_parts(timeframe.clone()); + let key_parts = Influencers::get_cache_key_parts(timeframe); let key_parts_vector: Vec<&str> = key_parts.iter().map(|s| s.as_str()).collect::>(); @@ -143,7 +143,7 @@ impl Influencers { reach: Option, skip: usize, limit: usize, - timeframe: Timeframe, + timeframe: &Timeframe, ) -> Result, DynError> { match user_id { Some(user_id) => { @@ -165,7 +165,7 @@ impl Influencers { reach: StreamReach, skip: usize, limit: usize, - timeframe: Timeframe, + timeframe: &Timeframe, ) -> Result, DynError> { let query = queries::get::get_influencers_by_reach(user_id.as_str(), reach, skip, limit, timeframe); @@ -175,19 +175,20 @@ impl Influencers { async fn get_global_influencers( skip: usize, limit: usize, - timeframe: Timeframe, + timeframe: &Timeframe, ) -> Result, DynError> { - let cached_influencers = Influencers::get_from_global_cache(skip, limit, timeframe).await?; + let cached_influencers = + Influencers::get_from_global_cache(skip, limit, &timeframe).await?; if cached_influencers.is_some() { return Ok(cached_influencers); } - let query = queries::get::get_global_influencers(0, 100, timeframe.clone()); + let query = queries::get::get_global_influencers(0, 100, &timeframe); let result = retrieve_from_graph::(query, "influencers").await?; let influencers = result.unwrap(); if influencers.len() > 0 { - Influencers::set_to_global_cache(influencers.clone(), timeframe).await?; + Influencers::put_to_global_cache(influencers.clone(), timeframe).await?; } Influencers::get_from_global_cache(skip, limit, timeframe).await diff --git a/src/types/mod.rs b/src/types/mod.rs index 69e18a2c..ee287d5c 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -26,4 +26,5 @@ pub enum StreamReach { Followers, Following, Friends, + Wot(u8), } From 188fe2240e8b15fa2c240199a5c86b2e8a364854 Mon Sep 17 00:00:00 2001 From: amir Date: Tue, 3 Dec 2024 01:20:08 -0500 Subject: [PATCH 04/18] fix build --- benches/tag.rs | 17 ++++++++--------- src/db/graph/queries/get.rs | 12 ++++++------ src/models/tag/global.rs | 8 ++++---- src/models/user/mod.rs | 1 + src/routes/v0/tag/global.rs | 14 ++++++-------- 5 files changed, 25 insertions(+), 27 deletions(-) diff --git a/benches/tag.rs b/benches/tag.rs index ff62c7a8..e608f3fe 100644 --- a/benches/tag.rs +++ b/benches/tag.rs @@ -1,12 +1,11 @@ -use chrono::Utc; use criterion::{criterion_group, criterion_main}; use criterion::{BenchmarkId, Criterion}; use pubky_nexus::models::tag::global::TagGlobal; use pubky_nexus::models::tag::post::TagPost; -use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput, TagStreamReach, Timeframe}; +use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput}; use pubky_nexus::models::tag::traits::{TagCollection, TaggersCollection}; use pubky_nexus::models::tag::user::TagUser; -use pubky_nexus::types::Pagination; +use pubky_nexus::types::{Pagination, StreamReach, Timeframe}; use setup::run_setup; use std::time::Duration; use tokio::runtime::Runtime; @@ -246,7 +245,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Following); + let reach_by = format!("{:?}", StreamReach::Following); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -266,7 +265,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Following), + Some(StreamReach::Following), &input, ) .await @@ -287,7 +286,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Followers); + let reach_by = format!("{:?}", StreamReach::Followers); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -307,7 +306,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Followers), + Some(StreamReach::Followers), &input, ) .await @@ -328,7 +327,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) { run_setup(); let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; - let reach_by = format!("{:?}", TagStreamReach::Friends); + let reach_by = format!("{:?}", StreamReach::Friends); let rt: Runtime = Runtime::new().unwrap(); c.bench_with_input( @@ -348,7 +347,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) { }; let profile = HotTags::get_hot_tags( Some(String::from(params[0])), - Some(TagStreamReach::Friends), + Some(StreamReach::Friends), &input, ) .await diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index 33464300..ac45dab8 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -559,14 +559,14 @@ pub fn get_influencers_by_reach( OPTIONAL MATCH (reach)-[tag:TAGGED]->(tagged:Post) WHERE tag.indexed_at >= $from AND tag.indexed_at < $to - OPTIONAL MATCH (reach)-[post:AUTHORED]->(post:Post) - WHERE post.indexed_at >= $from AND post.indexed_at < $to + OPTIONAL MATCH (reach)-[authored:AUTHORED]->(post:Post) + WHERE authored.indexed_at >= $from AND authored.indexed_at < $to WITH reach, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, COUNT(DISTINCT post) AS posts_count WITH {{ id: reach.id, - score: (tags_count + posts_count) * sqrt(followers_count), + score: (tags_count + posts_count) * sqrt(followers_count) }} AS influencer ORDER BY influencer.score DESC, reach.id ASC SKIP $skip LIMIT $limit @@ -594,14 +594,14 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post) WHERE tag.indexed_at >= $from AND tag.indexed_at < $to - OPTIONAL MATCH (user)-[post:AUTHORED]->(post:Post) - WHERE post.indexed_at >= $from AND post.indexed_at < $to + OPTIONAL MATCH (user)-[authored:AUTHORED]->(post:Post) + WHERE authored.indexed_at >= $from AND authored.indexed_at < $to WITH user, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, COUNT(DISTINCT post) AS posts_count WITH {{ id: user.id, - score: (tags_count + posts_count) * sqrt(followers_count), + score: (tags_count + posts_count) * sqrt(followers_count) }} AS influencer ORDER BY influencer.score DESC, user.id ASC SKIP $skip LIMIT $limit diff --git a/src/models/tag/global.rs b/src/models/tag/global.rs index c9e03360..aea881e8 100644 --- a/src/models/tag/global.rs +++ b/src/models/tag/global.rs @@ -1,6 +1,6 @@ -use super::stream::{TagStreamReach, Taggers}; +use super::stream::Taggers; use crate::db::graph::exec::retrieve_from_graph; -use crate::types::DynError; +use crate::types::{DynError, StreamReach}; use crate::{queries, RedisOps}; pub struct TagGlobal {} @@ -9,7 +9,7 @@ impl TagGlobal { pub async fn get_tag_taggers( label: String, user_id: Option, - reach: Option, + reach: Option, skip: usize, limit: usize, ) -> Result>, DynError> { @@ -31,7 +31,7 @@ pub async fn read_from_set( pub async fn get_tag_taggers_by_reach( label: &str, user_id: &str, - reach: TagStreamReach, + reach: StreamReach, skip: usize, limit: usize, ) -> Result>, DynError> { diff --git a/src/models/user/mod.rs b/src/models/user/mod.rs index e2cbb833..22fc533c 100644 --- a/src/models/user/mod.rs +++ b/src/models/user/mod.rs @@ -12,6 +12,7 @@ mod view; pub use counts::UserCounts; pub use details::UserDetails; //pub use id::PubkyId; +pub use influencers::Influencers; pub use muted::Muted; pub use relationship::Relationship; pub use search::{UserSearch, USER_NAME_KEY_PARTS}; diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs index ce7883c8..24871bfb 100644 --- a/src/routes/v0/tag/global.rs +++ b/src/routes/v0/tag/global.rs @@ -1,9 +1,7 @@ use crate::models::tag::global::TagGlobal; -use crate::models::tag::stream::{ - HotTag, HotTags, HotTagsInput, TagStreamReach, TaggedType, Taggers, -}; +use crate::models::tag::stream::{HotTag, HotTags, HotTagsInput, TaggedType, Taggers}; use crate::routes::v0::endpoints::{TAGS_HOT_ROUTE, TAG_TAGGERS_ROUTE}; -use crate::types::{Pagination, Timeframe}; +use crate::types::{Pagination, StreamReach, Timeframe}; use crate::{Error, Result}; use axum::extract::{Path, Query}; use axum::Json; @@ -14,7 +12,7 @@ use utoipa::OpenApi; #[derive(Deserialize, Debug)] pub struct HotTagsQuery { user_id: Option, - reach: Option, + reach: Option, taggers_limit: Option, timeframe: Option, tagged_type: Option, @@ -27,7 +25,7 @@ pub struct HotTagsQuery { pub struct TagTaggersQuery { pagination: Pagination, user_id: Option, - reach: Option, + reach: Option, } #[utoipa::path( @@ -37,7 +35,7 @@ pub struct TagTaggersQuery { tag = "Tags", params( ("label" = String, Path, description = "Tag name"), - ("reach" = TagStreamReach, Path, description = "Reach type: Follower | Following | Friends"), + ("reach" = StreamReach, Path, description = "Reach type: Follower | Following | Friends"), ("user_id" = Option, Query, description = "User ID to base reach on"), ), responses( @@ -72,7 +70,7 @@ pub async fn tag_taggers_handler( tag = "Tags", params( ("user_id" = Option, Query, description = "User Pubky ID"), - ("reach" = Option, Query, description = "Reach type: Follower | Following | Friends"), + ("reach" = Option, Query, description = "Reach type: Follower | Following | Friends"), ("taggers_limit" = Option, Query, description = "Retrieve N user_id for each tag"), ("skip" = Option, Query, description = "Skip N tags"), ("limit" = Option, Query, description = "Retrieve N tag"), From e3f19c4e5e1421b2cbe1fb252ac31b2a8eab2369 Mon Sep 17 00:00:00 2001 From: amir Date: Tue, 3 Dec 2024 01:38:40 -0500 Subject: [PATCH 05/18] Add reindex --- src/reindex.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reindex.rs b/src/reindex.rs index f0ce4556..882af2bf 100644 --- a/src/reindex.rs +++ b/src/reindex.rs @@ -7,7 +7,7 @@ use crate::models::tag::stream::HotTags; use crate::models::tag::traits::TagCollection; use crate::models::tag::user::TagUser; use crate::models::traits::Collection; -use crate::models::user::{Muted, UserDetails}; +use crate::models::user::{Influencers, Muted, UserDetails}; use crate::types::DynError; use crate::{ db::connectors::neo4j::get_neo4j_graph, @@ -69,6 +69,10 @@ pub async fn reindex() { .await .expect("Failed to store the global hot tags"); + Influencers::reindex() + .await + .expect("Failed to reindex influencers"); + TagSearch::reindex() .await .expect("Failed to store the global post tags"); From d62f98f2474ea073a44757c49cc06252b487c473 Mon Sep 17 00:00:00 2001 From: amir Date: Tue, 3 Dec 2024 08:23:41 -0500 Subject: [PATCH 06/18] Use new influencers in user stream --- benches/streams_benches/user.rs | 12 +++++++++--- src/models/user/influencers.rs | 11 +++++------ src/models/user/stream.rs | 32 +++++++++++++++++++------------- src/routes/v0/stream/users.rs | 8 ++++++-- 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index 7080ddac..0b20a76c 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -1,6 +1,9 @@ use crate::run_setup; use criterion::Criterion; -use pubky_nexus::models::user::{UserStream, UserStreamSource}; +use pubky_nexus::{ + models::user::{UserStream, UserStreamSource}, + types::StreamReach, +}; use tokio::runtime::Runtime; /// USER STREAMS BENCHMARKS @@ -22,7 +25,8 @@ pub fn bench_stream_following(c: &mut Criterion) { None, None, Some(20), - UserStreamSource::Influencers, + UserStreamSource::Influencers(StreamReach::Following), + None, None, ) .await @@ -50,6 +54,7 @@ pub fn bench_stream_most_followed(c: &mut Criterion) { Some(20), UserStreamSource::MostFollowed, None, + None, ) .await .unwrap(); @@ -99,7 +104,8 @@ pub fn bench_stream_influencers(c: &mut Criterion) { None, None, Some(20), - UserStreamSource::Influencers, + UserStreamSource::Influencers(StreamReach::Wot(3)), + None, None, ) .await diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index 386a70f7..c021d866 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -15,8 +15,8 @@ const GLOBAL_INFLUENCERS_PREFIX: &str = "Influencers"; #[derive(Deserialize, Serialize, ToSchema, Debug, Clone)] pub struct Influencer { - id: String, - score: f64, + pub id: String, + pub score: f64, } // Define a newtype wrapper @@ -139,7 +139,7 @@ impl Influencers { } pub async fn get_influencers( - user_id: Option, + user_id: Option<&str>, reach: Option, skip: usize, limit: usize, @@ -161,14 +161,13 @@ impl Influencers { } async fn get_influencers_by_reach( - user_id: String, + user_id: &str, reach: StreamReach, skip: usize, limit: usize, timeframe: &Timeframe, ) -> Result, DynError> { - let query = - queries::get::get_influencers_by_reach(user_id.as_str(), reach, skip, limit, timeframe); + let query = queries::get::get_influencers_by_reach(user_id, reach, skip, limit, timeframe); retrieve_from_graph::(query, "influencers").await } diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index 5cd178c0..4e6748b0 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -1,6 +1,6 @@ -use super::{Muted, UserCounts, UserSearch, UserView}; +use super::{Influencers, Muted, UserCounts, UserSearch, UserView}; use crate::models::follow::{Followers, Following, Friends, UserFollows}; -use crate::types::DynError; +use crate::types::{DynError, StreamReach, Timeframe}; use crate::{db::kv::index::sorted_sets::SortOrder, RedisOps}; use crate::{get_neo4j_graph, queries}; use serde::{Deserialize, Serialize}; @@ -21,7 +21,7 @@ pub enum UserStreamSource { Friends, Muted, MostFollowed, - Influencers, + Influencers(StreamReach), Recommended, } @@ -38,8 +38,10 @@ impl UserStream { limit: Option, source: UserStreamSource, depth: Option, + timeframe: Option, ) -> Result, DynError> { - let user_ids = Self::get_user_list_from_source(user_id, source, skip, limit).await?; + let user_ids = + Self::get_user_list_from_source(user_id, source, skip, limit, timeframe).await?; match user_ids { Some(users) => Self::from_listed_user_ids(&users, viewer_id, depth).await, None => Ok(None), @@ -194,6 +196,7 @@ impl UserStream { source: UserStreamSource, skip: Option, limit: Option, + timeframe: Option, ) -> Result>, DynError> { let user_ids = match source { UserStreamSource::Followers => Followers::get_by_id( @@ -238,17 +241,20 @@ impl UserStream { ) .await? .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), - UserStreamSource::Influencers => Self::try_from_index_sorted_set( - &USER_INFLUENCERS_KEY_PARTS, - None, - None, - skip, - limit, - SortOrder::Descending, - None, + UserStreamSource::Influencers(reach) => Influencers::get_influencers( + user_id, + Some(reach), + skip.unwrap_or(0), + limit.unwrap_or(10).min(100), + &timeframe.unwrap_or(Timeframe::AllTime), ) .await? - .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), + .map(|result| { + result + .iter() + .map(|influencer| influencer.id.clone()) + .collect() + }), UserStreamSource::Recommended => { UserStream::get_recommended_ids( user_id.ok_or( diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 389aefe1..7113515a 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -2,7 +2,7 @@ use crate::models::user::{UserStream, UserStreamSource}; use crate::routes::v0::endpoints::{ STREAM_USERS_BY_IDS_ROUTE, STREAM_USERS_ROUTE, STREAM_USERS_USERNAME_SEARCH_ROUTE, }; -use crate::types::Pagination; +use crate::types::{Pagination, Timeframe}; use crate::{Error, Result}; use axum::extract::Query; use axum::Json; @@ -18,6 +18,7 @@ pub struct UserStreamQuery { limit: Option, source: Option, depth: Option, + timeframe: Option, } #[utoipa::path( @@ -31,7 +32,8 @@ pub struct UserStreamQuery { ("skip" = Option, Query, description = "Skip N followers"), ("limit" = Option, Query, description = "Retrieve N followers"), ("source" = Option, Query, description = "Source of users for the stream."), - ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored") + ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored"), + ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range") ), responses( (status = 200, description = "Users stream", body = UserStream), @@ -50,6 +52,7 @@ pub async fn stream_users_handler( let skip = query.skip.unwrap_or(0); let limit = query.limit.unwrap_or(6).min(20); let source = query.source.unwrap_or(UserStreamSource::Followers); + let timeframe = query.timeframe.unwrap_or(Timeframe::AllTime); if query.user_id.is_none() { match source { @@ -93,6 +96,7 @@ pub async fn stream_users_handler( Some(limit), source.clone(), query.depth, + Some(timeframe), ) .await { From 42d9688785e3afba030923a9110e529a612aefa6 Mon Sep 17 00:00:00 2001 From: amir Date: Tue, 3 Dec 2024 10:00:33 -0500 Subject: [PATCH 07/18] fix lint issues --- src/db/graph/queries/get.rs | 5 +---- src/models/tag/stream.rs | 3 +-- src/models/user/influencers.rs | 10 ++++------ 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index 254ae072..ceb6d56e 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -586,8 +586,7 @@ pub fn get_influencers_by_reach( pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) -> Query { let (from, to) = timeframe.to_timestamp_range(); query( - format!( - " + " OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user:User) WHERE follow.indexed_at >= $from AND follow.indexed_at < $to @@ -607,8 +606,6 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) SKIP $skip LIMIT $limit RETURN COLLECT(influencer) as influencers ", - ) - .as_str(), ) .param("skip", skip as i64) .param("limit", limit as i64) diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index 664baf96..a795ca7c 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -190,8 +190,7 @@ impl HotTags { let mapping = result.clone(); let hot_tags_data: HashMap = mapping .iter() - .map(|item| item.clone()) - .into_iter() + .cloned() .map(|tag| (tag.label.clone(), tag)) .collect(); diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index c021d866..d1b75962 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -110,8 +110,7 @@ impl Influencers { let mapping = result.clone(); let influencers_data: HashMap = mapping .iter() - .map(|item| item.clone()) - .into_iter() + .cloned() .map(|influencer| (influencer.id.clone(), influencer)) .collect(); @@ -128,7 +127,7 @@ impl Influencers { key_parts_vector.as_slice(), result .iter() - .map(|influencer| (influencer.score as f64, influencer.id.as_str())) + .map(|influencer| (influencer.score, influencer.id.as_str())) .collect::>() .as_slice(), Some(GLOBAL_INFLUENCERS_PREFIX), @@ -176,13 +175,12 @@ impl Influencers { limit: usize, timeframe: &Timeframe, ) -> Result, DynError> { - let cached_influencers = - Influencers::get_from_global_cache(skip, limit, &timeframe).await?; + let cached_influencers = Influencers::get_from_global_cache(skip, limit, timeframe).await?; if cached_influencers.is_some() { return Ok(cached_influencers); } - let query = queries::get::get_global_influencers(0, 100, &timeframe); + let query = queries::get::get_global_influencers(0, 100, timeframe); let result = retrieve_from_graph::(query, "influencers").await?; let influencers = result.unwrap(); From 99e0a59ab60b0b0b8e6c265da3b00f313243e86e Mon Sep 17 00:00:00 2001 From: amir Date: Wed, 4 Dec 2024 02:15:42 -0500 Subject: [PATCH 08/18] Add more tests and fix review issues --- src/models/tag/stream.rs | 2 +- src/routes/v0/tag/global.rs | 10 +++------- src/types/timeframe.rs | 1 + tests/service/tags/hot.rs | 30 ++++++++++++++++++++++++++---- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index a795ca7c..3d909412 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -13,7 +13,7 @@ use crate::{RedisOps, ScoreAction}; pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"]; -const GLOBAL_HOT_TAGS_PREFIX: &str = "Hot_Tags"; +const GLOBAL_HOT_TAGS_PREFIX: &str = "Hot:Tags"; #[derive(Serialize, Deserialize, Debug, ToSchema, Clone)] pub struct Taggers(pub Vec); diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs index 24871bfb..dc7969b9 100644 --- a/src/routes/v0/tag/global.rs +++ b/src/routes/v0/tag/global.rs @@ -15,7 +15,6 @@ pub struct HotTagsQuery { reach: Option, taggers_limit: Option, timeframe: Option, - tagged_type: Option, #[serde(flatten)] pagination: Pagination, @@ -70,13 +69,11 @@ pub async fn tag_taggers_handler( tag = "Tags", params( ("user_id" = Option, Query, description = "User Pubky ID"), - ("reach" = Option, Query, description = "Reach type: Follower | Following | Friends"), + ("reach" = Option, Query, description = "Reach type: follower | following | friends | wot"), ("taggers_limit" = Option, Query, description = "Retrieve N user_id for each tag"), ("skip" = Option, Query, description = "Skip N tags"), ("limit" = Option, Query, description = "Retrieve N tag"), - ("from" = Option, Query, description = "Retrieve hot tags from this timestamp"), - ("to" = Option, Query, description = "Retrieve hot tags up to this timestamp"), - ("tagged_type" = Option, Query, description = "Retrieve hot tags by the type of entities tagged with it"), + ("timeframe" = Option, Query, description = "Retrieve hot tags for this specific timeframe. Defaults to all_time"), ), responses( (status = 200, description = "Retrieve tags by reach cluster", body = Vec), @@ -98,14 +95,13 @@ pub async fn hot_tags_handler(Query(query): Query) -> Result Result<()> { } #[tokio::test] -async fn test_global_hot_tags_for_posts() -> Result<()> { - let body = make_request("/v0/tags/hot?tagged_type=Post").await?; +async fn test_global_hot_tags_skip_limit() -> Result<()> { + let body = make_request("/v0/tags/hot?skip=3&limit=5").await?; assert!(body.is_array()); @@ -82,9 +83,12 @@ async fn test_global_hot_tags_for_posts() -> Result<()> { // Validate that the posts belong to the specified user's bookmarks analyse_hot_tags_structure(tags); + // assert limit + assert_eq!(tags.len(), 5); + // Analyse the tag that is in the 4th index let hot_tag = StreamTagMockup::new(String::from("ha"), 9, 16, 9); - compare_unit_hot_tag(&tags[3], hot_tag); + compare_unit_hot_tag(&tags[0], hot_tag); Ok(()) } @@ -108,6 +112,24 @@ async fn test_hot_tags_by_following_reach() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_hot_tags_by_reach_no_user_id() -> Result<()> { + let endpoint = "/v0/tags/hot?reach=following"; + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_hot_tags_by_reach_no_reach() -> Result<()> { + let endpoint = &format!("/v0/tags/hot?user_id={}", PEER_PUBKY); + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + #[tokio::test] async fn test_hot_tags_by_following_using_taggers_limit() -> Result<()> { let endpoint = &format!( From b76cdd21231b4cb56b10692d7104e27d696d1469 Mon Sep 17 00:00:00 2001 From: amir Date: Thu, 5 Dec 2024 03:21:26 -0500 Subject: [PATCH 09/18] Add preview option to influencers --- src/models/user/influencers.rs | 8 ++++++++ src/models/user/stream.rs | 6 +++++- src/routes/v0/stream/users.rs | 5 ++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index d1b75962..7f5a5fb2 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -3,6 +3,7 @@ use crate::db::kv::index::sorted_sets::SortOrder; use crate::types::DynError; use crate::types::StreamReach; use crate::types::Timeframe; +use chrono::Utc; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ops::Deref; @@ -143,7 +144,14 @@ impl Influencers { skip: usize, limit: usize, timeframe: &Timeframe, + preview: bool, ) -> Result, DynError> { + let (skip, limit) = if preview { + let skip = Utc::now().timestamp_subsec_micros() % 98; + (skip as usize, 3) + } else { + (skip, limit) + }; match user_id { Some(user_id) => { Influencers::get_influencers_by_reach( diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index 4e6748b0..ba91a14a 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -39,9 +39,11 @@ impl UserStream { source: UserStreamSource, depth: Option, timeframe: Option, + preview: Option, ) -> Result, DynError> { let user_ids = - Self::get_user_list_from_source(user_id, source, skip, limit, timeframe).await?; + Self::get_user_list_from_source(user_id, source, skip, limit, timeframe, preview) + .await?; match user_ids { Some(users) => Self::from_listed_user_ids(&users, viewer_id, depth).await, None => Ok(None), @@ -197,6 +199,7 @@ impl UserStream { skip: Option, limit: Option, timeframe: Option, + preview: Option, ) -> Result>, DynError> { let user_ids = match source { UserStreamSource::Followers => Followers::get_by_id( @@ -247,6 +250,7 @@ impl UserStream { skip.unwrap_or(0), limit.unwrap_or(10).min(100), &timeframe.unwrap_or(Timeframe::AllTime), + preview.unwrap_or(false), ) .await? .map(|result| { diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 7113515a..16699d95 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -19,6 +19,7 @@ pub struct UserStreamQuery { source: Option, depth: Option, timeframe: Option, + preview: Option, } #[utoipa::path( @@ -33,7 +34,8 @@ pub struct UserStreamQuery { ("limit" = Option, Query, description = "Retrieve N followers"), ("source" = Option, Query, description = "Source of users for the stream."), ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored"), - ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range") + ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range"), + ("preview" = Option, Query, description = "Provide a random selection of size 3 for sources supporting preview. Passing preview ignores skip and limit parameters.") ), responses( (status = 200, description = "Users stream", body = UserStream), @@ -97,6 +99,7 @@ pub async fn stream_users_handler( source.clone(), query.depth, Some(timeframe), + query.preview, ) .await { From bd37289c64288aaaca29b05c76af3e266c5dd169 Mon Sep 17 00:00:00 2001 From: amir Date: Thu, 5 Dec 2024 03:38:12 -0500 Subject: [PATCH 10/18] fix broken test from merge conflict --- tests/service/tags/hot.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/service/tags/hot.rs b/tests/service/tags/hot.rs index fed8b90f..fbe68ca7 100644 --- a/tests/service/tags/hot.rs +++ b/tests/service/tags/hot.rs @@ -92,6 +92,7 @@ async fn test_global_hot_tags_skip_limit() -> Result<()> { Ok(()) } +#[tokio::test] async fn test_global_hot_tags_with_today_timeframe() -> Result<()> { let body = make_request("/v0/tags/hot?timeframe=today").await?; From 4d52912459b271615499a4168d3e4a751fde5b38 Mon Sep 17 00:00:00 2001 From: amir Date: Thu, 5 Dec 2024 03:57:58 -0500 Subject: [PATCH 11/18] remove reach from influencer enum and move it to query param --- benches/streams_benches/user.rs | 10 ++++++++-- src/db/graph/queries/get.rs | 5 +---- src/models/user/stream.rs | 21 +++++++++++++++------ src/routes/v0/stream/users.rs | 5 ++++- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index 0b20a76c..ee9f3dd5 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -25,7 +25,9 @@ pub fn bench_stream_following(c: &mut Criterion) { None, None, Some(20), - UserStreamSource::Influencers(StreamReach::Following), + UserStreamSource::Influencers, + Some(StreamReach::Following), + None, None, None, ) @@ -55,6 +57,8 @@ pub fn bench_stream_most_followed(c: &mut Criterion) { UserStreamSource::MostFollowed, None, None, + None, + None, ) .await .unwrap(); @@ -104,7 +108,9 @@ pub fn bench_stream_influencers(c: &mut Criterion) { None, None, Some(20), - UserStreamSource::Influencers(StreamReach::Wot(3)), + UserStreamSource::Influencers, + Some((StreamReach::Wot(3))), + None, None, None, ) diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index ceb6d56e..ec8b4992 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -422,10 +422,7 @@ fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String { "MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)".to_string() } StreamReach::Wot(depth) => { - format!( - "MATCH (viewer)-[:FOLLOWS*1..{}]->(tagger:User)", - depth.clone() - ) + format!("MATCH (viewer)-[:FOLLOWS*1..{}]->(tagger:User)", depth) } } } diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index ba91a14a..575a6854 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -21,7 +21,7 @@ pub enum UserStreamSource { Friends, Muted, MostFollowed, - Influencers(StreamReach), + Influencers, Recommended, } @@ -37,13 +37,21 @@ impl UserStream { skip: Option, limit: Option, source: UserStreamSource, + source_reach: Option, depth: Option, timeframe: Option, preview: Option, ) -> Result, DynError> { - let user_ids = - Self::get_user_list_from_source(user_id, source, skip, limit, timeframe, preview) - .await?; + let user_ids = Self::get_user_list_from_source( + user_id, + source, + source_reach, + skip, + limit, + timeframe, + preview, + ) + .await?; match user_ids { Some(users) => Self::from_listed_user_ids(&users, viewer_id, depth).await, None => Ok(None), @@ -196,6 +204,7 @@ impl UserStream { pub async fn get_user_list_from_source( user_id: Option<&str>, source: UserStreamSource, + source_reach: Option, skip: Option, limit: Option, timeframe: Option, @@ -244,9 +253,9 @@ impl UserStream { ) .await? .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), - UserStreamSource::Influencers(reach) => Influencers::get_influencers( + UserStreamSource::Influencers => Influencers::get_influencers( user_id, - Some(reach), + Some(source_reach.unwrap_or(StreamReach::Wot(3))), skip.unwrap_or(0), limit.unwrap_or(10).min(100), &timeframe.unwrap_or(Timeframe::AllTime), diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 16699d95..553eb42c 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -2,7 +2,7 @@ use crate::models::user::{UserStream, UserStreamSource}; use crate::routes::v0::endpoints::{ STREAM_USERS_BY_IDS_ROUTE, STREAM_USERS_ROUTE, STREAM_USERS_USERNAME_SEARCH_ROUTE, }; -use crate::types::{Pagination, Timeframe}; +use crate::types::{Pagination, StreamReach, Timeframe}; use crate::{Error, Result}; use axum::extract::Query; use axum::Json; @@ -17,6 +17,7 @@ pub struct UserStreamQuery { skip: Option, limit: Option, source: Option, + source_reach: Option, depth: Option, timeframe: Option, preview: Option, @@ -33,6 +34,7 @@ pub struct UserStreamQuery { ("skip" = Option, Query, description = "Skip N followers"), ("limit" = Option, Query, description = "Retrieve N followers"), ("source" = Option, Query, description = "Source of users for the stream."), + ("source_reach" = Option, Query, description = "The target reach of the source. Supported in 'influencers' source."), ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored"), ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range"), ("preview" = Option, Query, description = "Provide a random selection of size 3 for sources supporting preview. Passing preview ignores skip and limit parameters.") @@ -97,6 +99,7 @@ pub async fn stream_users_handler( Some(skip), Some(limit), source.clone(), + query.source_reach, query.depth, Some(timeframe), query.preview, From 85f74653b11c22dd70f5a0680f465a9dadccf122 Mon Sep 17 00:00:00 2001 From: amir Date: Thu, 5 Dec 2024 04:07:17 -0500 Subject: [PATCH 12/18] fix query string syntax error --- src/db/graph/queries/get.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index ec8b4992..c72e6614 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -595,10 +595,10 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) WITH user, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count, COUNT(DISTINCT post) AS posts_count - WITH {{ + WITH { id: user.id, score: (tags_count + posts_count) * sqrt(followers_count) - }} AS influencer + } AS influencer ORDER BY influencer.score DESC, user.id ASC SKIP $skip LIMIT $limit RETURN COLLECT(influencer) as influencers From 2b4d5a297f9877b0ae15e78b5a670d46f04d5ced Mon Sep 17 00:00:00 2001 From: amir Date: Wed, 11 Dec 2024 01:36:38 -0500 Subject: [PATCH 13/18] Refactor Influencers cache --- src/models/user/influencers.rs | 49 +++++++--------------------------- src/routes/v0/tag/global.rs | 2 +- 2 files changed, 10 insertions(+), 41 deletions(-) diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index 7f5a5fb2..7d51b82b 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -5,14 +5,13 @@ use crate::types::StreamReach; use crate::types::Timeframe; use chrono::Utc; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::ops::Deref; use utoipa::ToSchema; use crate::queries; use crate::RedisOps; -const GLOBAL_INFLUENCERS_PREFIX: &str = "Influencers"; +const GLOBAL_INFLUENCERS_PREFIX: &str = "Cached:Influencers"; #[derive(Deserialize, Serialize, ToSchema, Debug, Clone)] pub struct Influencer { @@ -26,11 +25,6 @@ pub struct Influencers(pub Vec); impl RedisOps for Influencers {} -#[derive(Serialize, Deserialize, Debug, ToSchema, Default)] -pub struct InfluencersData(pub HashMap); - -impl RedisOps for InfluencersData {} - // Create a Influencers instance directly from an iterator of Influencer items // Need it in collect() impl FromIterator for Influencers { @@ -67,8 +61,6 @@ impl Influencers { let key_parts = Influencers::get_cache_key_parts(timeframe); let key_parts_vector: Vec<&str> = key_parts.iter().map(|s| s.as_str()).collect::>(); - let data = - InfluencersData::try_from_index_json(key_parts_vector.clone().as_slice()).await?; let ranking = Influencers::try_from_index_sorted_set( key_parts_vector.as_slice(), None, @@ -80,23 +72,16 @@ impl Influencers { ) .await?; - if data.is_none() || ranking.is_none() { - return Ok(None); - } - - let mapping = data.unwrap(); - - // for each value in ranking, look up the value in data - let mut influencers = Vec::new(); - for (id, _) in ranking.unwrap() { - if let Some(influencer) = mapping.0.get(&id) { - influencers.push(Influencer { - id, - score: influencer.score, - }); + match ranking { + None => Ok(None), + Some(ranking) => { + let mut influencers = Vec::new(); + for (id, score) in ranking { + influencers.push(Influencer { id, score }); + } + Ok(Some(Influencers(influencers))) } } - Ok(Some(Influencers(influencers))) } async fn put_to_global_cache( @@ -107,22 +92,6 @@ impl Influencers { let key_parts_vector: Vec<&str> = key_parts.iter().map(|s| s.as_str()).collect::>(); - // turn result which is a vector of Influencer, into a mapping from label to Influencer - let mapping = result.clone(); - let influencers_data: HashMap = mapping - .iter() - .cloned() - .map(|influencer| (influencer.id.clone(), influencer)) - .collect(); - - // store the data as json in cache - InfluencersData::put_index_json( - &InfluencersData(influencers_data), - key_parts_vector.as_slice(), - Some(timeframe.to_cache_period()), - ) - .await?; - // store the ranking as sorted set in cache Influencers::put_index_sorted_set( key_parts_vector.as_slice(), diff --git a/src/routes/v0/tag/global.rs b/src/routes/v0/tag/global.rs index dc7969b9..1c716801 100644 --- a/src/routes/v0/tag/global.rs +++ b/src/routes/v0/tag/global.rs @@ -34,7 +34,7 @@ pub struct TagTaggersQuery { tag = "Tags", params( ("label" = String, Path, description = "Tag name"), - ("reach" = StreamReach, Path, description = "Reach type: Follower | Following | Friends"), + ("reach" = StreamReach, Path, description = "Reach type: Follower | Following | Friends | Wot"), ("user_id" = Option, Query, description = "User ID to base reach on"), ), responses( From 4f18737674503ab80afb2a18f092efd9a7ec10e8 Mon Sep 17 00:00:00 2001 From: amir Date: Wed, 11 Dec 2024 01:49:02 -0500 Subject: [PATCH 14/18] fix lint issues --- benches/streams_benches/user.rs | 68 ++++++++++++++++----------------- src/models/user/mod.rs | 3 +- src/models/user/stream.rs | 42 +++++++++++--------- src/routes/v0/stream/users.rs | 24 ++++++------ 4 files changed, 71 insertions(+), 66 deletions(-) diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index ee9f3dd5..e9564150 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -1,7 +1,7 @@ use crate::run_setup; use criterion::Criterion; use pubky_nexus::{ - models::user::{UserStream, UserStreamSource}, + models::user::{UserStream, UserStreamInput, UserStreamSource}, types::StreamReach, }; use tokio::runtime::Runtime; @@ -20,17 +20,17 @@ pub fn bench_stream_following(c: &mut Criterion) { c.bench_function("stream_following", |b| { b.to_async(&rt).iter(|| async { - let user_stream = UserStream::get_by_id( - Some(user_id), - None, - None, - Some(20), - UserStreamSource::Influencers, - Some(StreamReach::Following), - None, - None, - None, - ) + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: Some(String::from(user_id)), + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::Influencers, + source_reach: Some(StreamReach::Following), + depth: None, + timeframe: None, + preview: None, + }) .await .unwrap(); criterion::black_box(user_stream); @@ -49,17 +49,17 @@ pub fn bench_stream_most_followed(c: &mut Criterion) { c.bench_function("stream_most_followed", |b| { b.to_async(&rt).iter(|| async { - let user_stream = UserStream::get_by_id( - None, - None, - None, - Some(20), - UserStreamSource::MostFollowed, - None, - None, - None, - None, - ) + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: None, + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::MostFollowed, + source_reach: None, + depth: None, + timeframe: None, + preview: None, + }) .await .unwrap(); criterion::black_box(user_stream); @@ -103,17 +103,17 @@ pub fn bench_stream_influencers(c: &mut Criterion) { c.bench_function("stream_influencers", |b| { b.to_async(&rt).iter(|| async { - let user_stream = UserStream::get_by_id( - None, - None, - None, - Some(20), - UserStreamSource::Influencers, - Some((StreamReach::Wot(3))), - None, - None, - None, - ) + let user_stream = UserStream::get_by_id(&UserStreamInput { + user_id: None, + viewer_id: None, + skip: None, + limit: Some(20), + source: UserStreamSource::Influencers, + source_reach: Some(StreamReach::Wot(3)), + depth: None, + timeframe: None, + preview: None, + }) .await .unwrap(); criterion::black_box(user_stream); diff --git a/src/models/user/mod.rs b/src/models/user/mod.rs index 22fc533c..ae3d0663 100644 --- a/src/models/user/mod.rs +++ b/src/models/user/mod.rs @@ -17,7 +17,8 @@ pub use muted::Muted; pub use relationship::Relationship; pub use search::{UserSearch, USER_NAME_KEY_PARTS}; pub use stream::{ - UserStream, UserStreamSource, USER_INFLUENCERS_KEY_PARTS, USER_MOSTFOLLOWED_KEY_PARTS, + UserStream, UserStreamInput, UserStreamSource, USER_INFLUENCERS_KEY_PARTS, + USER_MOSTFOLLOWED_KEY_PARTS, }; pub use tags::ProfileTag; pub use tags::UserTags; diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index 575a6854..d611598d 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -25,35 +25,39 @@ pub enum UserStreamSource { Recommended, } +pub struct UserStreamInput { + pub user_id: Option, + pub viewer_id: Option, + pub skip: Option, + pub limit: Option, + pub source: UserStreamSource, + pub source_reach: Option, + pub depth: Option, + pub timeframe: Option, + pub preview: Option, +} + #[derive(Serialize, Deserialize, ToSchema, Default)] pub struct UserStream(Vec); impl RedisOps for UserStream {} impl UserStream { - pub async fn get_by_id( - user_id: Option<&str>, - viewer_id: Option<&str>, - skip: Option, - limit: Option, - source: UserStreamSource, - source_reach: Option, - depth: Option, - timeframe: Option, - preview: Option, - ) -> Result, DynError> { + pub async fn get_by_id(input: &UserStreamInput) -> Result, DynError> { let user_ids = Self::get_user_list_from_source( - user_id, - source, - source_reach, - skip, - limit, - timeframe, - preview, + input.user_id.as_deref(), + input.source.clone(), + input.source_reach.clone(), + input.skip, + input.limit, + input.timeframe.clone(), + input.preview, ) .await?; match user_ids { - Some(users) => Self::from_listed_user_ids(&users, viewer_id, depth).await, + Some(users) => { + Self::from_listed_user_ids(&users, input.viewer_id.as_deref(), input.depth).await + } None => Ok(None), } } diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 553eb42c..746330ca 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -1,4 +1,4 @@ -use crate::models::user::{UserStream, UserStreamSource}; +use crate::models::user::{UserStream, UserStreamInput, UserStreamSource}; use crate::routes::v0::endpoints::{ STREAM_USERS_BY_IDS_ROUTE, STREAM_USERS_ROUTE, STREAM_USERS_USERNAME_SEARCH_ROUTE, }; @@ -93,17 +93,17 @@ pub async fn stream_users_handler( } } - match UserStream::get_by_id( - query.user_id.as_deref(), - query.viewer_id.as_deref(), - Some(skip), - Some(limit), - source.clone(), - query.source_reach, - query.depth, - Some(timeframe), - query.preview, - ) + match UserStream::get_by_id(&UserStreamInput { + user_id: query.user_id.clone(), + viewer_id: query.viewer_id, + skip: Some(skip), + limit: Some(limit), + source: source.clone(), + source_reach: query.source_reach, + depth: query.depth, + timeframe: Some(timeframe), + preview: query.preview, + }) .await { Ok(Some(stream)) => Ok(Json(stream)), From 1b49dbde4384d868b35c9f6f5b52a9e6376e0de0 Mon Sep 17 00:00:00 2001 From: amir Date: Wed, 11 Dec 2024 04:23:13 -0500 Subject: [PATCH 15/18] Add tests and fix found bugs --- src/db/graph/queries/get.rs | 10 +- src/models/user/stream.rs | 2 +- src/routes/v0/stream/users.rs | 18 +- tests/service/stream/user/influencers.rs | 274 +++++++++++++++++++++++ tests/service/stream/user/mod.rs | 1 + tests/service/stream/user/score.rs | 70 ------ tests/service/tags/hot.rs | 26 +-- 7 files changed, 312 insertions(+), 89 deletions(-) create mode 100644 tests/service/stream/user/influencers.rs diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index c72e6614..e3fd0307 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -584,7 +584,10 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) let (from, to) = timeframe.to_timestamp_range(); query( " - OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user:User) + MATCH (user:User) + WITH DISTINCT user + + OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user) WHERE follow.indexed_at >= $from AND follow.indexed_at < $to OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post) @@ -597,9 +600,10 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) COUNT(DISTINCT post) AS posts_count WITH { id: user.id, - score: (tags_count + posts_count) * sqrt(followers_count) + score: (tags_count + posts_count) * sqrt(followers_count + 1) } AS influencer - ORDER BY influencer.score DESC, user.id ASC + WHERE influencer.id IS NOT NULL + ORDER BY influencer.score DESC, influencer.id ASC SKIP $skip LIMIT $limit RETURN COLLECT(influencer) as influencers ", diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index d611598d..c2df4b78 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -13,7 +13,7 @@ pub const CACHE_USER_RECOMMENDED_KEY_PARTS: [&str; 3] = ["Cache", "Users", "Reco // TTL, 12HR pub const CACHE_USER_RECOMMENDED_TTL: i64 = 12 * 60 * 60; -#[derive(Deserialize, ToSchema, Debug, Clone)] +#[derive(Deserialize, ToSchema, Debug, Clone, PartialEq)] #[serde(rename_all = "snake_case")] pub enum UserStreamSource { Followers, diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index 746330ca..c6fb338d 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -54,7 +54,7 @@ pub async fn stream_users_handler( ); let skip = query.skip.unwrap_or(0); - let limit = query.limit.unwrap_or(6).min(20); + let limit = query.limit.unwrap_or(5).min(20); let source = query.source.unwrap_or(UserStreamSource::Followers); let timeframe = query.timeframe.unwrap_or(Timeframe::AllTime); @@ -89,8 +89,24 @@ pub async fn stream_users_handler( .to_string(), }) } + UserStreamSource::Influencers => match query.source_reach { + None => (), + Some(_) => { + return Err(Error::InvalidInput { + message: + "source_reach query param must be provided for source 'influencers' with a user_id" + .to_string(), + }) + } + }, _ => (), } + } else if source == UserStreamSource::Influencers && query.source_reach.is_none() { + return Err(Error::InvalidInput { + message: + "source_reach query param must be provided for source 'influencers' when you pass a user_id" + .to_string(), + }); } match UserStream::get_by_id(&UserStreamInput { diff --git a/tests/service/stream/user/influencers.rs b/tests/service/stream/user/influencers.rs new file mode 100644 index 00000000..b493c91f --- /dev/null +++ b/tests/service/stream/user/influencers.rs @@ -0,0 +1,274 @@ +use anyhow::Result; +use reqwest::StatusCode; + +use crate::service::utils::{make_request, make_wrong_request}; + +// TODO: Create deterministic integration tests + +const PEER_PUBKY: &str = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"; + +#[tokio::test] +async fn test_global_influencers() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy", + "kzq3o8y8w1b7ffogpq73okop4gb3ahm31ytwwk1na8p6gpr4511o", + "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo", + "zdbg13k5gh4tfz9qz11quohrxetgqxs7awandu8h57147xddcuhy", + "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", + ]; + + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_preview() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&preview=true").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + // assert preview size is respected + assert_eq!(influencers.len(), 3); + + let first_influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // make the request a second time to ensure the preview is generating different results + let body = make_request("/v0/stream/users?source=influencers&preview=true").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + assert!(!influencers.is_empty(), "Influencers should not be empty"); + + // assert preview size is respected + assert_eq!(influencers.len(), 3); + + let second_influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + assert!(first_influencer_ids != second_influencer_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_skip_limit() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&skip=3&limit=3").await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + // assert limit + assert_eq!(influencers.len(), 3); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "zdbg13k5gh4tfz9qz11quohrxetgqxs7awandu8h57147xddcuhy", + "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", + "7hq56kap6exmhghyedrw1q3ar8b1wutomq8ax9eazhajcpdfx3so", + ]; + + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_with_today_timeframe() -> Result<()> { + let body = make_request("/v0/stream/users?source=influencers&timeframe=today&limit=4").await?; + + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "r91hi8kc3x6761gwfiigr7yn6nca1z47wm6jadhw1jbx1co93r9y", + "qumq6fady4bmw4w5tpsrj1tg36g3qo4tcfedga9p4bg4so4ikyzy", + "r4irb481b8qspaixq1brwre8o87cxybsbk9iwe1f6f9ukrxxs7bo", + "tkpeqpx3ywoawiw6q8e6kuo9o3egr7fnhx83rudznbrrmqgdmomo", + ]; + + // Verify that each expected user ID is present in the response + for id in &expected_user_ids { + let exists = influencer_ids.clone().into_iter().any(|item| item == *id); + assert!(exists, "Expected user ID not found: {}", id); + } + + Ok(()) +} + +#[tokio::test] +async fn test_global_influencers_with_this_month_timeframe() -> Result<()> { + let body = + make_request("/v0/stream/users?source=influencers&timeframe=this_month&limit=5").await?; + + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "r91hi8kc3x6761gwfiigr7yn6nca1z47wm6jadhw1jbx1co93r9y", + "tkpeqpx3ywoawiw6q8e6kuo9o3egr7fnhx83rudznbrrmqgdmomo", + "pyc598poqkdgtx1wc4aeptx67mqg71mmywyh7uzkffzittjmbiuo", + "r4irb481b8qspaixq1brwre8o87cxybsbk9iwe1f6f9ukrxxs7bo", + "qumq6fady4bmw4w5tpsrj1tg36g3qo4tcfedga9p4bg4so4ikyzy", + ]; + + // Verify that each expected user ID is present in the response + for id in &expected_user_ids { + let exists = influencer_ids.clone().into_iter().any(|item| item == *id); + assert!(exists, "Expected user ID not found: {}", id); + } + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_source_reach_no_user_id() -> Result<()> { + let endpoint = + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&source_reach=following"; + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_source_reach_no_reach() -> Result<()> { + let endpoint = &format!( + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}", + PEER_PUBKY + ); + + make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_following_reach() -> Result<()> { + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=following", PEER_PUBKY); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body + .as_array() + .expect("Stream influencers should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "5g3fwnue819wfdjwiwm8qr35ww6uxxgbzrigrtdgmbi19ksioeoy", + "9arfi37owcrdywc9zqw3m5uc7gd5gqu1yfuykzo66od6tcayqk9y", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_followers_reach() -> Result<()> { + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=followers", PEER_PUBKY); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body.as_array().expect("Post stream should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "gxk8itzrnikrpshfsudgsgtxrz59ojp4iwmp4w9iff3ess6zfr4y", + "h3fghnb3x59oh7r53x8y6a5x38oatqyjym9b31ybss17zqdnhcoy", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} + +#[tokio::test] +async fn test_influencers_by_friends_reach() -> Result<()> { + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=friends", PEER_PUBKY); + + let body = make_request(endpoint).await?; + assert!(body.is_array()); + + let influencers = body.as_array().expect("Post stream should be an array"); + + let influencer_ids = influencers + .iter() + .map(|f| f["details"]["id"].as_str().unwrap()) + .collect::>(); + + // List of expected user IDs + let expected_user_ids = vec![ + "4snwyct86m383rsduhw5xgcxpw7c63j3pq8x4ycqikxgik8y64ro", + "gxk8itzrnikrpshfsudgsgtxrz59ojp4iwmp4w9iff3ess6zfr4y", + "h3fghnb3x59oh7r53x8y6a5x38oatqyjym9b31ybss17zqdnhcoy", + ]; + assert!(influencer_ids == expected_user_ids); + + Ok(()) +} diff --git a/tests/service/stream/user/mod.rs b/tests/service/stream/user/mod.rs index 2b0900db..e436322d 100644 --- a/tests/service/stream/user/mod.rs +++ b/tests/service/stream/user/mod.rs @@ -1,3 +1,4 @@ +pub mod influencers; pub mod list; pub mod reach; pub mod score; diff --git a/tests/service/stream/user/score.rs b/tests/service/stream/user/score.rs index b22811fe..6e87203c 100644 --- a/tests/service/stream/user/score.rs +++ b/tests/service/stream/user/score.rs @@ -75,76 +75,6 @@ async fn test_stream_most_followed() -> Result<()> { Ok(()) } -// ##### PIONEERS USERS ###### - -#[tokio::test] -async fn test_stream_influencers() -> Result<()> { - let client = httpc_test::new_client(HOST_URL)?; - - // Test retrieving the most followed users - let res = client.do_get("/v0/stream/users?source=influencers").await?; - assert_eq!(res.status(), 200); - - let body = res.json_body()?; - assert!(body.is_array()); - - let influencers_users = body.as_array().expect("User stream should be an array"); - - // Check if the response has the expected number of users - assert!( - !influencers_users.is_empty(), - "There should be at least one user in the most followed stream" - ); - - // List of expected user IDs - let expected_user_ids = vec![ - "pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy", - "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo", - "kzq3o8y8w1b7ffogpq73okop4gb3ahm31ytwwk1na8p6gpr4511o", - "y4euc58gnmxun9wo87gwmanu6kztt9pgw1zz1yp1azp7trrsjamy", - "7hq56kap6exmhghyedrw1q3ar8b1wutomq8ax9eazhajcpdfx3so", - ]; - - // Verify that each expected user ID is present in the response - for id in &expected_user_ids { - let exists = influencers_users.iter().any(|f| f["details"]["id"] == *id); - assert!(exists, "Expected user ID not found: {}", id); - } - - // Additional checks for specific user attributes (e.g., name, follower counts) - for user in influencers_users { - assert!( - user["details"]["name"].is_string(), - "Name should be a string" - ); - assert!(user["details"]["bio"].is_string(), "Bio should be a string"); - assert!( - user["counts"]["followers"].is_number(), - "Follower counts should be a number" - ); - } - - // Test limiting the results to 5 users - let res = client - .do_get("/v0/stream/users?source=influencers&limit=5") - .await?; - assert_eq!(res.status(), 200); - - let body = res.json_body()?; - assert!(body.is_array()); - - let limited_users = body.as_array().expect("User stream should be an array"); - - // Check if the response has the expected number of users - assert_eq!( - limited_users.len(), - 5, - "Expected 5 users in the limited stream" - ); - - Ok(()) -} - // ##### RECOMMENDED USERS ###### #[tokio::test] diff --git a/tests/service/tags/hot.rs b/tests/service/tags/hot.rs index fbe68ca7..bf76b167 100644 --- a/tests/service/tags/hot.rs +++ b/tests/service/tags/hot.rs @@ -73,8 +73,9 @@ async fn test_global_hot_tags() -> Result<()> { } #[tokio::test] -async fn test_global_hot_tags_skip_limit() -> Result<()> { - let body = make_request("/v0/tags/hot?skip=3&limit=5").await?; +async fn test_global_hot_tags_with_today_timeframe() -> Result<()> { + let body = make_request("/v0/tags/hot?timeframe=today").await?; + assert!(body.is_array()); let tags = body.as_array().expect("Stream tags should be an array"); @@ -82,9 +83,6 @@ async fn test_global_hot_tags_skip_limit() -> Result<()> { // Validate that the posts belong to the specified user's bookmarks analyse_hot_tags_structure(tags); - // assert limit - assert_eq!(tags.len(), 5); - // Analyse the tag that is in the 4th index let hot_tag = StreamTagMockup::new(String::from("tag2"), 2, 1, 2); compare_unit_hot_tag(&tags[0], hot_tag); @@ -93,8 +91,8 @@ async fn test_global_hot_tags_skip_limit() -> Result<()> { } #[tokio::test] -async fn test_global_hot_tags_with_today_timeframe() -> Result<()> { - let body = make_request("/v0/tags/hot?timeframe=today").await?; +async fn test_global_hot_tags_with_this_month_timeframe() -> Result<()> { + let body = make_request("/v0/tags/hot?timeframe=this_month").await?; assert!(body.is_array()); @@ -103,19 +101,16 @@ async fn test_global_hot_tags_with_today_timeframe() -> Result<()> { // Validate that the posts belong to the specified user's bookmarks analyse_hot_tags_structure(tags); - // assert limit - assert_eq!(tags.len(), 5); - // Analyse the tag that is in the 4th index - let hot_tag = StreamTagMockup::new(String::from("tag2"), 2, 1, 2); + let hot_tag = StreamTagMockup::new(String::from("tag2"), 3, 2, 3); compare_unit_hot_tag(&tags[0], hot_tag); Ok(()) } #[tokio::test] -async fn test_global_hot_tags_with_this_month_timeframe() -> Result<()> { - let body = make_request("/v0/tags/hot?timeframe=this_month").await?; +async fn test_global_hot_tags_skip_limit() -> Result<()> { + let body = make_request("/v0/tags/hot?skip=3&limit=5").await?; assert!(body.is_array()); @@ -124,8 +119,11 @@ async fn test_global_hot_tags_with_this_month_timeframe() -> Result<()> { // Validate that the posts belong to the specified user's bookmarks analyse_hot_tags_structure(tags); + // assert limit + assert_eq!(tags.len(), 5); + // Analyse the tag that is in the 4th index - let hot_tag = StreamTagMockup::new(String::from("tag2"), 3, 2, 3); + let hot_tag = StreamTagMockup::new(String::from("ha"), 9, 16, 9); compare_unit_hot_tag(&tags[0], hot_tag); Ok(()) From f762204eade248e33ca5a4fa3c21fef2e2711bbe Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 13 Dec 2024 17:00:12 -0500 Subject: [PATCH 16/18] rename source_reach to reach --- benches/streams_benches/user.rs | 6 +++--- src/models/user/stream.rs | 8 ++++---- src/routes/v0/stream/users.rs | 24 +++++++++++------------- tests/service/stream/user/influencers.rs | 15 +++++++++------ 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/benches/streams_benches/user.rs b/benches/streams_benches/user.rs index e9564150..4aaa2f55 100644 --- a/benches/streams_benches/user.rs +++ b/benches/streams_benches/user.rs @@ -26,7 +26,7 @@ pub fn bench_stream_following(c: &mut Criterion) { skip: None, limit: Some(20), source: UserStreamSource::Influencers, - source_reach: Some(StreamReach::Following), + reach: Some(StreamReach::Following), depth: None, timeframe: None, preview: None, @@ -55,7 +55,7 @@ pub fn bench_stream_most_followed(c: &mut Criterion) { skip: None, limit: Some(20), source: UserStreamSource::MostFollowed, - source_reach: None, + reach: None, depth: None, timeframe: None, preview: None, @@ -109,7 +109,7 @@ pub fn bench_stream_influencers(c: &mut Criterion) { skip: None, limit: Some(20), source: UserStreamSource::Influencers, - source_reach: Some(StreamReach::Wot(3)), + reach: Some(StreamReach::Wot(3)), depth: None, timeframe: None, preview: None, diff --git a/src/models/user/stream.rs b/src/models/user/stream.rs index c2df4b78..6320a927 100644 --- a/src/models/user/stream.rs +++ b/src/models/user/stream.rs @@ -31,7 +31,7 @@ pub struct UserStreamInput { pub skip: Option, pub limit: Option, pub source: UserStreamSource, - pub source_reach: Option, + pub reach: Option, pub depth: Option, pub timeframe: Option, pub preview: Option, @@ -47,7 +47,7 @@ impl UserStream { let user_ids = Self::get_user_list_from_source( input.user_id.as_deref(), input.source.clone(), - input.source_reach.clone(), + input.reach.clone(), input.skip, input.limit, input.timeframe.clone(), @@ -208,7 +208,7 @@ impl UserStream { pub async fn get_user_list_from_source( user_id: Option<&str>, source: UserStreamSource, - source_reach: Option, + reach: Option, skip: Option, limit: Option, timeframe: Option, @@ -259,7 +259,7 @@ impl UserStream { .map(|set| set.into_iter().map(|(user_id, _score)| user_id).collect()), UserStreamSource::Influencers => Influencers::get_influencers( user_id, - Some(source_reach.unwrap_or(StreamReach::Wot(3))), + Some(reach.unwrap_or(StreamReach::Wot(3))), skip.unwrap_or(0), limit.unwrap_or(10).min(100), &timeframe.unwrap_or(Timeframe::AllTime), diff --git a/src/routes/v0/stream/users.rs b/src/routes/v0/stream/users.rs index c6fb338d..5eeb4fff 100644 --- a/src/routes/v0/stream/users.rs +++ b/src/routes/v0/stream/users.rs @@ -17,7 +17,7 @@ pub struct UserStreamQuery { skip: Option, limit: Option, source: Option, - source_reach: Option, + reach: Option, depth: Option, timeframe: Option, preview: Option, @@ -34,7 +34,7 @@ pub struct UserStreamQuery { ("skip" = Option, Query, description = "Skip N followers"), ("limit" = Option, Query, description = "Retrieve N followers"), ("source" = Option, Query, description = "Source of users for the stream."), - ("source_reach" = Option, Query, description = "The target reach of the source. Supported in 'influencers' source."), + ("reach" = Option, Query, description = "The target reach of the source. Supported in 'influencers' source."), ("depth" = Option, Query, description = "User trusted network depth, user following users distance. Numbers bigger than 4, will be ignored"), ("timeframe" = Option, Query, description = "Timeframe for sources supporting a range"), ("preview" = Option, Query, description = "Provide a random selection of size 3 for sources supporting preview. Passing preview ignores skip and limit parameters.") @@ -89,22 +89,20 @@ pub async fn stream_users_handler( .to_string(), }) } - UserStreamSource::Influencers => match query.source_reach { + UserStreamSource::Influencers => match query.reach { None => (), - Some(_) => { - return Err(Error::InvalidInput { - message: - "source_reach query param must be provided for source 'influencers' with a user_id" - .to_string(), - }) - } + Some(_) => return Err(Error::InvalidInput { + message: + "reach query param must be provided for source 'influencers' with a user_id" + .to_string(), + }), }, _ => (), } - } else if source == UserStreamSource::Influencers && query.source_reach.is_none() { + } else if source == UserStreamSource::Influencers && query.reach.is_none() { return Err(Error::InvalidInput { message: - "source_reach query param must be provided for source 'influencers' when you pass a user_id" + "reach query param must be provided for source 'influencers' when you pass a user_id" .to_string(), }); } @@ -115,7 +113,7 @@ pub async fn stream_users_handler( skip: Some(skip), limit: Some(limit), source: source.clone(), - source_reach: query.source_reach, + reach: query.reach, depth: query.depth, timeframe: Some(timeframe), preview: query.preview, diff --git a/tests/service/stream/user/influencers.rs b/tests/service/stream/user/influencers.rs index b493c91f..0fcf376e 100644 --- a/tests/service/stream/user/influencers.rs +++ b/tests/service/stream/user/influencers.rs @@ -175,9 +175,9 @@ async fn test_global_influencers_with_this_month_timeframe() -> Result<()> { } #[tokio::test] -async fn test_influencers_by_source_reach_no_user_id() -> Result<()> { +async fn test_influencers_by_reach_no_user_id() -> Result<()> { let endpoint = - "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&source_reach=following"; + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&reach=following"; make_wrong_request(endpoint, Some(StatusCode::BAD_REQUEST.as_u16())).await?; @@ -185,7 +185,7 @@ async fn test_influencers_by_source_reach_no_user_id() -> Result<()> { } #[tokio::test] -async fn test_influencers_by_source_reach_no_reach() -> Result<()> { +async fn test_influencers_by_reach_no_reach() -> Result<()> { let endpoint = &format!( "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}", PEER_PUBKY @@ -198,7 +198,7 @@ async fn test_influencers_by_source_reach_no_reach() -> Result<()> { #[tokio::test] async fn test_influencers_by_following_reach() -> Result<()> { - let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=following", PEER_PUBKY); + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=following", PEER_PUBKY); let body = make_request(endpoint).await?; assert!(body.is_array()); @@ -225,7 +225,7 @@ async fn test_influencers_by_following_reach() -> Result<()> { #[tokio::test] async fn test_influencers_by_followers_reach() -> Result<()> { - let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=followers", PEER_PUBKY); + let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=followers", PEER_PUBKY); let body = make_request(endpoint).await?; assert!(body.is_array()); @@ -250,7 +250,10 @@ async fn test_influencers_by_followers_reach() -> Result<()> { #[tokio::test] async fn test_influencers_by_friends_reach() -> Result<()> { - let endpoint = &format!("/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&source_reach=friends", PEER_PUBKY); + let endpoint = &format!( + "/v0/stream/users?source=influencers&timeframe=this_month&limit=3&user_id={}&reach=friends", + PEER_PUBKY + ); let body = make_request(endpoint).await?; assert!(body.is_array()); From 072a65ac824ef95381b5d566ef5b860b145ea163 Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 13 Dec 2024 17:01:01 -0500 Subject: [PATCH 17/18] fix cache prefixes using cached instead of cache --- src/models/tag/stream.rs | 2 +- src/models/user/influencers.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/models/tag/stream.rs b/src/models/tag/stream.rs index 3a47932a..3d698593 100644 --- a/src/models/tag/stream.rs +++ b/src/models/tag/stream.rs @@ -13,7 +13,7 @@ use crate::{RedisOps, ScoreAction}; pub const TAG_GLOBAL_HOT: [&str; 3] = ["Tags", "Global", "Hot"]; -const GLOBAL_HOT_TAGS_PREFIX: &str = "Cached:Hot:Tags"; +const GLOBAL_HOT_TAGS_PREFIX: &str = "Cache:Hot:Tags"; #[derive(Serialize, Deserialize, Debug, ToSchema, Clone)] pub struct Taggers(pub Vec); diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index 7d51b82b..40149fa5 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -11,7 +11,7 @@ use utoipa::ToSchema; use crate::queries; use crate::RedisOps; -const GLOBAL_INFLUENCERS_PREFIX: &str = "Cached:Influencers"; +const GLOBAL_INFLUENCERS_PREFIX: &str = "Cache:Influencers"; #[derive(Deserialize, Serialize, ToSchema, Debug, Clone)] pub struct Influencer { From 66ab11c5cb6e6b09db7b205160fbf3d87078e9b6 Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 13 Dec 2024 17:03:25 -0500 Subject: [PATCH 18/18] remove unsafe unwraps --- src/models/user/influencers.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/models/user/influencers.rs b/src/models/user/influencers.rs index 40149fa5..a2c4f011 100644 --- a/src/models/user/influencers.rs +++ b/src/models/user/influencers.rs @@ -125,7 +125,7 @@ impl Influencers { Some(user_id) => { Influencers::get_influencers_by_reach( user_id, - reach.unwrap(), + reach.unwrap_or(StreamReach::Friends), skip, limit, timeframe, @@ -160,7 +160,10 @@ impl Influencers { let query = queries::get::get_global_influencers(0, 100, timeframe); let result = retrieve_from_graph::(query, "influencers").await?; - let influencers = result.unwrap(); + let influencers = match result { + Some(influencers) => influencers, + None => return Ok(None), + }; if influencers.len() > 0 { Influencers::put_to_global_cache(influencers.clone(), timeframe).await?; }