Skip to content

Commit

Permalink
refactor: [#1326] extract bittorrent_udp_tracker_core::services::conn…
Browse files Browse the repository at this point in the history
…ect::ConnectService
  • Loading branch information
josecelano committed Feb 28, 2025
1 parent ddfbcd2 commit e1d9aa4
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 46 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
callgrind.out
codecov.json
lcov.info
perf.data*
perf.data*
rustc-ice-*.txt
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"routable",
"rstest",
"rusqlite",
"rustc",
"RUSTDOCFLAGS",
"RUSTFLAGS",
"rustfmt",
Expand Down
5 changes: 4 additions & 1 deletion packages/udp-tracker-core/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::RwLock;
use torrust_tracker_configuration::{Core, UdpTracker};

use crate::services::banning::BanService;
use crate::services::connect::ConnectService;
use crate::{statistics, MAX_CONNECTION_ID_ERRORS_PER_IP};

pub struct UdpTrackerCoreContainer {
Expand All @@ -21,6 +22,7 @@ pub struct UdpTrackerCoreContainer {
pub udp_core_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
pub udp_core_stats_repository: Arc<statistics::repository::Repository>,
pub ban_service: Arc<RwLock<BanService>>,
pub connect_service: Arc<ConnectService>,
}

impl UdpTrackerCoreContainer {
Expand All @@ -39,8 +41,8 @@ impl UdpTrackerCoreContainer {
statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
let udp_core_stats_repository = Arc::new(udp_core_stats_repository);

let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender.clone()));

Arc::new(UdpTrackerCoreContainer {
core_config: tracker_core_container.core_config.clone(),
Expand All @@ -52,6 +54,7 @@ impl UdpTrackerCoreContainer {
udp_core_stats_event_sender: udp_core_stats_event_sender.clone(),
udp_core_stats_repository: udp_core_stats_repository.clone(),
ban_service: ban_service.clone(),
connect_service: connect_service.clone(),
})
}
}
93 changes: 64 additions & 29 deletions packages/udp-tracker-core/src/services/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,43 @@ use aquatic_udp_protocol::ConnectionId;
use crate::connection_cookie::{gen_remote_fingerprint, make};
use crate::statistics;

/// # Panics
/// The `ConnectService` is responsible for handling the `connect` requests.
///
/// IT will panic if there was an error making the connection cookie.
pub async fn handle_connect(
remote_addr: SocketAddr,
opt_udp_stats_event_sender: &Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
cookie_issue_time: f64,
) -> ConnectionId {
let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value");

if let Some(udp_stats_event_sender) = opt_udp_stats_event_sender.as_deref() {
match remote_addr {
SocketAddr::V4(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await;
}
SocketAddr::V6(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await;
}
/// It is responsible for generating the connection cookie and sending the
/// appropriate statistics events.
pub struct ConnectService {
pub opt_udp_core_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
}

impl ConnectService {
#[must_use]
pub fn new(opt_udp_core_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>) -> Self {
Self {
opt_udp_core_stats_event_sender,
}
}

connection_id
/// Handles a `connect` request.
///
/// # Panics
///
/// It will panic if there was an error making the connection cookie.
pub async fn handle_connect(&self, remote_addr: SocketAddr, cookie_issue_time: f64) -> ConnectionId {
let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value");

if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
match remote_addr {
SocketAddr::V4(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await;
}
SocketAddr::V6(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await;
}
}
}

connection_id
}
}

#[cfg(test)]
Expand All @@ -44,10 +59,10 @@ mod tests {
use mockall::predicate::eq;

use crate::connection_cookie::make;
use crate::services::connect::handle_connect;
use crate::services::connect::ConnectService;
use crate::services::tests::{
sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv4_socket_address, sample_ipv6_remote_addr,
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpStatsEventSender,
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender,
};
use crate::statistics;

Expand All @@ -56,7 +71,11 @@ mod tests {
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let response = handle_connect(sample_ipv4_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await;
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
.await;

assert_eq!(
response,
Expand All @@ -69,7 +88,11 @@ mod tests {
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let response = handle_connect(sample_ipv4_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await;
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
.await;

assert_eq!(
response,
Expand All @@ -82,7 +105,11 @@ mod tests {
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let response = handle_connect(sample_ipv6_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await;
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
.await;

assert_eq!(
response,
Expand All @@ -92,32 +119,40 @@ mod tests {

#[tokio::test]
async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() {
let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new();
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
udp_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::Udp4Connect))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
let udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));

let client_socket_address = sample_ipv4_socket_address();

handle_connect(client_socket_address, &udp_stats_event_sender, sample_issue_time()).await;
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));

connect_service
.handle_connect(client_socket_address, sample_issue_time())
.await;
}

#[tokio::test]
async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() {
let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new();
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
udp_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::Udp6Connect))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
let udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));

handle_connect(sample_ipv6_remote_addr(), &udp_stats_event_sender, sample_issue_time()).await;
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));

connect_service
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
.await;
}
}
}
4 changes: 2 additions & 2 deletions packages/udp-tracker-core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub(crate) mod tests {
}

mock! {
pub(crate) UdpStatsEventSender {}
impl statistics::event::sender::Sender for UdpStatsEventSender {
pub(crate) UdpCoreStatsEventSender {}
impl statistics::event::sender::Sender for UdpCoreStatsEventSender {
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
}
}
Expand Down
29 changes: 20 additions & 9 deletions packages/udp-tracker-server/src/handlers/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, ConnectionId, Response};
use bittorrent_udp_tracker_core::{services, statistics as core_statistics};
use bittorrent_udp_tracker_core::services::connect::ConnectService;
use tracing::{instrument, Level};

use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;

/// It handles the `Connect` request.
#[instrument(fields(transaction_id), skip(opt_udp_core_stats_event_sender, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))]
#[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))]
pub async fn handle_connect(
remote_addr: SocketAddr,
request: &ConnectRequest,
opt_udp_core_stats_event_sender: &Arc<Option<Box<dyn core_statistics::event::sender::Sender>>>,
connect_service: &Arc<ConnectService>,
opt_udp_server_stats_event_sender: &Arc<Option<Box<dyn server_statistics::event::sender::Sender>>>,
cookie_issue_time: f64,
) -> Response {
Expand All @@ -40,7 +40,7 @@ pub async fn handle_connect(
}
}

let connection_id = services::connect::handle_connect(remote_addr, opt_udp_core_stats_event_sender, cookie_issue_time).await;
let connection_id = connect_service.handle_connect(remote_addr, cookie_issue_time).await;

build_response(*request, connection_id)
}
Expand All @@ -64,6 +64,7 @@ mod tests {

use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId};
use bittorrent_udp_tracker_core::connection_cookie::make;
use bittorrent_udp_tracker_core::services::connect::ConnectService;
use bittorrent_udp_tracker_core::statistics as core_statistics;
use mockall::predicate::eq;

Expand Down Expand Up @@ -94,10 +95,12 @@ mod tests {
transaction_id: TransactionId(0i32.into()),
};

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = handle_connect(
sample_ipv4_remote_addr(),
&request,
&udp_core_stats_event_sender,
&connect_service,
&udp_server_stats_event_sender,
sample_issue_time(),
)
Expand Down Expand Up @@ -125,10 +128,12 @@ mod tests {
transaction_id: TransactionId(0i32.into()),
};

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = handle_connect(
sample_ipv4_remote_addr(),
&request,
&udp_core_stats_event_sender,
&connect_service,
&udp_server_stats_event_sender,
sample_issue_time(),
)
Expand Down Expand Up @@ -156,10 +161,12 @@ mod tests {
transaction_id: TransactionId(0i32.into()),
};

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = handle_connect(
sample_ipv6_remote_addr(),
&request,
&udp_core_stats_event_sender,
&connect_service,
&udp_server_stats_event_sender,
sample_issue_time(),
)
Expand Down Expand Up @@ -198,10 +205,12 @@ mod tests {

let client_socket_address = sample_ipv4_socket_address();

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

handle_connect(
client_socket_address,
&sample_connect_request(),
&udp_core_stats_event_sender,
&connect_service,
&udp_server_stats_event_sender,
sample_issue_time(),
)
Expand Down Expand Up @@ -230,10 +239,12 @@ mod tests {
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

handle_connect(
sample_ipv6_remote_addr(),
&sample_connect_request(),
&udp_core_stats_event_sender,
&connect_service,
&udp_server_stats_event_sender,
sample_issue_time(),
)
Expand Down
2 changes: 1 addition & 1 deletion packages/udp-tracker-server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub async fn handle_request(
Request::Connect(connect_request) => Ok(handle_connect(
remote_addr,
&connect_request,
&udp_tracker_core_container.udp_core_stats_event_sender,
&udp_tracker_core_container.connect_service,
&udp_tracker_server_container.udp_server_stats_event_sender,
cookie_time_values.issue_time,
)
Expand Down
10 changes: 7 additions & 3 deletions src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bittorrent_tracker_core::whitelist::manager::WhitelistManager;
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
use bittorrent_udp_tracker_core::services::banning::BanService;
use bittorrent_udp_tracker_core::services::connect::ConnectService;
use bittorrent_udp_tracker_core::{self, MAX_CONNECTION_ID_ERRORS_PER_IP};
use tokio::sync::RwLock;
use torrust_rest_tracker_api_core::container::TrackerHttpApiCoreContainer;
Expand All @@ -40,9 +41,10 @@ pub struct AppContainer {
pub torrents_manager: Arc<TorrentsManager>,

// UDP Tracker Core Services
pub ban_service: Arc<RwLock<BanService>>,
pub udp_core_stats_event_sender: Arc<Option<Box<dyn bittorrent_udp_tracker_core::statistics::event::sender::Sender>>>,
pub udp_core_stats_repository: Arc<bittorrent_udp_tracker_core::statistics::repository::Repository>,
pub ban_service: Arc<RwLock<BanService>>,
pub connect_service: Arc<bittorrent_udp_tracker_core::services::connect::ConnectService>,

// HTTP Tracker Core Services
pub http_stats_event_sender: Arc<Option<Box<dyn bittorrent_http_tracker_core::statistics::event::sender::Sender>>>,
Expand Down Expand Up @@ -86,8 +88,8 @@ impl AppContainer {
bittorrent_udp_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
let udp_core_stats_repository = Arc::new(udp_core_stats_repository);

let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender.clone()));

// UDP Tracker Server Services
let (udp_server_stats_event_sender, udp_server_stats_repository) =
Expand All @@ -111,9 +113,10 @@ impl AppContainer {
torrents_manager: tracker_core_container.torrents_manager,

// UDP Tracker Core Services
ban_service,
udp_core_stats_event_sender,
udp_core_stats_repository,
ban_service,
connect_service,

// HTTP Tracker Core Services
http_stats_event_sender,
Expand Down Expand Up @@ -156,6 +159,7 @@ impl AppContainer {
udp_core_stats_event_sender: self.udp_core_stats_event_sender.clone(),
udp_core_stats_repository: self.udp_core_stats_repository.clone(),
ban_service: self.ban_service.clone(),
connect_service: self.connect_service.clone(),
}
}

Expand Down

0 comments on commit e1d9aa4

Please sign in to comment.