diff --git a/src/api.js b/src/api.js index 5ecc29a..16079cd 100644 --- a/src/api.js +++ b/src/api.js @@ -385,12 +385,13 @@ export class Worker { } async checkAndRecoverOrphanStreams () { - const consumers = ( - await this.client.redis.xInfoConsumers( - this.client.redisWorkerStreamName, - this.client.redisWorkerGroupName - ) - ).map(({ name }) => name) + const rawConsumers = await this.client.redis.xInfoConsumers( + this.client.redisWorkerStreamName, + this.client.redisWorkerGroupName + ) + const consumers = rawConsumers + .filter((c) => c.pending > 0 || c.inactive < this.client.redisWorkerTimeout) + .map(({ name }) => name) const leader = consumers.sort()[0] if (this.client.consumername !== leader) return