-
-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retry jobs to the end of the queue #497
Comments
Have you checked out #399 (comment) |
You can also implement your own policy: impl<T, Res, Ctx> Policy<Req<T, Ctx>, Res, Err> for RetryPolicy
where
T: Clone,
Ctx: Clone,
{
type Future = future::Ready<()>;
fn retry(
&mut self,
req: &mut Req<T, Ctx>,
result: &mut Result<Res, Err>,
) -> Option<Self::Future> {
let attempt = &req.parts.attempt;
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so don't retry...
None
}
Err(_) if self.retries == 0 => None,
Err(_) if (self.retries - attempt.current() > 0) => Some(future::ready(())),
Err(_) => None,
}
}
fn clone_request(&mut self, req: &Req<T, Ctx>) -> Option<Req<T, Ctx>> {
let req = req.clone();
req.parts.attempt.increment();
Some(req)
}
} You can replace with a future that represents your logic:
|
How would I re-enque the job inside the layer?
…On Sat, Jan 11, 2025, 2:03 PM Geoffrey Mureithi ***@***.***> wrote:
You can also implement your own policy:
impl<T, Res, Ctx> Policy<Req<T, Ctx>, Res, Err> for RetryPolicywhere
T: Clone,
Ctx: Clone,{
type Future = future::Ready<()>;
fn retry(
&mut self,
req: &mut Req<T, Ctx>,
result: &mut Result<Res, Err>,
) -> Option<Self::Future> {
let attempt = &req.parts.attempt;
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so don't retry...
None
}
Err(_) if self.retries == 0 => None,
Err(_) if (self.retries - attempt.current() > 0) => Some(future::ready(())),
Err(_) => None,
}
}
fn clone_request(&mut self, req: &Req<T, Ctx>) -> Option<Req<T, Ctx>> {
let req = req.clone();
req.parts.attempt.increment();
Some(req)
}}
You can replace with a future that represents your logic:
Err(_) if (self.retries - attempt.current() > 0) => Some(tokio::sleep(duration)),
—
Reply to this email directly, view it on GitHub
<#497 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AOQ5PNA3IKURKIQKFEKKHCL2KDJOXAVCNFSM6AAAAABU7X4R4KVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDKOBVGE2DSOBYGI>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
You mean push it back to the storage? This just makes the next execution to
wait in memory
|
Yes. Thats how the bullmq library works (https://docs.bullmq.io/guide/retrying-failing-jobs). |
Currently its not yet supported.
There are discussions being held here
#399
<#399 (comment)>
The limitation is based on how tower setups up its RetryLayer. I hope to
buid a custom solution for v0.7
|
Closing in favour of #399. Thanks! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Retries are done instantly when using the
RetryLayer
. I do not see any options inRetryPolicy
(https://docs.rs/apalis/latest/apalis/layers/retry/struct.RetryPolicy.html) so that the retry job goes to the end of the queue.The text was updated successfully, but these errors were encountered: