Skip to content

Commit

Permalink
refactor(tap-agent): use single http client
Browse files Browse the repository at this point in the history
We want to reuse the same connection or wait for the older
one in case we hit 2 rav requests in a row

Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored and aasseman committed Aug 1, 2024
1 parent cfaf02f commit 4c85b2d
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ pub struct SenderAllocationState {
tap_manager: TapManager,
allocation_id: Address,
sender: Address,
sender_aggregator_endpoint: String,
config: &'static config::Config,
escrow_accounts: Eventual<EscrowAccounts>,
domain_separator: Eip712Domain,
sender_account_ref: ActorRef<SenderAccountMessage>,

http_client: jsonrpsee::http_client::HttpClient,
}

pub struct SenderAllocationArgs {
Expand Down Expand Up @@ -151,7 +152,7 @@ impl Actor for SenderAllocation {
) -> std::result::Result<Self::State, ActorProcessingErr> {
let sender_account_ref = args.sender_account_ref.clone();
let allocation_id = args.allocation_id;
let mut state = SenderAllocationState::new(args).await;
let mut state = SenderAllocationState::new(args).await?;

// update invalid receipts
state.invalid_receipts_fees = state.calculate_invalid_receipts_fee().await?;
Expand Down Expand Up @@ -306,7 +307,7 @@ impl SenderAllocationState {
sender_aggregator_endpoint,
sender_account_ref,
}: SenderAllocationArgs,
) -> Self {
) -> anyhow::Result<Self> {
let required_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![
Arc::new(AllocationId::new(
sender,
Expand All @@ -333,20 +334,24 @@ impl SenderAllocationState {
Checks::new(required_checks),
);

Self {
let http_client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
.build(&sender_aggregator_endpoint)?;

Ok(Self {
pgpool,
tap_manager,
allocation_id,
sender,
sender_aggregator_endpoint,
config,
escrow_accounts,
domain_separator,
sender_account_ref: sender_account_ref.clone(),
unaggregated_fees: UnaggregatedReceipts::default(),
invalid_receipts_fees: UnaggregatedReceipts::default(),
latest_rav,
}
http_client,
})
}

/// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager
Expand Down Expand Up @@ -503,13 +508,9 @@ impl SenderAllocationState {
),
_ => e.into(),
})?;
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(
self.config.tap.rav_request_timeout_secs,
))
.build(&self.sender_aggregator_endpoint)?;
let rav_response_time_start = Instant::now();
let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = client
let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = self
.http_client
.request(
"aggregate_receipts",
rpc_params!(
Expand Down Expand Up @@ -1242,7 +1243,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let state = SenderAllocationState::new(args).await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
for i in 1..10 {
Expand All @@ -1264,7 +1265,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let state = SenderAllocationState::new(args).await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
for i in 1..10 {
Expand Down Expand Up @@ -1292,7 +1293,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let state = SenderAllocationState::new(args).await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add the RAV to the database.
// This RAV has timestamp 4. The sender_allocation should only consider receipts
Expand All @@ -1319,7 +1320,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let state = SenderAllocationState::new(args).await;
let state = SenderAllocationState::new(args).await.unwrap();

let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);

Expand All @@ -1345,7 +1346,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let mut state = SenderAllocationState::new(args).await;
let mut state = SenderAllocationState::new(args).await.unwrap();

let checks = Checks::new(vec![Arc::new(FailingCheck)]);

Expand Down Expand Up @@ -1376,7 +1377,7 @@ pub mod tests {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let state = SenderAllocationState::new(args).await;
let state = SenderAllocationState::new(args).await.unwrap();

// mark rav as final
let result = state.mark_rav_last().await;
Expand Down

0 comments on commit 4c85b2d

Please sign in to comment.