Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add search endpoint by username #73

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ harness = false
name = "thread"
harness = false

[[bench]]
name = "search"
harness = false


# Max performance profile
[profile.release]
Expand Down
60 changes: 60 additions & 0 deletions benches/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use pubky_nexus::models::user::UserSearch;
use pubky_nexus::setup;
use pubky_nexus::Config;
use std::env;
use std::sync::Once;
use std::time::Duration;
use tokio::runtime::Runtime;

static INIT: Once = Once::new();

pub fn run_setup() {
INIT.call_once(|| {
let rt = Runtime::new().unwrap();
env::set_var("RUST_LOG", "error");
rt.block_on(async {
let config = Config::from_env();
setup(&config).await;
});
});
}

fn bench_user_search(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the user search.");
println!("***************************************");

run_setup();

let username = "a"; // Will match the anonymous users
let rt = Runtime::new().unwrap();

c.bench_with_input(
BenchmarkId::new("user_search", username),
&username,
|b, &username| {
b.to_async(&rt).iter(|| async {
let result = UserSearch::get_by_name(username, None, Some(40))
.await
.unwrap();
criterion::black_box(result);
});
},
);
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
.sample_size(100)
.warm_up_time(Duration::new(1, 0))
}

criterion_group! {
name = benches;
config = configure_criterion();
targets = bench_user_search,
}

criterion_main!(benches);
26 changes: 26 additions & 0 deletions benches/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,31 @@ fn bench_stream_posts_friends_reach(c: &mut Criterion) {
});
}

fn bench_stream_users_by_username_search(c: &mut Criterion) {
println!("***************************************");
println!("Benchmarking the user streams by username search.");
println!("***************************************");

run_setup();

let username = "An"; // Match all anonymous profiles
let rt = Runtime::new().unwrap();

c.bench_function("stream_users_by_username_search", |b| {
b.to_async(&rt).iter(|| async {
let user_stream = UserStream::get_from_username_search(
username,
None,
None,
Some(40), // Limit to 40 results
)
.await
.unwrap();
criterion::black_box(user_stream);
});
});
}

fn configure_criterion() -> Criterion {
Criterion::default()
.measurement_time(Duration::new(5, 0))
Expand All @@ -252,6 +277,7 @@ criterion_group! {
bench_stream_posts_following_reach,
bench_stream_posts_followers_reach,
bench_stream_posts_friends_reach,
bench_stream_users_by_username_search,
}

criterion_main!(benches);
50 changes: 50 additions & 0 deletions src/db/kv/index/sorted_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,53 @@ pub async fn get_range(
_ => Ok(Some(elements)),
}
}

/// Performs a lexicographical range search on the Redis sorted set.
///
/// # Arguments
///
/// * `min` - The minimum lexicographical bound (inclusive).
/// * `max` - The maximum lexicographical bound (exclusive).
/// * `limit` - The maximum number of elements to retrieve.
pub async fn get_lex_range(
prefix: &str,
key: &str,
min: &str,
max: &str,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Option<Vec<String>>, Box<dyn Error + Send + Sync>> {
let mut redis_conn = get_redis_conn().await?;
let index_key = format!("{}:{}", prefix, key);
let skip = skip.unwrap_or(0) as isize;
let limit = limit.unwrap_or(1000) as isize;

let elements: Vec<String> = redis_conn
.zrangebylex_limit(index_key, min, max, skip, limit)
.await?;

match elements.len() {
0 => Ok(None),
_ => Ok(Some(elements)),
}
}

/// Removes elements from the Redis sorted set.
///
/// # Arguments
///
/// * `items` - A slice of elements to remove.
pub async fn _remove(
prefix: &str,
key: &str,
items: &[&str],
) -> Result<(), Box<dyn Error + Send + Sync>> {
if items.is_empty() {
return Ok(());
}

let index_key = format!("{}:{}", prefix, key);
let mut redis_conn = get_redis_conn().await?;
redis_conn.zrem(&index_key, items).await?;
Ok(())
}
30 changes: 30 additions & 0 deletions src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,34 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let key = key_parts.join(":");
sorted_sets::get_range("Sorted", &key, start, end, skip, limit, sorting).await
}

/// Retrieves a lexicographical range of elements from a Redis sorted set using the provided key parts.
///
/// This method fetches elements from a Redis sorted set stored under the key generated from the provided `key_parts`.
/// The range is defined by `min` and `max` lexicographical bounds.
///
/// # Arguments
///
/// * `key_parts` - A slice of string slices that represent the parts used to form the key under which the sorted set is stored.
/// * `min` - The minimum lexicographical bound (inclusive).
/// * `max` - The maximum lexicographical bound (exclusive).
/// * `limit` - An optional number of elements to return (useful for pagination).
///
/// # Returns
///
/// Returns a vector of elements if they exist, or an empty vector if no matching elements are found.
///
/// # Errors
///
/// Returns an error if the operation fails, such as if the Redis connection is unavailable.
async fn try_from_index_sorted_set_lex(
key_parts: &[&str],
min: &str,
max: &str,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Option<Vec<String>>, Box<dyn Error + Send + Sync>> {
let key = key_parts.join(":");
sorted_sets::get_lex_range("Sorted", &key, min, max, skip, limit).await
}
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum Error {
InternalServerError { source: Box<dyn std::error::Error> },
#[error("Bookmarks not found: {user_id}")]
BookmarksNotFound { user_id: String },
#[error("Invalid input: {message}")]
InvalidInput { message: String },
// Add other custom errors here
}

Expand All @@ -25,6 +27,7 @@ impl IntoResponse for Error {
Error::UserNotFound { .. } => StatusCode::NOT_FOUND,
Error::PostNotFound { .. } => StatusCode::NOT_FOUND,
Error::BookmarksNotFound { .. } => StatusCode::NOT_FOUND,
Error::InvalidInput { .. } => StatusCode::BAD_REQUEST,
Error::InternalServerError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
// Map other errors to appropriate status codes
};
Expand All @@ -38,6 +41,9 @@ impl IntoResponse for Error {
Error::BookmarksNotFound { user_id } => {
debug!("Bookmarks not found: {}", user_id)
}
Error::InvalidInput { message } => {
debug!("Invalid input: {}", message)
}
Error::InternalServerError { source } => error!("Internal server error: {:?}", source),
};

Expand Down
2 changes: 2 additions & 0 deletions src/models/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ where
Self::to_index(&existing_record_ids, existing_records).await?;
}

Self::add_to_sorted_sets(&missing_records).await;
Ok(missing_records)
}

Expand All @@ -119,4 +120,5 @@ where
}

fn graph_query(id_list: &[&str]) -> Query;
async fn add_to_sorted_sets(elements: &[std::option::Option<Self>]);
}
21 changes: 18 additions & 3 deletions src/models/user/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize};
use serde_json;
use utoipa::ToSchema;

use super::UserSearch;

/// Represents a user's single link with a title and URL.
#[derive(Serialize, Deserialize, ToSchema, Default, Clone, Debug)]
pub struct UserLink {
Expand Down Expand Up @@ -34,18 +36,31 @@ impl Collection for UserDetails {
fn graph_query(id_list: &[&str]) -> Query {
queries::get_users_details_by_ids(id_list)
}

async fn add_to_sorted_sets(details: &[std::option::Option<Self>]) {
// Filter out None and collect only the references to UserDetails
let user_details_refs: Vec<&UserDetails> = details
.iter()
.filter_map(|detail| detail.as_ref()) // Filter out None and unwrap Some
.collect();

// Pass the references to the add_many_to_username_sorted_set function
UserSearch::add_many_to_username_sorted_set(&user_details_refs)
.await
.unwrap();
}
}

/// Represents user data with name, bio, image, links, and status.
#[derive(Serialize, Deserialize, ToSchema, Default, Clone, Debug)]
pub struct UserDetails {
name: String,
pub name: String,
bio: String,
id: String,
pub id: String,
#[serde(deserialize_with = "deserialize_user_links")]
links: Vec<UserLink>,
status: String,
indexed_at: i64,
pub indexed_at: i64,
}

fn deserialize_user_links<'de, D>(deserializer: D) -> Result<Vec<UserLink>, D::Error>
Expand Down
2 changes: 2 additions & 0 deletions src/models/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod counts;
mod details;
mod follows;
mod relationship;
mod search;
mod stream;
mod tags;
mod view;
Expand All @@ -10,6 +11,7 @@ pub use counts::UserCounts;
pub use details::{UserDetails, UserLink};
pub use follows::{Followers, Following, Friends, UserFollows};
pub use relationship::Relationship;
pub use search::UserSearch;
pub use stream::{UserStream, UserStreamType};
pub use tags::ProfileTag;
pub use tags::UserTags;
Expand Down
81 changes: 81 additions & 0 deletions src/models/user/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::error::Error;

use super::UserDetails;
use crate::RedisOps;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

const USER_NAME_KEY_PARTS: [&str; 2] = ["Users", "Name"];

#[derive(Serialize, Deserialize, ToSchema, Default)]
pub struct UserSearch(pub Vec<String>);

impl RedisOps for UserSearch {}

impl UserSearch {
pub async fn get_by_name(
name: &str,
skip: Option<usize>,
limit: Option<usize>,
) -> Result<Option<Self>, Box<dyn Error + Send + Sync>> {
// Convert the username to lowercase to ensure case-insensitive search
let name = name.to_lowercase();

let min = format!("[{}", name); // Inclusive range starting with "name"
let max = format!("({}~", name); // Exclusive range ending just after "name"

// Perform the lexicographical range search
let elements =
Self::try_from_index_sorted_set_lex(&USER_NAME_KEY_PARTS, &min, &max, skip, limit)
.await?;

// If elements exist, process them to extract user_ids
if let Some(elements) = elements {
let user_ids: Vec<String> = elements
.into_iter()
.filter_map(|element| {
// Split by `:` and take the second part (user_id)
element
.split_once(':')
.map(|(_, user_id)| user_id.to_string())
})
.collect();

return Ok(Some(UserSearch(user_ids)));
}

Ok(None)
}

/// Adds multiple `user_id`s to the Redis sorted set using the username as index.
///
/// This method takes a list of `UserDetails` and adds them all to the sorted set at once.
pub async fn add_many_to_username_sorted_set(
details_list: &[&UserDetails],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Collect all the `username:user_id` pairs and their corresponding scores
let mut items: Vec<(f64, String)> = Vec::with_capacity(details_list.len());

for details in details_list {
// Convert the username to lowercase before storing
let username = details.name.to_lowercase();
let user_id = &details.id;
let score = details.indexed_at as f64;

// The value in the sorted set will be `username:user_id`
let member = format!("{}:{}", username, user_id);

items.push((score, member));
}

// Perform a single Redis ZADD operation with all the items
Self::put_index_sorted_set(
&USER_NAME_KEY_PARTS,
&items
.iter()
.map(|(score, member)| (*score, member.as_str()))
.collect::<Vec<_>>(),
)
.await
}
}
Loading
Loading