Skip to content

Commit

Permalink
Add support for optional configuration option redis_connection to def…
Browse files Browse the repository at this point in the history
…ine what redis connection to use to store batch related data in. Defaults to 'default' if not set.
  • Loading branch information
cyppe committed Feb 11, 2024
1 parent d7d36b0 commit 6db8d35
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 39 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Set the `database` key under the `batching` section to `'redis'`. Without this a
~~~php
'batching' => [
'database' => 'redis', // Change this from 'mysql' to 'redis'
'redis_connection' => 'default', // here you can define what redis connection to store batch related data in. Defaults to 'default' if not set.
'table' => 'job_batches',
],
~~~
Expand Down
78 changes: 39 additions & 39 deletions src/Repositories/RedisBatchRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ public function __construct( BatchFactory $factory )

public function get($limit = 50, $before = null): array
{
if (!Redis::exists('batches_list')) {
if (!Redis::connection(config('queue.batching.redis_connection', 'default'))->exists('batches_list')) {
return [];
}

$totalBatches = Redis::llen('batches_list');
$totalBatches = Redis::connection(config('queue.batching.redis_connection', 'default'))->llen('batches_list');

// If $before is specified, find its index
$beforeIndex = $before ? array_search($before, Redis::lrange('batches_list', 0, -1)) : null;
$beforeIndex = $before ? array_search($before, Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange('batches_list', 0, -1)) : null;

// Determine the range to fetch
if ($beforeIndex !== false && $beforeIndex !== null) {
Expand All @@ -48,10 +48,10 @@ public function get($limit = 50, $before = null): array
}

// Fetch only the required batch IDs
$batchIds = Redis::lrange('batches_list', $startIndex, $rangeEnd);
$batchIds = Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange('batches_list', $startIndex, $rangeEnd);

// Use Redis pipeline to bulk fetch batch data
$responses = Redis::pipeline(function ($pipe) use ($batchIds) {
$responses = Redis::connection(config('queue.batching.redis_connection', 'default'))->pipeline(function ($pipe) use ($batchIds) {
foreach ($batchIds as $batchId) {
$pipe->get("batch:$batchId");
}
Expand All @@ -71,7 +71,7 @@ public function get($limit = 50, $before = null): array

public function find( string $batchId )
{
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );

if ( $data === false ) {
// Return null or handle the case where the batch does not exist
Expand Down Expand Up @@ -99,31 +99,31 @@ public function store( PendingBatch $batch )
'finished_at' => null,
];

Redis::set( "batch:$id", json_encode( $batchData ) );
Redis::rpush( 'batches_list', $id ); // Add the batch ID to the list
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$id", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->rpush( 'batches_list', $id ); // Add the batch ID to the list

return $this->find( $id );
}

public function incrementTotalJobs( string $batchId, int $amount )
{
return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $amount ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );
if ( $data === false ) {
Log::error( "Batch not found for incrementTotalJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
}
$batchData = json_decode( $data, true );
$batchData['total_jobs'] += $amount;
$batchData['pending_jobs'] += $amount;
Redis::set( "batch:$batchId", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}

protected function acquireLock( string $key ): bool
{
$isAcquired = Redis::set( $key, true, 'EX', $this->lockTimeout, 'NX' );
$isAcquired = Redis::connection(config('queue.batching.redis_connection', 'default'))->set( $key, true, 'EX', $this->lockTimeout, 'NX' );
return (bool)$isAcquired;
}

Expand Down Expand Up @@ -158,29 +158,29 @@ protected function executeWithLock( string $lockKey, Closure $callback, $retryCo

protected function releaseLock( string $key )
{
Redis::del( $key );
Redis::connection(config('queue.batching.redis_connection', 'default'))->del( $key );
}

public function incrementFailedJobs( string $batchId, string $jobId )
{
return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );
if ( $data === false ) {
Log::error( "Batch not found for incrementFailedJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
}
$batchData = json_decode( $data, true );
$batchData['failed_jobs']++;
$batchData['failed_job_ids'][] = $jobId;
Redis::set( "batch:$batchId", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}

public function decrementPendingJobs( string $batchId, string $jobId )
{
return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );
if ( $data === false ) {
Log::error( "Batch not found for decrementPendingJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
Expand All @@ -195,7 +195,7 @@ public function decrementPendingJobs( string $batchId, string $jobId )
Log::warning("Attempted to decrement pending_jobs below 0 for batch: " . $batchId);
}

Redis::set( "batch:$batchId", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}
Expand All @@ -204,7 +204,7 @@ public function decrementPendingJobs( string $batchId, string $jobId )
public function markAsFinished( string $batchId )
{
return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );

if ( $data === false ) {
Log::debug( "Batch not found for markAsFinished: " . $batchId );
Expand All @@ -214,7 +214,7 @@ public function markAsFinished( string $batchId )
$batchData = json_decode( $data, true );
// Convert finished_at to a Unix timestamp before storing
$batchData['finished_at'] = CarbonImmutable::now()->getTimestamp();
Redis::set( "batch:$batchId", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) );

Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] );
}, 100, 200 );
Expand All @@ -223,13 +223,13 @@ public function markAsFinished( string $batchId )

public function delete( string $batchId )
{
if ( !Redis::exists( "batch:$batchId" ) ) {
if ( !Redis::connection(config('queue.batching.redis_connection', 'default'))->exists( "batch:$batchId" ) ) {
// Handle the case where the batch does not exist
return;
}

Redis::del( "batch:$batchId" );
Redis::lrem( 'batches_list', 0, $batchId );
Redis::connection(config('queue.batching.redis_connection', 'default'))->del( "batch:$batchId" );
Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId );
}

protected function serialize( $value )
Expand All @@ -242,34 +242,34 @@ protected function unserialize( $serialized )
return unserialize( $serialized );
}

protected function toBatch( $data ): Batch
protected function toBatch( $batch ): Batch
{
return $this->factory->make(
$this,
$data['id'],
$data['name'],
(int)$data['total_jobs'],
(int)$data['pending_jobs'],
(int)$data['failed_jobs'],
$data['failed_job_ids'],
$this->unserialize( $data['options'] ),
CarbonImmutable::createFromTimestamp( $data['created_at'] ),
isset( $data['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $data['cancelled_at'] ) : null,
isset( $data['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $data['finished_at'] ) : null
$batch['id'],
$batch['name'],
(int)$batch['total_jobs'],
(int)$batch['pending_jobs'],
(int)$batch['failed_jobs'],
$batch['failed_job_ids'],
$this->unserialize( $batch['options'] ),
CarbonImmutable::createFromTimestamp( $batch['created_at'] ),
isset( $batch['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $batch['cancelled_at'] ) : null,
isset( $batch['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batch['finished_at'] ) : null
);
}

public function cancel( string $batchId )
{
$this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );
if ( $data === false ) {
return;
}
$batchData = json_decode( $data, true );
// Convert cancelled_at to a Unix timestamp before storing
$batchData['cancelled_at'] = CarbonImmutable::now()->getTimestamp();
Redis::set( "batch:$batchId", json_encode( $batchData ) );
Redis::connection(config('queue.batching.redis_connection', 'default'))->set( "batch:$batchId", json_encode( $batchData ) );
}, 100, 200 ); // Retry 100 times with 200 milliseconds between retries
}

Expand All @@ -295,14 +295,14 @@ public function pruneCancelled( DateTimeInterface $before )

protected function pruneBatches( DateTimeInterface $before, $isFinished = null, $isCancelled = false )
{
$batchIds = Redis::lrange( 'batches_list', 0, -1 );
$batchIds = Redis::connection(config('queue.batching.redis_connection', 'default'))->lrange( 'batches_list', 0, -1 );
$totalDeleted = 0;

foreach ( $batchIds as $batchId ) {
$data = Redis::get( "batch:$batchId" );
$data = Redis::connection(config('queue.batching.redis_connection', 'default'))->get( "batch:$batchId" );

if ( $data === false ) {
Redis::lrem( 'batches_list', 0, $batchId );
Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId );
continue;
}

Expand All @@ -323,8 +323,8 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null,
}

if ( $shouldBeDeleted ) {
Redis::del( "batch:$batchId" );
Redis::lrem( 'batches_list', 0, $batchId );
Redis::connection(config('queue.batching.redis_connection', 'default'))->del( "batch:$batchId" );
Redis::connection(config('queue.batching.redis_connection', 'default'))->lrem( 'batches_list', 0, $batchId );
$totalDeleted++;
}
}
Expand Down

0 comments on commit 6db8d35

Please sign in to comment.