diff --git a/Command/CleanUpCommand.php b/Command/CleanUpCommand.php index e9fe9f88..d604e868 100644 --- a/Command/CleanUpCommand.php +++ b/Command/CleanUpCommand.php @@ -6,7 +6,7 @@ use Doctrine\DBAL\Connection; use Doctrine\ORM\EntityManager; use JMS\JobQueueBundle\Entity\Job; -use JMS\JobQueueBundle\Entity\Repository\JobRepository; +use JMS\JobQueueBundle\Entity\Repository\JobManager; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; @@ -14,10 +14,22 @@ class CleanUpCommand extends ContainerAwareCommand { + protected static $defaultName = 'jms-job-queue:clean-up'; + + private $jobManager; + private $registry; + + public function __construct(ManagerRegistry $registry, JobManager $jobManager) + { + parent::__construct(); + + $this->jobManager = $jobManager; + $this->registry = $registry; + } + protected function configure() { $this - ->setName('jms-job-queue:clean-up') ->setDescription('Cleans up jobs which exceed the maximum retention time.') ->addOption('max-retention', null, InputOption::VALUE_REQUIRED, 'The maximum retention time (value must be parsable by DateTime).', '7 days') ->addOption('max-retention-succeeded', null, InputOption::VALUE_REQUIRED, 'The maximum retention time for succeeded jobs (value must be parsable by DateTime).', '1 hour') @@ -27,11 +39,8 @@ protected function configure() protected function execute(InputInterface $input, OutputInterface $output) { - /** @var ManagerRegistry $registry */ - $registry = $this->getContainer()->get('doctrine'); - /** @var EntityManager $em */ - $em = $registry->getManagerForClass('JMSJobQueueBundle:Job'); + $em = $this->registry->getManagerForClass(Job::class); $con = $em->getConnection(); $this->cleanUpExpiredJobs($em, $con, $input); @@ -40,15 +49,12 @@ protected function execute(InputInterface $input, OutputInterface $output) private function collectStaleJobs(EntityManager $em) { - /** @var JobRepository $repository */ - $repository = $em->getRepository(Job::class); - foreach ($this->findStaleJobs($em) as $job) { if ($job->isRetried()) { continue; } - $repository->closeJob($job, Job::STATE_INCOMPLETE); + $this->jobManager->closeJob($job, Job::STATE_INCOMPLETE); } } @@ -115,9 +121,7 @@ private function resolveDependencies(EntityManager $em, Job $job) // If this job has failed, or has otherwise not succeeded, we need to set the // incoming dependencies to failed if that has not been done already. if ( ! $job->isFinished()) { - /** @var JobRepository $repository */ - $repository = $em->getRepository(Job::class); - foreach ($repository->findIncomingDependencies($job) as $incomingDep) { + foreach ($this->jobManager->findIncomingDependencies($job) as $incomingDep) { if ($incomingDep->isInFinalState()) { continue; } @@ -127,7 +131,7 @@ private function resolveDependencies(EntityManager $em, Job $job) $finalState = Job::STATE_FAILED; } - $repository->closeJob($incomingDep, $finalState); + $this->jobManager->closeJob($incomingDep, $finalState); } } diff --git a/Command/MarkJobIncompleteCommand.php b/Command/MarkJobIncompleteCommand.php index 0af75c9d..085dcea2 100644 --- a/Command/MarkJobIncompleteCommand.php +++ b/Command/MarkJobIncompleteCommand.php @@ -2,7 +2,10 @@ namespace JMS\JobQueueBundle\Command; +use Doctrine\Common\Persistence\ManagerRegistry; +use Doctrine\ORM\EntityManager; use JMS\JobQueueBundle\Entity\Job; +use JMS\JobQueueBundle\Entity\Repository\JobManager; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputArgument; @@ -10,10 +13,22 @@ class MarkJobIncompleteCommand extends ContainerAwareCommand { + protected static $defaultName = 'jms-job-queue:mark-incomplete'; + + private $registry; + private $jobManager; + + public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager) + { + parent::__construct(); + + $this->registry = $managerRegistry; + $this->jobManager = $jobManager; + } + protected function configure() { $this - ->setName('jms-job-queue:mark-incomplete') ->setDescription('Internal command (do not use). It marks jobs as incomplete.') ->addArgument('job-id', InputArgument::REQUIRED, 'The ID of the Job.') ; @@ -21,11 +36,22 @@ protected function configure() protected function execute(InputInterface $input, OutputInterface $output) { - $c = $this->getContainer(); + /** @var EntityManager $em */ + $em = $this->registry->getManagerForClass(Job::class); + + /** @var Job|null $job */ + $job = $em->createQuery("SELECT j FROM ".Job::class." j WHERE j.id = :id") + ->setParameter('id', $input->getArgument('job-id')) + ->getOneOrNullResult(); + + if ($job === null) { + $output->writeln('Job was not found.'); + + return 1; + } - $em = $c->get('doctrine')->getManagerForClass('JMSJobQueueBundle:Job'); - $repo = $em->getRepository('JMSJobQueueBundle:Job'); + $this->jobManager->closeJob($job, Job::STATE_INCOMPLETE); - $repo->closeJob($em->find('JMSJobQueueBundle:Job', $input->getArgument('job-id')), Job::STATE_INCOMPLETE); + return 0; } } \ No newline at end of file diff --git a/Command/RunCommand.php b/Command/RunCommand.php index 6fa046f9..fc6d5310 100644 --- a/Command/RunCommand.php +++ b/Command/RunCommand.php @@ -19,23 +19,23 @@ namespace JMS\JobQueueBundle\Command; use Doctrine\ORM\EntityManager; -use JMS\JobQueueBundle\Entity\Repository\JobRepository; -use Symfony\Component\EventDispatcher\EventDispatcher; -use Symfony\Bridge\Doctrine\ManagerRegistry; -use Symfony\Component\Process\Exception\ProcessFailedException; -use JMS\JobQueueBundle\Exception\LogicException; -use JMS\JobQueueBundle\Exception\InvalidArgumentException; -use JMS\JobQueueBundle\Event\NewOutputEvent; -use Symfony\Component\Process\ProcessBuilder; -use Symfony\Component\Process\Process; use JMS\JobQueueBundle\Entity\Job; +use JMS\JobQueueBundle\Entity\Repository\JobManager; +use JMS\JobQueueBundle\Event\NewOutputEvent; use JMS\JobQueueBundle\Event\StateChangeEvent; -use Symfony\Component\Console\Input\InputOption; +use JMS\JobQueueBundle\Exception\InvalidArgumentException; +use Symfony\Bridge\Doctrine\ManagerRegistry; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\Process\Exception\ProcessFailedException; +use Symfony\Component\Process\Process; class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand { + protected static $defaultName = 'jms-job-queue:run'; + /** @var string */ private $env; @@ -57,12 +57,9 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC /** @var bool */ private $shouldShutdown = false; - private $consoleFile; - protected function configure() { $this - ->setName('jms-job-queue:run') ->setDescription('Runs jobs from the queue.') ->addOption('max-runtime', 'r', InputOption::VALUE_REQUIRED, 'The maximum runtime in seconds.', 900) ->addOption('max-concurrent-jobs', 'j', InputOption::VALUE_REQUIRED, 'The maximum number of concurrent jobs.', 4) @@ -76,8 +73,6 @@ protected function execute(InputInterface $input, OutputInterface $output) { $startTime = time(); - $this->consoleFile = $this->findConsoleFile(); - $maxRuntime = (integer) $input->getOption('max-runtime'); if ($maxRuntime <= 0) { throw new InvalidArgumentException('The maximum runtime must be greater than zero.'); @@ -199,7 +194,7 @@ private function startJobs($workerName, $idleTime, $maxJobs, array $restrictedQu { $excludedIds = array(); while (count($this->runningJobs) < $maxJobs) { - $pendingJob = $this->getRepository()->findStartableJob( + $pendingJob = $this->getJobManager()->findStartableJob( $workerName, $excludedIds, $this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs), @@ -296,7 +291,7 @@ private function checkRunningJobs() $data['process']->stop(5); $this->output->writeln($data['job'].' terminated; maximum runtime exceeded.'); - $this->getRepository()->closeJob($data['job'], Job::STATE_TERMINATED); + $this->getJobManager()->closeJob($data['job'], Job::STATE_TERMINATED); unset($this->runningJobs[$i]); continue; @@ -326,7 +321,7 @@ private function checkRunningJobs() $data['job']->setRuntime(time() - $data['start_time']); $newState = 0 === $data['process']->getExitCode() ? Job::STATE_FINISHED : Job::STATE_FAILED; - $this->getRepository()->closeJob($data['job'], $newState); + $this->getJobManager()->closeJob($data['job'], $newState); unset($this->runningJobs[$i]); } @@ -340,7 +335,7 @@ private function startJob(Job $job) $newState = $event->getNewState(); if (Job::STATE_CANCELED === $newState) { - $this->getRepository()->closeJob($job, Job::STATE_CANCELED); + $this->getJobManager()->closeJob($job, Job::STATE_CANCELED); return; } @@ -354,17 +349,15 @@ private function startJob(Job $job) $em->persist($job); $em->flush($job); - $pb = $this->getCommandProcessBuilder(); - $pb - ->add($job->getCommand()) - ->add('--jms-job-id='.$job->getId()) - ; + $args = $this->getBasicCommandLineArgs(); + $args[] = $job->getCommand(); + $args[] = '--jms-job-id='.$job->getId(); foreach ($job->getArgs() as $arg) { - $pb->add($arg); + $args[] = $arg; } - - $proc = new Process($pb->getProcess()->getCommandLine()); + + $proc = new Process($args); $proc->start(); $this->output->writeln(sprintf('Started %s.', $job)); @@ -402,16 +395,12 @@ private function cleanUpStaleJobs($workerName) continue; } - $pb = $this->getCommandProcessBuilder(); - $pb - ->add('jms-job-queue:mark-incomplete') - ->add($job->getId()) - ->add('--env='.$this->env) - ->add('--verbose') - ; + $args = $this->getBasicCommandLineArgs(); + $args[] = 'jms-job-queue:mark-incomplete'; + $args[] = $job->getId(); // We use a separate process to clean up. - $proc = new Process($pb->getProcess()->getCommandLine()); + $proc = new Process($args); if (0 !== $proc->run()) { $ex = new ProcessFailedException($proc); @@ -420,60 +409,31 @@ private function cleanUpStaleJobs($workerName) } } - /** - * @return ProcessBuilder - */ - private function getCommandProcessBuilder() + private function getBasicCommandLineArgs(): array { - $pb = new ProcessBuilder(); - - // PHP wraps the process in "sh -c" by default, but we need to control - // the process directly. - if ( ! defined('PHP_WINDOWS_VERSION_MAJOR')) { - $pb->add('exec'); - } - - $pb - ->add(PHP_BINARY) - ->add($this->consoleFile) - ->add('--env='.$this->env) - ; + $args = array( + PHP_BINARY, + $_SERVER['SYMFONY_CONSOLE_FILE'] ?? $_SERVER['argv'][0], + '--env='.$this->env + ); if ($this->verbose) { - $pb->add('--verbose'); - } - - return $pb; - } - - private function findConsoleFile() - { - $kernelDir = $this->getContainer()->getParameter('kernel.root_dir'); - - if (file_exists($kernelDir.'/console')) { - return $kernelDir.'/console'; + $args[] = '--verbose'; } - if (file_exists($kernelDir.'/../bin/console')) { - return $kernelDir.'/../bin/console'; - } - - throw new \RuntimeException('Could not locate console file.'); + return $args; } - /** - * @return EntityManager - */ - private function getEntityManager() + private function getEntityManager(): EntityManager { - return $this->registry->getManagerForClass('JMSJobQueueBundle:Job'); + return /** @var EntityManager */ $this->registry->getManagerForClass('JMSJobQueueBundle:Job'); } /** - * @return JobRepository + * @return JobManager */ - private function getRepository() + private function getJobManager() { - return $this->getEntityManager()->getRepository('JMSJobQueueBundle:Job'); + return $this->getContainer()->get('jms_job_queue.job_manager'); } } diff --git a/Command/ScheduleCommand.php b/Command/ScheduleCommand.php index fa6fe57a..bc8d4ba4 100644 --- a/Command/ScheduleCommand.php +++ b/Command/ScheduleCommand.php @@ -10,17 +10,31 @@ use JMS\JobQueueBundle\Cron\JobScheduler; use JMS\JobQueueBundle\Entity\CronJob; use JMS\JobQueueBundle\Entity\Job; -use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; +use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -class ScheduleCommand extends ContainerAwareCommand +class ScheduleCommand extends Command { + protected static $defaultName = 'jms-job-queue:schedule'; + + private $registry; + private $schedulers; + private $cronCommands; + + public function __construct(ManagerRegistry $managerRegistry, iterable $schedulers, iterable $cronCommands) + { + parent::__construct(); + + $this->registry = $managerRegistry; + $this->schedulers = $schedulers; + $this->cronCommands = $cronCommands; + } + protected function configure() { $this - ->setName('jms-job-queue:schedule') ->setDescription('Schedules jobs at defined intervals') ->addOption('max-runtime', null, InputOption::VALUE_REQUIRED, 'The maximum runtime of this command.', 3600) ->addOption('min-job-interval', null, InputOption::VALUE_REQUIRED, 'The minimum time between schedules jobs in seconds.', 5) @@ -29,9 +43,6 @@ protected function configure() protected function execute(InputInterface $input, OutputInterface $output) { - /** @var ManagerRegistry $registry */ - $registry = $this->getContainer()->get('doctrine'); - $maxRuntime = $input->getOption('max-runtime'); if ($maxRuntime > 300) { $maxRuntime += mt_rand(0, (integer)($input->getOption('max-runtime') * 0.05)); @@ -52,7 +63,7 @@ protected function execute(InputInterface $input, OutputInterface $output) return 0; } - $jobsLastRunAt = $this->populateJobsLastRunAt($registry->getManagerForClass(CronJob::class), $jobSchedulers); + $jobsLastRunAt = $this->populateJobsLastRunAt($this->registry->getManagerForClass(CronJob::class), $jobSchedulers); $startedAt = time(); while (true) { @@ -63,7 +74,7 @@ protected function execute(InputInterface $input, OutputInterface $output) break; } - $this->scheduleJobs($output, $registry, $jobSchedulers, $jobsLastRunAt); + $this->scheduleJobs($output, $jobSchedulers, $jobsLastRunAt); $timeToWait = microtime(true) - $lastRunAt + $minJobInterval; if ($timeToWait > 0) { @@ -78,7 +89,7 @@ protected function execute(InputInterface $input, OutputInterface $output) * @param JobScheduler[] $jobSchedulers * @param \DateTime[] $jobsLastRunAt */ - private function scheduleJobs(OutputInterface $output, ManagerRegistry $registry, array $jobSchedulers, array &$jobsLastRunAt) + private function scheduleJobs(OutputInterface $output, array $jobSchedulers, array &$jobsLastRunAt) { foreach ($jobSchedulers as $name => $scheduler) { $lastRunAt = $jobsLastRunAt[$name]; @@ -87,23 +98,23 @@ private function scheduleJobs(OutputInterface $output, ManagerRegistry $registry continue; } - list($success, $newLastRunAt) = $this->acquireLock($registry, $name, $lastRunAt); + list($success, $newLastRunAt) = $this->acquireLock($name, $lastRunAt); $jobsLastRunAt[$name] = $newLastRunAt; if ($success) { $output->writeln('Scheduling command '.$name); $job = $scheduler->createJob($name, $lastRunAt); - $em = $registry->getManagerForClass(Job::class); + $em = $this->registry->getManagerForClass(Job::class); $em->persist($job); $em->flush($job); } } } - private function acquireLock(ManagerRegistry $registry, $commandName, \DateTime $lastRunAt) + private function acquireLock($commandName, \DateTime $lastRunAt) { /** @var EntityManager $em */ - $em = $registry->getManagerForClass(CronJob::class); + $em = $this->registry->getManagerForClass(CronJob::class); $con = $em->getConnection(); $now = new \DateTime(); @@ -135,14 +146,21 @@ private function acquireLock(ManagerRegistry $registry, $commandName, \DateTime private function populateJobSchedulers() { - $schedulers = $this->getContainer()->get('jms_job_queue.scheduler_registry')->getSchedulers(); + $schedulers = []; + foreach ($this->schedulers as $scheduler) { + /** @var JobScheduler $scheduler */ + foreach ($scheduler->getCommands() as $name) { + $schedulers[$name] = $scheduler; + } + } - foreach ($this->getApplication()->all() as $name => $command) { - if ( ! $command instanceof CronCommand) { - continue; + foreach ($this->cronCommands as $command) { + /** @var CronCommand $command */ + if ( ! $command instanceof Command) { + throw new \RuntimeException('CronCommand should only be used on Symfony commands.'); } - $schedulers[$name] = new CommandScheduler($command); + $schedulers[$command->getName()] = new CommandScheduler($command->getName(), $command); } return $schedulers; diff --git a/Console/CronCommand.php b/Console/CronCommand.php index 8dc952ea..6ea3decd 100644 --- a/Console/CronCommand.php +++ b/Console/CronCommand.php @@ -1,5 +1,7 @@ getTimestamp() >= $this->getScheduleInterval(); } - public function createCronJob(\DateTime $_) + public function createCronJob(\DateTime $_): Job { if ( ! $this instanceof Command) { throw new \LogicException('This trait must be used in Symfony console commands only.'); @@ -27,5 +29,5 @@ public function createCronJob(\DateTime $_) /** * @return integer */ - abstract protected function getScheduleInterval(); + abstract protected function getScheduleInterval(): int; } \ No newline at end of file diff --git a/Controller/JobController.php b/Controller/JobController.php index 39d5fddf..0719ccf1 100644 --- a/Controller/JobController.php +++ b/Controller/JobController.php @@ -3,29 +3,20 @@ namespace JMS\JobQueueBundle\Controller; use Doctrine\Common\Util\ClassUtils; -use JMS\DiExtraBundle\Annotation as DI; +use Doctrine\ORM\EntityManager; use JMS\JobQueueBundle\Entity\Job; +use JMS\JobQueueBundle\Entity\Repository\JobManager; use JMS\JobQueueBundle\View\JobFilter; use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route; -use Sensio\Bundle\FrameworkExtraBundle\Configuration\Template; +use Symfony\Bundle\FrameworkBundle\Controller\Controller; use Symfony\Component\HttpFoundation\RedirectResponse; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpKernel\Exception\HttpException; -class JobController +class JobController extends Controller { - /** @DI\Inject("doctrine") */ - private $registry; - - /** @DI\Inject */ - private $router; - - /** @DI\Inject("%jms_job_queue.statistics%") */ - private $statisticsEnabled; - /** * @Route("/", name = "jms_jobs_overview") - * @Template("JMSJobQueueBundle:Job:overview.html.twig") */ public function overviewAction(Request $request) { @@ -63,18 +54,17 @@ public function overviewAction(Request $request) $jobs = $query->getResult(); - return array( + return $this->render('@JMSJobQueue/Job/overview.html.twig', array( 'jobsWithError' => $lastJobsWithError, 'jobs' => array_slice($jobs, 0, $perPage), 'jobFilter' => $jobFilter, 'hasMore' => count($jobs) > $perPage, 'jobStates' => Job::getStates(), - ); + )); } /** * @Route("/{id}", name = "jms_jobs_details") - * @Template("JMSJobQueueBundle:Job:details.html.twig") */ public function detailsAction(Job $job) { @@ -83,15 +73,15 @@ public function detailsAction(Job $job) $class = ClassUtils::getClass($entity); $relatedEntities[] = array( 'class' => $class, - 'id' => json_encode($this->registry->getManagerForClass($class)->getClassMetadata($class)->getIdentifierValues($entity)), + 'id' => json_encode($this->get('doctrine')->getManagerForClass($class)->getClassMetadata($class)->getIdentifierValues($entity)), 'raw' => $entity, ); } $statisticData = $statisticOptions = array(); - if ($this->statisticsEnabled) { + if ($this->getParameter('jms_job_queue.statistics')) { $dataPerCharacteristic = array(); - foreach ($this->registry->getManagerForClass('JMSJobQueueBundle:Job')->getConnection()->query("SELECT * FROM jms_job_statistics WHERE job_id = ".$job->getId()) as $row) { + foreach ($this->get('doctrine')->getManagerForClass(Job::class)->getConnection()->query("SELECT * FROM jms_job_statistics WHERE job_id = ".$job->getId()) as $row) { $dataPerCharacteristic[$row['characteristic']][] = array( // hack because postgresql lower-cases all column names. array_key_exists('createdAt', $row) ? $row['createdAt'] : $row['createdat'], @@ -125,13 +115,13 @@ public function detailsAction(Job $job) } } - return array( + return $this->render('@JMSJobQueue/Job/details.html.twig', array( 'job' => $job, 'relatedEntities' => $relatedEntities, 'incomingDependencies' => $this->getRepo()->getIncomingDependencies($job), 'statisticData' => $statisticData, 'statisticOptions' => $statisticOptions, - ); + )); } /** @@ -154,20 +144,18 @@ public function retryJobAction(Job $job) $this->getEm()->persist($retryJob); $this->getEm()->flush(); - $url = $this->router->generate('jms_jobs_details', array('id' => $retryJob->getId()), false); + $url = $this->generateUrl('jms_jobs_details', array('id' => $retryJob->getId())); return new RedirectResponse($url, 201); } - /** @return \Doctrine\ORM\EntityManager */ - private function getEm() + private function getEm(): EntityManager { - return $this->registry->getManagerForClass('JMSJobQueueBundle:Job'); + return $this->get('doctrine')->getManagerForClass(Job::class); } - /** @return \JMS\JobQueueBundle\Entity\Repository\JobRepository */ - private function getRepo() + private function getRepo(): JobManager { - return $this->getEm()->getRepository('JMSJobQueueBundle:Job'); + return $this->get('jms_job_queue.job_manager'); } } diff --git a/Cron/CommandScheduler.php b/Cron/CommandScheduler.php index 106485b0..48403c84 100644 --- a/Cron/CommandScheduler.php +++ b/Cron/CommandScheduler.php @@ -3,22 +3,30 @@ namespace JMS\JobQueueBundle\Cron; use JMS\JobQueueBundle\Console\CronCommand; +use JMS\JobQueueBundle\Entity\Job; class CommandScheduler implements JobScheduler { + private $name; private $command; - public function __construct(CronCommand $command) + public function __construct(string $name, CronCommand $command) { + $this->name = $name; $this->command = $command; } - public function shouldSchedule($_, \DateTime $lastRunAt) + public function getCommands(): array + { + return [$this->name]; + } + + public function shouldSchedule($_, \DateTime $lastRunAt): bool { return $this->command->shouldBeScheduled($lastRunAt); } - public function createJob($_, \DateTime $lastRunAt) + public function createJob($_, \DateTime $lastRunAt): Job { return $this->command->createCronJob($lastRunAt); } diff --git a/Cron/JobScheduler.php b/Cron/JobScheduler.php index f4a55caf..7495e7e7 100644 --- a/Cron/JobScheduler.php +++ b/Cron/JobScheduler.php @@ -7,12 +7,25 @@ interface JobScheduler { /** + * Returns an array of commands managed by this scheduler. + * + * @return string[] + */ + public function getCommands(): array; + + /** + * Returns whether to schedule the given command again. + * * @return boolean */ - public function shouldSchedule($command, \DateTime $lastRunAt); + public function shouldSchedule(string $command, \DateTime $lastRunAt): bool; /** + * Creates the given command when it is scheduled. + * + * @param string $command + * @param \DateTime $lastRunAt * @return Job */ - public function createJob($command, \DateTime $lastRunAt); + public function createJob(string $command, \DateTime $lastRunAt): Job; } \ No newline at end of file diff --git a/Cron/SchedulerRegistry.php b/Cron/SchedulerRegistry.php deleted file mode 100644 index f2e164fa..00000000 --- a/Cron/SchedulerRegistry.php +++ /dev/null @@ -1,21 +0,0 @@ -schedulers = $schedulers; - } - - public function getSchedulers() - { - return $this->schedulers; - } -} \ No newline at end of file diff --git a/DependencyInjection/CompilerPass/JobSchedulersPass.php b/DependencyInjection/CompilerPass/JobSchedulersPass.php deleted file mode 100644 index 9eeb2933..00000000 --- a/DependencyInjection/CompilerPass/JobSchedulersPass.php +++ /dev/null @@ -1,27 +0,0 @@ -findTaggedServiceIds('jms_job_queue.scheduler') as $id => $attributes) { - foreach ($attributes as $attributeData) { - if (!isset($attributeData['command'])) { - throw new \RuntimeException(sprintf('The tag "jms_job_queue.schedulers" of service "%s" must have a "command" attribute.', $id)); - } - - $schedulers[$attributeData['command']] = new Reference($id); - } - } - - $container->getDefinition('jms_job_queue.scheduler_registry') - ->addArgument($schedulers); - } -} \ No newline at end of file diff --git a/DependencyInjection/JMSJobQueueExtension.php b/DependencyInjection/JMSJobQueueExtension.php index 2d9a9a42..b8be3fab 100644 --- a/DependencyInjection/JMSJobQueueExtension.php +++ b/DependencyInjection/JMSJobQueueExtension.php @@ -18,6 +18,8 @@ namespace JMS\JobQueueBundle\DependencyInjection; +use JMS\JobQueueBundle\Console\CronCommand; +use JMS\JobQueueBundle\Cron\JobScheduler; use JMS\JobQueueBundle\Entity\Type\SafeObjectType; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\Config\FileLocator; @@ -42,12 +44,18 @@ public function load(array $configs, ContainerBuilder $container) $loader = new Loader\XmlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.xml'); + $loader->load('console.xml'); $container->setParameter('jms_job_queue.statistics', $config['statistics']); if ($config['statistics']) { $loader->load('statistics.xml'); } + $container->registerForAutoconfiguration(JobScheduler::class) + ->addTag('jms_job_queue.scheduler'); + $container->registerForAutoconfiguration(CronCommand::class) + ->addTag('jms_job_queue.cron_command'); + $container->setParameter('jms_job_queue.queue_options_defaults', $config['queue_options_defaults']); $container->setParameter('jms_job_queue.queue_options', $config['queue_options']); } diff --git a/Entity/Job.php b/Entity/Job.php index ccfd6aa9..49d98dc4 100644 --- a/Entity/Job.php +++ b/Entity/Job.php @@ -25,7 +25,7 @@ use Symfony\Component\Debug\Exception\FlattenException; /** - * @ORM\Entity(repositoryClass = "JMS\JobQueueBundle\Entity\Repository\JobRepository") + * @ORM\Entity * @ORM\Table(name = "jms_jobs", indexes = { * @ORM\Index("cmd_search_index", columns = {"command"}), * @ORM\Index("sorting_index", columns = {"state", "priority", "id"}), diff --git a/Entity/Listener/PersistentRelatedEntitiesCollection.php b/Entity/Listener/PersistentRelatedEntitiesCollection.php index 74ee4336..470e883d 100644 --- a/Entity/Listener/PersistentRelatedEntitiesCollection.php +++ b/Entity/Listener/PersistentRelatedEntitiesCollection.php @@ -11,7 +11,6 @@ use Doctrine\Common\Collections\Selectable; use JMS\JobQueueBundle\Entity\Job; use Symfony\Bridge\Doctrine\RegistryInterface; -use Symfony\Component\Validator\Constraints\Collection as Collection2; /** * Collection for persistent related entities. @@ -366,7 +365,7 @@ public function getIterator() * a new collection with the elements returned by the function. * * @param Closure $func - * @return Collection2 + * @return Collection */ public function map(Closure $func) { @@ -380,7 +379,7 @@ public function map(Closure $func) * The order of the elements is preserved. * * @param Closure $p The predicate used for filtering. - * @return Collection2 A collection with the results of the filter operation. + * @return Collection A collection with the results of the filter operation. */ public function filter(Closure $p) { @@ -474,7 +473,7 @@ public function slice($offset, $length = null) * return a new collection containing these elements. * * @param Criteria $criteria - * @return Collection2 + * @return Collection */ public function matching(Criteria $criteria) { diff --git a/Entity/Repository/JobRepository.php b/Entity/Repository/JobManager.php similarity index 80% rename from Entity/Repository/JobRepository.php rename to Entity/Repository/JobManager.php index f9edc9ec..63c3e351 100644 --- a/Entity/Repository/JobRepository.php +++ b/Entity/Repository/JobManager.php @@ -20,59 +20,34 @@ use Doctrine\Common\Collections\ArrayCollection; use Doctrine\Common\Util\ClassUtils; +use Doctrine\DBAL\Connection; use Doctrine\DBAL\Types\Type; -use Doctrine\ORM\EntityRepository; +use Doctrine\ORM\EntityManager; use Doctrine\ORM\Query\Parameter; use Doctrine\ORM\Query\ResultSetMappingBuilder; -use JMS\DiExtraBundle\Annotation as DI; use JMS\JobQueueBundle\Entity\Job; use JMS\JobQueueBundle\Event\StateChangeEvent; use JMS\JobQueueBundle\Retry\ExponentialRetryScheduler; use JMS\JobQueueBundle\Retry\RetryScheduler; -use Symfony\Bridge\Doctrine\RegistryInterface; +use Symfony\Bridge\Doctrine\ManagerRegistry; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -use DateTime; -use Doctrine\DBAL\Connection; -class JobRepository extends EntityRepository +class JobManager { private $dispatcher; private $registry; private $retryScheduler; - - /** - * @DI\InjectParams({ - * "dispatcher" = @DI\Inject("event_dispatcher"), - * }) - */ - public function setDispatcher(EventDispatcherInterface $dispatcher) - { - $this->dispatcher = $dispatcher; - } - - /** - * @DI\InjectParams({ - * "retryScheduler" = @DI\Inject("jms_job_queue.retry_scheduler"), - * }) - */ - public function setRetryScheduler(RetryScheduler $retryScheduler) + + public function __construct(ManagerRegistry $managerRegistry, EventDispatcherInterface $eventDispatcher, RetryScheduler $retryScheduler) { + $this->registry = $managerRegistry; + $this->dispatcher = $eventDispatcher; $this->retryScheduler = $retryScheduler; } - /** - * @DI\InjectParams({ - * "registry" = @DI\Inject("doctrine"), - * }) - */ - public function setRegistry(RegistryInterface $registry) - { - $this->registry = $registry; - } - public function findJob($command, array $args = array()) { - return $this->_em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args") + return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args") ->setParameter('command', $command) ->setParameter('args', $args, Type::JSON_ARRAY) ->setMaxResults(1) @@ -95,10 +70,10 @@ public function getOrCreateIfNotExists($command, array $args = array()) } $job = new Job($command, $args, false); - $this->_em->persist($job); - $this->_em->flush($job); + $this->getJobManager()->persist($job); + $this->getJobManager()->flush($job); - $firstJob = $this->_em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args ORDER BY j.id ASC") + $firstJob = $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args ORDER BY j.id ASC") ->setParameter('command', $command) ->setParameter('args', $args, 'json_array') ->setMaxResults(1) @@ -106,14 +81,14 @@ public function getOrCreateIfNotExists($command, array $args = array()) if ($firstJob === $job) { $job->setState(Job::STATE_PENDING); - $this->_em->persist($job); - $this->_em->flush($job); + $this->getJobManager()->persist($job); + $this->getJobManager()->flush($job); return $job; } - $this->_em->remove($job); - $this->_em->flush($job); + $this->getJobManager()->remove($job); + $this->getJobManager()->flush($job); return $firstJob; } @@ -130,7 +105,7 @@ public function findStartableJob($workerName, array &$excludedIds = array(), $ex // We do not want to have non-startable jobs floating around in // cache as they might be changed by another process. So, better // re-fetch them when they are not excluded anymore. - $this->_em->detach($job); + $this->getJobManager()->detach($job); } return null; @@ -138,7 +113,7 @@ public function findStartableJob($workerName, array &$excludedIds = array(), $ex private function acquireLock($workerName, Job $job) { - $affectedRows = $this->_em->getConnection()->executeUpdate( + $affectedRows = $this->getJobManager()->getConnection()->executeUpdate( "UPDATE jms_jobs SET workerName = :worker WHERE id = :id AND workerName IS NULL", array( 'worker' => $workerName, @@ -159,10 +134,10 @@ public function findAllForRelatedEntity($relatedEntity) { list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); - $rsm = new ResultSetMappingBuilder($this->_em); + $rsm = new ResultSetMappingBuilder($this->getJobManager()); $rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); - return $this->_em->createNativeQuery("SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId", $rsm) + return $this->getJobManager()->createNativeQuery("SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId", $rsm) ->setParameter('relClass', $relClass) ->setParameter('relId', $relId) ->getResult(); @@ -177,7 +152,7 @@ public function findJobForRelatedEntity($command, $relatedEntity, array $states { list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); - $rsm = new ResultSetMappingBuilder($this->_em); + $rsm = new ResultSetMappingBuilder($this->getJobManager()); $rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); $sql = "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId AND j.command = :command"; @@ -191,7 +166,7 @@ public function findJobForRelatedEntity($command, $relatedEntity, array $states $params->add(new Parameter('states', $states, Connection::PARAM_STR_ARRAY)); } - return $this->_em->createNativeQuery($sql, $rsm) + return $this->getJobManager()->createNativeQuery($sql, $rsm) ->setParameters($params) ->getOneOrNullResult(); } @@ -218,7 +193,7 @@ private function getRelatedEntityIdentifier($entity) public function findPendingJob(array $excludedIds = array(), array $excludedQueues = array(), array $restrictedQueues = array()) { - $qb = $this->_em->createQueryBuilder(); + $qb = $this->getJobManager()->createQueryBuilder(); $qb->select('j')->from('JMSJobQueueBundle:Job', 'j') ->orderBy('j.priority', 'ASC') ->addOrderBy('j.id', 'ASC'); @@ -255,12 +230,12 @@ public function findPendingJob(array $excludedIds = array(), array $excludedQueu public function closeJob(Job $job, $finalState) { - $this->_em->getConnection()->beginTransaction(); + $this->getJobManager()->getConnection()->beginTransaction(); try { $visited = array(); $this->closeJobInternal($job, $finalState, $visited); - $this->_em->flush(); - $this->_em->getConnection()->commit(); + $this->getJobManager()->flush(); + $this->getJobManager()->getConnection()->commit(); // Clean-up entity manager to allow for garbage collection to kick in. foreach ($visited as $job) { @@ -270,10 +245,10 @@ public function closeJob(Job $job, $finalState) continue; } - $this->_em->detach($job); + $this->getJobManager()->detach($job); } } catch (\Exception $ex) { - $this->_em->getConnection()->rollback(); + $this->getJobManager()->getConnection()->rollback(); throw $ex; } @@ -299,7 +274,7 @@ private function closeJobInternal(Job $job, $finalState, array &$visited = array switch ($finalState) { case Job::STATE_CANCELED: $job->setState(Job::STATE_CANCELED); - $this->_em->persist($job); + $this->getJobManager()->persist($job); if ($job->isRetryJob()) { $this->closeJobInternal($job->getOriginalJob(), Job::STATE_CANCELED, $visited); @@ -318,7 +293,7 @@ private function closeJobInternal(Job $job, $finalState, array &$visited = array case Job::STATE_INCOMPLETE: if ($job->isRetryJob()) { $job->setState($finalState); - $this->_em->persist($job); + $this->getJobManager()->persist($job); $this->closeJobInternal($job->getOriginalJob(), $finalState); @@ -337,14 +312,14 @@ private function closeJobInternal(Job $job, $finalState, array &$visited = array $retryJob->setExecuteAfter($this->retryScheduler->scheduleNextRetry($job)); $job->addRetryJob($retryJob); - $this->_em->persist($retryJob); - $this->_em->persist($job); + $this->getJobManager()->persist($retryJob); + $this->getJobManager()->persist($job); return; } $job->setState($finalState); - $this->_em->persist($job); + $this->getJobManager()->persist($job); // The original job has failed, and no retries are allowed. foreach ($this->findIncomingDependencies($job) as $dep) { @@ -361,10 +336,10 @@ private function closeJobInternal(Job $job, $finalState, array &$visited = array case Job::STATE_FINISHED: if ($job->isRetryJob()) { $job->getOriginalJob()->setState($finalState); - $this->_em->persist($job->getOriginalJob()); + $this->getJobManager()->persist($job->getOriginalJob()); } $job->setState($finalState); - $this->_em->persist($job); + $this->getJobManager()->persist($job); return; @@ -383,7 +358,7 @@ public function findIncomingDependencies(Job $job) return array(); } - return $this->_em->createQuery("SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)") + return $this->getJobManager()->createQuery("SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)") ->setParameter('ids', $jobIds) ->getResult(); } @@ -398,14 +373,14 @@ public function getIncomingDependencies(Job $job) return array(); } - return $this->_em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)") + return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)") ->setParameter('ids', $jobIds) ->getResult(); } private function getJobIdsOfIncomingDependencies(Job $job) { - $jobIds = $this->_em->getConnection() + $jobIds = $this->getJobManager()->getConnection() ->executeQuery("SELECT source_job_id FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId())) ->fetchAll(\PDO::FETCH_COLUMN); @@ -414,7 +389,7 @@ private function getJobIdsOfIncomingDependencies(Job $job) public function findLastJobsWithError($nbJobs = 10) { - return $this->_em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC") + return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC") ->setParameter('errorStates', array(Job::STATE_TERMINATED, Job::STATE_FAILED)) ->setMaxResults($nbJobs) ->getResult(); @@ -422,7 +397,7 @@ public function findLastJobsWithError($nbJobs = 10) public function getAvailableQueueList() { - $queues = $this->_em->createQuery("SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) GROUP BY j.queue") + $queues = $this->getJobManager()->createQuery("SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) GROUP BY j.queue") ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) ->getResult(); @@ -439,7 +414,7 @@ public function getAvailableQueueList() public function getAvailableJobsForQueueCount($jobQueue) { - $result = $this->_em->createQuery("SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue") + $result = $this->getJobManager()->createQuery("SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue") ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) ->setParameter('queue', $jobQueue) ->setMaxResults(1) @@ -447,4 +422,9 @@ public function getAvailableJobsForQueueCount($jobQueue) return count($result); } + + private function getJobManager(): EntityManager + { + return $this->registry->getManagerForClass(Job::class); + } } diff --git a/JMSJobQueueBundle.php b/JMSJobQueueBundle.php index f30c9dc2..48065b18 100644 --- a/JMSJobQueueBundle.php +++ b/JMSJobQueueBundle.php @@ -28,6 +28,5 @@ class JMSJobQueueBundle extends Bundle public function build(ContainerBuilder $container) { $container->addCompilerPass(new LinkGeneratorsPass()); - $container->addCompilerPass(new JobSchedulersPass()); } } diff --git a/Resources/config/console.xml b/Resources/config/console.xml new file mode 100644 index 00000000..6ace5aa0 --- /dev/null +++ b/Resources/config/console.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Resources/config/services.xml b/Resources/config/services.xml index 4122a544..ff60aa4e 100644 --- a/Resources/config/services.xml +++ b/Resources/config/services.xml @@ -8,12 +8,11 @@ JMS\JobQueueBundle\Entity\Listener\ManyToAnyListener JMS\JobQueueBundle\Twig\JobQueueExtension JMS\JobQueueBundle\Retry\ExponentialRetryScheduler - JMS\JobQueueBundle\Cron\SchedulerRegistry + JMS\JobQueueBundle\Entity\Repository\JobManager - @@ -26,5 +25,11 @@ + + + + + + diff --git a/Resources/doc/scheduled_jobs.rst b/Resources/doc/scheduled_jobs.rst index 19b9d63d..66287094 100644 --- a/Resources/doc/scheduled_jobs.rst +++ b/Resources/doc/scheduled_jobs.rst @@ -46,14 +46,13 @@ This is useful if you want to run a third-party command or a Symfony command as .. code-block :: php - use JMS\DiExtraBundle\Annotation as DI; - - /** - * @DI\Service - * @DI\Tag("jms_job_queue.scheduler", attributes = {"command": "my-command"}) - */ class MyJobScheduler implements JobScheduler { + public function getCommands(): array + { + return ['my-command']; + } + public function shouldSchedule($commandName, \DateTime $lastRunAt) { return time() - $lastRunAt->getTimestamp() >= 60; // Executed at most every minute. diff --git a/Resources/views/Job/details.html.twig b/Resources/views/Job/details.html.twig index 0a3b534f..046aa380 100644 --- a/Resources/views/Job/details.html.twig +++ b/Resources/views/Job/details.html.twig @@ -1,8 +1,18 @@ -{% extends "JMSJobQueueBundle::base.html.twig" %} -{% import "JMSJobQueueBundle:Job:macros.html.twig" as macros %} +{% extends "@JMSJobQueue/base.html.twig" %} +{% import "@JMSJobQueue/Job/macros.html.twig" as macros %} {% block title %}Job "{{ job.command }}" (ID: {{ job.id }})" - {{ parent() }}{% endblock %} +{% block stylesheets %} + +{% endblock %} + {% block content %}