Skip to content

Commit 55be581

Browse files
authored
Merge pull request #7 from darkin1/feature/persistent
Add persistent functionality
2 parents 0bc9cd1 + 8b8cce9 commit 55be581

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ It also supports extended AMQP features such as queue declaration and message de
77

88
The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client).
99

10+
To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `public $persistent = true;`
1011
## Resources
1112

1213
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)

src/Queue.php

+13-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Illuminate\Contracts\Queue\Queue as QueueContract;
66
use Illuminate\Queue\Queue as BaseQueue;
77
use Interop\Queue\PsrContext;
8+
use Interop\Amqp\Impl\AmqpMessage;
89

910
class Queue extends BaseQueue implements QueueContract
1011
{
@@ -56,9 +57,15 @@ public function push($job, $data = '', $queue = null)
5657
*/
5758
public function pushRaw($payload, $queue = null, array $options = [])
5859
{
60+
$message = $this->psrContext->createMessage($payload);
61+
62+
if ($message instanceof AmqpMessage) {
63+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
64+
}
65+
5966
return $this->psrContext->createProducer()->send(
6067
$this->getQueue($queue),
61-
$this->psrContext->createMessage($payload)
68+
$message
6269
);
6370
}
6471

@@ -69,11 +76,13 @@ public function later($delay, $job, $data = '', $queue = null)
6976
{
7077
$message = $this->psrContext->createMessage($this->createPayload($job, $data));
7178

79+
if ($message instanceof AmqpMessage) {
80+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
81+
}
82+
7283
return $this->psrContext->createProducer()
7384
->setDeliveryDelay($this->secondsUntil($delay) * 1000)
74-
75-
->send($this->getQueue($queue), $message)
76-
;
85+
->send($this->getQueue($queue), $message);
7786
}
7887

7988
/**

0 commit comments

Comments
 (0)