Skip to content

Commit

Permalink
feat: Post tags (#87)
Browse files Browse the repository at this point in the history
* Post tags endpoint: Get data from cache or graph
* Integration tests created
  • Loading branch information
tipogi authored Aug 28, 2024
1 parent 9273500 commit ac262b7
Show file tree
Hide file tree
Showing 22 changed files with 408 additions and 108 deletions.
2 changes: 1 addition & 1 deletion benches/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn bench_get_post_by_id(c: &mut Criterion) {
&post_id,
|b, &id| {
b.to_async(&rt).iter(|| async {
let post = PostView::get_by_id(author_id, id, Some(viewer_id))
let post = PostView::get_by_id(author_id, id, Some(viewer_id), None, None)
.await
.unwrap();
criterion::black_box(post);
Expand Down
6 changes: 4 additions & 2 deletions benches/tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion};
use pubky_nexus::models::tag::global::TagGlobal;
use pubky_nexus::models::tag::post::PostTags;
use pubky_nexus::models::tag::post::TagPost;
use pubky_nexus::models::tag::stream::HotTags;
use pubky_nexus::models::tag::user::UserTags;
use pubky_nexus::models::user::UserStreamType;
Expand Down Expand Up @@ -52,7 +52,9 @@ fn bench_get_post_tags(c: &mut Criterion) {
&[user_id, post_id],
|b, &params| {
b.to_async(&rt).iter(|| async {
let profile = PostTags::get_by_id(params[0], params[1]).await.unwrap();
let profile = TagPost::get_by_id(params[0], params[1], None, None)
.await
.unwrap();
criterion::black_box(profile);
});
},
Expand Down
24 changes: 8 additions & 16 deletions src/db/graph/queries/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,19 @@ pub fn get_users_details_by_ids(user_ids: &[&str]) -> Query {
.param("ids", user_ids)
}

/// Retrieve all the tags of the post
// Retrieve all the tags of the post
pub fn post_tags(user_id: &str, post_id: &str) -> neo4rs::Query {
query(
"
MATCH (u:User {id: $user_id})-[:AUTHORED]->(p:Post {id: $post_id})
CALL {
WITH p
MATCH (tagger:User)-[tag:TAGGED]->(p)
WITH tag.label AS name,
collect({
tag_id: tag.id,
indexed_at: tag.indexed_at,
tagger_id: tagger.id
}) AS from
WITH tag.label AS name, collect(DISTINCT tagger.id) AS tagger_ids
RETURN collect({
label: name,
tagged: from
taggers: tagger_ids,
taggers_count: SIZE(tagger_ids)
}) AS post_tags
}
RETURN
Expand All @@ -105,22 +101,18 @@ pub fn post_tags(user_id: &str, post_id: &str) -> neo4rs::Query {
.param("post_id", post_id)
}

// Retrieve all the tags of the user
pub fn user_tags(user_id: &str) -> neo4rs::Query {
query(
"
MATCH (u:User {id: $user_id})
CALL {
WITH u
MATCH (p:User)-[r:TAGGED]->(u)
WITH r.label AS name,
collect({
tag_id: r.id,
indexed_at: r.indexed_at,
tagger_id: p.id
}) AS from
MATCH (p:User)-[t:TAGGED]->(u)
WITH t.label AS name, collect(DISTINCT p.id) AS tagger_ids
RETURN collect({
label: name,
tagged: from
taggers: tagger_ids
}) AS user_tags
}
RETURN
Expand Down
38 changes: 38 additions & 0 deletions src/db/kv/index/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,41 @@ pub async fn get_multiple_sets(

Ok(taggers_list)
}

/// Adds elements to multiple Redis sets using a pipeline.
///
/// This asynchronous function adds elements to multiple Redis sets, with the keys generated by concatenating the provided `prefix`, `common_key`, and each element of `index`.
/// Each set is uniquely identified by a combination of these keys, ensuring that each element in each set is unique.
///
/// # Arguments
///
/// * `prefix` - A string slice representing the prefix for the Redis keys.
/// * `common_key` - A slice of string slices representing the common components of the Redis keys, which will be joined with colons (`:`).
/// * `index` - A slice of string slices representing the unique identifiers to append to the generated keys.
/// * `collections` - A slice of slices, where each inner slice contains elements to be added to the corresponding Redis set. The elements in each collection are added to the Redis set identified by the respective key from the `index`.
///
/// # Errors
///
/// This function will return an error if:
/// - The Redis connection cannot be established.
/// - The pipeline query fails to execute.
pub async fn put_multiple_sets(
prefix: &str,
common_key: &[&str],
index: &[&str],
collections: &[&[&str]],
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut redis_conn = get_redis_conn().await?;
let mut pipe = redis::pipe();

for (i, key) in index.iter().enumerate() {
let full_index = format!("{}:{}:{}", &prefix, common_key.join(":"), key);
if !collections[i].is_empty() {
pipe.sadd(full_index, collections[i]);
}
}

// Execute the pipeline
pipe.query_async(&mut redis_conn).await?;
Ok(())
}
43 changes: 43 additions & 0 deletions src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,47 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let prefix = Self::prefix().await;
sets::get_multiple_sets(&prefix, key_parts_list, limit).await
}

/// Adds elements to multiple Redis sets using the provided keys and collections.
///
/// This asynchronous function allows you to add elements to multiple Redis sets,
/// with each set identified by a key generated from the `common_key` and `index_ref`.
/// The function ensures that each element in each set is unique.
///
/// # Arguments
///
/// * `common_key` - A slice of string slices representing the common part of the Redis keys.
/// This will be combined with each element in `index` to generate the full Redis key.
/// * `index` - A slice of string slices representing the unique identifiers to append to the `common_key` to form the full Redis keys.
/// * `collections_refs` - A slice of vectors, where each inner vector contains elements to be added to the corresponding Redis set
///
/// # Returns
///
/// This function returns a `Result` indicating success or failure. A successful result means that
/// all elements were successfully added to their respective Redis sets.
///
/// # Errors
///
/// Returns an error if the operation fails, such as if the Redis connection is unavailable.
async fn put_multiple_set_indexes(
common_key: &[&str],
index: &[&str],
collections_refs: &[Vec<&str>],
) -> Result<(), Box<dyn Error + Send + Sync>> {
// Ensure the lengths of keys_refs and collections_refs match
if index.len() != collections_refs.len() {
return Err("Keys refs and collections refs length mismatch".into());
}

// Get the prefix for the Redis keys
let prefix = Self::prefix().await;

let refs: Vec<&[&str]> = collections_refs
.iter()
.map(|inner_vec| inner_vec.as_slice())
.collect();
let slice: &[&[&str]] = refs.as_slice();

sets::put_multiple_sets(&prefix, common_key, index, slice).await
}
}
2 changes: 1 addition & 1 deletion src/models/post/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl PostStream {
let viewer_id = viewer_id.clone();
let post_id = post_id.to_string();
let handle = spawn(async move {
PostView::get_by_id(&author_id, &post_id, viewer_id.as_deref()).await
PostView::get_by_id(&author_id, &post_id, viewer_id.as_deref(), None, None).await
});
handles.push(handle);
}
Expand Down
10 changes: 6 additions & 4 deletions src/models/post/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ impl PostThread {
}

if let Some(row) = result.next().await? {
let root_post_view = PostView::get_by_id(author_id, post_id, viewer_id).await?;
let root_post_view =
PostView::get_by_id(author_id, post_id, viewer_id, None, None).await?;

let root_post_view = match root_post_view {
None => return Ok(None),
Expand All @@ -46,9 +47,10 @@ impl PostThread {
let reply_id: String = reply.get("reply_id")?;
let reply_author_id: String = reply.get("author_id")?;

let reply_view = PostView::get_by_id(&reply_author_id, &reply_id, viewer_id)
.await?
.unwrap_or_default();
let reply_view =
PostView::get_by_id(&reply_author_id, &reply_id, viewer_id, None, None)
.await?
.unwrap_or_default();
replies_view.push(reply_view);
}

Expand Down
8 changes: 5 additions & 3 deletions src/models/post/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use super::{Bookmark, PostCounts, PostDetails, PostRelationships};
use crate::models::tag::post::PostTags;
use crate::models::tag::post::TagPost;

/// Represents a Pubky user with relational data including tags, counts, and relationship with a viewer.
#[derive(Serialize, Deserialize, ToSchema, Default)]
pub struct PostView {
details: PostDetails,
counts: PostCounts,
tags: PostTags,
tags: TagPost,
relationships: PostRelationships,
bookmark: Option<Bookmark>,
}
Expand All @@ -20,14 +20,16 @@ impl PostView {
author_id: &str,
post_id: &str,
viewer_id: Option<&str>,
max_tags: Option<usize>,
max_taggers: Option<usize>,
) -> Result<Option<Self>, Box<dyn std::error::Error + Send + Sync>> {
// Perform all operations concurrently
let (details, counts, bookmark, relationships, tags) = tokio::try_join!(
PostDetails::get_by_id(author_id, post_id),
PostCounts::get_by_id(author_id, post_id),
Bookmark::get_by_id(author_id, post_id, viewer_id),
PostRelationships::get_by_id(author_id, post_id),
PostTags::get_by_id(author_id, post_id),
TagPost::get_by_id(author_id, post_id, max_tags, max_taggers),
)?;

let details = match details {
Expand Down
64 changes: 64 additions & 0 deletions src/models/tag/details.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

type TagFieldsTuple<'a> = (Vec<(f64, &'a str)>, (Vec<&'a str>, Vec<Vec<&'a str>>));

#[derive(Serialize, Deserialize, ToSchema, Debug, Clone, Default)]
pub struct TagDetails {
pub label: String,
taggers: Vec<String>,
taggers_count: usize,
}

impl TagDetails {
/// Creates a list of `TagDetails` from cached tag scores and taggers.
/// # Arguments
/// * `tag_scores` - A `Vec` of tuples where each tuple contains a tag label as a `String` and a score as an `f64`.
/// * `taggers_list` - A `Vec` of `Option` tuples where each tuple contains a `Vec` of tagger identifiers as `String` and a `usize`.
///
/// # Returns
///
/// A `Vec` of `TagDetails` instances, filtered to include only those where `taggers_list` contains `Some` data.
pub fn from_index(
tag_scores: Vec<(String, f64)>,
taggers_list: Vec<Option<(Vec<String>, usize)>>,
) -> Vec<TagDetails> {
tag_scores
.into_iter()
.zip(taggers_list)
.filter_map(|((label, _), taggers)| {
// TIP: MAP will not process None types and it will be automatically passed through unchanged
taggers.map(|(taggers, taggers_count)| TagDetails {
label,
taggers,
taggers_count,
})
})
.collect()
}

/// Splits fields of `TagDetails` and calculates scores based on the number of taggers.
/// # Arguments
/// * `tag_details` - A reference to a slice of `TagDetails` instances.
/// # Returns
/// - A list of tag scores paired with their corresponding labels.
/// - A list of labels and a corresponding list of lists containing tagger identifiers.
///
pub fn process_tag_details<'a>(tag_details: &'a [TagDetails]) -> TagFieldsTuple {
let mut tag_scores: Vec<(f64, &str)> = Vec::with_capacity(tag_details.len());
let mut labels = Vec::with_capacity(tag_details.len());
let mut taggers_id = Vec::with_capacity(tag_details.len());
for tag in tag_details {
let label: &str = &tag.label;
let taggers = tag
.taggers
.iter()
.map(|s| s.as_str())
.collect::<Vec<&'a str>>();
tag_scores.push((tag.taggers.len() as f64, label));
labels.push(tag.label.as_str());
taggers_id.push(taggers);
}
(tag_scores, (labels, taggers_id))
}
}
37 changes: 3 additions & 34 deletions src/models/tag/mod.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,8 @@
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use utoipa::ToSchema;

pub mod details;
pub mod global;
pub mod post;
pub mod stream;
pub mod user;

// Atomic struct to save in the cache
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct Tag {
tag_id: String, // TODO: Crobfordbase32 type
indexed_at: i64,
tagger_id: String,
}

impl Default for Tag {
fn default() -> Self {
Self {
tag_id: String::new(),
indexed_at: Utc::now().timestamp(),
tagger_id: String::new(),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, ToSchema, Default)]
pub struct Tags(Vec<Tag>);

// Implement Deref so TagList can be used like Vec<String>
impl Deref for Tags {
type Target = Vec<Tag>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
// TODO: Use all the structs in that away
pub use details::TagDetails;
Loading

0 comments on commit ac262b7

Please sign in to comment.