-
-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(commandbus): async commands (#685)
- Loading branch information
Showing
20 changed files
with
562 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
use Attribute; | ||
|
||
#[Attribute] | ||
final readonly class AsyncCommand | ||
{ | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
use Symfony\Component\Uid\Uuid; | ||
use Tempest\Core\KernelEvent; | ||
use Tempest\EventBus\EventHandler; | ||
use Tempest\Reflection\ClassReflector; | ||
|
||
final readonly class AsyncCommandMiddleware implements CommandBusMiddleware | ||
{ | ||
public function __construct( | ||
private CommandBusConfig $commandBusConfig, | ||
private CommandRepository $repository, | ||
) { | ||
} | ||
|
||
#[EventHandler(KernelEvent::BOOTED)] | ||
public function onBooted(): void | ||
{ | ||
$this->commandBusConfig->addMiddleware(self::class); | ||
} | ||
|
||
public function __invoke(object $command, CommandBusMiddlewareCallable $next): void | ||
{ | ||
$reflector = new ClassReflector($command); | ||
|
||
if ($reflector->hasAttribute(AsyncCommand::class)) { | ||
$this->repository->store(Uuid::v7()->toString(), $command); | ||
|
||
return; | ||
} | ||
|
||
$next($command); | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
src/Tempest/CommandBus/src/AsyncCommandRepositories/FileCommandRepository.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus\AsyncCommandRepositories; | ||
|
||
use Tempest\CommandBus\CommandRepository; | ||
use Tempest\CommandBus\Exceptions\CouldNotResolveCommand; | ||
use function Tempest\Support\arr; | ||
|
||
final readonly class FileCommandRepository implements CommandRepository | ||
{ | ||
public function store(string $uuid, object $command): void | ||
{ | ||
$payload = serialize($command); | ||
|
||
file_put_contents(__DIR__ . "/../stored-commands/{$uuid}.pending.txt", $payload); | ||
} | ||
|
||
public function findPendingCommand(string $uuid): object | ||
{ | ||
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt"; | ||
|
||
if (! file_exists($path)) { | ||
throw new CouldNotResolveCommand($uuid); | ||
} | ||
|
||
$payload = file_get_contents($path); | ||
|
||
return unserialize($payload); | ||
} | ||
|
||
public function markAsDone(string $uuid): void | ||
{ | ||
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt"; | ||
|
||
unlink($path); | ||
} | ||
|
||
public function markAsFailed(string $uuid): void | ||
{ | ||
rename( | ||
from: __DIR__ . "/../stored-commands/{$uuid}.pending.txt", | ||
to: __DIR__ . "/../stored-commands/{$uuid}.failed.txt", | ||
); | ||
} | ||
|
||
public function getPendingCommands(): array | ||
{ | ||
return arr(glob(__DIR__ . "/../stored-commands/*.pending.txt")) | ||
->mapWithKeys(function (string $path) { | ||
$uuid = str_replace('.pending.txt', '', pathinfo($path, PATHINFO_BASENAME)); | ||
|
||
$payload = file_get_contents($path); | ||
|
||
yield $uuid => unserialize($payload); | ||
}) | ||
->toArray(); | ||
} | ||
} |
37 changes: 37 additions & 0 deletions
37
src/Tempest/CommandBus/src/AsyncCommandRepositories/MemoryRepository.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus\AsyncCommandRepositories; | ||
|
||
use Tempest\CommandBus\CommandRepository; | ||
|
||
final class MemoryRepository implements CommandRepository | ||
{ | ||
private array $commands = []; | ||
|
||
public function store(string $uuid, object $command): void | ||
{ | ||
$this->commands[$uuid] = $command; | ||
} | ||
|
||
public function getPendingCommands(): array | ||
{ | ||
return $this->commands; | ||
} | ||
|
||
public function findPendingCommand(string $uuid): object | ||
{ | ||
return $this->commands[$uuid]; | ||
} | ||
|
||
public function markAsDone(string $uuid): void | ||
{ | ||
unset($this->commands[$uuid]); | ||
} | ||
|
||
public function markAsFailed(string $uuid): void | ||
{ | ||
unset($this->commands[$uuid]); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
interface CommandRepository | ||
{ | ||
public function store(string $uuid, object $command): void; | ||
|
||
/** @return array<string, object> */ | ||
public function getPendingCommands(): array; | ||
|
||
public function findPendingCommand(string $uuid): object; | ||
|
||
public function markAsDone(string $uuid): void; | ||
|
||
public function markAsFailed(string $uuid): void; | ||
} |
20 changes: 20 additions & 0 deletions
20
src/Tempest/CommandBus/src/CommandRepositoryInitializer.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
use Tempest\Container\Container; | ||
use Tempest\Container\Initializer; | ||
use Tempest\Container\Singleton; | ||
|
||
final readonly class CommandRepositoryInitializer implements Initializer | ||
{ | ||
#[Singleton] | ||
public function initialize(Container $container): CommandRepository | ||
{ | ||
$commandRepositoryClass = $container->get(CommandBusConfig::class)->commandRepositoryClass; | ||
|
||
return $container->get($commandRepositoryClass); | ||
} | ||
} |
11 changes: 11 additions & 0 deletions
11
src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus\Exceptions; | ||
|
||
use Exception; | ||
|
||
final class CouldNotResolveCommand extends Exception | ||
{ | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
use Tempest\Console\Console; | ||
use Tempest\Console\ConsoleCommand; | ||
use Tempest\Console\ExitCode; | ||
use Tempest\Console\HasConsole; | ||
use Tempest\Container\Container; | ||
use function Tempest\Support\arr; | ||
use Throwable; | ||
|
||
final readonly class HandleAsyncCommand | ||
{ | ||
use HasConsole; | ||
|
||
public function __construct( | ||
private CommandBusConfig $commandBusConfig, | ||
private Container $container, | ||
private Console $console, | ||
private CommandRepository $repository, | ||
) { | ||
} | ||
|
||
#[ConsoleCommand(name: 'command:handle')] | ||
public function __invoke(?string $uuid = null): ExitCode | ||
{ | ||
try { | ||
if ($uuid) { | ||
$command = $this->repository->findPendingCommand($uuid); | ||
} else { | ||
$command = arr($this->repository->getPendingCommands())->first(); | ||
} | ||
|
||
if (! $command) { | ||
$this->error('No pending command found'); | ||
|
||
return ExitCode::ERROR; | ||
} | ||
|
||
$commandHandler = $this->commandBusConfig->handlers[$command::class] ?? null; | ||
|
||
if (! $commandHandler) { | ||
$commandClass = $command::class; | ||
$this->error("No handler found for command {$commandClass}"); | ||
|
||
return ExitCode::ERROR; | ||
} | ||
|
||
$commandHandler->handler->invokeArgs( | ||
$this->container->get($commandHandler->handler->getDeclaringClass()->getName()), | ||
[$command], | ||
); | ||
|
||
$this->repository->markAsDone($uuid); | ||
$this->success('Done'); | ||
|
||
return ExitCode::SUCCESS; | ||
} catch (Throwable $throwable) { | ||
$this->repository->markAsFailed($uuid); | ||
$this->error($throwable->getMessage()); | ||
|
||
return ExitCode::ERROR; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Tempest\CommandBus; | ||
|
||
use DateTimeImmutable; | ||
use Symfony\Component\Process\Process; | ||
use Tempest\Console\Console; | ||
use Tempest\Console\ConsoleCommand; | ||
use Tempest\Console\HasConsole; | ||
use Tempest\Console\Input\ConsoleArgumentBag; | ||
use function Tempest\Support\arr; | ||
|
||
final readonly class MonitorAsyncCommands | ||
{ | ||
use HasConsole; | ||
|
||
public function __construct( | ||
private CommandRepository $repository, | ||
private ConsoleArgumentBag $argumentBag, | ||
private Console $console, | ||
) { | ||
} | ||
|
||
#[ConsoleCommand(name: 'command:monitor')] | ||
public function __invoke(): void | ||
{ | ||
$this->success("Monitoring for new commands. Press ctrl+c to stop."); | ||
|
||
/** @var \Symfony\Component\Process\Process[] $processes */ | ||
$processes = []; | ||
|
||
while (true) { // @phpstan-ignore-line | ||
foreach ($processes as $uuid => $process) { | ||
$time = new DateTimeImmutable(); | ||
|
||
if ($process->isTerminated()) { | ||
if ($process->isSuccessful()) { | ||
$this->writeln("<success>{$uuid}</success> finished at {$time->format('Y-m-d H:i:s')}"); | ||
} else { | ||
$this->writeln("<error>{$uuid}</error> failed at {$time->format('Y-m-d H:i:s')}"); | ||
} | ||
|
||
if ($output = trim($process->getOutput())) { | ||
$this->writeln($output); | ||
} | ||
|
||
if ($errorOutput = trim($process->getErrorOutput())) { | ||
$this->writeln($errorOutput); | ||
} | ||
|
||
unset($processes[$uuid]); | ||
} | ||
} | ||
|
||
$availableCommands = arr($this->repository->getPendingCommands()) | ||
->filter(fn (object $command, string $uuid) => ! in_array($uuid, array_keys($processes))); | ||
|
||
if (count($processes) === 5) { | ||
$this->sleep(0.5); | ||
|
||
continue; | ||
} | ||
|
||
if ($availableCommands->isEmpty()) { | ||
$this->sleep(0.5); | ||
|
||
continue; | ||
} | ||
|
||
// Start a task | ||
$uuid = $availableCommands->keys()->first(); | ||
|
||
$time = new DateTimeImmutable(); | ||
$this->writeln("<h2>{$uuid}</h2> started at {$time->format('Y-m-d H:i:s')}"); | ||
|
||
$process = new Process([ | ||
$this->argumentBag->getBinaryPath(), | ||
$this->argumentBag->getCliName(), | ||
'command:handle', | ||
$uuid, | ||
], getcwd()); | ||
|
||
$process->start(); | ||
|
||
$processes[$uuid] = $process; | ||
} | ||
} | ||
|
||
private function sleep(float $seconds): void | ||
{ | ||
usleep((int) ($seconds * 1_000_000)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
*.txt |
Oops, something went wrong.