Skip to content

Commit

Permalink
fix: minor clippy and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Feb 9, 2024
1 parent bbfb916 commit 9b3eddf
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 45 deletions.
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ pprof = { version = "0.13", features = ["flamegraph"] }
paste = "1.0.14"
serde = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
apalis-redis = { version = "0.5.0-alpha.0", path = "./packages/apalis-redis" }
apalis-sql = { version = "0.5.0-alpha.0", path = "./packages/apalis-sql", features = [
"sqlite",
"postgres",
"mysql",
] }
apalis = { version = "0.5.0-alpha.0", path = ".", features = ["redis"] }


[[bench]]
name = "storages"
Expand Down
12 changes: 6 additions & 6 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Start,
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
});
};
let worker_layers = ServiceBuilder::new()
Expand All @@ -374,7 +374,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Stop,
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
});
};
break;
Expand All @@ -384,7 +384,7 @@ impl<S, P> Worker<Ready<S, P>> {
let (sender, receiver) = async_oneshot::oneshot();
notifier
.notify(Worker {
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
state: FetchNext::new(sender),
})
.unwrap();
Expand All @@ -398,15 +398,15 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Error(Box::new(e)),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
});
};
}
Ok(Ok(None)) => {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Idle,
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
});
};
}
Expand All @@ -419,7 +419,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Error(e.into()),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
id: WorkerId::new_with_instance(worker.id.name(), instance),
});
};
}
Expand Down
55 changes: 23 additions & 32 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use apalis_core::codec::json::JsonCodec;
use apalis_core::error::Error;
use apalis_core::layers::{Ack, AckLayer};
use apalis_core::notify::Notify;
use apalis_core::poller::controller::Controller;
use apalis_core::poller::stream::BackendStream;
use apalis_core::poller::Poller;
use apalis_core::request::{Request, RequestStream};
use apalis_core::storage::{Job, Storage};
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::{Backend, Codec};
use apalis_core::codec::json::JsonCodec;
use apalis_core::layers::{Ack, AckLayer};
use apalis_core::task::task_id::TaskId;
use async_stream::try_stream;
use futures::{Stream, StreamExt, TryStreamExt};
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -96,7 +96,7 @@ impl<T: Serialize + DeserializeOwned> MysqlStorage<T> {
codec: Arc::new(Box::new(JsonCodec)),
ack_notify: Notify::new(),
}
}
}

/// Expose the pool for other functionality, eg custom migrations
pub fn pool(&self) -> &Pool<MySql> {
Expand Down Expand Up @@ -138,27 +138,23 @@ impl<T: DeserializeOwned + Send + Unpin + Job + Sync + 'static> MysqlStorage<T>
for i in &task_ids {
query = query.bind(i);
}

query.execute(&mut *tx).await?;
tx.commit().await?;

let fetch_query = format!("SELECT * FROM jobs
WHERE ID IN ({})", id_params);
let fetch_query = format!("SELECT * FROM jobs WHERE ID IN ({})", id_params);
let mut query = sqlx::query_as(&fetch_query);
for i in task_ids {
query = query.bind(i);
}
let jobs: Vec<SqlRequest<Value>> = query.fetch_all(&pool).await.unwrap();
for i in task_ids {
query = query.bind(i);
}
let jobs: Vec<SqlRequest<Value>> = query.fetch_all(&pool).await.unwrap();

for job in jobs {
yield Some(Into::into(SqlRequest {
context: job.context,
req: self.codec.decode(&job.req).unwrap(),
}))

}
}

}
}
}.boxed()
}
Expand Down Expand Up @@ -404,7 +400,11 @@ impl<T: Job> MysqlStorage<T> {
}

/// Puts the job instantly back into the queue
pub async fn retry(&mut self, worker_id: &WorkerId, job_id: &TaskId) -> Result<(), sqlx::Error> {
pub async fn retry(
&mut self,
worker_id: &WorkerId,
job_id: &TaskId,
) -> Result<(), sqlx::Error> {
let pool = self.pool.clone();

let mut tx = pool.acquire().await?;
Expand Down Expand Up @@ -460,9 +460,11 @@ mod tests {
// (different runtimes are created for each test),
// we don't share the storage and tests must be run sequentially.
let pool = MySqlPool::connect(db_url).await.unwrap();
MysqlStorage::setup(&pool).await.expect("failed to migrate DB");
MysqlStorage::setup(&pool)
.await
.expect("failed to migrate DB");
let storage = MysqlStorage::new(pool);

storage
}

Expand All @@ -485,8 +487,7 @@ mod tests {
.expect("failed to delete worker");
}

async fn consume_one(storage: &MysqlStorage<Email>, worker_id: &WorkerId) -> Request<Email>
{
async fn consume_one(storage: &MysqlStorage<Email>, worker_id: &WorkerId) -> Request<Email> {
let storage = storage.clone();
let mut stream = storage.stream_jobs(worker_id, std::time::Duration::from_secs(10), 1);
stream
Expand All @@ -507,7 +508,6 @@ mod tests {

struct DummyService {}


async fn register_worker_at(
storage: &mut MysqlStorage<Email>,
last_seen: DateTime<Utc>,
Expand All @@ -522,7 +522,6 @@ mod tests {
}

async fn register_worker(storage: &mut MysqlStorage<Email>) -> WorkerId {

let now = Utc::now();

register_worker_at(storage, now).await
Expand Down Expand Up @@ -598,7 +597,7 @@ mod tests {
let worker_id = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;

let ctx = job.get::<SqlContext>().unwrap();
let job_id = ctx.id();

Expand Down Expand Up @@ -631,7 +630,6 @@ mod tests {

let six_minutes_ago = Utc::now() - Duration::from_secs(60 * 6);


storage
.keep_alive_at::<Email>(&worker_id, six_minutes_ago)
.await
Expand All @@ -643,10 +641,7 @@ mod tests {

assert_eq!(*ctx.status(), State::Running);

storage
.reenqueue_orphaned(300)
.await
.unwrap();
storage.reenqueue_orphaned(300).await.unwrap();

// then, the job status has changed to Pending
let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap();
Expand Down Expand Up @@ -674,7 +669,6 @@ mod tests {
// register a worker responding at 4 minutes ago
let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);


let worker_id = WorkerId::new("test_worker");
storage
.keep_alive_at::<Email>(&worker_id, four_minutes_ago)
Expand All @@ -688,10 +682,7 @@ mod tests {
assert_eq!(*ctx.status(), State::Running);

// heartbeat with ReenqueueOrpharned pulse
storage
.reenqueue_orphaned(300)
.await
.unwrap();
storage.reenqueue_orphaned(300).await.unwrap();

// then, the job status is not changed
let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! ```rust, no_run
//! use apalis::prelude::*;
//! use serde::{Deserialize, Serialize};
//! use apalis_redis::RedisStorage;
//! use apalis::redis::RedisStorage;
//!
//! #[derive(Debug, Deserialize, Serialize)]
//! struct Email {
Expand Down

0 comments on commit 9b3eddf

Please sign in to comment.