diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs
index 76501629..0c4803fa 100644
--- a/tap-agent/src/agent/sender_account.rs
+++ b/tap-agent/src/agent/sender_account.rs
@@ -71,12 +71,18 @@ lazy_static! {
type RavMap = HashMap
;
type Balance = U256;
+#[derive(Debug, Eq, PartialEq)]
+pub enum ReceiptFees {
+ NewValue(UnaggregatedReceipts),
+ Retry,
+}
+
#[derive(Debug)]
pub enum SenderAccountMessage {
UpdateBalanceAndLastRavs(Balance, RavMap),
UpdateAllocationIds(HashSet),
NewAllocationId(Address),
- UpdateReceiptFees(Address, UnaggregatedReceipts),
+ UpdateReceiptFees(Address, ReceiptFees),
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
UpdateRav(SignedRAV),
#[cfg(test)]
@@ -551,19 +557,21 @@ impl Actor for SenderAccount {
state.add_to_denylist().await;
}
}
- SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => {
- UNAGGREGATED_FEES
- .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
- .set(unaggregated_fees.value as f64);
-
+ SenderAccountMessage::UpdateReceiptFees(allocation_id, receipt_fees) => {
// If we're here because of a new receipt, abort any scheduled UpdateReceiptFees
if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() {
scheduled_rav_request.abort();
}
- state
- .sender_fee_tracker
- .update(allocation_id, unaggregated_fees.value);
+ if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees {
+ state
+ .sender_fee_tracker
+ .update(allocation_id, unaggregated_fees.value);
+
+ UNAGGREGATED_FEES
+ .with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
+ .set(unaggregated_fees.value as f64);
+ }
// Eagerly deny the sender (if needed), before the RAV request. To be sure not to
// delay the denial because of the RAV request, which could take some time.
@@ -602,7 +610,7 @@ impl Actor for SenderAccount {
Some(myself.send_after(state.retry_interval, move || {
SenderAccountMessage::UpdateReceiptFees(
allocation_id,
- unaggregated_fees,
+ ReceiptFees::Retry,
)
}));
}
@@ -760,7 +768,7 @@ impl Actor for SenderAccount {
// update the receipt fees by reseting to 0
myself.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
- UnaggregatedReceipts::default(),
+ ReceiptFees::NewValue(UnaggregatedReceipts::default()),
))?;
// rav tracker is not updated because it's still not redeemed
@@ -805,6 +813,7 @@ impl Actor for SenderAccount {
#[cfg(test)]
pub mod tests {
use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
+ use crate::agent::sender_account::ReceiptFees;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
@@ -1014,16 +1023,18 @@ pub mod tests {
}
impl MockSenderAllocation {
- pub fn new_with_triggered_rav_request() -> (Self, Arc) {
+ pub fn new_with_triggered_rav_request() -> (Self, Arc, Arc>) {
let triggered_rav_request = Arc::new(AtomicU32::new(0));
+ let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
triggered_rav_request: triggered_rav_request.clone(),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
- next_unaggregated_fees_value: Arc::new(Mutex::new(0)),
+ next_unaggregated_fees_value: unaggregated_fees.clone(),
},
triggered_rav_request,
+ unaggregated_fees,
)
}
@@ -1120,10 +1131,11 @@ pub mod tests {
allocation: Address,
) -> (
Arc,
+ Arc>,
ActorRef,
JoinHandle<()>,
) {
- let (mock_sender_allocation, triggered_rav_request) =
+ let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
MockSenderAllocation::new_with_triggered_rav_request();
let name = format!("{}:{}:{}", prefix, sender, allocation);
@@ -1131,7 +1143,12 @@ pub mod tests {
MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())
.await
.unwrap();
- (triggered_rav_request, sender_account, join_handle)
+ (
+ triggered_rav_request,
+ next_unaggregated_fees,
+ sender_account,
+ join_handle,
+ )
}
#[sqlx::test(migrations = "../migrations")]
@@ -1145,17 +1162,17 @@ pub mod tests {
)
.await;
- let (triggered_rav_request, allocation, allocation_handle) =
+ let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
// create a fake sender allocation
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE - 1,
last_id: 10,
- },
+ }),
))
.unwrap();
@@ -1184,17 +1201,17 @@ pub mod tests {
)
.await;
- let (triggered_rav_request, allocation, allocation_handle) =
+ let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
// create a fake sender allocation
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 10,
- },
+ }),
))
.unwrap();
@@ -1292,23 +1309,24 @@ pub mod tests {
)
.await;
- let (triggered_rav_request, allocation, allocation_handle) =
+ let (triggered_rav_request, next_value, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
0
);
+ *next_value.lock().unwrap() = TRIGGER_VALUE;
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
- },
+ }),
))
.unwrap();
- tokio::time::sleep(Duration::from_millis(100)).await;
+ tokio::time::sleep(Duration::from_millis(200)).await;
let retry_value = triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst);
assert!(retry_value > 1, "It didn't retry more than once");
@@ -1348,10 +1366,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
- },
+ }),
))
.unwrap();
@@ -1489,10 +1507,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
- },
+ }),
))
.unwrap();
@@ -1690,10 +1708,10 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
- },
+ }),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs
index a845ee8f..d8a85499 100644
--- a/tap-agent/src/agent/sender_allocation.rs
+++ b/tap-agent/src/agent/sender_allocation.rs
@@ -30,7 +30,7 @@ use tap_core::{
use thegraph::types::Address;
use tracing::{error, warn};
-use crate::lazy_static;
+use crate::{agent::sender_account::ReceiptFees, lazy_static};
use crate::agent::sender_account::SenderAccountMessage;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
@@ -140,7 +140,7 @@ impl Actor for SenderAllocation {
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
- state.unaggregated_fees.clone(),
+ ReceiptFees::NewValue(state.unaggregated_fees.clone()),
))?;
// update rav tracker for sender account
@@ -227,7 +227,7 @@ impl Actor for SenderAllocation {
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
- unaggregated_fees.clone(),
+ ReceiptFees::NewValue(unaggregated_fees.clone()),
))?;
}
}
@@ -696,7 +696,8 @@ pub mod tests {
};
use crate::{
agent::{
- sender_account::SenderAccountMessage, sender_accounts_manager::NewReceiptNotification,
+ sender_account::{ReceiptFees, SenderAccountMessage},
+ sender_accounts_manager::NewReceiptNotification,
unaggregated_receipts::UnaggregatedReceipts,
},
config,
@@ -889,10 +890,10 @@ pub mod tests {
// Should emit a message to the sender account with the unaggregated fees.
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 10,
value: 55u128,
- },
+ }),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 1);
@@ -988,10 +989,10 @@ pub mod tests {
// should emit update aggregate fees message to sender account
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts {
+ ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 1,
value: 20,
- },
+ }),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 2);
@@ -1106,7 +1107,7 @@ pub mod tests {
last_message_emitted.lock().unwrap().last(),
Some(&SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
- UnaggregatedReceipts::default()
+ ReceiptFees::NewValue(UnaggregatedReceipts::default())
))
);
}