diff --git a/README.md b/README.md index 5e4f9fa..338d533 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Set the `database` key under the `batching` section to `'redis'`. Without this a ~~~php 'batching' => [ 'database' => 'redis', // Change this from 'mysql' to 'redis' + 'redis_connection' => 'default', // here you can define what redis connection to store batch related data in. Defaults to 'default' if not set. 'table' => 'job_batches', ], ~~~ diff --git a/src/Repositories/RedisBatchRepository.php b/src/Repositories/RedisBatchRepository.php index cb56f14..654f31c 100644 --- a/src/Repositories/RedisBatchRepository.php +++ b/src/Repositories/RedisBatchRepository.php @@ -29,14 +29,14 @@ public function __construct( BatchFactory $factory ) public function get($limit = 50, $before = null): array { - if (!Redis::exists('batches_list')) { + if (!Redis::connection(config('queue.batching.redis_connection', 'default'))->exists('batches_list')) { return []; } - $totalBatches = Redis::llen('batches_list'); + $totalBatches = Redis::connection(config('queue.batching.redis_connection', 'default'))->llen('batches_list'); // If $before is specified, find its index - $beforeIndex = $before ? array_search($before, Redis::lrange('batches_list', 0, -1)) : null; + $beforeIndex = $before ? array_search($before, Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange('batches_list', 0, -1)) : null; // Determine the range to fetch if ($beforeIndex !== false && $beforeIndex !== null) { @@ -48,10 +48,10 @@ public function get($limit = 50, $before = null): array } // Fetch only the required batch IDs - $batchIds = Redis::lrange('batches_list', $startIndex, $rangeEnd); + $batchIds = Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange('batches_list', $startIndex, $rangeEnd); // Use Redis pipeline to bulk fetch batch data - $responses = Redis::pipeline(function ($pipe) use ($batchIds) { + $responses = Redis::connection(config('queue.batching.redis_connection', 'default'))->pipeline(function ($pipe) use ($batchIds) { foreach ($batchIds as $batchId) { $pipe->get("batch:$batchId"); } @@ -71,7 +71,7 @@ public function get($limit = 50, $before = null): array public function find( string $batchId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { // Return null or handle the case where the batch does not exist @@ -99,8 +99,8 @@ public function store( PendingBatch $batch ) 'finished_at' => null, ]; - Redis::set( "batch:$id", json_encode( $batchData ) ); - Redis::rpush( 'batches_list', $id ); // Add the batch ID to the list + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$id", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->rpush( 'batches_list', $id ); // Add the batch ID to the list return $this->find( $id ); } @@ -108,7 +108,7 @@ public function store( PendingBatch $batch ) public function incrementTotalJobs( string $batchId, int $amount ) { return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $amount ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { Log::error( "Batch not found for incrementTotalJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); @@ -116,14 +116,14 @@ public function incrementTotalJobs( string $batchId, int $amount ) $batchData = json_decode( $data, true ); $batchData['total_jobs'] += $amount; $batchData['pending_jobs'] += $amount; - Redis::set( "batch:$batchId", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); }, 100, 200 ); } protected function acquireLock( string $key ): bool { - $isAcquired = Redis::set( $key, true, 'EX', $this->lockTimeout, 'NX' ); + $isAcquired = Redis::connection(config('queue.batching.redis_connection', 'default'))->set( $key, true, 'EX', $this->lockTimeout, 'NX' ); return (bool)$isAcquired; } @@ -158,13 +158,13 @@ protected function executeWithLock( string $lockKey, Closure $callback, $retryCo protected function releaseLock( string $key ) { - Redis::del( $key ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->del( $key ); } public function incrementFailedJobs( string $batchId, string $jobId ) { return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { Log::error( "Batch not found for incrementFailedJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); @@ -172,7 +172,7 @@ public function incrementFailedJobs( string $batchId, string $jobId ) $batchData = json_decode( $data, true ); $batchData['failed_jobs']++; $batchData['failed_job_ids'][] = $jobId; - Redis::set( "batch:$batchId", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); }, 100, 200 ); } @@ -180,7 +180,7 @@ public function incrementFailedJobs( string $batchId, string $jobId ) public function decrementPendingJobs( string $batchId, string $jobId ) { return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { Log::error( "Batch not found for decrementPendingJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); @@ -195,7 +195,7 @@ public function decrementPendingJobs( string $batchId, string $jobId ) Log::warning("Attempted to decrement pending_jobs below 0 for batch: " . $batchId); } - Redis::set( "batch:$batchId", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); }, 100, 200 ); } @@ -204,7 +204,7 @@ public function decrementPendingJobs( string $batchId, string $jobId ) public function markAsFinished( string $batchId ) { return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { Log::debug( "Batch not found for markAsFinished: " . $batchId ); @@ -214,7 +214,7 @@ public function markAsFinished( string $batchId ) $batchData = json_decode( $data, true ); // Convert finished_at to a Unix timestamp before storing $batchData['finished_at'] = CarbonImmutable::now()->getTimestamp(); - Redis::set( "batch:$batchId", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) ); Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] ); }, 100, 200 ); @@ -223,13 +223,13 @@ public function markAsFinished( string $batchId ) public function delete( string $batchId ) { - if ( !Redis::exists( "batch:$batchId" ) ) { + if ( !Redis::connection(config('queue.batching.redis_connection', 'default'))->exists( "batch:$batchId" ) ) { // Handle the case where the batch does not exist return; } - Redis::del( "batch:$batchId" ); - Redis::lrem( 'batches_list', 0, $batchId ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->del( "batch:$batchId" ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId ); } protected function serialize( $value ) @@ -242,34 +242,34 @@ protected function unserialize( $serialized ) return unserialize( $serialized ); } - protected function toBatch( $data ): Batch + protected function toBatch( $batch ): Batch { return $this->factory->make( $this, - $data['id'], - $data['name'], - (int)$data['total_jobs'], - (int)$data['pending_jobs'], - (int)$data['failed_jobs'], - $data['failed_job_ids'], - $this->unserialize( $data['options'] ), - CarbonImmutable::createFromTimestamp( $data['created_at'] ), - isset( $data['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $data['cancelled_at'] ) : null, - isset( $data['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $data['finished_at'] ) : null + $batch['id'], + $batch['name'], + (int)$batch['total_jobs'], + (int)$batch['pending_jobs'], + (int)$batch['failed_jobs'], + $batch['failed_job_ids'], + $this->unserialize( $batch['options'] ), + CarbonImmutable::createFromTimestamp( $batch['created_at'] ), + isset( $batch['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $batch['cancelled_at'] ) : null, + isset( $batch['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batch['finished_at'] ) : null ); } public function cancel( string $batchId ) { $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { return; } $batchData = json_decode( $data, true ); // Convert cancelled_at to a Unix timestamp before storing $batchData['cancelled_at'] = CarbonImmutable::now()->getTimestamp(); - Redis::set( "batch:$batchId", json_encode( $batchData ) ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) ); }, 100, 200 ); // Retry 100 times with 200 milliseconds between retries } @@ -295,14 +295,14 @@ public function pruneCancelled( DateTimeInterface $before ) protected function pruneBatches( DateTimeInterface $before, $isFinished = null, $isCancelled = false ) { - $batchIds = Redis::lrange( 'batches_list', 0, -1 ); + $batchIds = Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange( 'batches_list', 0, -1 ); $totalDeleted = 0; foreach ( $batchIds as $batchId ) { - $data = Redis::get( "batch:$batchId" ); + $data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" ); if ( $data === false ) { - Redis::lrem( 'batches_list', 0, $batchId ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId ); continue; } @@ -323,8 +323,8 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null, } if ( $shouldBeDeleted ) { - Redis::del( "batch:$batchId" ); - Redis::lrem( 'batches_list', 0, $batchId ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->del( "batch:$batchId" ); + Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId ); $totalDeleted++; } }