Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clippy warnings #23

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/src/bin/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<()> {
.create_producer(output.stream_key()?, Default::default())
.await?;

for batch in 0..std::usize::MAX {
for batch in 0..usize::MAX {
// Take all messages currently buffered in the queue, but do not wait
let mut messages: Vec<SharedMessage> = receiver.drain().collect();
if messages.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/consumer/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Streamers {
let (sender, receiver) = unbounded();
self.max_sid += 1;
let sid = self.max_sid;
if self.streamers.get(&file_id).is_none() {
if !self.streamers.contains_key(&file_id) {
self.streamers.insert(file_id.clone(), Vec::new());
}
let handles = self.streamers.get_mut(&file_id).unwrap();
Expand Down
2 changes: 0 additions & 2 deletions sea-streamer-file/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ where

#[cfg(test)]
mod test {
use sea_streamer_types::Buffer;

use super::*;

#[test]
Expand Down
7 changes: 4 additions & 3 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl MessageSource {
#[allow(clippy::never_loop)]
let res = 'outer: loop {
// survey the beacons to narrow down the scope of search
let surveyor = match Surveyor::new(self, |b: &Beacon| {
let surveyor = Surveyor::new(self, |b: &Beacon| {
for item in b.items.iter() {
if (stream_key, shard_id) == (item.header.stream_key(), item.header.shard_id())
{
Expand All @@ -210,8 +210,9 @@ impl MessageSource {
}
SurveyResult::Undecided
})
.await
{
.await;

let surveyor = match surveyor {
Ok(s) => s,
Err(e) => {
break Err(e);
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/producer/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Writers {
options: &FileConnectOptions,
_pro_options: &FileProducerOptions,
) -> Result<FileProducer, FileErr> {
if self.writers.get(&file_id).is_none() {
if !self.writers.contains_key(&file_id) {
self.writers.insert(
file_id.clone(),
Writer::new(file_id.clone(), options).await?,
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Watchers {
/// `Sender` should be unbounded, and never blocks.
fn add(&mut self, file_id: FileId, sender: Sender<FileEvent>) -> Result<Watcher, FileErr> {
assert!(sender.capacity().is_none());
if self.watchers.get(&file_id).is_none() {
if !self.watchers.contains_key(&file_id) {
let watcher = Self::new_watcher(file_id.clone(), self.sender.clone())?;
self.watchers.insert(file_id.clone(), watcher);
}
Expand Down
7 changes: 4 additions & 3 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,17 @@ impl KafkaProducer {
) -> KafkaResult<()> {
self.get();
let client = self.inner.take().unwrap();
match spawn_blocking(move || {
let producer = spawn_blocking(move || {
let s = client;
match func(&s) {
Ok(()) => Ok(s),
Err(e) => Err((s, e)),
}
})
.await
.map_err(runtime_error)?
{
.map_err(runtime_error)?;

match producer {
Ok(inner) => {
self.inner = Some(inner);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-redis/src/consumer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Cluster {
}
StatusMsg::Moved { shard, from, to } => {
log::info!("Shard {shard:?} moving from {from} to {to}");
let conn = if self.nodes.get(&to).is_none() {
let conn = if !self.nodes.contains_key(&to) {
Some(
Connection::create_or_reconnect(
to.clone(),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl Cluster {
}

fn add_node(&mut self, node_id: NodeId, event_sender: Sender<StatusMsg>) -> &Sender<CtrlMsg> {
if self.nodes.get(&node_id).is_none() {
if !self.nodes.contains_key(&node_id) {
let (ctrl_sender, receiver) = bounded(128);
self.nodes.insert(node_id.clone(), ctrl_sender);
let node = Node::add(
Expand Down
Loading