Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(commandbus): async commands #685

Merged
merged 19 commits into from
Nov 14, 2024
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"psr/log": "^3.0.0",
"ramsey/uuid": "^4.7",
"symfony/cache": "^7.2",
"symfony/process": "^7.1",
"symfony/var-dumper": "^7.1",
"symfony/var-exporter": "^7.1",
"tempest/highlight": "^2.0",
Expand Down
12 changes: 12 additions & 0 deletions src/Tempest/CommandBus/src/AsyncCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Attribute;

#[Attribute]
final readonly class AsyncCommand
{
}
38 changes: 38 additions & 0 deletions src/Tempest/CommandBus/src/AsyncCommandMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Ramsey\Uuid\Uuid;
use Tempest\Core\KernelEvent;
use Tempest\EventBus\EventHandler;
use Tempest\Reflection\ClassReflector;

final readonly class AsyncCommandMiddleware implements CommandBusMiddleware
{
public function __construct(
private CommandBusConfig $commandBusConfig,
private CommandRepository $repository,
) {
}

#[EventHandler(KernelEvent::BOOTED)]
public function onBooted(): void
{
$this->commandBusConfig->addMiddleware(self::class);
}

public function __invoke(object $command, CommandBusMiddlewareCallable $next): void
{
$reflector = new ClassReflector($command);

if ($reflector->hasAttribute(AsyncCommand::class)) {
$this->repository->store(Uuid::uuid7()->toString(), $command);

return;
}

$next($command);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\AsyncCommandRepositories;

use Tempest\CommandBus\CommandRepository;
use Tempest\CommandBus\Exceptions\CouldNotResolveCommand;
use function Tempest\Support\arr;

final readonly class FileCommandRepository implements CommandRepository
{
public function store(string $uuid, object $command): void
{
$payload = serialize($command);

file_put_contents(__DIR__ . "/../stored-commands/{$uuid}.pending.txt", $payload);
}

public function findPendingCommand(string $uuid): object
{
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt";

if (! file_exists($path)) {
throw new CouldNotResolveCommand($uuid);
}

$payload = file_get_contents($path);

return unserialize($payload);
}

public function markAsDone(string $uuid): void
{
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt";

unlink($path);
}

public function markAsFailed(string $uuid): void
{
rename(
from: __DIR__ . "/../stored-commands/{$uuid}.pending.txt",
to: __DIR__ . "/../stored-commands/{$uuid}.failed.txt",
);
}

public function getPendingCommands(): array
brendt marked this conversation as resolved.
Show resolved Hide resolved
{
return arr(glob(__DIR__ . "/../stored-commands/*.pending.txt"))
->mapWithKeys(function (string $path) {
$uuid = str_replace('.pending.txt', '', pathinfo($path, PATHINFO_BASENAME));

$payload = file_get_contents($path);

yield $uuid => unserialize($payload);
})
->toArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\AsyncCommandRepositories;

use Tempest\CommandBus\CommandRepository;

final class MemoryRepository implements CommandRepository
{
private array $commands = [];

public function store(string $uuid, object $command): void
{
$this->commands[$uuid] = $command;
}

public function getPendingCommands(): array
{
return $this->commands;
}

public function findPendingCommand(string $uuid): object
{
return $this->commands[$uuid];
}

public function markAsDone(string $uuid): void
{
unset($this->commands[$uuid]);
}

public function markAsFailed(string $uuid): void
{
unset($this->commands[$uuid]);
}
}
4 changes: 4 additions & 0 deletions src/Tempest/CommandBus/src/CommandBusConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Tempest\CommandBus;

use Tempest\CommandBus\AsyncCommandRepositories\FileCommandRepository;
use Tempest\Reflection\MethodReflector;

final class CommandBusConfig
Expand All @@ -14,6 +15,9 @@ public function __construct(

/** @var array<array-key, class-string<\Tempest\CommandBus\CommandBusMiddleware>> */
public array $middleware = [],

/** @var class-string<\Tempest\CommandBus\CommandRepository> $commandRepositoryClass */
public string $commandRepositoryClass = FileCommandRepository::class,
brendt marked this conversation as resolved.
Show resolved Hide resolved
) {
}

Expand Down
19 changes: 19 additions & 0 deletions src/Tempest/CommandBus/src/CommandRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

interface CommandRepository
{
public function store(string $uuid, object $command): void;

/** @return array<string, object> */
public function getPendingCommands(): array;

public function findPendingCommand(string $uuid): object;

public function markAsDone(string $uuid): void;

public function markAsFailed(string $uuid): void;
}
20 changes: 20 additions & 0 deletions src/Tempest/CommandBus/src/CommandRepositoryInitializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Tempest\Container\Container;
use Tempest\Container\Initializer;
use Tempest\Container\Singleton;

final readonly class CommandRepositoryInitializer implements Initializer
{
#[Singleton]
public function initialize(Container $container): CommandRepository
{
$commandRepositoryClass = $container->get(CommandBusConfig::class)->commandRepositoryClass;

return $container->get($commandRepositoryClass);
}
}
11 changes: 11 additions & 0 deletions src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\Exceptions;

use Exception;

final class CouldNotResolveCommand extends Exception
{
}
68 changes: 68 additions & 0 deletions src/Tempest/CommandBus/src/HandleAsyncCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Tempest\Console\Console;
use Tempest\Console\ConsoleCommand;
use Tempest\Console\ExitCode;
use Tempest\Console\HasConsole;
use Tempest\Container\Container;
use function Tempest\Support\arr;
use Throwable;

final readonly class HandleAsyncCommand
{
use HasConsole;

public function __construct(
private CommandBusConfig $commandBusConfig,
private Container $container,
private Console $console,
private CommandRepository $repository,
) {
}

#[ConsoleCommand(name: 'command:handle')]
public function __invoke(?string $uuid = null): ExitCode
{
try {
if ($uuid) {
$command = $this->repository->findPendingCommand($uuid);
} else {
$command = arr($this->repository->getPendingCommands())->first();
}

if (! $command) {
$this->error('No pending command found');

return ExitCode::ERROR;
}

$commandHandler = $this->commandBusConfig->handlers[$command::class] ?? null;

if (! $commandHandler) {
$commandClass = $command::class;
$this->error("No handler found for command {$commandClass}");

return ExitCode::ERROR;
}

$commandHandler->handler->invokeArgs(
$this->container->get($commandHandler->handler->getDeclaringClass()->getName()),
[$command],
);

$this->repository->markAsDone($uuid);
$this->success('Done');

return ExitCode::SUCCESS;
} catch (Throwable $throwable) {
$this->repository->markAsFailed($uuid);
$this->error($throwable->getMessage());

return ExitCode::ERROR;
}
}
}
95 changes: 95 additions & 0 deletions src/Tempest/CommandBus/src/MonitorAsyncCommands.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use DateTimeImmutable;
use Symfony\Component\Process\Process;
use Tempest\Console\Console;
use Tempest\Console\ConsoleCommand;
use Tempest\Console\HasConsole;
use Tempest\Console\Input\ConsoleArgumentBag;
use function Tempest\Support\arr;

final readonly class MonitorAsyncCommands
{
use HasConsole;

public function __construct(
private CommandRepository $repository,
private ConsoleArgumentBag $argumentBag,
private Console $console,
) {
}

#[ConsoleCommand(name: 'command:monitor')]
public function __invoke(): void
{
$this->success("Monitoring for new commands. Press ctrl+c to stop.");

/** @var \Symfony\Component\Process\Process[] $processes */
$processes = [];

while (true) { // @phpstan-ignore-line
foreach ($processes as $uuid => $process) {
$time = new DateTimeImmutable();

if ($process->isTerminated()) {
if ($process->isSuccessful()) {
$this->writeln("<success>{$uuid}</success> finished at {$time->format('Y-m-d H:i:s')}");
} else {
$this->writeln("<error>{$uuid}</error> failed at {$time->format('Y-m-d H:i:s')}");
}

if ($output = trim($process->getOutput())) {
$this->writeln($output);
}

if ($errorOutput = trim($process->getErrorOutput())) {
$this->writeln($errorOutput);
}

unset($processes[$uuid]);
}
}

$availableCommands = arr($this->repository->getPendingCommands())
->filter(fn (object $command, string $uuid) => ! in_array($uuid, array_keys($processes)));

if (count($processes) === 5) {
$this->sleep(0.5);

continue;
}

if ($availableCommands->isEmpty()) {
$this->sleep(0.5);

continue;
}

// Start a task
$uuid = $availableCommands->keys()->first();

$time = new DateTimeImmutable();
$this->writeln("<h2>{$uuid}</h2> started at {$time->format('Y-m-d H:i:s')}");

$process = new Process([
$this->argumentBag->getBinaryPath(),
$this->argumentBag->getCliName(),
'command:handle',
$uuid,
], getcwd());

$process->start();

$processes[$uuid] = $process;
}
}

private function sleep(float $seconds): void
{
usleep((int) ($seconds * 1_000_000));
}
}
1 change: 1 addition & 0 deletions src/Tempest/CommandBus/src/stored-commands/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.txt
Loading
Loading