Skip to content

Commit

Permalink
Replace println calls with log crate macros (#43)
Browse files Browse the repository at this point in the history
* Add log crate

* Replace println with log macros

* Add env-logger

Add env-logger and configure with log level filter debug if the logger
is not configured via the RUST_LOG env variable.
  • Loading branch information
flxo authored Apr 28, 2022
1 parent 30a8c99 commit f4c3349
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 24 deletions.
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions mqtt-v5-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ edition = "2018"

[dependencies]
bytes = "1"
tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] }
tokio-util = { version = "0.6", features = ["codec"] }
futures = "0.3"
log = "0.4"
mqtt-v5 = { path = "../mqtt-v5", version = "0.2" }
nanoid = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] }
tokio-util = { version = "0.6", features = ["codec"] }
env_logger = "0.8.4"
9 changes: 5 additions & 4 deletions mqtt-v5-broker/src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{client::ClientMessage, tree::SubscriptionTree};
use log::{info, warn};
use mqtt_v5::{
topic::TopicFilter,
types::{
Expand Down Expand Up @@ -155,7 +156,7 @@ impl Session {
async fn send(&mut self, message: ClientMessage) {
if let Some(ref client_sender) = self.client_sender {
if client_sender.send(message).await.is_err() {
println!("Failed to send message to client. Dropping sender");
warn!("Failed to send message to client. Dropping sender");
self.client_sender.take();
}
}
Expand Down Expand Up @@ -216,7 +217,7 @@ impl Broker {
if let Err(e) = client_sender
.try_send(ClientMessage::Disconnect(DisconnectReason::SessionTakenOver))
{
println!("Failed to send disconnect packet to taken-over session - {:?}", e);
warn!("Failed to send disconnect packet to taken-over session - {:?}", e);
}
}

Expand Down Expand Up @@ -269,7 +270,7 @@ impl Broker {
existing_session.resend_packets().await;
}

println!(
info!(
"Client ID {} connected (Version: {:?})",
connect_packet.client_id, connect_packet.protocol_version
);
Expand Down Expand Up @@ -441,7 +442,7 @@ impl Broker {
}

fn handle_disconnect(&mut self, client_id: String, will_disconnect_logic: WillDisconnectLogic) {
println!("Client ID {} disconnected", client_id);
info!("Client ID {} disconnected", client_id);

let mut disconnect_will = None;
let mut session_expiry_duration = None;
Expand Down
15 changes: 8 additions & 7 deletions mqtt-v5-broker/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::{
future::{self, Either},
stream, Sink, SinkExt, Stream, StreamExt,
};
use log::{debug, info, trace, warn};
use mqtt_v5::types::{
DecodeError, DisconnectPacket, DisconnectReason, EncodeError, Packet, ProtocolError,
ProtocolVersion, QoS,
Expand Down Expand Up @@ -38,7 +39,7 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
.await
.map_err(|_| ProtocolError::ConnectTimedOut)?;

println!("got a packet: {:?}", first_packet);
trace!("Received packet: {:?}", first_packet);

match first_packet {
Some(Ok(Packet::Connect(mut connect_packet))) => {
Expand Down Expand Up @@ -252,7 +253,7 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
_ => {},
},
Err(err) => {
println!("Error while reading frame: {:?}", err);
warn!("Error while reading frame: {:?}", err);
break;
},
}
Expand Down Expand Up @@ -284,10 +285,10 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
};

if let Err(e) = sink.send(Packet::Disconnect(disconnect_packet)).await {
println!("Failed to send disconnect packet to framed socket: {:?}", e);
warn!("Failed to send disconnect packet to framed socket: {:?}", e);
}

println!("broker told the client to disconnect");
info!("Broker told the client to disconnect");

return;
},
Expand All @@ -299,11 +300,11 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
match tokio::time::timeout(SINK_SEND_TIMEOUT, send).await {
Ok(Ok(())) => (),
Ok(Err(e)) => {
println!("Failed to write to client client socket: {:?}", e);
warn!("Failed to write to client client socket: {:?}", e);
return;
},
Err(_) => {
println!("Timeout during client socket write. Disconnecting");
warn!("Timeout during client socket write. Disconnecting");
return;
},
}
Expand All @@ -329,6 +330,6 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
// expressions will be unable to continue. If parallelism is required, spawn
// each async expression using tokio::spawn and pass the join handle to select!.
future::join(task_rx, task_tx).await;
println!("Client ID {} task exit", self.id);
debug!("Client ID {} task exit", self.id);
}
}
33 changes: 23 additions & 10 deletions mqtt-v5-broker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::env;

use crate::{
broker::{Broker, BrokerMessage},
client::UnconnectedClient,
};
use bytes::BytesMut;
use futures::{stream, SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use mqtt_v5::{
codec::MqttCodec,
encoder,
Expand All @@ -25,15 +28,15 @@ mod client;
mod tree;

async fn client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {
println!("Handling a client");
debug!("Handling client {:?}", stream.peer_addr());

let (sink, stream) = Framed::new(stream, MqttCodec::new()).split();
let unconnected_client = UnconnectedClient::new(stream, sink, broker_tx);

let connected_client = match unconnected_client.handshake().await {
Ok(connected_client) => connected_client,
Err(err) => {
println!("Protocol error during connection handshake: {:?}", err);
warn!("Protocol error during connection handshake: {:?}", err);
return;
},
};
Expand All @@ -60,7 +63,7 @@ async fn upgrade_stream(stream: TcpStream) -> Framed<TcpStream, WsMessageCodec>
}

async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {
println!("Handling a WebSocket client");
debug!("Handling WebSocket client {:?}", stream.peer_addr());

let ws_framed = upgrade_stream(stream).await;

Expand Down Expand Up @@ -115,7 +118,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
}

if message.opcode() == Opcode::Ping {
println!("Got a websocket ping");
trace!("Got a websocket ping");
}

if message.opcode() != Opcode::Binary {
Expand All @@ -129,7 +132,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
read_buf.extend_from_slice(&message.into_data());
},
Some(Err(e)) => {
println!("Error while reading from WebSocket stream: {:?}", e);
debug!("Error while reading from WebSocket stream: {:?}", e);
// If we had a decode error in the WebSocket layer,
// propagate the it along the stream
return Some((
Expand All @@ -153,7 +156,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
let connected_client = match unconnected_client.handshake().await {
Ok(connected_client) => connected_client,
Err(err) => {
println!("Protocol error during connection handshake: {:?}", err);
warn!("Protocol error during connection handshake: {:?}", err);
return;
},
};
Expand All @@ -165,12 +168,12 @@ async fn server_loop(broker_tx: Sender<BrokerMessage>) {
let bind_addr = "0.0.0.0:1883";
let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 1883");

println!("Listening on {}", bind_addr);
info!("Listening on {}", bind_addr);

loop {
let (socket, addr) =
listener.accept().await.expect("Error in server_loop 'listener.accept()");
println!("Got a new socket from addr: {:?}", addr);
info!("Got a new socket from addr: {:?}", addr);

let handler = client_handler(socket, broker_tx.clone());

Expand All @@ -182,20 +185,30 @@ async fn websocket_server_loop(broker_tx: Sender<BrokerMessage>) {
let bind_addr = "0.0.0.0:8080";
let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 8080");

println!("Listening on {}", bind_addr);
info!("Listening on {}", bind_addr);

loop {
let (socket, addr) =
listener.accept().await.expect("Error in websocket_server_loop 'listener.accept()");
println!("Got a new socket from addr: {:?}", addr);
info!("Got a new socket from addr: {:?}", addr);

let handler = websocket_client_handler(socket, broker_tx.clone());

tokio::spawn(handler);
}
}

fn init_logging() {
if env::var("RUST_LOG").is_err() {
env_logger::builder().filter(None, log::LevelFilter::Debug).init();
} else {
env_logger::init();
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();

// Creating a Runtime does the following:
// * Spawn a background thread running a Reactor instance.
// * Start a ThreadPool for executing futures.
Expand Down

0 comments on commit f4c3349

Please sign in to comment.