diff --git a/composer.json b/composer.json index 774974658..13625164a 100644 --- a/composer.json +++ b/composer.json @@ -30,6 +30,7 @@ "psr/http-message": "^1.0|^2.0", "psr/log": "^3.0.0", "symfony/cache": "^7.2", + "symfony/process": "^7.1", "symfony/uid": "^7.1", "symfony/var-dumper": "^7.1", "symfony/var-exporter": "^7.1", diff --git a/src/Tempest/CommandBus/src/AsyncCommand.php b/src/Tempest/CommandBus/src/AsyncCommand.php new file mode 100644 index 000000000..a3af36271 --- /dev/null +++ b/src/Tempest/CommandBus/src/AsyncCommand.php @@ -0,0 +1,12 @@ +commandBusConfig->addMiddleware(self::class); + } + + public function __invoke(object $command, CommandBusMiddlewareCallable $next): void + { + $reflector = new ClassReflector($command); + + if ($reflector->hasAttribute(AsyncCommand::class)) { + $this->repository->store(Uuid::v7()->toString(), $command); + + return; + } + + $next($command); + } +} diff --git a/src/Tempest/CommandBus/src/AsyncCommandRepositories/FileCommandRepository.php b/src/Tempest/CommandBus/src/AsyncCommandRepositories/FileCommandRepository.php new file mode 100644 index 000000000..3c6a2cb04 --- /dev/null +++ b/src/Tempest/CommandBus/src/AsyncCommandRepositories/FileCommandRepository.php @@ -0,0 +1,60 @@ +mapWithKeys(function (string $path) { + $uuid = str_replace('.pending.txt', '', pathinfo($path, PATHINFO_BASENAME)); + + $payload = file_get_contents($path); + + yield $uuid => unserialize($payload); + }) + ->toArray(); + } +} diff --git a/src/Tempest/CommandBus/src/AsyncCommandRepositories/MemoryRepository.php b/src/Tempest/CommandBus/src/AsyncCommandRepositories/MemoryRepository.php new file mode 100644 index 000000000..23a178d73 --- /dev/null +++ b/src/Tempest/CommandBus/src/AsyncCommandRepositories/MemoryRepository.php @@ -0,0 +1,37 @@ +commands[$uuid] = $command; + } + + public function getPendingCommands(): array + { + return $this->commands; + } + + public function findPendingCommand(string $uuid): object + { + return $this->commands[$uuid]; + } + + public function markAsDone(string $uuid): void + { + unset($this->commands[$uuid]); + } + + public function markAsFailed(string $uuid): void + { + unset($this->commands[$uuid]); + } +} diff --git a/src/Tempest/CommandBus/src/CommandBusConfig.php b/src/Tempest/CommandBus/src/CommandBusConfig.php index a678cee48..4891d11dc 100644 --- a/src/Tempest/CommandBus/src/CommandBusConfig.php +++ b/src/Tempest/CommandBus/src/CommandBusConfig.php @@ -4,6 +4,7 @@ namespace Tempest\CommandBus; +use Tempest\CommandBus\AsyncCommandRepositories\FileCommandRepository; use Tempest\Reflection\MethodReflector; final class CommandBusConfig @@ -14,6 +15,9 @@ public function __construct( /** @var array> */ public array $middleware = [], + + /** @var class-string<\Tempest\CommandBus\CommandRepository> $commandRepositoryClass */ + public string $commandRepositoryClass = FileCommandRepository::class, ) { } diff --git a/src/Tempest/CommandBus/src/CommandRepository.php b/src/Tempest/CommandBus/src/CommandRepository.php new file mode 100644 index 000000000..1a0b35d1b --- /dev/null +++ b/src/Tempest/CommandBus/src/CommandRepository.php @@ -0,0 +1,19 @@ + */ + public function getPendingCommands(): array; + + public function findPendingCommand(string $uuid): object; + + public function markAsDone(string $uuid): void; + + public function markAsFailed(string $uuid): void; +} diff --git a/src/Tempest/CommandBus/src/CommandRepositoryInitializer.php b/src/Tempest/CommandBus/src/CommandRepositoryInitializer.php new file mode 100644 index 000000000..b2afa16a2 --- /dev/null +++ b/src/Tempest/CommandBus/src/CommandRepositoryInitializer.php @@ -0,0 +1,20 @@ +get(CommandBusConfig::class)->commandRepositoryClass; + + return $container->get($commandRepositoryClass); + } +} diff --git a/src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php b/src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php new file mode 100644 index 000000000..483b836e0 --- /dev/null +++ b/src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php @@ -0,0 +1,11 @@ +repository->findPendingCommand($uuid); + } else { + $command = arr($this->repository->getPendingCommands())->first(); + } + + if (! $command) { + $this->error('No pending command found'); + + return ExitCode::ERROR; + } + + $commandHandler = $this->commandBusConfig->handlers[$command::class] ?? null; + + if (! $commandHandler) { + $commandClass = $command::class; + $this->error("No handler found for command {$commandClass}"); + + return ExitCode::ERROR; + } + + $commandHandler->handler->invokeArgs( + $this->container->get($commandHandler->handler->getDeclaringClass()->getName()), + [$command], + ); + + $this->repository->markAsDone($uuid); + $this->success('Done'); + + return ExitCode::SUCCESS; + } catch (Throwable $throwable) { + $this->repository->markAsFailed($uuid); + $this->error($throwable->getMessage()); + + return ExitCode::ERROR; + } + } +} diff --git a/src/Tempest/CommandBus/src/MonitorAsyncCommands.php b/src/Tempest/CommandBus/src/MonitorAsyncCommands.php new file mode 100644 index 000000000..3508ab027 --- /dev/null +++ b/src/Tempest/CommandBus/src/MonitorAsyncCommands.php @@ -0,0 +1,95 @@ +success("Monitoring for new commands. Press ctrl+c to stop."); + + /** @var \Symfony\Component\Process\Process[] $processes */ + $processes = []; + + while (true) { // @phpstan-ignore-line + foreach ($processes as $uuid => $process) { + $time = new DateTimeImmutable(); + + if ($process->isTerminated()) { + if ($process->isSuccessful()) { + $this->writeln("{$uuid} finished at {$time->format('Y-m-d H:i:s')}"); + } else { + $this->writeln("{$uuid} failed at {$time->format('Y-m-d H:i:s')}"); + } + + if ($output = trim($process->getOutput())) { + $this->writeln($output); + } + + if ($errorOutput = trim($process->getErrorOutput())) { + $this->writeln($errorOutput); + } + + unset($processes[$uuid]); + } + } + + $availableCommands = arr($this->repository->getPendingCommands()) + ->filter(fn (object $command, string $uuid) => ! in_array($uuid, array_keys($processes))); + + if (count($processes) === 5) { + $this->sleep(0.5); + + continue; + } + + if ($availableCommands->isEmpty()) { + $this->sleep(0.5); + + continue; + } + + // Start a task + $uuid = $availableCommands->keys()->first(); + + $time = new DateTimeImmutable(); + $this->writeln("

{$uuid}

started at {$time->format('Y-m-d H:i:s')}"); + + $process = new Process([ + $this->argumentBag->getBinaryPath(), + $this->argumentBag->getCliName(), + 'command:handle', + $uuid, + ], getcwd()); + + $process->start(); + + $processes[$uuid] = $process; + } + } + + private function sleep(float $seconds): void + { + usleep((int) ($seconds * 1_000_000)); + } +} diff --git a/src/Tempest/CommandBus/src/stored-commands/.gitignore b/src/Tempest/CommandBus/src/stored-commands/.gitignore new file mode 100644 index 000000000..314f02b1b --- /dev/null +++ b/src/Tempest/CommandBus/src/stored-commands/.gitignore @@ -0,0 +1 @@ +*.txt \ No newline at end of file diff --git a/src/Tempest/Console/src/Actions/ExecuteConsoleCommand.php b/src/Tempest/Console/src/Actions/ExecuteConsoleCommand.php index 09f849b3c..a799c3105 100644 --- a/src/Tempest/Console/src/Actions/ExecuteConsoleCommand.php +++ b/src/Tempest/Console/src/Actions/ExecuteConsoleCommand.php @@ -43,12 +43,12 @@ private function getCallable(array $commandMiddleware): ConsoleMiddlewareCallabl $inputBuilder = new ConsoleInputBuilder($consoleCommand, $invocation->argumentBag); - $consoleCommand->handler->invokeArgs( + $exitCode = $consoleCommand->handler->invokeArgs( $consoleCommandClass, $inputBuilder->build(), ); - return ExitCode::SUCCESS; + return $exitCode ?? ExitCode::SUCCESS; }); $middlewareStack = [...$this->consoleConfig->middleware, ...$commandMiddleware]; diff --git a/src/Tempest/Console/src/Exceptions/ConsoleErrorHandler.php b/src/Tempest/Console/src/Exceptions/ConsoleErrorHandler.php index 0fbd29eef..dab363c1b 100644 --- a/src/Tempest/Console/src/Exceptions/ConsoleErrorHandler.php +++ b/src/Tempest/Console/src/Exceptions/ConsoleErrorHandler.php @@ -57,7 +57,7 @@ public function handleException(Throwable $throwable): void public function handleError(int $errNo, string $errstr, string $errFile, int $errLine): void { - ll('error'); + ll(error: $errstr); $this->console ->writeln() diff --git a/src/Tempest/Console/src/Input/ConsoleArgumentBag.php b/src/Tempest/Console/src/Input/ConsoleArgumentBag.php index 73764aa2a..a29abde15 100644 --- a/src/Tempest/Console/src/Input/ConsoleArgumentBag.php +++ b/src/Tempest/Console/src/Input/ConsoleArgumentBag.php @@ -159,6 +159,11 @@ public function add(ConsoleInputArgument $argument): self return $this; } + public function getBinaryPath(): string + { + return PHP_BINARY; + } + public function getCliName(): string { return $this->path[0] ?? ''; diff --git a/tests/Fixtures/Console/DispatchAsyncCommand.php b/tests/Fixtures/Console/DispatchAsyncCommand.php new file mode 100644 index 000000000..248313800 --- /dev/null +++ b/tests/Fixtures/Console/DispatchAsyncCommand.php @@ -0,0 +1,32 @@ +info('Dispatched commands'); + } +} diff --git a/tests/Fixtures/Handlers/MyAsyncCommandHandler.php b/tests/Fixtures/Handlers/MyAsyncCommandHandler.php new file mode 100644 index 000000000..b9466e14d --- /dev/null +++ b/tests/Fixtures/Handlers/MyAsyncCommandHandler.php @@ -0,0 +1,27 @@ +container->singleton( + CommandRepository::class, + fn () => $repository, + ); + + MyAsyncCommandHandler::$isHandled = false; + + command(new MyAsyncCommand('Brent')); + + $pendingCommands = arr($repository->getPendingCommands()); + + $this->assertCount(1, $pendingCommands); + $command = $pendingCommands->first(); + $this->assertSame('Brent', $command->name); + $this->assertFalse(MyAsyncCommandHandler::$isHandled); + + $this->console + ->call("command:handle " . $pendingCommands->keys()->first()) + ->assertSee('Done'); + + $this->assertTrue(MyAsyncCommandHandler::$isHandled); + } + + public function test_async_command_monitor(): void + { + $process = new Process(['php', 'tempest', 'command:monitor']); + $process->start(); + + $this->console->call("command:dispatch 1"); + + sleep(1); + + $output = $this->getOutput($process); + $this->assertStringContainsString('Monitoring for new commands', $output); + $this->assertStringContainsString('started at', $output); + $this->assertStringContainsString('finished at', $output); + $this->assertStringContainsString('Done', $output); + $process->stop(); + } + + public function test_async_failed_command_monitor(): void + { + $process = new Process(['php', 'tempest', 'command:monitor']); + $process->start(); + + $this->console->call("command:dispatch 1 --fail"); + + sleep(1); + + $output = $this->getOutput($process); + $this->assertStringContainsString('Monitoring for new commands', $output); + $this->assertStringContainsString('started at', $output); + $this->assertStringContainsString('failed at', $output); + $this->assertStringContainsString('Failed command', $output); + $process->stop(); + + arr(glob(__DIR__ . '/../../../src/Tempest/CommandBus/src/stored-commands/*.failed.txt')) + ->each(function (string $filename): void { + unlink($filename); + }); + } + + private function getOutput(Process $process): string + { + $pattern = array_map( + fn (TerminalStyle $consoleStyle) => TerminalStyle::ESC->value . $consoleStyle->value, + TerminalStyle::cases(), + ); + + return str_replace($pattern, '', $process->getOutput()); + } +} diff --git a/tests/Integration/CommandBus/Fixtures/MyAsyncCommand.php b/tests/Integration/CommandBus/Fixtures/MyAsyncCommand.php new file mode 100644 index 000000000..02bb32895 --- /dev/null +++ b/tests/Integration/CommandBus/Fixtures/MyAsyncCommand.php @@ -0,0 +1,16 @@ +