Skip to content

Commit 34b0e8e

Browse files
committed
Add persisten functionality
1 parent 0bc9cd1 commit 34b0e8e

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ It also supports extended AMQP features such as queue declaration and message de
77

88
The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client).
99

10+
To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `protected $persistent = true;`
1011
## Resources
1112

1213
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)

src/Queue.php

+19-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class Queue extends BaseQueue implements QueueContract
2323
*/
2424
protected $psrContext;
2525

26+
/**
27+
* @var boolean
28+
*/
29+
protected $persistent = false;
30+
2631
/**
2732
* @param PsrContext $psrContext
2833
* @param string $queueName
@@ -48,6 +53,8 @@ public function size($queue = null)
4853
*/
4954
public function push($job, $data = '', $queue = null)
5055
{
56+
$this->persistent = $job->persistent ?? false;
57+
5158
return $this->pushRaw($this->createPayload($job, $data), $queue);
5259
}
5360

@@ -56,9 +63,15 @@ public function push($job, $data = '', $queue = null)
5663
*/
5764
public function pushRaw($payload, $queue = null, array $options = [])
5865
{
66+
$message = $this->psrContext->createMessage($payload);
67+
68+
if ($this->persistent) {
69+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
70+
}
71+
5972
return $this->psrContext->createProducer()->send(
6073
$this->getQueue($queue),
61-
$this->psrContext->createMessage($payload)
74+
$message
6275
);
6376
}
6477

@@ -69,11 +82,13 @@ public function later($delay, $job, $data = '', $queue = null)
6982
{
7083
$message = $this->psrContext->createMessage($this->createPayload($job, $data));
7184

85+
if (isset($job->persistent) && $job->persistent) {
86+
$message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT);
87+
}
88+
7289
return $this->psrContext->createProducer()
7390
->setDeliveryDelay($this->secondsUntil($delay) * 1000)
74-
75-
->send($this->getQueue($queue), $message)
76-
;
91+
->send($this->getQueue($queue), $message);
7792
}
7893

7994
/**

0 commit comments

Comments
 (0)