diff --git a/src/Repositories/RedisBatchRepository.php b/src/Repositories/RedisBatchRepository.php index ca994d8..a9d55f9 100644 --- a/src/Repositories/RedisBatchRepository.php +++ b/src/Repositories/RedisBatchRepository.php @@ -1,6 +1,6 @@ lockTimeout = 120; - $this->factory = $factory; + $this->factory = $factory; } - public function get( $limit = 50, $before = null ) + public function get( $limit = 50, $before = null ): array { if ( !Redis::exists( 'batches_list' ) ) { - // Handle the case where the batches_list does not exist return []; } - $allBatchIds = Redis::lrange( 'batches_list', 0, -1 ); - $batches = []; + $totalBatches = Redis::llen( 'batches_list' ); - foreach ( $allBatchIds as $batchId ) { - $data = Redis::get( "batch:$batchId" ); - if ( $data !== false ) { - $batchData = unserialize( $data ); - $batches[$batchId] = $batchData; + // Determine the starting index + $startIndex = 0; + if ( $before !== null ) { + // Fetch a range of batch IDs surrounding the 'before' value + // Adjust the range size as needed for efficiency + $rangeSize = 100; + $allBatchIds = Redis::lrange( 'batches_list', 0, $rangeSize - 1 ); + $beforeIndex = array_search( $before, $allBatchIds ); + + if ( $beforeIndex !== false ) { + $startIndex = $beforeIndex + 1; + } else { + // TODO: + // Handle case where 'before' is not found in the initial range + // Consider fetching additional ranges or handling this scenario differently } } - // Sort batches by 'created_at' in descending order - uasort( $batches, function ( $a, $b ) { - return $b['created_at'] <=> $a['created_at']; - } ); + // Calculate the range to fetch + $rangeEnd = min( $startIndex + $limit - 1, $totalBatches - 1 ); - // If 'before' is specified, find the start position - $startIndex = 0; - if ( $before !== null ) { - $startIndex = array_search( $before, array_keys( $batches ) ); - $startIndex = $startIndex === false ? 0 : $startIndex + 1; - } + // Fetch only the required batch IDs + $batchIds = Redis::lrange( 'batches_list', $startIndex, $rangeEnd ); - // Slice the array to apply limit and offset - $batches = array_slice( $batches, $startIndex, $limit, true ); + // Use Redis pipeline to bulk fetch batch data + $responses = Redis::pipeline( function ( $pipe ) use ( $batchIds ) { + foreach ( $batchIds as $batchId ) { + $pipe->get( "batch:$batchId" ); + } + } ); - return array_map( function ( $batchId ) { - return $this->find( $batchId ); - }, array_keys( $batches ) ); - } + // Filter, unserialize, and map to Batch objects + $batches = array_map( function ( $response ) { + return $response ? $this->toBatch( json_decode( $response, true ) ) : null; + }, $responses ); + return array_filter( $batches ); + } public function find( string $batchId ) { @@ -75,7 +82,7 @@ public function find( string $batchId ) return null; } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); return $this->toBatch( $batchData ); } @@ -96,7 +103,7 @@ public function store( PendingBatch $batch ) 'finished_at' => null, ]; - Redis::set( "batch:$id", serialize( $batchData ) ); + Redis::set( "batch:$id", json_encode( $batchData ) ); Redis::rpush( 'batches_list', $id ); // Add the batch ID to the list return $this->find( $id ); @@ -110,15 +117,14 @@ public function incrementTotalJobs( string $batchId, int $amount ) Log::error( "Batch not found for incrementTotalJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); } - $batchData = unserialize( $data ); - $batchData['total_jobs'] += $amount; + $batchData = json_decode( $data, true ); + $batchData['total_jobs'] += $amount; $batchData['pending_jobs'] += $amount; - Redis::set( "batch:$batchId", serialize( $batchData ) ); + Redis::set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); - }, 100, 200 ); + }, 100, 200 ); } - protected function acquireLock( string $key ): bool { $isAcquired = Redis::set( $key, true, 'EX', $this->lockTimeout, 'NX' ); @@ -132,10 +138,10 @@ protected function executeWithLock( string $lockKey, Closure $callback, $retryCo if ( $this->acquireLock( $lockKey ) ) { try { if ( $attempts > 2 ) { - // Log::info( "Finally got lock. Attempt: " . $attempts ); + //Log::info( "Finally got lock. Attempt: " . $attempts ); } return $callback(); - } catch ( \Exception $e ) { + } catch ( \Throwable $e ) { Log::error( "Error in executeWithLock: " . $e->getMessage() ); throw $e; } finally { @@ -167,15 +173,14 @@ public function incrementFailedJobs( string $batchId, string $jobId ) Log::error( "Batch not found for incrementFailedJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); $batchData['failed_jobs']++; $batchData['failed_job_ids'][] = $jobId; - Redis::set( "batch:$batchId", serialize( $batchData ) ); + Redis::set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); - }, 100, 200 ); + }, 100, 200 ); } - public function decrementPendingJobs( string $batchId, string $jobId ) { return $this->executeWithLock( "lock:batch:$batchId", function () use ( $batchId, $jobId ) { @@ -184,11 +189,11 @@ public function decrementPendingJobs( string $batchId, string $jobId ) Log::error( "Batch not found for decrementPendingJobs: " . $batchId ); return new UpdatedBatchJobCounts( 0, 0 ); } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); $batchData['pending_jobs']--; - Redis::set( "batch:$batchId", serialize( $batchData ) ); + Redis::set( "batch:$batchId", json_encode( $batchData ) ); return new UpdatedBatchJobCounts( $batchData['pending_jobs'], $batchData['failed_jobs'] ); - }, 100, 200 ); + }, 100, 200 ); } @@ -202,13 +207,13 @@ public function markAsFinished( string $batchId ) return; } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); // Convert finished_at to a Unix timestamp before storing $batchData['finished_at'] = CarbonImmutable::now()->getTimestamp(); - Redis::set( "batch:$batchId", serialize( $batchData ) ); + Redis::set( "batch:$batchId", json_encode( $batchData ) ); - //Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] ); - }, 100, 200 ); + Log::debug( "Batch marked as finished: " . $batchId . " with finished_at: " . $batchData['finished_at'] ); + }, 100, 200 ); } @@ -257,21 +262,18 @@ public function cancel( string $batchId ) if ( $data === false ) { return; } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); // Convert cancelled_at to a Unix timestamp before storing $batchData['cancelled_at'] = CarbonImmutable::now()->getTimestamp(); - Redis::set( "batch:$batchId", serialize( $batchData ) ); - }, 100, 200 ); // Retry 100 times with 200 milliseconds between retries + Redis::set( "batch:$batchId", json_encode( $batchData ) ); + }, 100, 200 ); // Retry 100 times with 200 milliseconds between retries } - public function transaction( Closure $callback ) { return $callback(); } - - // Work in progress public function prune( DateTimeInterface $before ) { return $this->pruneBatches( $before, true ); @@ -289,7 +291,7 @@ public function pruneCancelled( DateTimeInterface $before ) protected function pruneBatches( DateTimeInterface $before, $isFinished = null, $isCancelled = false ) { - $batchIds = Redis::lrange( 'batches_list', 0, -1 ); + $batchIds = Redis::lrange( 'batches_list', 0, -1 ); $totalDeleted = 0; foreach ( $batchIds as $batchId ) { @@ -300,12 +302,12 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null, continue; } - $batchData = unserialize( $data ); + $batchData = json_decode( $data, true ); $shouldBeDeleted = false; - $createdAt = CarbonImmutable::createFromTimestamp( $batchData['created_at'] ); - $finishedAt = isset( $batchData['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['finished_at'] ) : null; + $createdAt = CarbonImmutable::createFromTimestamp( $batchData['created_at'] ); + $finishedAt = isset( $batchData['finished_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['finished_at'] ) : null; $cancelledAt = isset( $batchData['cancelled_at'] ) ? CarbonImmutable::createFromTimestamp( $batchData['cancelled_at'] ) : null; if ( $isFinished === true && $finishedAt && $finishedAt < $before ) { @@ -316,7 +318,6 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null, $shouldBeDeleted = true; } - if ( $shouldBeDeleted ) { Redis::del( "batch:$batchId" ); Redis::lrem( 'batches_list', 0, $batchId ); @@ -326,6 +327,4 @@ protected function pruneBatches( DateTimeInterface $before, $isFinished = null, return $totalDeleted; } - - -} \ No newline at end of file +}