From 55c4ea1eb69160d75446bcd6a9ab2d785d607935 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 7 Aug 2024 17:26:45 -0600 Subject: [PATCH] fix(tap-agent): update retry logic (#281) * fix(tap-agent): update retry logic Signed-off-by: Gustavo Inacio * test(tap-agent): fix retry test Signed-off-by: Gustavo Inacio --------- Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_account.rs | 80 +++++++++++++++--------- tap-agent/src/agent/sender_allocation.rs | 19 +++--- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 76501629..0c4803fa 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -71,12 +71,18 @@ lazy_static! { type RavMap = HashMap; type Balance = U256; +#[derive(Debug, Eq, PartialEq)] +pub enum ReceiptFees { + NewValue(UnaggregatedReceipts), + Retry, +} + #[derive(Debug)] pub enum SenderAccountMessage { UpdateBalanceAndLastRavs(Balance, RavMap), UpdateAllocationIds(HashSet
), NewAllocationId(Address), - UpdateReceiptFees(Address, UnaggregatedReceipts), + UpdateReceiptFees(Address, ReceiptFees), UpdateInvalidReceiptFees(Address, UnaggregatedReceipts), UpdateRav(SignedRAV), #[cfg(test)] @@ -551,19 +557,21 @@ impl Actor for SenderAccount { state.add_to_denylist().await; } } - SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => { - UNAGGREGATED_FEES - .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]) - .set(unaggregated_fees.value as f64); - + SenderAccountMessage::UpdateReceiptFees(allocation_id, receipt_fees) => { // If we're here because of a new receipt, abort any scheduled UpdateReceiptFees if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() { scheduled_rav_request.abort(); } - state - .sender_fee_tracker - .update(allocation_id, unaggregated_fees.value); + if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees { + state + .sender_fee_tracker + .update(allocation_id, unaggregated_fees.value); + + UNAGGREGATED_FEES + .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]) + .set(unaggregated_fees.value as f64); + } // Eagerly deny the sender (if needed), before the RAV request. To be sure not to // delay the denial because of the RAV request, which could take some time. @@ -602,7 +610,7 @@ impl Actor for SenderAccount { Some(myself.send_after(state.retry_interval, move || { SenderAccountMessage::UpdateReceiptFees( allocation_id, - unaggregated_fees, + ReceiptFees::Retry, ) })); } @@ -760,7 +768,7 @@ impl Actor for SenderAccount { // update the receipt fees by reseting to 0 myself.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - UnaggregatedReceipts::default(), + ReceiptFees::NewValue(UnaggregatedReceipts::default()), ))?; // rav tracker is not updated because it's still not redeemed @@ -805,6 +813,7 @@ impl Actor for SenderAccount { #[cfg(test)] pub mod tests { use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage}; + use crate::agent::sender_account::ReceiptFees; use crate::agent::sender_accounts_manager::NewReceiptNotification; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; @@ -1014,16 +1023,18 @@ pub mod tests { } impl MockSenderAllocation { - pub fn new_with_triggered_rav_request() -> (Self, Arc) { + pub fn new_with_triggered_rav_request() -> (Self, Arc, Arc>) { let triggered_rav_request = Arc::new(AtomicU32::new(0)); + let unaggregated_fees = Arc::new(Mutex::new(0)); ( Self { triggered_rav_request: triggered_rav_request.clone(), receipts: Arc::new(Mutex::new(Vec::new())), next_rav_value: Arc::new(Mutex::new(0)), - next_unaggregated_fees_value: Arc::new(Mutex::new(0)), + next_unaggregated_fees_value: unaggregated_fees.clone(), }, triggered_rav_request, + unaggregated_fees, ) } @@ -1120,10 +1131,11 @@ pub mod tests { allocation: Address, ) -> ( Arc, + Arc>, ActorRef, JoinHandle<()>, ) { - let (mock_sender_allocation, triggered_rav_request) = + let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) = MockSenderAllocation::new_with_triggered_rav_request(); let name = format!("{}:{}:{}", prefix, sender, allocation); @@ -1131,7 +1143,12 @@ pub mod tests { MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ()) .await .unwrap(); - (triggered_rav_request, sender_account, join_handle) + ( + triggered_rav_request, + next_unaggregated_fees, + sender_account, + join_handle, + ) } #[sqlx::test(migrations = "../migrations")] @@ -1145,17 +1162,17 @@ pub mod tests { ) .await; - let (triggered_rav_request, allocation, allocation_handle) = + let (triggered_rav_request, _, allocation, allocation_handle) = create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; // create a fake sender allocation sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: TRIGGER_VALUE - 1, last_id: 10, - }, + }), )) .unwrap(); @@ -1184,17 +1201,17 @@ pub mod tests { ) .await; - let (triggered_rav_request, allocation, allocation_handle) = + let (triggered_rav_request, _, allocation, allocation_handle) = create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; // create a fake sender allocation sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: TRIGGER_VALUE, last_id: 10, - }, + }), )) .unwrap(); @@ -1292,23 +1309,24 @@ pub mod tests { ) .await; - let (triggered_rav_request, allocation, allocation_handle) = + let (triggered_rav_request, next_value, allocation, allocation_handle) = create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; assert_eq!( triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst), 0 ); + *next_value.lock().unwrap() = TRIGGER_VALUE; sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: TRIGGER_VALUE, last_id: 11, - }, + }), )) .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; let retry_value = triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst); assert!(retry_value > 1, "It didn't retry more than once"); @@ -1348,10 +1366,10 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: $value, last_id: 11, - }, + }), )) .unwrap(); @@ -1489,10 +1507,10 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: $value, last_id: 11, - }, + }), )) .unwrap(); @@ -1690,10 +1708,10 @@ pub mod tests { sender_account .cast(SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { value: TRIGGER_VALUE, last_id: 11, - }, + }), )) .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index a845ee8f..d8a85499 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -30,7 +30,7 @@ use tap_core::{ use thegraph::types::Address; use tracing::{error, warn}; -use crate::lazy_static; +use crate::{agent::sender_account::ReceiptFees, lazy_static}; use crate::agent::sender_account::SenderAccountMessage; use crate::agent::sender_accounts_manager::NewReceiptNotification; @@ -140,7 +140,7 @@ impl Actor for SenderAllocation { sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees( allocation_id, - state.unaggregated_fees.clone(), + ReceiptFees::NewValue(state.unaggregated_fees.clone()), ))?; // update rav tracker for sender account @@ -227,7 +227,7 @@ impl Actor for SenderAllocation { .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( state.allocation_id, - unaggregated_fees.clone(), + ReceiptFees::NewValue(unaggregated_fees.clone()), ))?; } } @@ -696,7 +696,8 @@ pub mod tests { }; use crate::{ agent::{ - sender_account::SenderAccountMessage, sender_accounts_manager::NewReceiptNotification, + sender_account::{ReceiptFees, SenderAccountMessage}, + sender_accounts_manager::NewReceiptNotification, unaggregated_receipts::UnaggregatedReceipts, }, config, @@ -889,10 +890,10 @@ pub mod tests { // Should emit a message to the sender account with the unaggregated fees. let expected_message = SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { last_id: 10, value: 55u128, - }, + }), ); let last_message_emitted = last_message_emitted.lock().unwrap(); assert_eq!(last_message_emitted.len(), 1); @@ -988,10 +989,10 @@ pub mod tests { // should emit update aggregate fees message to sender account let expected_message = SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { + ReceiptFees::NewValue(UnaggregatedReceipts { last_id: 1, value: 20, - }, + }), ); let last_message_emitted = last_message_emitted.lock().unwrap(); assert_eq!(last_message_emitted.len(), 2); @@ -1106,7 +1107,7 @@ pub mod tests { last_message_emitted.lock().unwrap().last(), Some(&SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts::default() + ReceiptFees::NewValue(UnaggregatedReceipts::default()) )) ); }