From 4b5ecfa61fa9401ae80c0e0a9c705b28b0b6a4ab Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 19 Jul 2017 12:38:26 +0300 Subject: [PATCH] Add queue interop based driver. --- composer.json | 2 + .../Queue/Connectors/InteropConnector.php | 41 ++++++ src/Illuminate/Queue/InteropQueue.php | 132 ++++++++++++++++++ src/Illuminate/Queue/Jobs/InteropJob.php | 94 +++++++++++++ src/Illuminate/Queue/QueueServiceProvider.php | 16 ++- 5 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 src/Illuminate/Queue/Connectors/InteropConnector.php create mode 100644 src/Illuminate/Queue/InteropQueue.php create mode 100644 src/Illuminate/Queue/Jobs/InteropJob.php diff --git a/composer.json b/composer.json index 88f81b3a84f8..90a23d45810d 100644 --- a/composer.json +++ b/composer.json @@ -72,6 +72,7 @@ }, "require-dev": { "aws/aws-sdk-php": "~3.0", + "queue-interop/queue-interop": "^0.5", "doctrine/dbal": "~2.5", "mockery/mockery": "~0.9.4", "pda/pheanstalk": "~3.0", @@ -104,6 +105,7 @@ }, "suggest": { "aws/aws-sdk-php": "Required to use the SQS queue driver and SES mail driver (~3.0).", + "queue-interop/queue-interop": "Required to use the queue interop driver compatible transports", "doctrine/dbal": "Required to rename columns and drop SQLite columns (~2.5).", "fzaninotto/faker": "Required to use the eloquent factory builder (~1.4).", "guzzlehttp/guzzle": "Required to use the Mailgun and Mandrill mail drivers and the ping methods on schedules (~6.0).", diff --git a/src/Illuminate/Queue/Connectors/InteropConnector.php b/src/Illuminate/Queue/Connectors/InteropConnector.php new file mode 100644 index 000000000000..57287d1fc27f --- /dev/null +++ b/src/Illuminate/Queue/Connectors/InteropConnector.php @@ -0,0 +1,41 @@ + null, + 'dsn' => null, + 'queue' => 'default', + 'time_to_run' => 0, + ], $config); + + if (empty($config['connection_factory_class'])) { + throw new \LogicException('The "connection_factory_class" option is required'); + } + + $factoryClass = $config['connection_factory_class']; + if (false == class_exists($factoryClass)) { + throw new \LogicException(sprintf('The "connection_factory_class" option "%s" is not a class', $factoryClass)); + } + + $rc = new \ReflectionClass($factoryClass); + if (false == $rc->implementsInterface(PsrConnectionFactory::class)) { + throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class)); + } + + /** @var PsrConnectionFactory $factory */ + $factory = new $factoryClass($config['dsn'] ? $config['dsn'] : $config); + + return new InteropQueue($factory->createContext(), $config['queue'], $config['time_to_run']); + } +} diff --git a/src/Illuminate/Queue/InteropQueue.php b/src/Illuminate/Queue/InteropQueue.php new file mode 100644 index 000000000000..22a49b4c9c3b --- /dev/null +++ b/src/Illuminate/Queue/InteropQueue.php @@ -0,0 +1,132 @@ +psrContext = $psrContext; + $this->queueName = $queueName; + $this->timeToRun = $timeToRun; + } + + /** + * {@inheritdoc} + */ + public function size($queue = null) + { + return 0; + } + + /** + * {@inheritdoc} + */ + public function push($job, $data = '', $queue = null) + { + return $this->pushRaw($this->createPayload($job, $data), $queue); + } + + /** + * Push a new job onto the queue. + * + * @param string $queue + * @param string $job + * @param mixed $data + * + * @return mixed + */ + public function pushOn($queue, $job, $data = '') + { + new \LogicException('to be implemented'); + } + + /** + * {@inheritdoc} + */ + public function pushRaw($payload, $queue = null, array $options = []) + { + return $this->psrContext->createProducer()->send( + $this->getQueue($queue), + $this->psrContext->createMessage($payload) + ); + } + + /** + * {@inheritdoc} + */ + public function later($delay, $job, $data = '', $queue = null) + { + new \LogicException('to be implemented'); + } + + /** + * {@inheritdoc} + */ + public function pop($queue = null) + { + $queue = $this->getQueue($queue); + + $psrConsumer = $this->psrContext->createConsumer($queue); + if ($psrMessage = $psrConsumer->receive(1000)) { // 1 sec + return new InteropJob( + $this->container, + $this->psrContext, + $psrConsumer, + $psrMessage, + $this->connectionName + ); + } + } + + /** + * Get the queue or return the default. + * + * @param string|null $queue + * + * @return \Interop\Queue\PsrQueue + */ + public function getQueue($queue = null) + { + return $this->psrContext->createQueue($queue ?: $this->queueName); + } + + /** + * @return PsrContext + */ + public function getPsrContext() + { + return $this->psrContext; + } + + /** + * @return int + */ + public function getTimeToRun() + { + return $this->timeToRun; + } +} diff --git a/src/Illuminate/Queue/Jobs/InteropJob.php b/src/Illuminate/Queue/Jobs/InteropJob.php new file mode 100644 index 000000000000..826c78c77a07 --- /dev/null +++ b/src/Illuminate/Queue/Jobs/InteropJob.php @@ -0,0 +1,94 @@ +container = $container; + $this->psrContext = $psrContext; + $this->psrConsumer = $psrConsumer; + $this->psrMessage = $psrMessage; + $this->connectionName = $connectionName; + } + + /** + * {@inheritdoc} + */ + public function delete() + { + parent::delete(); + + $this->psrConsumer->acknowledge($this->psrMessage); + } + + /** + * {@inheritdoc} + */ + public function release($delay = 0) + { + if ($delay) { + throw new \LogicException('To be implemented'); + } + + $requeueMessage = clone $this->psrMessage; + $requeueMessage->setProperty('x-attempts', $this->attempts() + 1); + + $this->psrContext->createProducer()->send($this->psrConsumer->getQueue(), $requeueMessage); + + $this->psrConsumer->acknowledge($this->psrMessage); + } + + /** + * {@inheritdoc} + */ + public function getQueue() + { + return $this->psrConsumer->getQueue()->getQueueName(); + } + + /** + * {@inheritdoc} + */ + public function attempts() + { + return $this->psrMessage->getProperty('x-attempts', 1); + } + + /** + * {@inheritdoc} + */ + public function getRawBody() + { + return $this->psrMessage->getBody(); + } +} diff --git a/src/Illuminate/Queue/QueueServiceProvider.php b/src/Illuminate/Queue/QueueServiceProvider.php index 2ad923ec409a..e6ce2c7b8ad0 100755 --- a/src/Illuminate/Queue/QueueServiceProvider.php +++ b/src/Illuminate/Queue/QueueServiceProvider.php @@ -2,6 +2,7 @@ namespace Illuminate\Queue; +use Illuminate\Queue\Connectors\InteropConnector; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Connectors\SqsConnector; use Illuminate\Queue\Connectors\NullConnector; @@ -77,7 +78,7 @@ protected function registerConnection() */ public function registerConnectors($manager) { - foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) { + foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs', 'Interop'] as $connector) { $this->{"register{$connector}Connector"}($manager); } } @@ -160,6 +161,19 @@ protected function registerSqsConnector($manager) }); } + /** + * Register the interop queue connector. + * + * @param \Illuminate\Queue\QueueManager $manager + * @return void + */ + protected function registerInteropConnector($manager) + { + $manager->addConnector('interop', function () { + return new InteropConnector(); + }); + } + /** * Register the queue worker. *