Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add keep_alive() call when an error occurs in fetch_next(), to prevent permanent failure when redis restarts #492

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@
}
Err(e) => {
warn!("An error occurred during streaming jobs: {e}");
if matches!(e.kind(), ErrorKind::ResponseError)
&& e.to_string().contains("consumer not registered script")
{
self.keep_alive(worker_id).await?;
}
Err(e)
}
}
Expand Down Expand Up @@ -686,7 +691,7 @@
type Error = RedisError;
type Context = RedisContext;

async fn push_request(

Check warning on line 694 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`

Check warning on line 694 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Clippy

this function depends on never type fallback being `()`

Check warning on line 694 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 694 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`
&mut self,
req: Request<T, RedisContext>,
) -> Result<Parts<Self::Context>, RedisError> {
Expand All @@ -709,7 +714,7 @@
Ok(req.parts)
}

async fn schedule_request(

Check warning on line 717 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`

Check warning on line 717 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 717 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`
&mut self,
req: Request<Self::Job, RedisContext>,
on: i64,
Expand Down Expand Up @@ -772,7 +777,7 @@
Ok(())
}

async fn reschedule(

Check warning on line 780 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`

Check warning on line 780 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 780 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`
&mut self,
job: Request<T, RedisContext>,
wait: Duration,
Expand Down Expand Up @@ -831,7 +836,7 @@
C: Codec<Compact = Vec<u8>> + Send + 'static,
{
/// Attempt to retry a job
pub async fn retry(&mut self, worker_id: &WorkerId, task_id: &TaskId) -> Result<i32, RedisError>

Check warning on line 839 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`

Check warning on line 839 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 839 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`
where
T: Send + DeserializeOwned + Serialize + Unpin + Sync + 'static,
{
Expand Down
Loading