Skip to content

Commit

Permalink
Merge branch 'symfony4'
Browse files Browse the repository at this point in the history
  • Loading branch information
schmittjoh committed Nov 1, 2018
2 parents 384bcab + e8f7cba commit 04f5b8e
Show file tree
Hide file tree
Showing 39 changed files with 2,209 additions and 1,788 deletions.
32 changes: 18 additions & 14 deletions Command/CleanUpCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobRepository;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class CleanUpCommand extends ContainerAwareCommand
{
protected static $defaultName = 'jms-job-queue:clean-up';

private $jobManager;
private $registry;

public function __construct(ManagerRegistry $registry, JobManager $jobManager)
{
parent::__construct();

$this->jobManager = $jobManager;
$this->registry = $registry;
}

protected function configure()
{
$this
->setName('jms-job-queue:clean-up')
->setDescription('Cleans up jobs which exceed the maximum retention time.')
->addOption('max-retention', null, InputOption::VALUE_REQUIRED, 'The maximum retention time (value must be parsable by DateTime).', '7 days')
->addOption('max-retention-succeeded', null, InputOption::VALUE_REQUIRED, 'The maximum retention time for succeeded jobs (value must be parsable by DateTime).', '1 hour')
Expand All @@ -27,11 +39,8 @@ protected function configure()

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ManagerRegistry $registry */
$registry = $this->getContainer()->get('doctrine');

/** @var EntityManager $em */
$em = $registry->getManagerForClass('JMSJobQueueBundle:Job');
$em = $this->registry->getManagerForClass(Job::class);
$con = $em->getConnection();

$this->cleanUpExpiredJobs($em, $con, $input);
Expand All @@ -40,15 +49,12 @@ protected function execute(InputInterface $input, OutputInterface $output)

private function collectStaleJobs(EntityManager $em)
{
/** @var JobRepository $repository */
$repository = $em->getRepository(Job::class);

foreach ($this->findStaleJobs($em) as $job) {
if ($job->isRetried()) {
continue;
}

$repository->closeJob($job, Job::STATE_INCOMPLETE);
$this->jobManager->closeJob($job, Job::STATE_INCOMPLETE);
}
}

Expand Down Expand Up @@ -115,9 +121,7 @@ private function resolveDependencies(EntityManager $em, Job $job)
// If this job has failed, or has otherwise not succeeded, we need to set the
// incoming dependencies to failed if that has not been done already.
if ( ! $job->isFinished()) {
/** @var JobRepository $repository */
$repository = $em->getRepository(Job::class);
foreach ($repository->findIncomingDependencies($job) as $incomingDep) {
foreach ($this->jobManager->findIncomingDependencies($job) as $incomingDep) {
if ($incomingDep->isInFinalState()) {
continue;
}
Expand All @@ -127,7 +131,7 @@ private function resolveDependencies(EntityManager $em, Job $job)
$finalState = Job::STATE_FAILED;
}

$repository->closeJob($incomingDep, $finalState);
$this->jobManager->closeJob($incomingDep, $finalState);
}
}

Expand Down
36 changes: 31 additions & 5 deletions Command/MarkJobIncompleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,56 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;

class MarkJobIncompleteCommand extends ContainerAwareCommand
{
protected static $defaultName = 'jms-job-queue:mark-incomplete';

private $registry;
private $jobManager;

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager)
{
parent::__construct();

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
}

protected function configure()
{
$this
->setName('jms-job-queue:mark-incomplete')
->setDescription('Internal command (do not use). It marks jobs as incomplete.')
->addArgument('job-id', InputArgument::REQUIRED, 'The ID of the Job.')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$c = $this->getContainer();
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);

/** @var Job|null $job */
$job = $em->createQuery("SELECT j FROM ".Job::class." j WHERE j.id = :id")
->setParameter('id', $input->getArgument('job-id'))
->getOneOrNullResult();

if ($job === null) {
$output->writeln('<error>Job was not found.</error>');

return 1;
}

$em = $c->get('doctrine')->getManagerForClass('JMSJobQueueBundle:Job');
$repo = $em->getRepository('JMSJobQueueBundle:Job');
$this->jobManager->closeJob($job, Job::STATE_INCOMPLETE);

$repo->closeJob($em->find('JMSJobQueueBundle:Job', $input->getArgument('job-id')), Job::STATE_INCOMPLETE);
return 0;
}
}
114 changes: 37 additions & 77 deletions Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
namespace JMS\JobQueueBundle\Command;

use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Repository\JobRepository;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Bridge\Doctrine\ManagerRegistry;
use Symfony\Component\Process\Exception\ProcessFailedException;
use JMS\JobQueueBundle\Exception\LogicException;
use JMS\JobQueueBundle\Exception\InvalidArgumentException;
use JMS\JobQueueBundle\Event\NewOutputEvent;
use Symfony\Component\Process\ProcessBuilder;
use Symfony\Component\Process\Process;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use JMS\JobQueueBundle\Event\NewOutputEvent;
use JMS\JobQueueBundle\Event\StateChangeEvent;
use Symfony\Component\Console\Input\InputOption;
use JMS\JobQueueBundle\Exception\InvalidArgumentException;
use Symfony\Bridge\Doctrine\ManagerRegistry;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand
{
protected static $defaultName = 'jms-job-queue:run';

/** @var string */
private $env;

Expand All @@ -57,12 +57,9 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC
/** @var bool */
private $shouldShutdown = false;

private $consoleFile;

protected function configure()
{
$this
->setName('jms-job-queue:run')
->setDescription('Runs jobs from the queue.')
->addOption('max-runtime', 'r', InputOption::VALUE_REQUIRED, 'The maximum runtime in seconds.', 900)
->addOption('max-concurrent-jobs', 'j', InputOption::VALUE_REQUIRED, 'The maximum number of concurrent jobs.', 4)
Expand All @@ -76,8 +73,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$startTime = time();

$this->consoleFile = $this->findConsoleFile();

$maxRuntime = (integer) $input->getOption('max-runtime');
if ($maxRuntime <= 0) {
throw new InvalidArgumentException('The maximum runtime must be greater than zero.');
Expand Down Expand Up @@ -199,7 +194,7 @@ private function startJobs($workerName, $idleTime, $maxJobs, array $restrictedQu
{
$excludedIds = array();
while (count($this->runningJobs) < $maxJobs) {
$pendingJob = $this->getRepository()->findStartableJob(
$pendingJob = $this->getJobManager()->findStartableJob(
$workerName,
$excludedIds,
$this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs),
Expand Down Expand Up @@ -296,7 +291,7 @@ private function checkRunningJobs()
$data['process']->stop(5);

$this->output->writeln($data['job'].' terminated; maximum runtime exceeded.');
$this->getRepository()->closeJob($data['job'], Job::STATE_TERMINATED);
$this->getJobManager()->closeJob($data['job'], Job::STATE_TERMINATED);
unset($this->runningJobs[$i]);

continue;
Expand Down Expand Up @@ -326,7 +321,7 @@ private function checkRunningJobs()
$data['job']->setRuntime(time() - $data['start_time']);

$newState = 0 === $data['process']->getExitCode() ? Job::STATE_FINISHED : Job::STATE_FAILED;
$this->getRepository()->closeJob($data['job'], $newState);
$this->getJobManager()->closeJob($data['job'], $newState);
unset($this->runningJobs[$i]);
}

Expand All @@ -340,7 +335,7 @@ private function startJob(Job $job)
$newState = $event->getNewState();

if (Job::STATE_CANCELED === $newState) {
$this->getRepository()->closeJob($job, Job::STATE_CANCELED);
$this->getJobManager()->closeJob($job, Job::STATE_CANCELED);

return;
}
Expand All @@ -354,17 +349,15 @@ private function startJob(Job $job)
$em->persist($job);
$em->flush($job);

$pb = $this->getCommandProcessBuilder();
$pb
->add($job->getCommand())
->add('--jms-job-id='.$job->getId())
;
$args = $this->getBasicCommandLineArgs();
$args[] = $job->getCommand();
$args[] = '--jms-job-id='.$job->getId();

foreach ($job->getArgs() as $arg) {
$pb->add($arg);
$args[] = $arg;
}
$proc = new Process($pb->getProcess()->getCommandLine());

$proc = new Process($args);
$proc->start();
$this->output->writeln(sprintf('Started %s.', $job));

Expand Down Expand Up @@ -402,16 +395,12 @@ private function cleanUpStaleJobs($workerName)
continue;
}

$pb = $this->getCommandProcessBuilder();
$pb
->add('jms-job-queue:mark-incomplete')
->add($job->getId())
->add('--env='.$this->env)
->add('--verbose')
;
$args = $this->getBasicCommandLineArgs();
$args[] = 'jms-job-queue:mark-incomplete';
$args[] = $job->getId();

// We use a separate process to clean up.
$proc = new Process($pb->getProcess()->getCommandLine());
$proc = new Process($args);
if (0 !== $proc->run()) {
$ex = new ProcessFailedException($proc);

Expand All @@ -420,60 +409,31 @@ private function cleanUpStaleJobs($workerName)
}
}

/**
* @return ProcessBuilder
*/
private function getCommandProcessBuilder()
private function getBasicCommandLineArgs(): array
{
$pb = new ProcessBuilder();

// PHP wraps the process in "sh -c" by default, but we need to control
// the process directly.
if ( ! defined('PHP_WINDOWS_VERSION_MAJOR')) {
$pb->add('exec');
}

$pb
->add(PHP_BINARY)
->add($this->consoleFile)
->add('--env='.$this->env)
;
$args = array(
PHP_BINARY,
$_SERVER['SYMFONY_CONSOLE_FILE'] ?? $_SERVER['argv'][0],
'--env='.$this->env
);

if ($this->verbose) {
$pb->add('--verbose');
}

return $pb;
}

private function findConsoleFile()
{
$kernelDir = $this->getContainer()->getParameter('kernel.root_dir');

if (file_exists($kernelDir.'/console')) {
return $kernelDir.'/console';
$args[] = '--verbose';
}

if (file_exists($kernelDir.'/../bin/console')) {
return $kernelDir.'/../bin/console';
}

throw new \RuntimeException('Could not locate console file.');
return $args;
}

/**
* @return EntityManager
*/
private function getEntityManager()
private function getEntityManager(): EntityManager
{
return $this->registry->getManagerForClass('JMSJobQueueBundle:Job');
return /** @var EntityManager */ $this->registry->getManagerForClass('JMSJobQueueBundle:Job');
}

/**
* @return JobRepository
* @return JobManager
*/
private function getRepository()
private function getJobManager()
{
return $this->getEntityManager()->getRepository('JMSJobQueueBundle:Job');
return $this->getContainer()->get('jms_job_queue.job_manager');
}
}
Loading

0 comments on commit 04f5b8e

Please # to comment.