From 4c85b2d3865c1757faf1a60795220bfff97ec3b3 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 1 Aug 2024 10:20:56 -0300 Subject: [PATCH] refactor(tap-agent): use single http client We want to reuse the same connection or wait for the older one in case we hit 2 rav requests in a row Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_allocation.rs | 37 ++++++++++++------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 0ed23430..77a593c4 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -110,11 +110,12 @@ pub struct SenderAllocationState { tap_manager: TapManager, allocation_id: Address, sender: Address, - sender_aggregator_endpoint: String, config: &'static config::Config, escrow_accounts: Eventual, domain_separator: Eip712Domain, sender_account_ref: ActorRef, + + http_client: jsonrpsee::http_client::HttpClient, } pub struct SenderAllocationArgs { @@ -151,7 +152,7 @@ impl Actor for SenderAllocation { ) -> std::result::Result { let sender_account_ref = args.sender_account_ref.clone(); let allocation_id = args.allocation_id; - let mut state = SenderAllocationState::new(args).await; + let mut state = SenderAllocationState::new(args).await?; // update invalid receipts state.invalid_receipts_fees = state.calculate_invalid_receipts_fee().await?; @@ -306,7 +307,7 @@ impl SenderAllocationState { sender_aggregator_endpoint, sender_account_ref, }: SenderAllocationArgs, - ) -> Self { + ) -> anyhow::Result { let required_checks: Vec> = vec![ Arc::new(AllocationId::new( sender, @@ -333,12 +334,15 @@ impl SenderAllocationState { Checks::new(required_checks), ); - Self { + let http_client = HttpClientBuilder::default() + .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs)) + .build(&sender_aggregator_endpoint)?; + + Ok(Self { pgpool, tap_manager, allocation_id, sender, - sender_aggregator_endpoint, config, escrow_accounts, domain_separator, @@ -346,7 +350,8 @@ impl SenderAllocationState { unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), latest_rav, - } + http_client, + }) } /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager @@ -503,13 +508,9 @@ impl SenderAllocationState { ), _ => e.into(), })?; - let client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs( - self.config.tap.rav_request_timeout_secs, - )) - .build(&self.sender_aggregator_endpoint)?; let rav_response_time_start = Instant::now(); - let response: JsonRpcResponse> = client + let response: JsonRpcResponse> = self + .http_client .request( "aggregate_receipts", rpc_params!( @@ -1242,7 +1243,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let state = SenderAllocationState::new(args).await; + let state = SenderAllocationState::new(args).await.unwrap(); // Add receipts to the database. for i in 1..10 { @@ -1264,7 +1265,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let state = SenderAllocationState::new(args).await; + let state = SenderAllocationState::new(args).await.unwrap(); // Add receipts to the database. for i in 1..10 { @@ -1292,7 +1293,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let state = SenderAllocationState::new(args).await; + let state = SenderAllocationState::new(args).await.unwrap(); // Add the RAV to the database. // This RAV has timestamp 4. The sender_allocation should only consider receipts @@ -1319,7 +1320,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let state = SenderAllocationState::new(args).await; + let state = SenderAllocationState::new(args).await.unwrap(); let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10); @@ -1345,7 +1346,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let mut state = SenderAllocationState::new(args).await; + let mut state = SenderAllocationState::new(args).await.unwrap(); let checks = Checks::new(vec![Arc::new(FailingCheck)]); @@ -1376,7 +1377,7 @@ pub mod tests { let args = create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) .await; - let state = SenderAllocationState::new(args).await; + let state = SenderAllocationState::new(args).await.unwrap(); // mark rav as final let result = state.mark_rav_last().await;