-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add tags streams #68
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!! 🚀 I left a few comments.
Automated tests are not yet existing in this PR as it was opened before we created the workflows. You need to run cargo fmt
for tests to not fail in main
.
src/db/graph/queries.rs
Outdated
@@ -208,3 +208,71 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz | |||
} | |||
query(&query_string).param("user_id", user_id) | |||
} | |||
|
|||
// Retrieves popular tags across the entire network | |||
// Results ordered by post count (descending), effectively ranking "hot" tags. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query does not seem to order the results by desceding post count. It would need something like this before the RETURN
statement:
ORDER BY uniquePosts DESC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tipogi This query still does not sort by descending post count. Maybe it works as intended this way, but the docstring is not correct.
src/db/graph/queries.rs
Outdated
|
||
// Analyzes tag usage for a specific list of user IDs. Groups tags by name, | ||
// showing for each: label, post count, list of user IDs and total usage count. | ||
// Orders by user count and usage (descending). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering seems to be only by usage count (times), not by user count.
src/db/graph/queries.rs
Outdated
// showing for each: label, post count, list of user IDs and total usage count. | ||
// Orders by user count and usage (descending). | ||
// Note: Only considers users from the provided ID list. | ||
pub fn get_tags_from_user_ids(user_ids: &[&str]) -> Query { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: when naming functions we are typically using ..._by_user_ids
instead of from
src/reindex.rs
Outdated
|
||
impl RedisOps for HotTagsScore {} | ||
|
||
async fn retrive_global_hot_tags() -> Result<(), Box<dyn Error + Send + Sync>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not belong in the reindexer.rs
All this logic for reindexing should be in the Tag Stream model.
src/models/tag/stream.rs
Outdated
} | ||
} | ||
|
||
async fn retrieve_users_tags_by_reach(users: &[String]) -> Result<Option<Vec<HotTag>>, Box<dyn Error + Send + Sync>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically use get_
on our function names. Let's avoid using new lexicon (retrieve_
) for the same meaning and functionality.
If all of these functions are specific for HotTags maybe they belong as HotTags methods. We can also avoid using hot_tags
in the function names if that is the case.
src/routes/v0/stream/tags.rs
Outdated
} | ||
|
||
#[derive(Deserialize)] | ||
pub struct StreamTagsReachInput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far we call these ones Query
instead of Input
for example PostStreamReachQuery
pub const GLOBAL_HOT_TAGS: [&str; 3] = ["Tags", "Global", "Hot"]; | ||
|
||
#[derive(Deserialize, Serialize, ToSchema, Debug)] | ||
pub struct HotTag { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if too late or if it matters. It feels like we have HotTag
as unit and HotTags
should the a Vec<HotTags>
that implements all of the functionality? In the same way as our other streams are pub struct UserStream(Vec<UserView>);
or pub struct PostStream(Vec<PostView>);
I do not think it matters much at all, but it is a slightly different approach :)
src/routes/v0/stream/tags.rs
Outdated
|
||
match HotTag::get_stream_tags_by_reach(user_id, reach).await { | ||
Ok(Some(hot_tags)) => Ok(Json(hot_tags)), | ||
Ok(None) => Err(Error::TagsNotFound { reach: String::from("REACH") }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe here we intended to enter the query input reach
as part of the tags not found message error?
src/routes/v0/stream/tags.rs
Outdated
("reach" = UserStreamType, Path, description = "Reach type: Follower | Following | Friends") | ||
), | ||
responses( | ||
(status = 200, description = "Retrieve tags by reach cluster", body = StreamTags), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/reindex.rs
Outdated
|
||
if let Some(row) = result.next().await? { | ||
let hot_tags: Vec<(f64, &str)> = row.get("hot_tags")?; | ||
HotTagsScore::put_index_sorted_set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just move this (putting the index_sorted_set) within HotTag::retrieve_hot_tags_from_graph(query)
and we call it once our reindexer. Then take we take it out of ``HotTag::get_global_tags_stream(.....)` for global tags as it should always be served from reddis.
This whole function does not belong in the reindexer, there should be no need to import GLOBAL_HOT_TAGS or RedisOps, or Serde, and queries and such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @tipogi, I left some comments for a few smaller changes! I think once we fix those small things and fix the merge conflicts we can merge!
src/db/graph/queries.rs
Outdated
MATCH (u:User)-[tag:TAGGED]->(p:Post) | ||
WITH tag.label AS label, COUNT(DISTINCT p) AS uniquePosts | ||
WHERE uniquePosts >= $threshold | ||
RETURN COLLECT([toFloat(uniquePosts), label]) AS hot_tags | ||
") | ||
", | ||
) | ||
.param("threshold", threshold as f64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this threshold is a good idea. It's okay if we use more memory on this sorted set.
src/db/graph/queries.rs
Outdated
@@ -208,3 +208,71 @@ pub fn get_user_following(user_id: &str, skip: Option<usize>, limit: Option<usiz | |||
} | |||
query(&query_string).param("user_id", user_id) | |||
} | |||
|
|||
// Retrieves popular tags across the entire network | |||
// Results ordered by post count (descending), effectively ranking "hot" tags. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tipogi This query still does not sort by descending post count. Maybe it works as intended this way, but the docstring is not correct.
// We cannot use here limit and skip because we want to get all the users reach by | ||
let users = UserStream::get_user_list_from_reach(&user_id, reach, None, Some(10000)).await?; | ||
let users = | ||
UserStream::get_user_list_from_reach(&user_id, reach, None, Some(10000)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to make sure to get all users we can use Some(isize::MAX as usize)
instead of Some(10000)
. But I believe we might want to limit this to maybe ~500 users as for really big accounts this will become expensive. Same problem we have with the stream of posts by reach.
src/reindex.rs
Outdated
@@ -66,7 +62,10 @@ pub async fn reindex() { | |||
} | |||
} | |||
|
|||
retrive_global_hot_tags().await.expect("Failed to store the global hot tags"); | |||
info!("Retrieving tags global score..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not logging info!
during the rest of the reindexer. So we should either keep everything as debug!
or be consistent and log the other reindexing routines with info!
Pre-submission Checklist
cargo clippy
cargo test
.cargo bench