Skip to content

Commit

Permalink
Merge pull request #7 from TimoKoerber/feature/6-support-queue-name
Browse files Browse the repository at this point in the history
Queue for the dispatched jobs can be changed
  • Loading branch information
TimoKoerber authored Mar 24, 2023
2 parents c1948dc + 5e8e83c commit 69d339c
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 10 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ php artisan operations:make <operation_name> // create operation file
php artisan operations:process // process operation files
php artisan operations:process --sync // force syncronously execution
php artisan operations:process --async // force asyncronously execution
php artisan operations:process --queue=<name> // force queue, that the job will be dispatched to
php artisan operations:process --test // dont flag operations as processed
php artisan operations:process <operation_name> // re-run one specific operation
```
Expand Down Expand Up @@ -130,6 +131,11 @@ return new class extends OneTimeOperation
* Determine if the operation is being processed asyncronously.
*/
protected bool $async = true;

/**
* The queue that the job will be dispatched to.
*/
protected string $queue = 'default';

/**
* Process the operation.
Expand All @@ -155,10 +161,13 @@ public function process(): void
```

By default, the operation is being processed ***asyncronously*** (based on your configuration) by dispatching the job `OneTimeOperationProcessJob`.
By default, the operation is being dispatched to the `default` queue of your project. Change the `$queue` as you wish.

You can also execute the code syncronously by setting the `$async` flag to `false`.
_(this is only recommended for small operations, since the processing of these operations should be part of the deployment process)_

**Hint:** If you use syncronous processing, the `$queue` attribute will be ignored (duh!).

### Processing the operations

![One-Time Operations for Laravel - Processing the operations](https://user-images.githubusercontent.com/65356688/224434129-43082402-6077-4043-8e97-c44786e60a59.png)
Expand Down Expand Up @@ -192,6 +201,14 @@ php artisan operations:process --sync // force dispatchSync()
**Hint!** If `operation:process` is part of your deployment process, it is **not recommended** to process the operations syncronously,
since an error in your operation could make your whole deployment fail.

### Force different queue for all operations

You can provide the `--queue` option in the artisan call. The given queue will be used for all operations, ignoring the `$queue` attribute in the class.

```shell
php artisan operations:process --queue=redis // force redis queue
```

### Re-run an operation

![One-Time Operations for Laravel - Re-run an operation manually](https://user-images.githubusercontent.com/65356688/224440344-3d095730-12c3-4a2c-b4c3-42a8b6d60767.png)
Expand Down
31 changes: 23 additions & 8 deletions src/Commands/OneTimeOperationsProcessCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@ class OneTimeOperationsProcessCommand extends OneTimeOperationsCommand
{name? : Name of specific operation}
{--test : Process operation without tagging it as processed, so you can call it again}
{--async : Ignore setting in operation and process all operations asynchronously}
{--sync : Ignore setting in operation and process all operations synchronously}';
{--sync : Ignore setting in operation and process all operations synchronously}
{--queue= : Set the queue, that all jobs will be dispatched to}';

protected $description = 'Process all unprocessed one-time operations';

protected bool $forceAsync = false;

protected bool $forceSync = false;

protected ?string $queue = null;

public function handle(): int
{
$this->displayTestmodeWarning();

$this->forceAsync = (bool) $this->option('async');
$this->forceSync = (bool) $this->option('sync');
$this->queue = $this->option('queue');

if ($this->forceAsync && $this->forceSync) {
$this->components->error('Abort! Process either with --sync or --async.');
Expand Down Expand Up @@ -63,7 +67,7 @@ protected function proccessSingleOperation(string $providedOperationName): int
protected function processOperationFile(OneTimeOperationFile $operationFile): int
{
$this->components->task($operationFile->getOperationName(), function () use ($operationFile) {
$this->processOperation($operationFile);
$this->dispatchOperationJob($operationFile);
$this->storeOperation($operationFile);
});

Expand All @@ -86,7 +90,7 @@ protected function processOperationModel(Operation $operationModel): int
$this->components->task($operationModel->name, function () use ($operationModel) {
$operationFile = OneTimeOperationManager::getOperationFileByModel($operationModel);

$this->processOperation($operationFile);
$this->dispatchOperationJob($operationFile);
$this->storeOperation($operationFile);
});

Expand All @@ -110,7 +114,7 @@ protected function processNextOperations(): int

foreach ($unprocessedOperationFiles as $operationFile) {
$this->components->task($operationFile->getOperationName(), function () use ($operationFile) {
$this->processOperation($operationFile);
$this->dispatchOperationJob($operationFile);
$this->storeOperation($operationFile);
});
}
Expand All @@ -130,13 +134,15 @@ protected function storeOperation(OneTimeOperationFile $operationFile): void
Operation::storeOperation($operationFile->getOperationName(), $this->isAsyncMode($operationFile));
}

protected function processOperation(OneTimeOperationFile $operationFile)
protected function dispatchOperationJob(OneTimeOperationFile $operationFile)
{
if ($this->isAsyncMode($operationFile)) {
OneTimeOperationProcessJob::dispatch($operationFile->getOperationName());
} else {
OneTimeOperationProcessJob::dispatchSync($operationFile->getOperationName());
OneTimeOperationProcessJob::dispatch($operationFile->getOperationName())->onQueue($this->getQueue($operationFile));

return;
}

OneTimeOperationProcessJob::dispatchSync($operationFile->getOperationName());
}

protected function testModeEnabled(): bool
Expand All @@ -163,4 +169,13 @@ protected function isAsyncMode(OneTimeOperationFile $operationFile): bool

return $operationFile->getClassObject()->isAsync();
}

protected function getQueue(OneTimeOperationFile $operationFile): ?string
{
if ($this->queue) {
return $this->queue;
}

return $operationFile->getClassObject()->getQueue() ?: null;
}
}
10 changes: 10 additions & 0 deletions src/OneTimeOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ abstract class OneTimeOperation
*/
protected bool $async = true;

/**
* The queue that the job will be dispatched to.
*/
protected string $queue = 'default';

/**
* Process the operation.
*/
Expand All @@ -18,4 +23,9 @@ public function isAsync(): bool
{
return $this->async;
}

public function getQueue(): string
{
return $this->queue;
}
}
5 changes: 5 additions & 0 deletions stubs/one-time-operation.stub
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ return new class extends OneTimeOperation
*/
protected bool $async = true;

/**
* The queue that the job will be dispatched to.
*/
protected string $queue = 'default';

/**
* Process the operation.
*/
Expand Down
56 changes: 54 additions & 2 deletions tests/Feature/OneTimeOperationCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ public function test_sync_processing_with_file_attribute()

// Job was executed synchronously
Queue::assertPushed(OneTimeOperationProcessJob::class, function (OneTimeOperationProcessJob $job) {
return $job->operationName === '2015_10_21_072800_foo_bar_operation' && $job->connection === 'sync'; // sync
return $job->operationName === '2015_10_21_072800_foo_bar_operation'
&& $job->connection === 'sync' // sync
&& $job->queue === null; // no queue
});

$operation = Operation::first();
Expand All @@ -151,12 +153,62 @@ public function test_sync_processing_with_file_attribute()

// Job was executed asynchronously
Queue::assertPushed(OneTimeOperationProcessJob::class, function (OneTimeOperationProcessJob $job) {
return $job->operationName === '2015_10_21_072800_foo_bar_operation' && $job->connection === null; // async
return $job->operationName === '2015_10_21_072800_foo_bar_operation'
&& $job->connection === null // async
&& $job->queue === 'default'; // default queue
});

$operation = Operation::all()->last();
$this->assertEquals('2015_10_21_072800_foo_bar_operation', $operation->name);
$this->assertEquals('async', $operation->dispatched);

// process again - now on queue "foobar"
$this->artisan('operations:process 2015_10_21_072800_foo_bar_operation --async --queue=foobar')
->expectsConfirmation('Operation was processed before. Process it again?', 'yes')
->assertSuccessful();

// Job was executed asynchronously on queue "foobar"
Queue::assertPushed(OneTimeOperationProcessJob::class, function (OneTimeOperationProcessJob $job) {
return $job->operationName === '2015_10_21_072800_foo_bar_operation'
&& $job->connection === null // async
&& $job->queue === 'foobar'; // default queue
});
}

public function test_processing_with_queue()
{
$filepath = $this->filepath('2015_10_21_072800_foo_bar_operation.php');
Queue::assertNothingPushed();

// create file
$this->artisan('operations:make FooBarOperation')->assertSuccessful();

// edit file so it will use different queue
$fileContent = File::get($filepath);
$newContent = Str::replaceFirst('$queue = \'default\';', '$queue = \'narfpuit\';', $fileContent);
File::put($filepath, $newContent);

// process
$this->artisan('operations:process')->assertSuccessful();

// Job was executed synchronously
Queue::assertPushed(OneTimeOperationProcessJob::class, function (OneTimeOperationProcessJob $job) {
return $job->operationName === '2015_10_21_072800_foo_bar_operation'
&& $job->connection === null // async
&& $job->queue === 'narfpuit'; // queue narfpuit
});

// process again - overwrite queue with "foobar"
$this->artisan('operations:process 2015_10_21_072800_foo_bar_operation --queue=foobar')
->expectsConfirmation('Operation was processed before. Process it again?', 'yes')
->assertSuccessful();

// Job was executed asynchronously on queue "foobar"
Queue::assertPushed(OneTimeOperationProcessJob::class, function (OneTimeOperationProcessJob $job) {
return $job->operationName === '2015_10_21_072800_foo_bar_operation'
&& $job->connection === null // async
&& $job->queue === 'foobar'; // queue foobar
});
}

public function test_processing_with_test_flag()
Expand Down

0 comments on commit 69d339c

Please sign in to comment.