Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace println calls with log crate macros #43

Merged
merged 3 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions mqtt-v5-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ edition = "2018"

[dependencies]
bytes = "1"
tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] }
tokio-util = { version = "0.6", features = ["codec"] }
futures = "0.3"
log = "0.4"
mqtt-v5 = { path = "../mqtt-v5", version = "0.2" }
nanoid = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["net", "rt-multi-thread", "sync", "time", "macros"] }
tokio-util = { version = "0.6", features = ["codec"] }
env_logger = "0.8.4"
9 changes: 5 additions & 4 deletions mqtt-v5-broker/src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{client::ClientMessage, tree::SubscriptionTree};
use log::{info, warn};
use mqtt_v5::{
topic::TopicFilter,
types::{
Expand Down Expand Up @@ -155,7 +156,7 @@ impl Session {
async fn send(&mut self, message: ClientMessage) {
if let Some(ref client_sender) = self.client_sender {
if client_sender.send(message).await.is_err() {
println!("Failed to send message to client. Dropping sender");
warn!("Failed to send message to client. Dropping sender");
self.client_sender.take();
}
}
Expand Down Expand Up @@ -216,7 +217,7 @@ impl Broker {
if let Err(e) = client_sender
.try_send(ClientMessage::Disconnect(DisconnectReason::SessionTakenOver))
{
println!("Failed to send disconnect packet to taken-over session - {:?}", e);
warn!("Failed to send disconnect packet to taken-over session - {:?}", e);
}
}

Expand Down Expand Up @@ -269,7 +270,7 @@ impl Broker {
existing_session.resend_packets().await;
}

println!(
info!(
"Client ID {} connected (Version: {:?})",
connect_packet.client_id, connect_packet.protocol_version
);
Expand Down Expand Up @@ -441,7 +442,7 @@ impl Broker {
}

fn handle_disconnect(&mut self, client_id: String, will_disconnect_logic: WillDisconnectLogic) {
println!("Client ID {} disconnected", client_id);
info!("Client ID {} disconnected", client_id);

let mut disconnect_will = None;
let mut session_expiry_duration = None;
Expand Down
15 changes: 8 additions & 7 deletions mqtt-v5-broker/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::{
future::{self, Either},
stream, Sink, SinkExt, Stream, StreamExt,
};
use log::{debug, info, trace, warn};
use mqtt_v5::types::{
DecodeError, DisconnectPacket, DisconnectReason, EncodeError, Packet, ProtocolError,
ProtocolVersion, QoS,
Expand Down Expand Up @@ -38,7 +39,7 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
.await
.map_err(|_| ProtocolError::ConnectTimedOut)?;

println!("got a packet: {:?}", first_packet);
trace!("Received packet: {:?}", first_packet);

match first_packet {
Some(Ok(Packet::Connect(mut connect_packet))) => {
Expand Down Expand Up @@ -252,7 +253,7 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
_ => {},
},
Err(err) => {
println!("Error while reading frame: {:?}", err);
warn!("Error while reading frame: {:?}", err);
break;
},
}
Expand Down Expand Up @@ -284,10 +285,10 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
};

if let Err(e) = sink.send(Packet::Disconnect(disconnect_packet)).await {
println!("Failed to send disconnect packet to framed socket: {:?}", e);
warn!("Failed to send disconnect packet to framed socket: {:?}", e);
}

println!("broker told the client to disconnect");
info!("Broker told the client to disconnect");

return;
},
Expand All @@ -299,11 +300,11 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
match tokio::time::timeout(SINK_SEND_TIMEOUT, send).await {
Ok(Ok(())) => (),
Ok(Err(e)) => {
println!("Failed to write to client client socket: {:?}", e);
warn!("Failed to write to client client socket: {:?}", e);
return;
},
Err(_) => {
println!("Timeout during client socket write. Disconnecting");
warn!("Timeout during client socket write. Disconnecting");
return;
},
}
Expand All @@ -329,6 +330,6 @@ impl<ST: Stream<Item = PacketResult> + Unpin, SI: Sink<Packet, Error = EncodeErr
// expressions will be unable to continue. If parallelism is required, spawn
// each async expression using tokio::spawn and pass the join handle to select!.
future::join(task_rx, task_tx).await;
println!("Client ID {} task exit", self.id);
debug!("Client ID {} task exit", self.id);
}
}
33 changes: 23 additions & 10 deletions mqtt-v5-broker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::env;

use crate::{
broker::{Broker, BrokerMessage},
client::UnconnectedClient,
};
use bytes::BytesMut;
use futures::{stream, SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use mqtt_v5::{
codec::MqttCodec,
encoder,
Expand All @@ -25,15 +28,15 @@ mod client;
mod tree;

async fn client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {
println!("Handling a client");
debug!("Handling client {:?}", stream.peer_addr());

let (sink, stream) = Framed::new(stream, MqttCodec::new()).split();
let unconnected_client = UnconnectedClient::new(stream, sink, broker_tx);

let connected_client = match unconnected_client.handshake().await {
Ok(connected_client) => connected_client,
Err(err) => {
println!("Protocol error during connection handshake: {:?}", err);
warn!("Protocol error during connection handshake: {:?}", err);
return;
},
};
Expand All @@ -60,7 +63,7 @@ async fn upgrade_stream(stream: TcpStream) -> Framed<TcpStream, WsMessageCodec>
}

async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {
println!("Handling a WebSocket client");
debug!("Handling WebSocket client {:?}", stream.peer_addr());

let ws_framed = upgrade_stream(stream).await;

Expand Down Expand Up @@ -115,7 +118,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
}

if message.opcode() == Opcode::Ping {
println!("Got a websocket ping");
trace!("Got a websocket ping");
}

if message.opcode() != Opcode::Binary {
Expand All @@ -129,7 +132,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
read_buf.extend_from_slice(&message.into_data());
},
Some(Err(e)) => {
println!("Error while reading from WebSocket stream: {:?}", e);
debug!("Error while reading from WebSocket stream: {:?}", e);
// If we had a decode error in the WebSocket layer,
// propagate the it along the stream
return Some((
Expand All @@ -153,7 +156,7 @@ async fn websocket_client_handler(stream: TcpStream, broker_tx: Sender<BrokerMes
let connected_client = match unconnected_client.handshake().await {
Ok(connected_client) => connected_client,
Err(err) => {
println!("Protocol error during connection handshake: {:?}", err);
warn!("Protocol error during connection handshake: {:?}", err);
return;
},
};
Expand All @@ -165,12 +168,12 @@ async fn server_loop(broker_tx: Sender<BrokerMessage>) {
let bind_addr = "0.0.0.0:1883";
let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 1883");

println!("Listening on {}", bind_addr);
info!("Listening on {}", bind_addr);

loop {
let (socket, addr) =
listener.accept().await.expect("Error in server_loop 'listener.accept()");
println!("Got a new socket from addr: {:?}", addr);
info!("Got a new socket from addr: {:?}", addr);

let handler = client_handler(socket, broker_tx.clone());

Expand All @@ -182,20 +185,30 @@ async fn websocket_server_loop(broker_tx: Sender<BrokerMessage>) {
let bind_addr = "0.0.0.0:8080";
let listener = TcpListener::bind(bind_addr).await.expect("Couldn't bind to port 8080");

println!("Listening on {}", bind_addr);
info!("Listening on {}", bind_addr);

loop {
let (socket, addr) =
listener.accept().await.expect("Error in websocket_server_loop 'listener.accept()");
println!("Got a new socket from addr: {:?}", addr);
info!("Got a new socket from addr: {:?}", addr);

let handler = websocket_client_handler(socket, broker_tx.clone());

tokio::spawn(handler);
}
}

fn init_logging() {
if env::var("RUST_LOG").is_err() {
env_logger::builder().filter(None, log::LevelFilter::Debug).init();
} else {
env_logger::init();
}
flxo marked this conversation as resolved.
Show resolved Hide resolved
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();

// Creating a Runtime does the following:
// * Spawn a background thread running a Reactor instance.
// * Start a ThreadPool for executing futures.
Expand Down