Skip to content

Commit

Permalink
feat: add tracker for buffer unaggregated fees (#324)
Browse files Browse the repository at this point in the history
* feat: add tracker for buffer unaggregated fees

Signed-off-by: Gustavo Inacio <[email protected]>

* refactor: send just the value

Signed-off-by: Gustavo Inacio <[email protected]>

* test(tap-agent): buffer window fee

Signed-off-by: Gustavo Inacio <[email protected]>

* chore: update message for get heaviest allocation

Signed-off-by: Gustavo Inacio <[email protected]>

* chore: fix typo

Signed-off-by: Gustavo Inacio <[email protected]>

---------

Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored Oct 1, 2024
1 parent 57c89e2 commit 676a437
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 58 deletions.
96 changes: 63 additions & 33 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type Balance = U256;

#[derive(Debug, Eq, PartialEq)]
pub enum ReceiptFees {
NewValue(UnaggregatedReceipts),
NewReceipt(u128),
UpdateValue(UnaggregatedReceipts),
Retry,
}

Expand Down Expand Up @@ -198,8 +199,13 @@ impl State {
async fn rav_requester_single(&mut self) -> Result<()> {
let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else {
anyhow::bail!(
"Error while getting the heaviest allocation because \
no unblocked allocation has enough unaggregated fees tracked"
"Error while getting the heaviest allocation, \
this is due one of the following reasons: \n
1. allocations have too much fees under their buffer\n
2. allocations are blocked to be redeemed due to ongoing last rav. \n
If you keep seeing this message try to increase your `amount_willing_to_lose` \
and restart your `tap-agent`\n
If this doesn't work, open an issue on our Github."
);
};
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
Expand Down Expand Up @@ -478,7 +484,9 @@ impl Actor for SenderAccount {
.set(config.tap.rav_request_trigger_value as f64);

let state = State {
sender_fee_tracker: SenderFeeTracker::default(),
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
config.tap.rav_request_timestamp_buffer_ms,
)),
rav_tracker: SenderFeeTracker::default(),
invalid_receipts_tracker: SenderFeeTracker::default(),
allocation_ids: allocation_ids.clone(),
Expand Down Expand Up @@ -564,14 +572,30 @@ impl Actor for SenderAccount {
scheduled_rav_request.abort();
}

if let ReceiptFees::NewValue(unaggregated_fees) = receipt_fees {
state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);
match receipt_fees {
ReceiptFees::NewReceipt(value) => {
state.sender_fee_tracker.add(allocation_id, value);

UNAGGREGATED_FEES
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
.set(unaggregated_fees.value as f64);
UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.add(value as f64);
}
ReceiptFees::UpdateValue(unaggregated_fees) => {
state
.sender_fee_tracker
.update(allocation_id, unaggregated_fees.value);

UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(unaggregated_fees.value as f64);
}
ReceiptFees::Retry => {}
}

// Eagerly deny the sender (if needed), before the RAV request. To be sure not to
Expand All @@ -582,7 +606,7 @@ impl Actor for SenderAccount {
state.add_to_denylist().await;
}

if state.sender_fee_tracker.get_total_fee()
if state.sender_fee_tracker.get_total_fee_outside_buffer()
>= state.config.tap.rav_request_trigger_value
{
tracing::debug!(
Expand Down Expand Up @@ -769,7 +793,7 @@ impl Actor for SenderAccount {
// update the receipt fees by reseting to 0
myself.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::NewValue(UnaggregatedReceipts::default()),
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
))?;

// rav tracker is not updated because it's still not redeemed
Expand Down Expand Up @@ -884,6 +908,7 @@ pub mod tests {
const DUMMY_URL: &str = "http://localhost:1234";
const TRIGGER_VALUE: u128 = 500;
const ESCROW_VALUE: u128 = 1000;
const BUFFER_MS: u64 = 100;

async fn create_sender_account(
pgpool: PgPool,
Expand All @@ -904,7 +929,7 @@ pub mod tests {
},
tap: config::Tap {
rav_request_trigger_value,
rav_request_timestamp_buffer_ms: 1,
rav_request_timestamp_buffer_ms: BUFFER_MS,
rav_request_timeout_secs: 5,
max_unnaggregated_fees_per_sender,
..Default::default()
Expand Down Expand Up @@ -1191,14 +1216,11 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE - 1,
last_id: 10,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1),
))
.unwrap();

tokio::time::sleep(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
Expand Down Expand Up @@ -1230,10 +1252,24 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 10,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();

tokio::time::sleep(Duration::from_millis(20)).await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
0
);

// wait for it to be outside buffer
tokio::time::sleep(Duration::from_millis(BUFFER_MS)).await;

sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::Retry,
))
.unwrap();

Expand Down Expand Up @@ -1342,10 +1378,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
Expand Down Expand Up @@ -1388,7 +1421,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
}),
Expand Down Expand Up @@ -1529,7 +1562,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
value: $value,
last_id: 11,
}),
Expand Down Expand Up @@ -1730,10 +1763,7 @@ pub mod tests {
sender_account
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
value: TRIGGER_VALUE,
last_id: 11,
}),
ReceiptFees::NewReceipt(TRIGGER_VALUE),
))
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
2 changes: 1 addition & 1 deletion tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lazy_static! {
.unwrap();
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, PartialEq, Eq)]
pub struct NewReceiptNotification {
pub id: u64,
pub allocation_id: Address,
Expand Down
20 changes: 9 additions & 11 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl Actor for SenderAllocation {

sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::NewValue(state.unaggregated_fees.clone()),
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
))?;

// update rav tracker for sender account
Expand Down Expand Up @@ -225,9 +225,10 @@ impl Actor for SenderAllocation {
);
let unaggregated_fees = &mut state.unaggregated_fees;
match message {
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
id, value: fees, ..
}) => {
SenderAllocationMessage::NewReceipt(notification) => {
let NewReceiptNotification {
id, value: fees, ..
} = notification;
if id > unaggregated_fees.last_id {
unaggregated_fees.last_id = id;
unaggregated_fees.value = unaggregated_fees
Expand All @@ -248,7 +249,7 @@ impl Actor for SenderAllocation {
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
ReceiptFees::NewValue(unaggregated_fees.clone()),
ReceiptFees::NewReceipt(fees),
))?;
}
}
Expand Down Expand Up @@ -1014,7 +1015,7 @@ pub mod tests {
// Should emit a message to the sender account with the unaggregated fees.
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
ReceiptFees::UpdateValue(UnaggregatedReceipts {
last_id: 10,
value: 55u128,
}),
Expand Down Expand Up @@ -1113,10 +1114,7 @@ pub mod tests {
// should emit update aggregate fees message to sender account
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts {
last_id: 1,
value: 20,
}),
ReceiptFees::NewReceipt(20u128),
);
let last_message_emitted = last_message_emitted.lock().unwrap();
assert_eq!(last_message_emitted.len(), 2);
Expand Down Expand Up @@ -1231,7 +1229,7 @@ pub mod tests {
last_message_emitted.lock().unwrap().last(),
Some(&SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewValue(UnaggregatedReceipts::default())
ReceiptFees::UpdateValue(UnaggregatedReceipts::default())
))
);
}
Expand Down
Loading

0 comments on commit 676a437

Please sign in to comment.