Skip to content

Commit

Permalink
fix(tap-agent): update retry logic (#281)
Browse files Browse the repository at this point in the history
* fix(tap-agent): update retry logic

Signed-off-by: Gustavo Inacio <[email protected]>

* test(tap-agent): fix retry test

Signed-off-by: Gustavo Inacio <[email protected]>

---------

Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored Aug 7, 2024
1 parent feda99e commit 55c4ea1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 40 deletions.
80 changes: 49 additions & 31 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,18 @@ lazy_static! {
type RavMap = HashMap<Address, u128>;
type Balance = U256;

#[derive(Debug, Eq, PartialEq)]
pub enum ReceiptFees {
NewValue(UnaggregatedReceipts),
Retry,
}

#[derive(Debug)]
pub enum SenderAccountMessage {
UpdateBalanceAndLastRavs(Balance, RavMap),
UpdateAllocationIds(HashSet<Address>),
NewAllocationId(Address),
UpdateReceiptFees(Address, UnaggregatedReceipts),
UpdateReceiptFees(Address, ReceiptFees),
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
UpdateRav(SignedRAV),
#[cfg(test)]
Expand Down Expand Up @@ -551,19 +557,21 @@ impl Actor for SenderAccount {
state.add_to_denylist().await;
}
}
SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => {
UNAGGREGATED_FEES
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
.set(unaggregated_fees.value as f64);

SenderAccountMessage::UpdateReceiptFees(allocation_id, receipt_fees) => {
// If we're here because of a new receipt, abort any scheduled UpdateReceiptFees
if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() {
scheduled_rav_request.abort();
}

state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);
if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees {
state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);

UNAGGREGATED_FEES
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
.set(unaggregated_fees.value as f64);
}

// Eagerly deny the sender (if needed), before the RAV request. To be sure not to
// delay the denial because of the RAV request, which could take some time.
Expand Down Expand Up @@ -602,7 +610,7 @@ impl Actor for SenderAccount {
Some(myself.send_after(state.retry_interval, move || {
SenderAccountMessage::UpdateReceiptFees(
allocation_id,
unaggregated_fees,
ReceiptFees::Retry,
)
}));
}
Expand Down Expand Up @@ -760,7 +768,7 @@ impl Actor for SenderAccount {
// update the receipt fees by reseting to 0
myself.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
UnaggregatedReceipts::default(),
ReceiptFees::NewValue(UnaggregatedReceipts::default()),
))?;

// rav tracker is not updated because it's still not redeemed
Expand Down Expand Up @@ -805,6 +813,7 @@ impl Actor for SenderAccount {
#[cfg(test)]
pub mod tests {
use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
use crate::agent::sender_account::ReceiptFees;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
Expand Down Expand Up @@ -1014,16 +1023,18 @@ pub mod tests {
}

impl MockSenderAllocation {
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>) {
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
let triggered_rav_request = Arc::new(AtomicU32::new(0));
let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
triggered_rav_request: triggered_rav_request.clone(),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
next_unaggregated_fees_value: Arc::new(Mutex::new(0)),
next_unaggregated_fees_value: unaggregated_fees.clone(),
},
triggered_rav_request,
unaggregated_fees,
)
}

Expand Down Expand Up @@ -1120,18 +1131,24 @@ pub mod tests {
allocation: Address,
) -> (
Arc<AtomicU32>,
Arc<Mutex<u128>>,
ActorRef<SenderAllocationMessage>,
JoinHandle<()>,
) {
let (mock_sender_allocation, triggered_rav_request) =
let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
MockSenderAllocation::new_with_triggered_rav_request();

let name = format!("{}:{}:{}", prefix, sender, allocation);
let (sender_account, join_handle) =
MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())
.await
.unwrap();
(triggered_rav_request, sender_account, join_handle)
(
triggered_rav_request,
next_unaggregated_fees,
sender_account,
join_handle,
)
}

#[sqlx::test(migrations = "../migrations")]
Expand All @@ -1145,17 +1162,17 @@ pub mod tests {
)
.await;

let (triggered_rav_request, allocation, allocation_handle) =
let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;

// create a fake sender allocation
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE - 1,
last_id: 10,
},
}),
))
.unwrap();

Expand Down Expand Up @@ -1184,17 +1201,17 @@ pub mod tests {
)
.await;

let (triggered_rav_request, allocation, allocation_handle) =
let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;

// create a fake sender allocation
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 10,
},
}),
))
.unwrap();

Expand Down Expand Up @@ -1292,23 +1309,24 @@ pub mod tests {
)
.await;

let (triggered_rav_request, allocation, allocation_handle) =
let (triggered_rav_request, next_value, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
0
);
*next_value.lock().unwrap() = TRIGGER_VALUE;
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
},
}),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(200)).await;

let retry_value = triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst);
assert!(retry_value > 1, "It didn't retry more than once");
Expand Down Expand Up @@ -1348,10 +1366,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
},
}),
))
.unwrap();

Expand Down Expand Up @@ -1489,10 +1507,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
},
}),
))
.unwrap();

Expand Down Expand Up @@ -1690,10 +1708,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
},
}),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
19 changes: 10 additions & 9 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tap_core::{
use thegraph::types::Address;
use tracing::{error, warn};

use crate::lazy_static;
use crate::{agent::sender_account::ReceiptFees, lazy_static};

use crate::agent::sender_account::SenderAccountMessage;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Actor for SenderAllocation {

sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
state.unaggregated_fees.clone(),
ReceiptFees::NewValue(state.unaggregated_fees.clone()),
))?;

// update rav tracker for sender account
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Actor for SenderAllocation {
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
unaggregated_fees.clone(),
ReceiptFees::NewValue(unaggregated_fees.clone()),
))?;
}
}
Expand Down Expand Up @@ -696,7 +696,8 @@ pub mod tests {
};
use crate::{
agent::{
sender_account::SenderAccountMessage, sender_accounts_manager::NewReceiptNotification,
sender_account::{ReceiptFees, SenderAccountMessage},
sender_accounts_manager::NewReceiptNotification,
unaggregated_receipts::UnaggregatedReceipts,
},
config,
Expand Down Expand Up @@ -889,10 +890,10 @@ pub mod tests {
// Should emit a message to the sender account with the unaggregated fees.
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 10,
value: 55u128,
},
}),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 1);
Expand Down Expand Up @@ -988,10 +989,10 @@ pub mod tests {
// should emit update aggregate fees message to sender account
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 1,
value: 20,
},
}),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 2);
Expand Down Expand Up @@ -1106,7 +1107,7 @@ pub mod tests {
last_message_emitted.lock().unwrap().last(),
Some(&SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts::default()
ReceiptFees::NewValue(UnaggregatedReceipts::default())
))
);
}
Expand Down

0 comments on commit 55c4ea1

Please sign in to comment.