diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml new file mode 100644 index 0000000..9137688 --- /dev/null +++ b/.github/workflows/php.yml @@ -0,0 +1,33 @@ +#.github/workflows/php.yml +name: Tests + +on: + push: ~ + pull_request: ~ + +jobs: + build: + runs-on: ${{ matrix.operating-system }} + name: PHP ${{ matrix.php }} and Symfony ${{ matrix.symfony }} + strategy: + matrix: + operating-system: [ ubuntu-latest, macos-latest ] + php: [ '8.3' ] + symfony: [ '6.4.*', '7.0.*' ] + + steps: + - uses: actions/checkout@v4 + + - name: Setup PHP ${{ matrix.php }} + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + tools: flex + + - name: Download dependencies (Symfony ${{ matrix.symfony }}) + env: + SYMFONY_REQUIRE: ${{ matrix.symfony }} + uses: ramsey/composer-install@v2 + + - name: Tests on ${{ matrix.operating-system }} + run: ./vendor/bin/phpunit \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..31431ba --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +vendor/ +tests/var/ +.phpunit.result.cache +composer.lock +tests/app/var/data.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..302f2eb --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ + +# JobBundle + +JobBundle provides a way to handle long-running tasks in symfony. + +## Resources + +- [Documentation](./docs/index.md) +- [Report issues](https://github.com/SoureCode/JobBundle/issues) +- [Send Pull Requests](https://github.com/SoureCode/JobBundle/pulls) diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..f538004 --- /dev/null +++ b/composer.json @@ -0,0 +1,40 @@ +{ + "name": "sourecode/job-bundle", + "type": "symfony-bundle", + "license": "MIT", + "autoload": { + "psr-4": { + "SoureCode\\Bundle\\Job\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "SoureCode\\Bundle\\Job\\Tests\\": "tests/" + } + }, + "authors": [ + { + "name": "chapterjason", + "email": "jason@sourecode.dev" + } + ], + "require": { + "php": ">=8.3", + "symfony/http-kernel": "^6.4 || ^7.0", + "symfony/console": "^6.4 || ^7.0", + "symfony/process": "^6.4 || ^7.0", + "symfony/config": "^6.4 || ^7.0", + "symfony/dependency-injection": "^6.4 || ^7.0", + "symfony/finder": "^6.4 || ^7.0", + "doctrine/orm": "^2.16 || ^3.0", + "doctrine/doctrine-bundle": "^2.10", + "symfony/messenger": "^6.4 || ^7.0", + "symfony/serializer": "^6.4 || ^7.0", + "symfony/property-access": "^6.4 || ^7.0" + }, + "require-dev": { + "nyholm/symfony-bundle-test": "^2.0", + "phpunit/phpunit": "^9.5", + "symfony/phpunit-bridge": "^6.4 || ^7.0" + } +} diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..a687e37 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,64 @@ + +# JobBundle + +## Requirements + +- PHP 8.2 or higher +- Symfony 6.3 or higher + +## Commands + +- [`job:run`](./job-run.md) - Runs a job. + +## Examples + +```php +use SoureCode\Bundle\Job\Manager\JobManager; + +$jobManager = $container->get(JobManager::class); + +// run a job bound to given entity +$jobManager->dispatch($entity, new MakeJob()); + +// Run a job bound to a string key +$jobManager->dispatch("test", new DoJob()); +``` + +## Installation + +Make sure Composer is installed globally, as explained in the +[installation chapter](https://getcomposer.org/doc/00-intro.md) +of the Composer documentation. + +### Applications that use Symfony Flex + +Open a command console, enter your project directory and execute: + +```console +composer require sourecode/job-bundle +``` + +### Applications that don't use Symfony Flex + +#### Step 1: Download the Bundle + +Open a command console, enter your project directory and execute the +following command to download the latest stable version of this bundle: + +```console +composer require sourecode/job-bundle +``` + +#### Step 2: Enable the Bundle + +Then, enable the bundle by adding it to the list of registered bundles +in the `config/bundles.php` file of your project: + +```php +// config/bundles.php + +return [ + // ... + \SoureCode\Bundle\Job\SoureCodeJobBundle::class => ['all' => true], +]; +``` \ No newline at end of file diff --git a/docs/job-run.md b/docs/job-run.md new file mode 100644 index 0000000..f865f69 --- /dev/null +++ b/docs/job-run.md @@ -0,0 +1,28 @@ + +# Command: job:run + +## Usage + +```shell +Description: + Run a job + +Usage: + job:run + +Arguments: + id The job id + +Options: + -h, --help Display help for the given command. When no command is given display help for the list command + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi|--no-ansi Force (or disable --no-ansi) ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The Environment name. [default: "dev"] + --no-debug Switch off debug mode. + --profile Enables profiling (requires debug). + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug +``` + + diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..548c30e --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + + ./ + + + vendor + tests + + + + + ./tests + + + diff --git a/scripts/generate-docs.sh b/scripts/generate-docs.sh new file mode 100755 index 0000000..2c5def7 --- /dev/null +++ b/scripts/generate-docs.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +CURRENT_DIRECTORY="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIRECTORY="$(dirname "$CURRENT_DIRECTORY")" +DOCS_DIRECTORY="$PROJECT_DIRECTORY/docs" +TESTS_DIRECTORY="$PROJECT_DIRECTORY/tests" +BIN_DIRECTORY="$TESTS_DIRECTORY/app/bin" +CONSOLE="$BIN_DIRECTORY/console" + +COMMANDS="job:run" + +NEWLINE=$'\n' + +for COMMAND in $COMMANDS; do + MD_FILE_NAME="$(echo "$COMMAND" | tr ':' '-')" + MARKDOWN="${NEWLINE}# Command: ${COMMAND}${NEWLINE}${NEWLINE}## Usage${NEWLINE}${NEWLINE}\`\`\`shell${NEWLINE}" + USAGE="$("$CONSOLE" "$COMMAND" --help -vvv)" + MARKDOWN="${MARKDOWN}${USAGE}${NEWLINE}\`\`\`${NEWLINE}${NEWLINE}" + + echo "$MARKDOWN" >"$DOCS_DIRECTORY/$MD_FILE_NAME.md" +done diff --git a/src/Attribute/AsJobHandler.php b/src/Attribute/AsJobHandler.php new file mode 100644 index 0000000..d5b851d --- /dev/null +++ b/src/Attribute/AsJobHandler.php @@ -0,0 +1,16 @@ +addArgument('id', InputArgument::REQUIRED, 'The job id'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $id = (int)$input->getArgument('id'); + $job = $this->jobManager->get($id); + + return $this->jobRunner->run($job); + } + + public function getSubscribedSignals(): array + { + return [ + 2, // SIGINT + 15, // SIGTERM + ]; + } + + public function handleSignal(int $signal, int|false $previousExitCode = 0): false|int + { + return $this->jobRunner->signal($signal); + } +} diff --git a/src/DependencyInjection/JobCompilerPass.php b/src/DependencyInjection/JobCompilerPass.php new file mode 100644 index 0000000..b9f25f5 --- /dev/null +++ b/src/DependencyInjection/JobCompilerPass.php @@ -0,0 +1,113 @@ +findTaggedServiceIds('soure_code.job_handler', true); + + $mapping = []; + + foreach ($jobHandlers as $serviceId => $tags) { + $className = $this->getServiceClass($container, $serviceId); + $classReflection = $this->getClassReflection($container, $className, $serviceId); + + if (!$classReflection->implementsInterface(JobHandlerInterface::class)) { + throw new RuntimeException(sprintf('Invalid handler service "%s": class "%s" does not implement "%s".', $serviceId, $className, JobHandlerInterface::class)); + } + + $payloadClassName = $this->getPayloadClassName($serviceId, $tags); + + if (array_key_exists($payloadClassName, $mapping)) { + throw new RuntimeException(sprintf('Handler "%s" is already registered for message "%s".', $mapping[$className], $className)); + } + + $mapping[$payloadClassName] = $className; + } + + $references = []; + + foreach ($mapping as $jobClass => $handlerId) { + $serviceId = $this->createDescriptorService($container, $jobClass, $handlerId); + $references[] = new Reference($serviceId); + } + + $definition = $container->getDefinition('soure_code.job.executor'); + + $definition->setArgument(0, new IteratorArgument($references)); + } + + private function getServiceClass(ContainerBuilder $container, string $serviceId): string + { + while (true) { + $definition = $container->findDefinition($serviceId); + + if (!$definition->getClass() && $definition instanceof ChildDefinition) { + $serviceId = $definition->getParent(); + + continue; + } + + return $definition->getClass(); + } + } + + private function createDescriptorService( + ContainerBuilder $containerBuilder, + string $jobClass, + string $handlerId, + ): string + { + $definition = new Definition(HandlerDescriptor::class); + $definition + ->addArgument(new Reference($handlerId)) + ->addArgument($jobClass); + + $definitionId = 'soure_code.job.descriptor.' . ContainerBuilder::hash($jobClass); + + $containerBuilder->setDefinition($definitionId, $definition); + + return $definitionId; + } + + private function getPayloadClassName(string $serviceId, array $tags): string + { + $handle = null; + + foreach ($tags as $tag) { + if (array_key_exists('handle', $tag)) { + if ($handle !== null) { + throw new RuntimeException(sprintf('Invalid handler service "%s": multiple handles are not allowed.', $serviceId)); + } + + $handle = $tag['handle']; + } + } + + return $handle; + } + + private function getClassReflection(ContainerBuilder $container, string $className, int|string $serviceId): ReflectionClass + { + $classReflection = $container->getReflectionClass($className); + + if (null === $classReflection) { + throw new RuntimeException(sprintf('Invalid handler service "%s": class "%s" does not exist.', $serviceId, $className)); + } + + return $classReflection; + } +} \ No newline at end of file diff --git a/src/Entity/Job.php b/src/Entity/Job.php new file mode 100644 index 0000000..276d5cd --- /dev/null +++ b/src/Entity/Job.php @@ -0,0 +1,188 @@ +id; + } + + public function getCreatedAt(): \DateTimeImmutable + { + return $this->createdAt; + } + + public function setCreatedAt(\DateTimeImmutable $createdAt): Job + { + $this->createdAt = $createdAt; + + return $this; + } + + public function getStartedAt(): ?\DateTimeImmutable + { + return $this->startedAt; + } + + public function setStartedAt(?\DateTimeImmutable $startedAt): Job + { + $this->startedAt = $startedAt; + + return $this; + } + + public function getFinishedAt(): ?\DateTimeImmutable + { + return $this->finishedAt; + } + + public function setFinishedAt(?\DateTimeImmutable $finishedAt): Job + { + $this->finishedAt = $finishedAt; + + return $this; + } + + public function getCancelledAt(): ?\DateTimeImmutable + { + return $this->cancelledAt; + } + + public function setCancelledAt(?\DateTimeImmutable $cancelledAt): Job + { + $this->cancelledAt = $cancelledAt; + + return $this; + } + + public function getFailedAt(): ?\DateTimeImmutable + { + return $this->failedAt; + } + + public function setFailedAt(?\DateTimeImmutable $failedAt): Job + { + $this->failedAt = $failedAt; + + return $this; + } + + public function getEncodedPayload(): EncodedPayload + { + return EncodedPayload::fromString($this->payload); + } + + public function setEncodedPayload(EncodedPayload $payload): Job + { + $this->payload = $payload->toString(); + + return $this; + } + + public function getResult(): ?string + { + return $this->result; + } + + public function setResult(?string $result): Job + { + $this->result = $result; + + return $this; + } + + public function getError(): ?string + { + return $this->error; + } + + public function setError(?string $error): Job + { + $this->error = $error; + return $this; + } + + public function isPending(): bool + { + return null === $this->startedAt && null === $this->finishedAt && null === $this->cancelledAt && null === $this->failedAt; + } + + public function isRunning(): bool + { + return null !== $this->startedAt && null === $this->finishedAt && null === $this->cancelledAt && null === $this->failedAt; + } + + public function isFinished(): bool + { + return null !== $this->startedAt && null !== $this->finishedAt && null === $this->cancelledAt && null === $this->failedAt; + } + + public function isCancelled(): bool + { + return null !== $this->startedAt && null !== $this->finishedAt && null !== $this->cancelledAt && null === $this->failedAt; + } + + public function isFailed(): bool + { + return null !== $this->startedAt && null === $this->finishedAt && null === $this->cancelledAt && null !== $this->failedAt; + } + + public function getPayload(): ?string + { + return $this->payload; + } + + public function setPayload(?string $payload): void + { + $this->payload = $payload; + } + + public function getIdentity(): string + { + return $this->identity; + } + + public function setIdentity(string $identity): void + { + $this->identity = $identity; + } +} diff --git a/src/Job/HandlerDescriptor.php b/src/Job/HandlerDescriptor.php new file mode 100644 index 0000000..ce98f04 --- /dev/null +++ b/src/Job/HandlerDescriptor.php @@ -0,0 +1,25 @@ +handler; + } + + public function getPayloadClassName(): string + { + return $this->payloadClassName; + } +} \ No newline at end of file diff --git a/src/Job/JobHandlerInterface.php b/src/Job/JobHandlerInterface.php new file mode 100644 index 0000000..7e910bc --- /dev/null +++ b/src/Job/JobHandlerInterface.php @@ -0,0 +1,16 @@ +handlers as $handler) { + if ($handler->getPayloadClassName() === $payloadClassName) { + return $handler->getHandler(); + } + } + + throw new \RuntimeException(sprintf('No handler found for payload class "%s".', $payloadClassName)); + } +} \ No newline at end of file diff --git a/src/Job/Runner.php b/src/Job/Runner.php new file mode 100644 index 0000000..4605754 --- /dev/null +++ b/src/Job/Runner.php @@ -0,0 +1,109 @@ +jobRepository->getPendingByJob($job); + + if (count($pendingJobs) > 0) { + throw new LogicException("Job with identity already queued."); + } + + $result = null; + + try { + $this->loadPayload($job); + + $result = $this->execute($job); + + return Command::SUCCESS; + } catch + (\Throwable $exception) { + $this->fail($job, $exception); + + return Command::FAILURE; + } finally { + if ($this->canceled) { + $this->cancel($job); + } + + $this->finish($job, $result); + } + } + + public function signal(int $signal): false|int + { + if ($this->handler instanceof SignalableJobHandlerInterface) { + $this->handler->handleSignal($signal); + } + + $this->canceled = true; + + return false; + } + + private function execute(Job $job): mixed + { + $job->setStartedAt($this->clock->now()); + + $this->jobRepository->save($job); + + return $this->handler->handle($this->payloadBody); + } + + private function cancel(Job $job): void + { + $job->setCancelledAt($this->clock->now()); + + $this->jobRepository->save($job); + } + + private function fail(Job $job, \Throwable $exception): void + { + $job->setFailedAt($this->clock->now()); + $job->setError(FlattenException::createFromThrowable($exception)->getAsString()); + + $this->jobRepository->save($job); + } + + private function finish(Job $job, mixed $result): void + { + $job->setFinishedAt($this->clock->now()); + $job->setResult($result); + + $this->jobRepository->save($job); + } + + private function loadPayload(Job $job): void + { + $encodedPayload = $job->getEncodedPayload(); + + $payload = $this->serializer->decode($encodedPayload); + + $this->payloadBody = $payload->getBody(); + $this->handler = $this->handlerLocator->findHandler($payload->getType()); + } +} \ No newline at end of file diff --git a/src/Job/Serializer.php b/src/Job/Serializer.php new file mode 100644 index 0000000..f36e329 --- /dev/null +++ b/src/Job/Serializer.php @@ -0,0 +1,65 @@ +getContextBuilder($context); + $body = $this->serializer->serialize($object, 'json', $contextBuilder->toArray()); + + return new EncodedPayload(get_class($object), $body); + } + + /** + * @param EncodedPayload $encodedPayload The encoded payload. + * @param array $context The context for deserialization. + * @return Payload Returns the decoded payload. + */ + public function decode(EncodedPayload $encodedPayload, array $context = []): Payload + { + $type = $encodedPayload->getType(); + + if (!class_exists($type)) { + $this->logger->warning('Class {class} does not exist.', [ + 'class' => $type, + ]); + + $type = stdClass::class; + } + + $contextBuilder = $this->getContextBuilder($context); + $body = $this->serializer->deserialize($encodedPayload->getBody(), $type, 'json', $contextBuilder->toArray()); + + return $encodedPayload->decode($body); + } + + private function getContextBuilder(array $context): JsonEncoderContextBuilder + { + $contextBuilder = (new ObjectNormalizerContextBuilder())->withContext($context); + $contextBuilder = (new JsonEncoderContextBuilder())->withContext($contextBuilder); + + return $contextBuilder; + } +} \ No newline at end of file diff --git a/src/Job/SignalableJobHandlerInterface.php b/src/Job/SignalableJobHandlerInterface.php new file mode 100644 index 0000000..c523685 --- /dev/null +++ b/src/Job/SignalableJobHandlerInterface.php @@ -0,0 +1,8 @@ +serializer->encode($payload); + + $job = new Job(); + + $identity = $this->jobRepository->createIdentity($entityOrIdentity, $payload); + + $job->setIdentity($identity); + $job->setCreatedAt($this->clock->now()); + $job->setEncodedPayload($encodedPayload); + + $this->jobRepository->save($job); + + return $job; + } + + + /** + * @param string|null $argument + * @return string + * @copyright From package symfony/process in class \Symfony\Component\Process\Process + */ + private function escapeArgument(?string $argument): string + { + if ('' === $argument || null === $argument) { + return '""'; + } + if ('\\' !== \DIRECTORY_SEPARATOR) { + return "'" . str_replace("'", "'\\''", $argument) . "'"; + } + if (str_contains($argument, "\0")) { + $argument = str_replace("\0", '?', $argument); + } + if (!preg_match('/[\/()%!^"<>&|\s]/', $argument)) { + return $argument; + } + $argument = preg_replace('/(\\\\+)$/', '$1$1', $argument); + + return '"' . str_replace(['"', '^', '%', '!', "\n"], ['""', '"^^"', '"^%"', '"^!"', '!LF!'], $argument) . '"'; + } + + public function dispatch(Job $job): void + { + $pendingJobs = $this->jobRepository->getPendingByJob($job); + + if (count($pendingJobs) > 0) { + throw new LogicException("Job with identity already queued."); + } + + $command = [ + ...$this->getPhpBinary(), + Path::join($this->projectDirectory, 'bin', 'console'), + 'job:run', + $job->getId(), + ]; + + $command = array_map($this->escapeArgument(...), $command); + $command = implode(" ", $command); + + // stdin: null, stdout: null, stderr: null and run in background + $command .= " /dev/null 2>/dev/null &"; + + $process = Process::fromShellCommandline( + $command, + cwd: $this->projectDirectory, + env: null, + input: null, + timeout: 0, + ); + + // $process->disableOutput(); + $process->start(); + } + + protected function getPhpBinary(): ?array + { + $executableFinder = new PhpExecutableFinder(); + $php = $executableFinder->find(false); + + if (false === $php) { + return null; + } + + return array_merge([$php], $executableFinder->findArguments()); + } + + public function get(int $id): Job + { + $job = $this->jobRepository->find($id); + + if (!$job) { + throw new \InvalidArgumentException(sprintf('Job with id "%s" not found.', $id)); + } + + return $job; + } + +} \ No newline at end of file diff --git a/src/Model/EncodedPayload.php b/src/Model/EncodedPayload.php new file mode 100644 index 0000000..2ad7b23 --- /dev/null +++ b/src/Model/EncodedPayload.php @@ -0,0 +1,97 @@ +type; + } + + public function setType(string $type): EncodedPayload + { + $this->type = $type; + return $this; + } + + public function getBody(): string + { + return $this->body; + } + + public function setBody(string $body): EncodedPayload + { + $this->body = $body; + return $this; + } + + /** + * @throws JsonException + */ + public static function fromString(string $encoded): EncodedPayload + { + /** + * @var EncodedPayload $payload + */ + $data = json_decode($encoded, true, 512, JSON_THROW_ON_ERROR); + + if (!is_array($data)) { + throw new InvalidArgumentException('Invalid payload.'); + } + + if (!isset($data['type'])) { + throw new InvalidArgumentException('Invalid payload. Missing type.'); + } + + if (!isset($data['body'])) { + throw new InvalidArgumentException('Invalid payload. Missing body.'); + } + + return new EncodedPayload( + $data['type'], + $data['body'], + ); + } + + public function decode(object $body): Payload + { + return new Payload( + $this->type, + $body, + ); + } + + /** + * @throws JsonException + */ + public function toString(): string + { + return json_encode([ + 'type' => $this->type, + 'body' => $this->body, + ], JSON_THROW_ON_ERROR); + } + + /** + * @throws JsonException + */ + public function __toString() + { + return $this->toString(); + } +} \ No newline at end of file diff --git a/src/Model/Payload.php b/src/Model/Payload.php new file mode 100644 index 0000000..839d0ef --- /dev/null +++ b/src/Model/Payload.php @@ -0,0 +1,38 @@ +type; + } + + public function setType(string $type): Payload + { + $this->type = $type; + return $this; + } + + public function getBody(): object + { + return $this->body; + } + + public function setBody(object $body): Payload + { + $this->body = $body; + return $this; + } +} \ No newline at end of file diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php new file mode 100644 index 0000000..7943e3b --- /dev/null +++ b/src/Repository/JobRepository.php @@ -0,0 +1,164 @@ + + * + * @method Job|null find($id, $lockMode = null, $lockVersion = null) + * @method Job|null findOneBy(array $criteria, array $orderBy = null) + * @method Job[] findAll() + * @method Job[] findBy(array $criteria, array $orderBy = null, $limit = null, $offset = null) + */ +class JobRepository extends ServiceEntityRepository +{ + public function __construct(ManagerRegistry $registry) + { + parent::__construct($registry, Job::class); + } + + public function save(Job $job): void + { + $entityManager = $this->getEntityManager(); + + if (!$entityManager->contains($job)) { + $entityManager->persist($job); + } + + $entityManager->flush(); + } + + public function remove(Job $job): void + { + $entityManager = $this->getEntityManager(); + $entityManager->refresh($job); + + if ($job->isRunning()) { + throw new \InvalidArgumentException(sprintf('Can not remove running job "%s".', $job->getId())); + } + + $entityManager->remove($job); + $entityManager->flush(); + } + + + /** + * Returns the identifier of an entity using the given ClassMetadata and entity or identity. + * + * @param ClassMetadata $classMetadata The metadata of the entity class. + * @param object $entityOrIdentity The entity or identity for which to retrieve the identifier. + * + * @return string The identifier of the entity as a string, using hyphens to separate multiple identifiers if applicable. + */ + private function getIdentifier(ClassMetadata $classMetadata, object $entityOrIdentity): string + { + $identifiers = $classMetadata->getIdentifierValues($entityOrIdentity); + + return implode('-', array_map(static fn($value) => (string)$value, $identifiers)); + } + + + /** + * Resolves the ClassMetadata for the given entity. + * + * @param object $entity The entity for which to resolve the ClassMetadata. + * + * @return ClassMetadata The ClassMetadata for the given entity. + * + * @throws InvalidArgumentException if the entity is not persisted or scheduled for persistence. + */ + private function resolveClassMetadata(object $entity): ClassMetadata + { + $entityManger = $this->getEntityManager(); + $unitOfWork = $entityManger->getUnitOfWork(); + + // TODO: test entity states + if (!$entityManger->contains($entity) || $unitOfWork->isEntityScheduled($entity)) { + throw new \InvalidArgumentException(sprintf( + 'Entity "%s" must be persisted before creating an identity', + $entity::class + )); + } + + return $entityManger->getClassMetadata($entity::class); + } + + /** + * Generates an identity string for the given entity or identity object. + * + * @param object|string $entityOrIdentity The entity or identity object to generate the identity for. + * @param object $payload The payload object associated with the identity. + * + * @return string The generated identity string. + */ + public function createIdentity(object|string $entityOrIdentity, object $payload): string + { + $payloadClass = $payload::class; + + if (is_string($entityOrIdentity)) { + return implode("-", [ + $payloadClass, + $entityOrIdentity, + ]); + } + + $classMetadata = $this->resolveClassMetadata($entityOrIdentity); + $identifier = $this->getIdentifier($classMetadata, $entityOrIdentity); + + return implode([ + $payloadClass, + $classMetadata->getName(), + $identifier + ]); + } + + public function getPending(object|string $entityOrIdentity, object $payload): array + { + $identity = $this->createIdentity($entityOrIdentity, $payload); + + return $this->getPendingByIdentity($identity); + } + + /** + * Retrieves a list of pending jobs by their identity. + * + * @param string $identity The identity of the jobs to retrieve. + * + * @return array Returns an array of pending Job objects that match the given identity. + */ + public function getPendingByIdentity(string $identity): array + { + $queryBuilder = $this->createQueryBuilder('job'); + + $queryBuilder + ->where($queryBuilder->expr()->eq('job.identity', ':identity')) + ->andWhere($queryBuilder->expr()->isNull('job.finishedAt')) + ->setParameter('identity', $identity); + + $query = $queryBuilder->getQuery(); + + return $query->getResult(); + } + + /** + * Gets all pending items associated with a given Job. + * + * @param Job $job The Job object to retrieve pending items for. + * + * @return array An array of pending items. + */ + public function getPendingByJob(Job $job): array + { + $jobs = $this->getPendingByIdentity($job->getIdentity()); + + return array_filter($jobs, static fn(Job $pendingJob) => $pendingJob->getId() !== $job->getId()); + } +} diff --git a/src/SoureCodeJobBundle.php b/src/SoureCodeJobBundle.php new file mode 100644 index 0000000..f978454 --- /dev/null +++ b/src/SoureCodeJobBundle.php @@ -0,0 +1,100 @@ +rootNode() + ->children() + ->end(); + // @formatter:on + } + + public function loadExtension(array $config, ContainerConfigurator $container, ContainerBuilder $builder): void + { + $services = $container->services(); + + $services->set('soure_code.job.repository.job', JobRepository::class) + ->args([ + service('doctrine'), + ]) + ->tag('doctrine.repository_service'); + + $services + ->alias(JobRepository::class, 'soure_code.job.repository.job') + ->public(); + + $services->set('soure_code.job.manager', JobManager::class) + ->args([ + service('soure_code.job.repository.job'), + service('soure_code.job.serializer'), + service('clock'), + param('kernel.project_dir'), + ]); + + $services->alias(JobManager::class, 'soure_code.job.manager') + ->public(); + + $services->set('soure_code.job.command.job.run', JobRunCommand::class) + ->args([ + service('soure_code.job.manager'), + service('soure_code.job.runner'), + ]) + ->public() + ->tag('console.command', ['command' => 'job:run']); + + $services->set('soure_code.job.serializer', Serializer::class) + ->args([ + service('logger'), + service('serializer'), + ]); + + $services->set('soure_code.job.executor', JobHandlerLocator::class) + ->args([ + abstract_arg('handlers'), + service('soure_code.job.serializer'), + ]); + + $services->set('soure_code.job.runner', Runner::class) + ->args([ + service('soure_code.job.repository.job'), + service('soure_code.job.executor'), + service('soure_code.job.serializer'), + service('clock'), + ]); + + $builder->registerAttributeForAutoconfiguration(AsJobHandler::class, + static function (ChildDefinition $definition, AsJobHandler $attribute): void { + $definition->addTag('soure_code.job_handler', [ + 'handle' => $attribute->handle, + ]); + }); + } + + public function build(ContainerBuilder $container): void + { + $container->addCompilerPass(new JobCompilerPass()); + } + +} \ No newline at end of file diff --git a/tests/AbstractBaseFunctionalTestCase.php b/tests/AbstractBaseFunctionalTestCase.php new file mode 100644 index 0000000..e54eb7c --- /dev/null +++ b/tests/AbstractBaseFunctionalTestCase.php @@ -0,0 +1,37 @@ +get(EntityManagerInterface::class); + + $schemaTool = new SchemaTool($entityManager); + $schemaTool->updateSchema([ + $entityManager->getClassMetadata(Bug::class), + $entityManager->getClassMetadata(Job::class), + ]); + } + + protected function tearDown(): void + { + $container = self::getContainer(); + + $entityManager = $container->get(EntityManagerInterface::class); + + $schemaTool = new SchemaTool($entityManager); + $schemaTool->dropDatabase(); + + parent::tearDown(); + } +} diff --git a/tests/AbstractBaseTestCase.php b/tests/AbstractBaseTestCase.php new file mode 100644 index 0000000..c3be01b --- /dev/null +++ b/tests/AbstractBaseTestCase.php @@ -0,0 +1,33 @@ +addTestBundle(DoctrineBundle::class); + $kernel->addTestBundle(SoureCodeJobBundle::class); + $kernel->setTestProjectDir(Path::join(__DIR__, 'app')); + $kernel->addTestConfig(Path::join($kernel->getProjectDir(), 'config', 'config.yaml')); + $kernel->handleOptions($options); + + return $kernel; + } +} \ No newline at end of file diff --git a/tests/BundleInitializationTest.php b/tests/BundleInitializationTest.php new file mode 100644 index 0000000..ad96d9e --- /dev/null +++ b/tests/BundleInitializationTest.php @@ -0,0 +1,17 @@ +has(JobManager::class)); + self::assertTrue($container->has(JobRepository::class)); + } +} diff --git a/tests/CancelTest.php b/tests/CancelTest.php new file mode 100644 index 0000000..9858eb1 --- /dev/null +++ b/tests/CancelTest.php @@ -0,0 +1,100 @@ +get(JobManager::class); + /** + * @var EntityManagerInterface $entityManager + */ + $entityManager = $container->get(EntityManagerInterface::class); + + $entity = new Bug(); + $entity->setDescription('Yeet'); + $entity->setCreated(new \DateTime()); + $entity->setStatus('pending'); + + $entityManager->persist($entity); + $entityManager->flush(); + + $job = $jobManager->create($entity, new WaitJob(10)); + + $now = new \DateTime(); + $jobManager->dispatch($job); + $after = new \DateTime(); + $diff = $after->getTimestamp() - $now->getTimestamp(); + + self::assertLessThan(1, $diff); + + sleep(2); + + $pid = $this->getProcessPid('job:run'); + exec('kill -s 2 ' . $pid); + + sleep(2); + + $entityManager->refresh($job); + + self::assertNull($job->getError()); + self::assertNull($job->getResult()); + self::assertNotNull($job->getFinishedAt()); + self::assertNotNull($job->getStartedAt()); + self::assertNull($job->getFailedAt()); + self::assertNotNull($job->getCancelledAt()); + self::assertTrue($job->isCancelled()); + } + + private function getProcesses(): array + { + $output = []; + exec('ps -ax', $output); + + $processes = []; + + foreach ($output as $line) { + $processes[] = $line; + } + + return $processes; + } + + private function findProcess(string $needle): ?string + { + $processes = $this->getProcesses(); + + foreach ($processes as $process) { + if (str_contains($process, $needle)) { + return $process; + } + } + + return null; + } + + private function getProcessPid(string $needle): ?int + { + $process = $this->findProcess($needle); + + if (null === $process) { + return null; + } + + $parts = preg_split('/\s+/', $process); + $parts = array_filter($parts, static fn($value) => !empty($value)); + $parts = array_values($parts); + + return (int)$parts[0]; + } +} diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php new file mode 100644 index 0000000..d336e2a --- /dev/null +++ b/tests/FunctionalTest.php @@ -0,0 +1,123 @@ +get(JobManager::class); + /** + * @var EntityManagerInterface $entityManager + */ + $entityManager = $container->get(EntityManagerInterface::class); + + $entity = new Bug(); + $entity->setDescription('Yeet'); + $entity->setCreated(new \DateTime()); + $entity->setStatus('pending'); + + $entityManager->persist($entity); + $entityManager->flush(); + + $job = $jobManager->create($entity, new WaitJob()); + + self::assertTrue($job->isPending()); + + $now = new \DateTime(); + $jobManager->dispatch($job); + $after = new \DateTime(); + $diff = $after->getTimestamp() - $now->getTimestamp(); + + self::assertLessThan(1, $diff); + + sleep(1); + + $entityManager->refresh($job); + + self::assertNull($job->getError()); + self::assertNull($job->getResult()); + + self::assertNull($job->getFinishedAt()); + self::assertFalse($job->isFinished()); + + self::assertNotNull($job->getStartedAt()); + self::assertTrue($job->isRunning()); + + self::assertNull($job->getFailedAt()); + self::assertFalse($job->isFailed()); + + self::assertNull($job->getCancelledAt()); + self::assertFalse($job->isCancelled()); + + sleep(2); + + $entityManager->refresh($job); + + self::assertNull($job->getError()); + self::assertNull($job->getResult()); + + self::assertNotNull($job->getFinishedAt()); + self::assertTrue($job->isFinished()); + + self::assertNotNull($job->getStartedAt()); + self::assertFalse($job->isRunning()); + + self::assertNull($job->getFailedAt()); + self::assertFalse($job->isFailed()); + + self::assertNull($job->getCancelledAt()); + self::assertFalse($job->isCancelled()); + } + + public function testFastJobDispatchExecutionAndResult(): void + { + $container = self::getContainer(); + /** + * @var JobManager $jobManager + */ + $jobManager = $container->get(JobManager::class); + /** + * @var EntityManagerInterface $entityManager + */ + $entityManager = $container->get(EntityManagerInterface::class); + + $entity = new Bug(); + $entity->setDescription('Yeet'); + $entity->setCreated(new \DateTime()); + $entity->setStatus('pending'); + + $entityManager->persist($entity); + $entityManager->flush(); + + $job = $jobManager->create($entity, new FastJob("yeet")); + + $now = new \DateTime(); + $jobManager->dispatch($job); + $after = new \DateTime(); + $diff = $after->getTimestamp() - $now->getTimestamp(); + + self::assertLessThan(1, $diff); + + sleep(1); + + $entityManager->refresh($job); + + self::assertNull($job->getError()); + self::assertSame("yeetbaz", $job->getResult()); + self::assertNotNull($job->getFinishedAt()); + self::assertNotNull($job->getStartedAt()); + self::assertNull($job->getFailedAt()); + self::assertNull($job->getCancelledAt()); + } +} diff --git a/tests/app/bin/console b/tests/app/bin/console new file mode 100755 index 0000000..fdc501f --- /dev/null +++ b/tests/app/bin/console @@ -0,0 +1,19 @@ +#!/usr/bin/env php +addTestBundle(DoctrineBundle::class); +$kernel->addTestBundle(SoureCodeJobBundle::class); +$kernel->addTestConfig(Path::join(__DIR__, '..', 'config', 'config.yaml')); +$kernel->setTestProjectDir(Path::join(__DIR__, '..')); + +$app = new Application($kernel); +$app->run(); diff --git a/tests/app/config/config.yaml b/tests/app/config/config.yaml new file mode 100644 index 0000000..d89c3e3 --- /dev/null +++ b/tests/app/config/config.yaml @@ -0,0 +1,61 @@ +framework: + http_method_override: false + session: + handler_id: null + cookie_secure: auto + cookie_samesite: lax + storage_factory_id: session.storage.factory.native + messenger: + failure_transport: failed + default_bus: messenger.bus.default + + buses: + messenger.bus.default: + middleware: + - doctrine_ping_connection + - doctrine_close_connection + messenger.bus.high: + middleware: + - doctrine_ping_connection + - doctrine_close_connection + transports: + async: 'doctrine://default?auto_setup=0' + failed: 'doctrine://default?queue_name=failed' + + routing: + #SoureCode\Bundle\Worker\Tests\app\src\Message\SleepMessage: async + #SoureCode\Bundle\Worker\Tests\app\src\Message\StopMessage: async + +doctrine: + dbal: + url: 'sqlite:///%kernel.project_dir%/var/data.db' + orm: + auto_generate_proxy_classes: true + enable_lazy_ghost_objects: true + report_fields_where_declared: true + validate_xml_mapping: true + naming_strategy: doctrine.orm.naming_strategy.underscore_number_aware + auto_mapping: true + mappings: + SoureCodeJob: + is_bundle: false + type: attribute + dir: '%kernel.project_dir%/../../src/Entity' + prefix: 'SoureCode\Bundle\Job\Entity' + alias: SoureCodeJob + SoureCodeTest: + is_bundle: false + type: attribute + dir: '%kernel.project_dir%/src/Entity' + prefix: 'SoureCode\Bundle\Job\Tests\app\src\Entity' + alias: SoureCodeTest + +services: + _defaults: + autowire: true + autoconfigure: true + SoureCode\Bundle\Job\Tests\app\src\: + resource: '../src/' + exclude: + - '../src/DependencyInjection/' + - '../src/Entity/' \ No newline at end of file diff --git a/tests/app/src/Entity/Bug.php b/tests/app/src/Entity/Bug.php new file mode 100644 index 0000000..ad50632 --- /dev/null +++ b/tests/app/src/Entity/Bug.php @@ -0,0 +1,60 @@ +id; + } + + public function getDescription(): string + { + return $this->description; + } + + public function setDescription(string $description): void + { + $this->description = $description; + } + + public function setCreated(DateTime $created) + { + $this->created = $created; + } + + public function getCreated(): DateTime + { + return $this->created; + } + + public function setStatus($status): void + { + $this->status = $status; + } + + public function getStatus():string + { + return $this->status; + } +} \ No newline at end of file diff --git a/tests/app/src/Job/FastJob.php b/tests/app/src/Job/FastJob.php new file mode 100644 index 0000000..e62649a --- /dev/null +++ b/tests/app/src/Job/FastJob.php @@ -0,0 +1,12 @@ + + */ +class FastJobHandler implements JobHandlerInterface +{ + public function handle($payload): string + { + usleep(500); + + return $payload->foo . 'baz'; + } +} \ No newline at end of file diff --git a/tests/app/src/JobHandler/WaitJobHandler.php b/tests/app/src/JobHandler/WaitJobHandler.php new file mode 100644 index 0000000..4c000cb --- /dev/null +++ b/tests/app/src/JobHandler/WaitJobHandler.php @@ -0,0 +1,20 @@ + + */ +class WaitJobHandler implements JobHandlerInterface +{ + public function handle($payload): void + { + sleep($payload->seconds); + } +} \ No newline at end of file diff --git a/tests/app/var/.gitkeep b/tests/app/var/.gitkeep new file mode 100644 index 0000000..e69de29