Skip to content

Commit

Permalink
Improve serialization performance 10x for data stored in redis. Makes…
Browse files Browse the repository at this point in the history
… the jobs run even faster. And huge difference in listing batches in Horizon performance.
  • Loading branch information
cyppe committed Jan 25, 2024
1 parent 7c0837c commit 28f53f7
Showing 1 changed file with 61 additions and 62 deletions.
123 changes: 61 additions & 62 deletions src/Repositories/RedisBatchRepository.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
<?php

namespace Cyppe\LaravelBatchJobsRedisDriver\Repositories;
namespace App\Repositories;

use Carbon\CarbonImmutable;
use Closure;
use DateTimeInterface;
use Illuminate\Bus\Batch;
use Illuminate\Bus\BatchFactory;
use Illuminate\Bus\BatchRepository;
use Illuminate\Bus\DatabaseBatchRepository;
use Illuminate\Bus\PendingBatch;
use Illuminate\Bus\PrunableBatchRepository;
use Illuminate\Bus\UpdatedBatchJobCounts;
Expand All @@ -24,47 +23,55 @@ class RedisBatchRepository extends DatabaseBatchRepository implements BatchRepos
public function __construct( BatchFactory $factory )
{
$this->lockTimeout = 120;
$this->factory = $factory;
$this->factory = $factory;
}

public function get( $limit = 50, $before = null )
public function get( $limit = 50, $before = null ): array
{
if ( !Redis::exists( 'batches_list' ) ) {
// Handle the case where the batches_list does not exist
return [];
}

$allBatchIds = Redis::lrange( 'batches_list', 0, -1 );
$batches = [];
$totalBatches = Redis::llen( 'batches_list' );

foreach ( $allBatchIds as $batchId ) {
$data = Redis::get( "batch:$batchId" );
if ( $data !== false ) {
$batchData = unserialize( $data );
$batches[$batchId] = $batchData;
// Determine the starting index
$startIndex = 0;
if ( $before !== null ) {
// Fetch a range of batch IDs surrounding the 'before' value
// Adjust the range size as needed for efficiency
$rangeSize = 100;
$allBatchIds = Redis::lrange( 'batches_list', 0, $rangeSize - 1 );
$beforeIndex = array_search( $before, $allBatchIds );

if ( $beforeIndex !== false ) {
$startIndex = $beforeIndex + 1;
} else {
// TODO:
// Handle case where 'before' is not found in the initial range
// Consider fetching additional ranges or handling this scenario differently
}
}

// Sort batches by 'created_at' in descending order
uasort( $batches, function ( $a, $b ) {
return $b['created_at'] <=> $a['created_at'];
} );
// Calculate the range to fetch
$rangeEnd = min( $startIndex + $limit - 1, $totalBatches - 1 );

// If 'before' is specified, find the start position
$startIndex = 0;
if ( $before !== null ) {
$startIndex = array_search( $before, array_keys( $batches ) );
$startIndex = $startIndex === false ? 0 : $startIndex + 1;
}
// Fetch only the required batch IDs
$batchIds = Redis::lrange( 'batches_list', $startIndex, $rangeEnd );

// Slice the array to apply limit and offset
$batches = array_slice( $batches, $startIndex, $limit, true );
// Use Redis pipeline to bulk fetch batch data
$responses = Redis::pipeline( function ( $pipe ) use ( $batchIds ) {
foreach ( $batchIds as $batchId ) {
$pipe->get( "batch:$batchId" );
}
} );

return array_map( function ( $batchId ) {
return $this->find( $batchId );
}, array_keys( $batches ) );
}
// Filter, unserialize, and map to Batch objects
$batches = array_map( function ( $response ) {
return $response ? $this->toBatch( json_decode( $response, true ) ) : null;
}, $responses );

return array_filter( $batches );
}

public function find( string $batchId )
{
Expand All @@ -75,7 +82,7 @@ public function find( string $batchId )
return null;
}

$batchData = unserialize( $data );
$batchData = json_decode( $data, true );
return $this->toBatch( $batchData );
}

Expand All @@ -96,7 +103,7 @@ public function store( PendingBatch $batch )
'finished_at' => null,
];

Redis::set( "batch:$id", serialize( $batchData ) );
Redis::set( "batch:$id", json_encode( $batchData ) );
Redis::rpush( 'batches_list', $id ); // Add the batch ID to the list

return $this->find( $id );
Expand All @@ -110,15 +117,14 @@ public function incrementTotalJobs( string $batchId, int $amount )
Log::error( "Batch not found for incrementTotalJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
}
$batchData = unserialize( $data );
$batchData['total_jobs'] += $amount;
$batchData = json_decode( $data, true );
$batchData['total_jobs'] += $amount;
$batchData['pending_jobs'] += $amount;
Redis::set( "batch:$batchId", serialize( $batchData ) );
Redis::set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}, 100, 200 );
}


protected function acquireLock( string $key ): bool
{
$isAcquired = Redis::set( $key, true, 'EX', $this->lockTimeout, 'NX' );
Expand All @@ -132,10 +138,10 @@ protected function executeWithLock( string $lockKey, Closure $callback, $retryCo
if ( $this->acquireLock( $lockKey ) ) {
try {
if ( $attempts > 2 ) {
// Log::info( "Finally got lock. Attempt: " . $attempts );
//Log::info( "Finally got lock. Attempt: " . $attempts );
}
return $callback();
} catch ( \Exception $e ) {
} catch ( \Throwable $e ) {
Log::error( "Error in executeWithLock: " . $e->getMessage() );
throw $e;
} finally {
Expand Down Expand Up @@ -167,15 +173,14 @@ public function incrementFailedJobs( string $batchId, string $jobId )
Log::error( "Batch not found for incrementFailedJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
}
$batchData = unserialize( $data );
$batchData = json_decode( $data, true );
$batchData['failed_jobs']++;
$batchData['failed_job_ids'][] = $jobId;
Redis::set( "batch:$batchId", serialize( $batchData ) );
Redis::set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}, 100, 200 );
}


public function decrementPendingJobs( string $batchId, string $jobId )
{
return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) {
Expand All @@ -184,11 +189,11 @@ public function decrementPendingJobs( string $batchId, string $jobId )
Log::error( "Batch not found for decrementPendingJobs: " . $batchId );
return new UpdatedBatchJobCounts( 0, 0 );
}
$batchData = unserialize( $data );
$batchData = json_decode( $data, true );
$batchData['pending_jobs']--;
Redis::set( "batch:$batchId", serialize( $batchData ) );
Redis::set( "batch:$batchId", json_encode( $batchData ) );
return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] );
}, 100, 200 );
}, 100, 200 );
}


Expand All @@ -202,13 +207,13 @@ public function markAsFinished( string $batchId )
return;
}

$batchData = unserialize( $data );
$batchData = json_decode( $data, true );
// Convert finished_at to a Unix timestamp before storing
$batchData['finished_at'] = CarbonImmutable::now()->getTimestamp();
Redis::set( "batch:$batchId", serialize( $batchData ) );
Redis::set( "batch:$batchId", json_encode( $batchData ) );

//Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] );
}, 100, 200 );
Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] );
}, 100, 200 );
}


Expand Down Expand Up @@ -257,21 +262,18 @@ public function cancel( string $batchId )
if ( $data === false ) {
return;
}
$batchData = unserialize( $data );
$batchData = json_decode( $data, true );
// Convert cancelled_at to a Unix timestamp before storing
$batchData['cancelled_at'] = CarbonImmutable::now()->getTimestamp();
Redis::set( "batch:$batchId", serialize( $batchData ) );
}, 100, 200 ); // Retry 100 times with 200 milliseconds between retries
Redis::set( "batch:$batchId", json_encode( $batchData ) );
}, 100, 200 ); // Retry 100 times with 200 milliseconds between retries
}


public function transaction( Closure $callback )
{
return $callback();
}


// Work in progress
public function prune( DateTimeInterface $before )
{
return $this->pruneBatches( $before, true );
Expand All @@ -289,7 +291,7 @@ public function pruneCancelled( DateTimeInterface $before )

protected function pruneBatches( DateTimeInterface $before, $isFinished = null, $isCancelled = false )
{
$batchIds = Redis::lrange( 'batches_list', 0, -1 );
$batchIds = Redis::lrange( 'batches_list', 0, -1 );
$totalDeleted = 0;

foreach ( $batchIds as $batchId ) {
Expand All @@ -300,12 +302,12 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null,
continue;
}

$batchData = unserialize( $data );
$batchData = json_decode( $data, true );

$shouldBeDeleted = false;

$createdAt = CarbonImmutable::createFromTimestamp( $batchData['created_at'] );
$finishedAt = isset( $batchData['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['finished_at'] ) : null;
$createdAt = CarbonImmutable::createFromTimestamp( $batchData['created_at'] );
$finishedAt = isset( $batchData['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['finished_at'] ) : null;
$cancelledAt = isset( $batchData['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['cancelled_at'] ) : null;

if ( $isFinished === true && $finishedAt && $finishedAt < $before ) {
Expand All @@ -316,7 +318,6 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null,
$shouldBeDeleted = true;
}


if ( $shouldBeDeleted ) {
Redis::del( "batch:$batchId" );
Redis::lrem( 'batches_list', 0, $batchId );
Expand All @@ -326,6 +327,4 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null,

return $totalDeleted;
}


}
}

0 comments on commit 28f53f7

Please sign in to comment.