From f4c3349ef526beeca688123d02052bc5b558cd7d Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Thu, 28 Apr 2022 08:28:56 +0200 Subject: [PATCH] Replace println calls with log crate macros (#43) * Add log crate * Replace println with log macros * Add env-logger Add env-logger and configure with log level filter debug if the logger is not configured via the RUST_LOG env variable. --- Cargo.lock | 76 ++++++++++++++++++++++++++++++++++++ mqtt-v5-broker/Cargo.toml | 8 ++-- mqtt-v5-broker/src/broker.rs | 9 +++-- mqtt-v5-broker/src/client.rs | 15 +++---- mqtt-v5-broker/src/main.rs | 33 +++++++++++----- 5 files changed, 117 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d87ae27..678bf87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,26 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "0.1.7" @@ -70,6 +90,19 @@ dependencies = [ "syn", ] +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -196,6 +229,12 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "libc" version = "0.2.101" @@ -256,7 +295,9 @@ name = "mqtt-v5-broker" version = "0.1.0" dependencies = [ "bytes", + "env_logger", "futures", + "log", "mqtt-v5", "nanoid", "tokio", @@ -526,6 +567,23 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "regex" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "serde" version = "1.0.129" @@ -555,6 +613,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + [[package]] name = "tokio" version = "1.10.1" @@ -647,6 +714,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/mqtt-v5-broker/Cargo.toml b/mqtt-v5-broker/Cargo.toml index a32943e..6204d8d 100644 --- a/mqtt-v5-broker/Cargo.toml +++ b/mqtt-v5-broker/Cargo.toml @@ -10,8 +10,10 @@ edition = "2018" [dependencies] bytes = "1" -tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] } -tokio-util = { version = "0.6", features = ["codec"] } +futures = "0.3" +log = "0.4" mqtt-v5 = { path = "../mqtt-v5", version = "0.2" } nanoid = "0.3" -futures = "0.3" +tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] } +tokio-util = { version = "0.6", features = ["codec"] } +env_logger = "0.8.4" diff --git a/mqtt-v5-broker/src/broker.rs b/mqtt-v5-broker/src/broker.rs index bbc5131..b9826b6 100644 --- a/mqtt-v5-broker/src/broker.rs +++ b/mqtt-v5-broker/src/broker.rs @@ -1,4 +1,5 @@ use crate::{client::ClientMessage, tree::SubscriptionTree}; +use log::{info, warn}; use mqtt_v5::{ topic::TopicFilter, types::{ @@ -155,7 +156,7 @@ impl Session { async fn send(&mut self, message: ClientMessage) { if let Some(ref client_sender) = self.client_sender { if client_sender.send(message).await.is_err() { - println!("Failed to send message to client. Dropping sender"); + warn!("Failed to send message to client. Dropping sender"); self.client_sender.take(); } } @@ -216,7 +217,7 @@ impl Broker { if let Err(e) = client_sender .try_send(ClientMessage::Disconnect(DisconnectReason::SessionTakenOver)) { - println!("Failed to send disconnect packet to taken-over session - {:?}", e); + warn!("Failed to send disconnect packet to taken-over session - {:?}", e); } } @@ -269,7 +270,7 @@ impl Broker { existing_session.resend_packets().await; } - println!( + info!( "Client ID {} connected (Version: {:?})", connect_packet.client_id, connect_packet.protocol_version ); @@ -441,7 +442,7 @@ impl Broker { } fn handle_disconnect(&mut self, client_id: String, will_disconnect_logic: WillDisconnectLogic) { - println!("Client ID {} disconnected", client_id); + info!("Client ID {} disconnected", client_id); let mut disconnect_will = None; let mut session_expiry_duration = None; diff --git a/mqtt-v5-broker/src/client.rs b/mqtt-v5-broker/src/client.rs index b9ec07c..c4c5f77 100644 --- a/mqtt-v5-broker/src/client.rs +++ b/mqtt-v5-broker/src/client.rs @@ -3,6 +3,7 @@ use futures::{ future::{self, Either}, stream, Sink, SinkExt, Stream, StreamExt, }; +use log::{debug, info, trace, warn}; use mqtt_v5::types::{ DecodeError, DisconnectPacket, DisconnectReason, EncodeError, Packet, ProtocolError, ProtocolVersion, QoS, @@ -38,7 +39,7 @@ impl + Unpin, SI: Sink { @@ -252,7 +253,7 @@ impl + Unpin, SI: Sink {}, }, Err(err) => { - println!("Error while reading frame: {:?}", err); + warn!("Error while reading frame: {:?}", err); break; }, } @@ -284,10 +285,10 @@ impl + Unpin, SI: Sink + Unpin, SI: Sink (), Ok(Err(e)) => { - println!("Failed to write to client client socket: {:?}", e); + warn!("Failed to write to client client socket: {:?}", e); return; }, Err(_) => { - println!("Timeout during client socket write. Disconnecting"); + warn!("Timeout during client socket write. Disconnecting"); return; }, } @@ -329,6 +330,6 @@ impl + Unpin, SI: Sink) { - println!("Handling a client"); + debug!("Handling client {:?}", stream.peer_addr()); let (sink, stream) = Framed::new(stream, MqttCodec::new()).split(); let unconnected_client = UnconnectedClient::new(stream, sink, broker_tx); @@ -33,7 +36,7 @@ async fn client_handler(stream: TcpStream, broker_tx: Sender) { let connected_client = match unconnected_client.handshake().await { Ok(connected_client) => connected_client, Err(err) => { - println!("Protocol error during connection handshake: {:?}", err); + warn!("Protocol error during connection handshake: {:?}", err); return; }, }; @@ -60,7 +63,7 @@ async fn upgrade_stream(stream: TcpStream) -> Framed } async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender) { - println!("Handling a WebSocket client"); + debug!("Handling WebSocket client {:?}", stream.peer_addr()); let ws_framed = upgrade_stream(stream).await; @@ -115,7 +118,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender { - println!("Error while reading from WebSocket stream: {:?}", e); + debug!("Error while reading from WebSocket stream: {:?}", e); // If we had a decode error in the WebSocket layer, // propagate the it along the stream return Some(( @@ -153,7 +156,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender connected_client, Err(err) => { - println!("Protocol error during connection handshake: {:?}", err); + warn!("Protocol error during connection handshake: {:?}", err); return; }, }; @@ -165,12 +168,12 @@ async fn server_loop(broker_tx: Sender) { let bind_addr = "0.0.0.0:1883"; let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 1883"); - println!("Listening on {}", bind_addr); + info!("Listening on {}", bind_addr); loop { let (socket, addr) = listener.accept().await.expect("Error in server_loop 'listener.accept()"); - println!("Got a new socket from addr: {:?}", addr); + info!("Got a new socket from addr: {:?}", addr); let handler = client_handler(socket, broker_tx.clone()); @@ -182,12 +185,12 @@ async fn websocket_server_loop(broker_tx: Sender) { let bind_addr = "0.0.0.0:8080"; let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 8080"); - println!("Listening on {}", bind_addr); + info!("Listening on {}", bind_addr); loop { let (socket, addr) = listener.accept().await.expect("Error in websocket_server_loop 'listener.accept()"); - println!("Got a new socket from addr: {:?}", addr); + info!("Got a new socket from addr: {:?}", addr); let handler = websocket_client_handler(socket, broker_tx.clone()); @@ -195,7 +198,17 @@ async fn websocket_server_loop(broker_tx: Sender) { } } +fn init_logging() { + if env::var("RUST_LOG").is_err() { + env_logger::builder().filter(None, log::LevelFilter::Debug).init(); + } else { + env_logger::init(); + } +} + fn main() -> Result<(), Box> { + init_logging(); + // Creating a Runtime does the following: // * Spawn a background thread running a Reactor instance. // * Start a ThreadPool for executing futures.