diff --git a/.sqlx/query-7ab21e7613dd88e97cf602e76bff62170c13ceef8104a4ce4cb2d101f8ce4f48.json b/.sqlx/query-7ab21e7613dd88e97cf602e76bff62170c13ceef8104a4ce4cb2d101f8ce4f48.json deleted file mode 100644 index 5fb8d0de..00000000 --- a/.sqlx/query-7ab21e7613dd88e97cf602e76bff62170c13ceef8104a4ce4cb2d101f8ce4f48.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users\n SET balance = balance + $1\n WHERE id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "7ab21e7613dd88e97cf602e76bff62170c13ceef8104a4ce4cb2d101f8ce4f48" -} diff --git a/.sqlx/query-b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af.json b/.sqlx/query-b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af.json deleted file mode 100644 index 0db3e537..00000000 --- a/.sqlx/query-b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE versions\n SET downloads = downloads + 1\n WHERE id = ANY($1)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array" - ] - }, - "nullable": [] - }, - "hash": "b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af" -} diff --git a/.sqlx/query-d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b.json b/.sqlx/query-d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b.json deleted file mode 100644 index 7eab9304..00000000 --- a/.sqlx/query-d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE mods\n SET downloads = downloads + 1\n WHERE id = ANY($1)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array" - ] - }, - "nullable": [] - }, - "hash": "d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b" -} diff --git a/.sqlx/query-3d535886d8a239967e6556fb0cd0588b79a7787b9b3cbbd4f8968cd0d99ed49d.json b/.sqlx/query-f297b517bc3bbd8628c0c222c0e3daf8f4efbe628ee2e8ddbbb4b9734cc9c915.json similarity index 73% rename from .sqlx/query-3d535886d8a239967e6556fb0cd0588b79a7787b9b3cbbd4f8968cd0d99ed49d.json rename to .sqlx/query-f297b517bc3bbd8628c0c222c0e3daf8f4efbe628ee2e8ddbbb4b9734cc9c915.json index 371132b5..dc923578 100644 --- a/.sqlx/query-3d535886d8a239967e6556fb0cd0588b79a7787b9b3cbbd4f8968cd0d99ed49d.json +++ b/.sqlx/query-f297b517bc3bbd8628c0c222c0e3daf8f4efbe628ee2e8ddbbb4b9734cc9c915.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO moderation_external_files (sha1, external_license_id)\n SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])\n ", + "query": "\n INSERT INTO moderation_external_files (sha1, external_license_id)\n SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])\n ON CONFLICT (sha1) DO NOTHING\n ", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "3d535886d8a239967e6556fb0cd0588b79a7787b9b3cbbd4f8968cd0d99ed49d" + "hash": "f297b517bc3bbd8628c0c222c0e3daf8f4efbe628ee2e8ddbbb4b9734cc9c915" } diff --git a/Cargo.lock b/Cargo.lock index e32b9b09..d628d5a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -690,9 +690,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc" +checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" dependencies = [ "memchr", ] diff --git a/src/clickhouse/mod.rs b/src/clickhouse/mod.rs index c1763dc6..a89d47f4 100644 --- a/src/clickhouse/mod.rs +++ b/src/clickhouse/mod.rs @@ -42,14 +42,15 @@ pub async fn init_client_with_database( user_id UInt64, project_id UInt64, + monetized Bool DEFAULT True, ip IPv6, country String, user_agent String, - headers Array(Tuple(String, String)), + headers Array(Tuple(String, String)) ) ENGINE = MergeTree() - PRIMARY KEY (project_id, recorded) + PRIMARY KEY (project_id, recorded, ip) " )) .execute() @@ -71,10 +72,10 @@ pub async fn init_client_with_database( ip IPv6, country String, user_agent String, - headers Array(Tuple(String, String)), + headers Array(Tuple(String, String)) ) ENGINE = MergeTree() - PRIMARY KEY (project_id, recorded) + PRIMARY KEY (project_id, recorded, ip) " )) .execute() @@ -94,10 +95,10 @@ pub async fn init_client_with_database( loader String, game_version String, - parent UInt64, + parent UInt64 ) ENGINE = MergeTree() - PRIMARY KEY (project_id, recorded) + PRIMARY KEY (project_id, recorded, user_id) " )) .execute() diff --git a/src/lib.rs b/src/lib.rs index 4c74d3ab..05827e7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ use util::cors::default_cors; use crate::queue::moderation::AutomatedModerationQueue; use crate::{ - // queue::payouts::process_payout, + queue::payouts::process_payout, search::indexing::index_projects, util::env::{parse_strings_from_var, parse_var}, }; @@ -214,25 +214,25 @@ pub fn app_setup( }); } - // { - // let pool_ref = pool.clone(); - // let redis_ref = redis_pool.clone(); - // let client_ref = clickhouse.clone(); - // scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || { - // let pool_ref = pool_ref.clone(); - // let redis_ref = redis_ref.clone(); - // let client_ref = client_ref.clone(); - // - // async move { - // info!("Started running payouts"); - // let result = process_payout(&pool_ref, &redis_ref, &client_ref).await; - // if let Err(e) = result { - // warn!("Payouts run failed: {:?}", e); - // } - // info!("Done running payouts"); - // } - // }); - // } + { + let pool_ref = pool.clone(); + let redis_ref = redis_pool.clone(); + let client_ref = clickhouse.clone(); + scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || { + let pool_ref = pool_ref.clone(); + let redis_ref = redis_ref.clone(); + let client_ref = client_ref.clone(); + + async move { + info!("Started running payouts"); + let result = process_payout(&pool_ref, &redis_ref, &client_ref).await; + if let Err(e) = result { + warn!("Payouts run failed: {:?}", e); + } + info!("Done running payouts"); + } + }); + } let ip_salt = Pepper { pepper: models::ids::Base62Id(models::ids::random_base62(11)).to_string(), diff --git a/src/models/v3/analytics.rs b/src/models/v3/analytics.rs index 669175ad..b59254a7 100644 --- a/src/models/v3/analytics.rs +++ b/src/models/v3/analytics.rs @@ -34,6 +34,8 @@ pub struct PageView { pub user_id: u64, // Modrinth Project ID (used for payouts) pub project_id: u64, + // whether this view will be monetized / counted for payouts + pub monetized: bool, // The below information is used exclusively for data aggregation and fraud detection // (ex: page view botting). diff --git a/src/queue/analytics.rs b/src/queue/analytics.rs index 0d0fa869..ecb300a2 100644 --- a/src/queue/analytics.rs +++ b/src/queue/analytics.rs @@ -5,12 +5,15 @@ use crate::routes::ApiError; use dashmap::{DashMap, DashSet}; use redis::cmd; use sqlx::PgPool; +use std::collections::HashMap; +use std::net::Ipv6Addr; const DOWNLOADS_NAMESPACE: &str = "downloads"; +const VIEWS_NAMESPACE: &str = "views"; pub struct AnalyticsQueue { - views_queue: DashSet, - downloads_queue: DashMap, + views_queue: DashMap<(u64, u64), Vec>, + downloads_queue: DashMap<(u64, u64), Download>, playtime_queue: DashSet, } @@ -24,26 +27,37 @@ impl Default for AnalyticsQueue { impl AnalyticsQueue { pub fn new() -> Self { AnalyticsQueue { - views_queue: DashSet::with_capacity(1000), + views_queue: DashMap::with_capacity(1000), downloads_queue: DashMap::with_capacity(1000), playtime_queue: DashSet::with_capacity(1000), } } - pub fn add_view(&self, page_view: PageView) { - self.views_queue.insert(page_view); + fn strip_ip(ip: Ipv6Addr) -> u64 { + if let Some(ip) = ip.to_ipv4_mapped() { + let octets = ip.octets(); + u64::from_be_bytes([octets[0], octets[1], octets[2], octets[3], 0, 0, 0, 0]) + } else { + let octets = ip.octets(); + u64::from_be_bytes([ + octets[0], octets[1], octets[2], octets[3], octets[4], octets[5], octets[6], + octets[7], + ]) + } } + pub fn add_view(&self, page_view: PageView) { + let ip_stripped = Self::strip_ip(page_view.ip); + + self.views_queue + .entry((ip_stripped, page_view.project_id)) + .or_default() + .push(page_view); + } pub fn add_download(&self, download: Download) { - let ip_stripped = if let Some(ip) = download.ip.to_ipv4_mapped() { - let octets = ip.octets(); - u64::from_be_bytes([0, 0, 0, 0, octets[0], octets[1], octets[2], octets[3]]) - } else { - let octets = download.ip.octets(); - u64::from_be_bytes([0, 0, 0, 0, octets[0], octets[1], octets[2], octets[3]]) - }; + let ip_stripped = Self::strip_ip(download.ip); self.downloads_queue - .insert(format!("{}-{}", ip_stripped, download.project_id), download); + .insert((ip_stripped, download.project_id), download); } pub fn add_playtime(&self, playtime: Playtime) { @@ -65,16 +79,6 @@ impl AnalyticsQueue { let playtime_queue = self.playtime_queue.clone(); self.playtime_queue.clear(); - if !views_queue.is_empty() { - let mut views = client.insert("views")?; - - for view in views_queue { - views.write(&view).await?; - } - - views.end().await?; - } - if !playtime_queue.is_empty() { let mut playtimes = client.insert("playtime")?; @@ -85,6 +89,78 @@ impl AnalyticsQueue { playtimes.end().await?; } + if !views_queue.is_empty() { + let mut views_keys = Vec::new(); + let mut raw_views = Vec::new(); + + for (key, views) in views_queue { + views_keys.push(key); + raw_views.push((views, true)); + } + + let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?; + + let results = cmd("MGET") + .arg( + views_keys + .iter() + .map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1)) + .collect::>(), + ) + .query_async::<_, Vec>>(&mut redis) + .await + .map_err(DatabaseError::CacheError)?; + + let mut pipe = redis::pipe(); + for (idx, count) in results.into_iter().enumerate() { + let key = &views_keys[idx]; + + let new_count = if let Some((views, monetized)) = raw_views.get_mut(idx) { + if let Some(count) = count { + println!("len: {} count: {}", views.len(), count); + + if count > 3 { + *monetized = false; + continue; + } + + if (count + views.len() as u32) > 3 { + *monetized = false; + } + + count + (views.len() as u32) + } else { + views.len() as u32 + } + } else { + 1 + }; + + pipe.atomic().set_ex( + format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1), + new_count, + 6 * 60 * 60, + ); + } + pipe.query_async(&mut *redis) + .await + .map_err(DatabaseError::CacheError)?; + + let mut views = client.insert("views")?; + + for (all_views, monetized) in raw_views { + for (idx, mut view) in all_views.into_iter().enumerate() { + if idx != 0 || !monetized { + view.monetized = false; + } + + views.write(&view).await?; + } + } + + views.end().await?; + } + if !downloads_queue.is_empty() { let mut downloads_keys = Vec::new(); let raw_downloads = DashMap::new(); @@ -100,7 +176,7 @@ impl AnalyticsQueue { .arg( downloads_keys .iter() - .map(|x| format!("{}:{}", DOWNLOADS_NAMESPACE, x)) + .map(|x| format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1)) .collect::>(), ) .query_async::<_, Vec>>(&mut redis) @@ -123,7 +199,7 @@ impl AnalyticsQueue { }; pipe.atomic().set_ex( - format!("{}:{}", DOWNLOADS_NAMESPACE, key), + format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1), new_count, 6 * 60 * 60, ); @@ -132,37 +208,46 @@ impl AnalyticsQueue { .await .map_err(DatabaseError::CacheError)?; - let version_ids = raw_downloads - .iter() - .map(|x| x.version_id as i64) - .collect::>(); - let project_ids = raw_downloads - .iter() - .map(|x| x.project_id as i64) - .collect::>(); - let mut transaction = pool.begin().await?; let mut downloads = client.insert("downloads")?; + let mut version_downloads: HashMap = HashMap::new(); + let mut project_downloads: HashMap = HashMap::new(); + for (_, download) in raw_downloads { + *version_downloads + .entry(download.version_id as i64) + .or_default() += 1; + *project_downloads + .entry(download.project_id as i64) + .or_default() += 1; + downloads.write(&download).await?; } - sqlx::query!( - "UPDATE versions - SET downloads = downloads + 1 - WHERE id = ANY($1)", - &version_ids + sqlx::query( + " + UPDATE versions v + SET downloads = v.downloads + x.amount + FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount) + WHERE v.id = x.id + ", ) + .bind(version_downloads.keys().copied().collect::>()) + .bind(version_downloads.values().copied().collect::>()) .execute(&mut *transaction) .await?; - sqlx::query!( - "UPDATE mods - SET downloads = downloads + 1 - WHERE id = ANY($1)", - &project_ids + sqlx::query( + " + UPDATE mods m + SET downloads = m.downloads + x.amount + FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount) + WHERE m.id = x.id + ", ) + .bind(project_downloads.keys().copied().collect::>()) + .bind(project_downloads.values().copied().collect::>()) .execute(&mut *transaction) .await?; diff --git a/src/queue/moderation.rs b/src/queue/moderation.rs index beeb51e3..72e401d7 100644 --- a/src/queue/moderation.rs +++ b/src/queue/moderation.rs @@ -511,6 +511,7 @@ impl AutomatedModerationQueue { " INSERT INTO moderation_external_files (sha1, external_license_id) SELECT * FROM UNNEST ($1::bytea[], $2::bigint[]) + ON CONFLICT (sha1) DO NOTHING ", &insert_hashes[..], &insert_ids[..] diff --git a/src/queue/payouts.rs b/src/queue/payouts.rs index 53ba2005..301f9435 100644 --- a/src/queue/payouts.rs +++ b/src/queue/payouts.rs @@ -6,6 +6,8 @@ use crate::util::env::parse_var; use crate::{database::redis::RedisPool, models::projects::MonetizationStatus}; use base64::Engine; use chrono::{DateTime, Datelike, Duration, Utc, Weekday}; +use dashmap::DashMap; +use futures::TryStreamExt; use reqwest::Method; use rust_decimal::Decimal; use serde::de::DeserializeOwned; @@ -548,7 +550,7 @@ pub async fn process_payout( r#" SELECT COUNT(1) page_views, project_id FROM views - WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) + WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE) GROUP BY project_id ORDER BY page_views DESC "#, @@ -557,7 +559,7 @@ pub async fn process_payout( .bind(end.timestamp()) .fetch_all::(), client - .query("SELECT COUNT(1) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0)") + .query("SELECT COUNT(1) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE)") .bind(start.timestamp()) .bind(end.timestamp()) .fetch_one::(), @@ -636,7 +638,13 @@ pub async fn process_payout( .map(|x| x.to_string()) .collect::>(), ) - .fetch_all(&mut *transaction) + .fetch(&mut *transaction) + .try_fold(DashMap::new(), |acc: DashMap>, r| { + acc.entry(r.id) + .or_default() + .insert(r.user_id, r.payouts_split); + async move { Ok(acc) } + }) .await?; let project_team_members = sqlx::query!( @@ -653,20 +661,27 @@ pub async fn process_payout( .map(|x| x.to_string()) .collect::>(), ) - .fetch_all(&mut *transaction) + .fetch(&mut *transaction) + .try_fold( + DashMap::new(), + |acc: DashMap>, r| { + acc.entry(r.id) + .or_default() + .insert(r.user_id, r.payouts_split); + async move { Ok(acc) } + }, + ) .await?; for project_id in project_ids { let team_members: HashMap = project_team_members - .iter() - .filter(|r| r.id == project_id) - .map(|r| (r.user_id, r.payouts_split)) - .collect(); + .remove(&project_id) + .unwrap_or((0, HashMap::new())) + .1; let org_team_members: HashMap = project_org_members - .iter() - .filter(|r| r.id == project_id) - .map(|r| (r.user_id, r.payouts_split)) - .collect(); + .remove(&project_id) + .unwrap_or((0, HashMap::new())) + .1; let mut all_team_members = vec![]; @@ -711,6 +726,7 @@ pub async fn process_payout( let mut clear_cache_users = Vec::new(); let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); + let (mut update_user_ids, mut update_user_balances) = (Vec::new(), Vec::new()); for (id, project) in projects_map { if let Some(value) = &multipliers.values.get(&(id as u64)) { let project_multiplier: Decimal = @@ -728,17 +744,8 @@ pub async fn process_payout( insert_payouts.push(payout); insert_starts.push(start); - sqlx::query!( - " - UPDATE users - SET balance = balance + $1 - WHERE id = $2 - ", - payout, - user_id - ) - .execute(&mut *transaction) - .await?; + update_user_ids.push(user_id); + update_user_balances.push(payout); clear_cache_users.push(user_id); } @@ -747,6 +754,19 @@ pub async fn process_payout( } } + sqlx::query( + " + UPDATE users u + SET balance = u.balance + v.amount + FROM unnest($1::BIGINT[], $2::NUMERIC[]) AS v(id, amount) + WHERE u.id = v.id + ", + ) + .bind(&update_user_ids) + .bind(&update_user_balances) + .execute(&mut *transaction) + .await?; + sqlx::query!( " INSERT INTO payouts_values (user_id, mod_id, amount, created) diff --git a/src/routes/analytics.rs b/src/routes/analytics.rs index 04db14be..1d28f863 100644 --- a/src/routes/analytics.rs +++ b/src/routes/analytics.rs @@ -118,6 +118,7 @@ pub async fn page_view_ingest( .into_iter() .filter(|x| !FILTERED_HEADERS.contains(&&*x.0)) .collect(), + monetized: true, }; if let Some(segments) = url.path_segments() {