Skip to content

Commit 6710e58

Browse files
authored
Merge pull request #11 from php-enqueue/worker-based-on-queue-consumer
Advanced Laravel Worker
2 parents 6179026 + 9dfda8e commit 6710e58

File tree

4 files changed

+212
-8
lines changed

4 files changed

+212
-8
lines changed

README.md

+35-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,41 @@
55
You can use all transports built on top of [queue-interop](https://github.com/queue-interop/queue-interop) including [all supported](https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport) by Enqueue.
66
It also supports extended AMQP features such as queue declaration and message delaying.
77

8-
The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client).
8+
The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client).
9+
10+
11+
## Advantages
12+
13+
* Supports message delaying, priorities and expiration
14+
* Use DSN to configure transport. 12 factors friendly.
15+
* It brings support of a lot of MQ transport with few lines of integration code:
16+
17+
* [AMQP(s)](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp.md) based on [PHP AMQP extension](https://github.com/pdezwart/php-amqp).
18+
* [AMQP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_bunny.md) based on [bunny](https://github.com/jakubkulhan/bunny).
19+
* [AMQP(s)](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md) based on [php-amqplib](https://github.com/php-amqplib/php-amqplib).
20+
* [Beanstalk](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/pheanstalk.md).
21+
* [STOMP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/stomp.md)
22+
* [Amazon SQS](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/sqs.md)
23+
* [Google PubSub](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/gps.md)
24+
* [Kafka](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/kafka.md)
25+
* [Redis](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/redis.md)
26+
* [Gearman](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/gearman.md)
27+
* [Doctrine DBAL](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/dbal.md)
28+
* [Filesystem](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/filesystem.md)
29+
* [MongoDB](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/mongodb.md)
30+
* [WAMP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/wamp.md)
31+
* [PHP-FPM](https://github.com/makasim/php-fpm-queue)
32+
* [rabbitmq-cli-consumer-client](https://github.com/makasim/rabbitmq-cli-consumer-client)
33+
34+
* Consume messages as they arrive from multiple queues.
35+
* You can run fewer work processes and reduce memory usages.
36+
* It uses long pulling whenever possible. It results in zero CPU usages while waiting for messages.
37+
* You can [monitor](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/monitoring.md) any transport, not only redis
38+
* Adds extension points
39+
* AMQP friendly.
40+
* Popular soliution, big and active community around the project
41+
* Supported by a company - Forma-Pro
42+
943

1044
## Resources
1145

src/EnqueueServiceProvider.php

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Enqueue\LaravelQueue\Command\RoutesCommand;
88
use Enqueue\LaravelQueue\Command\SetupBrokerCommand;
99
use Enqueue\SimpleClient\SimpleClient;
10+
use Illuminate\Contracts\Debug\ExceptionHandler;
1011
use Illuminate\Queue\QueueManager;
1112
use Illuminate\Support\ServiceProvider;
1213

@@ -61,5 +62,11 @@ private function bootInteropQueueDriver()
6162
$manager->addConnector('amqp_interop', function () {
6263
return new AmqpConnector();
6364
});
65+
66+
$this->app->extend('queue.worker', function ($worker, $app) {
67+
return new Worker(
68+
$app['queue'], $app['events'], $app[ExceptionHandler::class]
69+
);
70+
});
6471
}
6572
}

src/Queue.php

+14-7
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
use Illuminate\Contracts\Queue\Queue as QueueContract;
66
use Illuminate\Queue\Queue as BaseQueue;
7+
use Interop\Queue\Consumer;
78
use Interop\Queue\Context;
89
use Interop\Amqp\Impl\AmqpMessage;
10+
use Interop\Queue\Message;
911

1012
class Queue extends BaseQueue implements QueueContract
1113
{
@@ -91,15 +93,20 @@ public function pop($queue = null)
9193

9294
$consumer = $this->context->createConsumer($queue);
9395
if ($message = $consumer->receive(1000)) { // 1 sec
94-
return new Job(
95-
$this->container,
96-
$this->context,
97-
$consumer,
98-
$message,
99-
$this->connectionName
100-
);
96+
return $this->convertMessageToJob($message, $consumer);
10197
}
10298
}
99+
100+
public function convertMessageToJob(Message $message, Consumer $consumer): Job
101+
{
102+
return new Job(
103+
$this->container,
104+
$this->context,
105+
$consumer,
106+
$message,
107+
$this->connectionName
108+
);
109+
}
103110

104111
/**
105112
* Get the queue or return the default.

src/Worker.php

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
<?php
2+
namespace Enqueue\LaravelQueue;
3+
4+
use Enqueue\Consumption\ChainExtension;
5+
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\PostMessageReceived;
7+
use Enqueue\Consumption\Context\PreConsume;
8+
use Enqueue\Consumption\Context\Start;
9+
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
10+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
12+
use Enqueue\Consumption\PreConsumeExtensionInterface;
13+
use Enqueue\Consumption\QueueConsumer;
14+
use Enqueue\Consumption\Result;
15+
use Enqueue\Consumption\StartExtensionInterface;
16+
use Enqueue\LaravelQueue\Queue;
17+
use Illuminate\Queue\WorkerOptions;
18+
19+
class Worker extends \Illuminate\Queue\Worker implements
20+
StartExtensionInterface,
21+
PreConsumeExtensionInterface,
22+
MessageReceivedExtensionInterface,
23+
PostMessageReceivedExtensionInterface
24+
{
25+
protected $connectionName;
26+
27+
protected $queueNames;
28+
29+
protected $queue;
30+
31+
protected $options;
32+
33+
protected $lastRestart;
34+
35+
protected $interop = false;
36+
37+
protected $stopped = false;
38+
39+
protected $job;
40+
41+
public function daemon($connectionName, $queueNames, WorkerOptions $options)
42+
{
43+
$this->connectionName = $connectionName;
44+
$this->queueNames = $queueNames;
45+
$this->options = $options;
46+
47+
/** @var Queue $queue */
48+
$this->queue = $this->getManager()->connection($connectionName);
49+
$this->interop = $this->queue instanceof Queue;
50+
51+
if (false == $this->interop) {
52+
parent::daemon($connectionName, $this->queue, $options);
53+
}
54+
55+
$context = $this->queue->getQueueInteropContext();
56+
$queueConsumer = new QueueConsumer($context, new ChainExtension([$this]));
57+
foreach (explode(',', $queueNames) as $queueName) {
58+
$queueConsumer->bindCallback($queueName, function() {
59+
$this->runJob($this->job, $this->connectionName, $this->options);
60+
61+
return Result::ALREADY_ACKNOWLEDGED;
62+
});
63+
}
64+
65+
$queueConsumer->consume();
66+
}
67+
68+
public function runNextJob($connectionName, $queueNames, WorkerOptions $options)
69+
{
70+
$this->connectionName = $connectionName;
71+
$this->queueNames = $queueNames;
72+
$this->options = $options;
73+
74+
/** @var Queue $queue */
75+
$this->queue = $this->getManager()->connection($connectionName);
76+
$this->interop = $this->queue instanceof Queue;
77+
78+
if (false == $this->interop) {
79+
parent::daemon($connectionName, $this->queue, $options);
80+
}
81+
82+
$context = $this->queue->getQueueInteropContext();
83+
84+
$queueConsumer = new QueueConsumer($context, new ChainExtension([
85+
$this,
86+
new LimitConsumedMessagesExtension(1),
87+
]));
88+
89+
foreach (explode(',', $queueNames) as $queueName) {
90+
$queueConsumer->bindCallback($queueName, function() {
91+
$this->runJob($this->job, $this->connectionName, $this->options);
92+
93+
return Result::ALREADY_ACKNOWLEDGED;
94+
});
95+
}
96+
97+
$queueConsumer->consume();
98+
}
99+
100+
public function onStart(Start $context): void
101+
{
102+
if ($this->supportsAsyncSignals()) {
103+
$this->listenForSignals();
104+
}
105+
106+
$this->lastRestart = $this->getTimestampOfLastQueueRestart();
107+
108+
if ($this->stopped) {
109+
$context->interruptExecution();
110+
}
111+
}
112+
113+
public function onPreConsume(PreConsume $context): void
114+
{
115+
if (! $this->daemonShouldRun($this->options, $this->connectionName, $this->queueNames)) {
116+
$this->pauseWorker($this->options, $this->lastRestart);
117+
}
118+
119+
if ($this->stopped) {
120+
$context->interruptExecution();
121+
}
122+
}
123+
124+
public function onMessageReceived(MessageReceived $context): void
125+
{
126+
$this->job = $this->queue->convertMessageToJob(
127+
$context->getMessage(),
128+
$context->getConsumer()
129+
);
130+
131+
if ($this->supportsAsyncSignals()) {
132+
$this->registerTimeoutHandler($this->job, $this->options);
133+
}
134+
}
135+
136+
public function onPostMessageReceived(PostMessageReceived $context): void
137+
{
138+
$this->stopIfNecessary($this->options, $this->lastRestart, $this->job);
139+
140+
if ($this->stopped) {
141+
$context->interruptExecution();
142+
}
143+
}
144+
145+
public function stop($status = 0)
146+
{
147+
if ($this->interop) {
148+
$this->stopped = true;
149+
150+
return;
151+
}
152+
153+
parent::stop($status);
154+
}
155+
}
156+

0 commit comments

Comments
 (0)