From 9b3eddff77470fafda3d8f812cbe01475131373e Mon Sep 17 00:00:00 2001 From: Njuguna Mureithi Date: Fri, 9 Feb 2024 07:36:54 +0300 Subject: [PATCH] fix: minor clippy and fixes --- Cargo.toml | 8 +--- packages/apalis-core/src/worker/mod.rs | 12 +++--- packages/apalis-sql/src/mysql.rs | 55 +++++++++++--------------- src/lib.rs | 2 +- 4 files changed, 32 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46723f8e..8a595e65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,12 +110,8 @@ pprof = { version = "0.13", features = ["flamegraph"] } paste = "1.0.14" serde = "1" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -apalis-redis = { version = "0.5.0-alpha.0", path = "./packages/apalis-redis" } -apalis-sql = { version = "0.5.0-alpha.0", path = "./packages/apalis-sql", features = [ - "sqlite", - "postgres", - "mysql", -] } +apalis = { version = "0.5.0-alpha.0", path = ".", features = ["redis"] } + [[bench]] name = "storages" diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index f9f71f62..aac99fcc 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -360,7 +360,7 @@ impl Worker> { if let Some(ctx) = worker.state.context.as_ref() { ctx.notify(Worker { state: Event::Start, - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), }); }; let worker_layers = ServiceBuilder::new() @@ -374,7 +374,7 @@ impl Worker> { if let Some(ctx) = worker.state.context.as_ref() { ctx.notify(Worker { state: Event::Stop, - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), }); }; break; @@ -384,7 +384,7 @@ impl Worker> { let (sender, receiver) = async_oneshot::oneshot(); notifier .notify(Worker { - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), state: FetchNext::new(sender), }) .unwrap(); @@ -398,7 +398,7 @@ impl Worker> { if let Some(ctx) = worker.state.context.as_ref() { ctx.notify(Worker { state: Event::Error(Box::new(e)), - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), }); }; } @@ -406,7 +406,7 @@ impl Worker> { if let Some(ctx) = worker.state.context.as_ref() { ctx.notify(Worker { state: Event::Idle, - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), }); }; } @@ -419,7 +419,7 @@ impl Worker> { if let Some(ctx) = worker.state.context.as_ref() { ctx.notify(Worker { state: Event::Error(e.into()), - id: WorkerId::new_with_instance(&worker.id.name(), instance), + id: WorkerId::new_with_instance(worker.id.name(), instance), }); }; } diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index a2387c99..8a7fe219 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -1,15 +1,15 @@ +use apalis_core::codec::json::JsonCodec; use apalis_core::error::Error; +use apalis_core::layers::{Ack, AckLayer}; use apalis_core::notify::Notify; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; use apalis_core::request::{Request, RequestStream}; use apalis_core::storage::{Job, Storage}; +use apalis_core::task::task_id::TaskId; use apalis_core::worker::WorkerId; use apalis_core::{Backend, Codec}; -use apalis_core::codec::json::JsonCodec; -use apalis_core::layers::{Ack, AckLayer}; -use apalis_core::task::task_id::TaskId; use async_stream::try_stream; use futures::{Stream, StreamExt, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; @@ -96,7 +96,7 @@ impl MysqlStorage { codec: Arc::new(Box::new(JsonCodec)), ack_notify: Notify::new(), } - } + } /// Expose the pool for other functionality, eg custom migrations pub fn pool(&self) -> &Pool { @@ -138,27 +138,23 @@ impl MysqlStorage for i in &task_ids { query = query.bind(i); } - query.execute(&mut *tx).await?; tx.commit().await?; - let fetch_query = format!("SELECT * FROM jobs - WHERE ID IN ({})", id_params); + let fetch_query = format!("SELECT * FROM jobs WHERE ID IN ({})", id_params); let mut query = sqlx::query_as(&fetch_query); - for i in task_ids { - query = query.bind(i); - } - let jobs: Vec> = query.fetch_all(&pool).await.unwrap(); + for i in task_ids { + query = query.bind(i); + } + let jobs: Vec> = query.fetch_all(&pool).await.unwrap(); for job in jobs { yield Some(Into::into(SqlRequest { context: job.context, req: self.codec.decode(&job.req).unwrap(), })) - } - } - + } } }.boxed() } @@ -404,7 +400,11 @@ impl MysqlStorage { } /// Puts the job instantly back into the queue - pub async fn retry(&mut self, worker_id: &WorkerId, job_id: &TaskId) -> Result<(), sqlx::Error> { + pub async fn retry( + &mut self, + worker_id: &WorkerId, + job_id: &TaskId, + ) -> Result<(), sqlx::Error> { let pool = self.pool.clone(); let mut tx = pool.acquire().await?; @@ -460,9 +460,11 @@ mod tests { // (different runtimes are created for each test), // we don't share the storage and tests must be run sequentially. let pool = MySqlPool::connect(db_url).await.unwrap(); - MysqlStorage::setup(&pool).await.expect("failed to migrate DB"); + MysqlStorage::setup(&pool) + .await + .expect("failed to migrate DB"); let storage = MysqlStorage::new(pool); - + storage } @@ -485,8 +487,7 @@ mod tests { .expect("failed to delete worker"); } - async fn consume_one(storage: &MysqlStorage, worker_id: &WorkerId) -> Request - { + async fn consume_one(storage: &MysqlStorage, worker_id: &WorkerId) -> Request { let storage = storage.clone(); let mut stream = storage.stream_jobs(worker_id, std::time::Duration::from_secs(10), 1); stream @@ -507,7 +508,6 @@ mod tests { struct DummyService {} - async fn register_worker_at( storage: &mut MysqlStorage, last_seen: DateTime, @@ -522,7 +522,6 @@ mod tests { } async fn register_worker(storage: &mut MysqlStorage) -> WorkerId { - let now = Utc::now(); register_worker_at(storage, now).await @@ -598,7 +597,7 @@ mod tests { let worker_id = register_worker(&mut storage).await; let job = consume_one(&mut storage, &worker_id).await; - + let ctx = job.get::().unwrap(); let job_id = ctx.id(); @@ -631,7 +630,6 @@ mod tests { let six_minutes_ago = Utc::now() - Duration::from_secs(60 * 6); - storage .keep_alive_at::(&worker_id, six_minutes_ago) .await @@ -643,10 +641,7 @@ mod tests { assert_eq!(*ctx.status(), State::Running); - storage - .reenqueue_orphaned(300) - .await - .unwrap(); + storage.reenqueue_orphaned(300).await.unwrap(); // then, the job status has changed to Pending let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap(); @@ -674,7 +669,6 @@ mod tests { // register a worker responding at 4 minutes ago let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); - let worker_id = WorkerId::new("test_worker"); storage .keep_alive_at::(&worker_id, four_minutes_ago) @@ -688,10 +682,7 @@ mod tests { assert_eq!(*ctx.status(), State::Running); // heartbeat with ReenqueueOrpharned pulse - storage - .reenqueue_orphaned(300) - .await - .unwrap(); + storage.reenqueue_orphaned(300).await.unwrap(); // then, the job status is not changed let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index ce4adad0..c55a7a70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ //! ```rust, no_run //! use apalis::prelude::*; //! use serde::{Deserialize, Serialize}; -//! use apalis_redis::RedisStorage; +//! use apalis::redis::RedisStorage; //! //! #[derive(Debug, Deserialize, Serialize)] //! struct Email {