Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: improve influencers endpoint #234

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
22519d0
rename pioneers to influencers
amirRamirfatahi Nov 29, 2024
65d5bf6
Merge branch 'feature/hot-tags' into feature/influencers
amirRamirfatahi Nov 29, 2024
233bbf3
add influencers
amirRamirfatahi Nov 29, 2024
97a8a65
Add queries
amirRamirfatahi Nov 30, 2024
188fe22
fix build
amirRamirfatahi Dec 3, 2024
e3f19c4
Add reindex
amirRamirfatahi Dec 3, 2024
d62f98f
Use new influencers in user stream
amirRamirfatahi Dec 3, 2024
4549801
Merge branch 'main' into feature/influencers
amirRamirfatahi Dec 3, 2024
42d9688
fix lint issues
amirRamirfatahi Dec 3, 2024
99e0a59
Add more tests and fix review issues
amirRamirfatahi Dec 4, 2024
1fdac36
Merge branch 'feature/hot-tags' into feature/influencers
amirRamirfatahi Dec 5, 2024
b76cdd2
Add preview option to influencers
amirRamirfatahi Dec 5, 2024
bd37289
fix broken test from merge conflict
amirRamirfatahi Dec 5, 2024
4d52912
remove reach from influencer enum and move it to query param
amirRamirfatahi Dec 5, 2024
85f7465
fix query string syntax error
amirRamirfatahi Dec 5, 2024
6b91172
Merge branch 'main' into feature/influencers
amirRamirfatahi Dec 11, 2024
2b4d5a2
Refactor Influencers cache
amirRamirfatahi Dec 11, 2024
4f18737
fix lint issues
amirRamirfatahi Dec 11, 2024
1b49dbd
Add tests and fix found bugs
amirRamirfatahi Dec 11, 2024
f762204
rename source_reach to reach
amirRamirfatahi Dec 13, 2024
072a65a
fix cache prefixes using cached instead of cache
amirRamirfatahi Dec 13, 2024
66ab11c
remove unsafe unwraps
amirRamirfatahi Dec 13, 2024
7caf20f
Merge branch 'main' into feature/influencers
amirRamirfatahi Dec 13, 2024
4162acc
Merge branch 'main' into feature/influencers
amirRamirfatahi Jan 13, 2025
c47b9dd
Merge branch 'main' into feature/influencers
amirRamirfatahi Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ criterion_group! {
tag::bench_stream_tag_timeline,
tag::bench_stream_tag_total_engagement,
user::bench_stream_users_by_username_search,
user::bench_stream_pioneers,
user::bench_stream_influencers,
user::bench_stream_following,
user::bench_stream_most_followed,
kind::bench_stream_post_kind_short,
Expand Down
66 changes: 42 additions & 24 deletions benches/streams_benches/user.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::run_setup;
use criterion::Criterion;
use pubky_nexus::models::user::{UserStream, UserStreamSource};
use pubky_nexus::{
models::user::{UserStream, UserStreamInput, UserStreamSource},
types::StreamReach,
};
use tokio::runtime::Runtime;

/// USER STREAMS BENCHMARKS
Expand All @@ -17,14 +20,17 @@ pub fn bench_stream_following(c: &mut Criterion) {

c.bench_function("stream_following", |b| {
b.to_async(&rt).iter(|| async {
let user_stream = UserStream::get_by_id(
Some(user_id),
None,
None,
Some(20),
UserStreamSource::Pioneers,
None,
)
let user_stream = UserStream::get_by_id(&UserStreamInput {
user_id: Some(String::from(user_id)),
viewer_id: None,
skip: None,
limit: Some(20),
source: UserStreamSource::Influencers,
reach: Some(StreamReach::Following),
depth: None,
timeframe: None,
preview: None,
})
.await
.unwrap();
criterion::black_box(user_stream);
Expand All @@ -43,14 +49,17 @@ pub fn bench_stream_most_followed(c: &mut Criterion) {

c.bench_function("stream_most_followed", |b| {
b.to_async(&rt).iter(|| async {
let user_stream = UserStream::get_by_id(
None,
None,
None,
Some(20),
UserStreamSource::MostFollowed,
None,
)
let user_stream = UserStream::get_by_id(&UserStreamInput {
user_id: None,
viewer_id: None,
skip: None,
limit: Some(20),
source: UserStreamSource::MostFollowed,
reach: None,
depth: None,
timeframe: None,
preview: None,
})
.await
.unwrap();
criterion::black_box(user_stream);
Expand Down Expand Up @@ -83,21 +92,30 @@ pub fn bench_stream_users_by_username_search(c: &mut Criterion) {
});
}

pub fn bench_stream_pioneers(c: &mut Criterion) {
pub fn bench_stream_influencers(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the user streams for pioneer users.");
println!("Benchmarking the user streams for influencer users.");
println!("***************************************");

run_setup();

let rt = Runtime::new().unwrap();

c.bench_function("stream_pioneers", |b| {
c.bench_function("stream_influencers", |b| {
b.to_async(&rt).iter(|| async {
let user_stream =
UserStream::get_by_id(None, None, None, Some(20), UserStreamSource::Pioneers, None)
.await
.unwrap();
let user_stream = UserStream::get_by_id(&UserStreamInput {
user_id: None,
viewer_id: None,
skip: None,
limit: Some(20),
source: UserStreamSource::Influencers,
reach: Some(StreamReach::Wot(3)),
depth: None,
timeframe: None,
preview: None,
})
.await
.unwrap();
criterion::black_box(user_stream);
});
});
Expand Down
16 changes: 8 additions & 8 deletions benches/tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::global::TagGlobal;
use pubky_nexus::models::tag::post::TagPost;
use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput, TagStreamReach};
use pubky_nexus::models::tag::stream::{HotTags, HotTagsInput};
use pubky_nexus::models::tag::traits::{TagCollection, TaggersCollection};
use pubky_nexus::models::tag::user::TagUser;
use pubky_nexus::types::{Pagination, Timeframe};
use pubky_nexus::types::{Pagination, StreamReach, Timeframe};
use setup::run_setup;
use std::time::Duration;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -245,7 +245,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", TagStreamReach::Following);
let reach_by = format!("{:?}", StreamReach::Following);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
Expand All @@ -265,7 +265,7 @@ fn bench_get_following_reach_hot_tags(c: &mut Criterion) {
};
let profile = HotTags::get_hot_tags(
Some(String::from(params[0])),
Some(TagStreamReach::Following),
Some(StreamReach::Following),
&input,
)
.await
Expand All @@ -286,7 +286,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", TagStreamReach::Followers);
let reach_by = format!("{:?}", StreamReach::Followers);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
Expand All @@ -306,7 +306,7 @@ fn bench_get_followers_reach_hot_tags(c: &mut Criterion) {
};
let profile = HotTags::get_hot_tags(
Some(String::from(params[0])),
Some(TagStreamReach::Followers),
Some(StreamReach::Followers),
&input,
)
.await
Expand All @@ -327,7 +327,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
run_setup();

let user_id = "o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo";
let reach_by = format!("{:?}", TagStreamReach::Friends);
let reach_by = format!("{:?}", StreamReach::Friends);
let rt: Runtime = Runtime::new().unwrap();

c.bench_with_input(
Expand All @@ -347,7 +347,7 @@ fn bench_get_friends_reach_hot_tags(c: &mut Criterion) {
};
let profile = HotTags::get_hot_tags(
Some(String::from(params[0])),
Some(TagStreamReach::Friends),
Some(StreamReach::Friends),
&input,
)
.await
Expand Down
109 changes: 94 additions & 15 deletions src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::models::post::StreamSource;
use crate::models::tag::stream::HotTagsInput;
use crate::models::tag::stream::TagStreamReach;
use crate::types::Pagination;
use crate::types::StreamReach;
use crate::types::StreamSorting;
use log::debug;
use crate::types::Timeframe;
use neo4rs::{query, Query};
use pubky_app_specs::PubkyAppPostKind;

Expand Down Expand Up @@ -414,21 +414,23 @@ pub fn get_global_hot_tags_taggers(tag_list: &[&str]) -> Query {
.param("labels", tag_list)
}

fn tag_stream_reach_to_graph_subquery(reach: &TagStreamReach) -> String {
let query = match reach {
TagStreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)",
TagStreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)",
TagStreamReach::Friends => {
"MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)"
fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String {
match reach {
StreamReach::Followers => "MATCH (user:User)<-[:FOLLOWS]-(reach:User)".to_string(),
StreamReach::Following => "MATCH (user:User)-[:FOLLOWS]->(reach:User)".to_string(),
StreamReach::Friends => {
"MATCH (user:User)-[:FOLLOWS]->(reach:User), (user)<-[:FOLLOWS]-(reach)".to_string()
}
};
String::from(query)
StreamReach::Wot(depth) => {
format!("MATCH (viewer)-[:FOLLOWS*1..{}]->(tagger:User)", depth)
}
}
}

pub fn get_tag_taggers_by_reach(
label: &str,
user_id: &str,
reach: TagStreamReach,
reach: StreamReach,
skip: usize,
limit: usize,
) -> Query {
Expand All @@ -443,7 +445,7 @@ pub fn get_tag_taggers_by_reach(
SKIP $skip LIMIT $limit
RETURN COLLECT(reach.id) as tagger_ids
",
tag_stream_reach_to_graph_subquery(&reach)
stream_reach_to_graph_subquery(&reach)
)
.as_str(),
)
Expand All @@ -455,7 +457,7 @@ pub fn get_tag_taggers_by_reach(

pub fn get_hot_tags_by_reach(
user_id: &str,
reach: TagStreamReach,
reach: StreamReach,
tags_query: &HotTagsInput,
) -> Query {
let input_tagged_type = match &tags_query.tagged_type {
Expand Down Expand Up @@ -485,7 +487,7 @@ pub fn get_hot_tags_by_reach(
SKIP $skip LIMIT $limit
RETURN COLLECT(hot_tag) as hot_tags
",
tag_stream_reach_to_graph_subquery(&reach),
stream_reach_to_graph_subquery(&reach),
input_tagged_type,
tags_query.taggers_limit
)
Expand All @@ -504,7 +506,6 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInput) -> Query {
None => String::from("Post|User"),
};
let (from, to) = tags_query.timeframe.to_timestamp_range();
debug!("get_global_hot_tags query: {:?} {:?}", from, to);
query(
format!(
"
Expand Down Expand Up @@ -535,6 +536,84 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInput) -> Query {
.param("to", to)
}

pub fn get_influencers_by_reach(
user_id: &str,
reach: StreamReach,
skip: usize,
limit: usize,
timeframe: &Timeframe,
) -> Query {
let (from, to) = timeframe.to_timestamp_range();
query(
format!(
"
{}
WHERE user.id = $user_id

OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(reach)
WHERE follow.indexed_at >= $from AND follow.indexed_at < $to

OPTIONAL MATCH (reach)-[tag:TAGGED]->(tagged:Post)
WHERE tag.indexed_at >= $from AND tag.indexed_at < $to

OPTIONAL MATCH (reach)-[authored:AUTHORED]->(post:Post)
WHERE authored.indexed_at >= $from AND authored.indexed_at < $to

WITH reach, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count,
COUNT(DISTINCT post) AS posts_count
WITH {{
id: reach.id,
score: (tags_count + posts_count) * sqrt(followers_count)
}} AS influencer
ORDER BY influencer.score DESC, reach.id ASC
SKIP $skip LIMIT $limit
RETURN COLLECT(influencer) as influencers
",
stream_reach_to_graph_subquery(&reach),
)
.as_str(),
)
.param("user_id", user_id)
.param("skip", skip as i64)
.param("limit", limit as i64)
.param("from", from)
.param("to", to)
}

pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) -> Query {
let (from, to) = timeframe.to_timestamp_range();
query(
"
MATCH (user:User)
WITH DISTINCT user

OPTIONAL MATCH (others:User)-[follow:FOLLOWS]->(user)
WHERE follow.indexed_at >= $from AND follow.indexed_at < $to

OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post)
WHERE tag.indexed_at >= $from AND tag.indexed_at < $to

OPTIONAL MATCH (user)-[authored:AUTHORED]->(post:Post)
WHERE authored.indexed_at >= $from AND authored.indexed_at < $to

WITH user, COUNT(DISTINCT follow) AS followers_count, COUNT(DISTINCT tag) AS tags_count,
COUNT(DISTINCT post) AS posts_count
WITH {
id: user.id,
score: (tags_count + posts_count) * sqrt(followers_count + 1)
} AS influencer
WHERE influencer.id IS NOT NULL
ORDER BY influencer.score DESC, influencer.id ASC
SKIP $skip LIMIT $limit
RETURN COLLECT(influencer) as influencers
",
)
.param("skip", skip as i64)
.param("limit", limit as i64)
.param("from", from)
.param("to", to)
}

pub fn get_files_by_ids(key_pair: &[&[&str]]) -> Query {
query(
"
Expand Down
8 changes: 4 additions & 4 deletions src/models/tag/global.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::stream::{TagStreamReach, Taggers};
use super::stream::Taggers;
use crate::db::graph::exec::retrieve_from_graph;
use crate::types::DynError;
use crate::types::{DynError, StreamReach};
use crate::{queries, RedisOps};

pub struct TagGlobal {}
Expand All @@ -9,7 +9,7 @@ impl TagGlobal {
pub async fn get_tag_taggers(
label: String,
user_id: Option<String>,
reach: Option<TagStreamReach>,
reach: Option<StreamReach>,
skip: usize,
limit: usize,
) -> Result<Option<Vec<String>>, DynError> {
Expand All @@ -31,7 +31,7 @@ pub async fn read_from_set(
pub async fn get_tag_taggers_by_reach(
label: &str,
user_id: &str,
reach: TagStreamReach,
reach: StreamReach,
skip: usize,
limit: usize,
) -> Result<Option<Vec<String>>, DynError> {
Expand Down
Loading
Loading