Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.

Commit 7d2959b

Browse files
author
Sven Speckmaier
committed
retry connect on queue exception
1 parent 2a3c0cb commit 7d2959b

File tree

1 file changed

+39
-3
lines changed

1 file changed

+39
-3
lines changed

src/Sender/RabbitMQ.php

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
<?php namespace Ipunkt\RabbitMQ\Sender;
22

33
use Closure;
4+
use Enqueue\AmqpExt\AmqpContext;
45
use Illuminate\Support\Facades\Log;
56
use Interop\Amqp\AmqpConnectionFactory;
67
use Interop\Amqp\Impl\AmqpMessage;
78
use Interop\Queue\Context;
89
use Interop\Queue\Destination;
10+
use Interop\Queue\Exception\Exception;
911
use Interop\Queue\Message;
12+
use Interop\Queue\Topic;
1013
use Ipunkt\RabbitMQ\Events\MessageSending;
1114
use Ipunkt\RabbitMQ\Events\MessageSent;
1215
use Ipunkt\RabbitMQ\Rpc\Rpc;
@@ -20,7 +23,7 @@ class RabbitMQ
2023
{
2124

2225
/**
23-
* @var array
26+
* @var array|Topic[]
2427
*/
2528
protected $topics = [];
2629

@@ -110,7 +113,15 @@ public function onExchange($exchangeName, $routingKey)
110113
'routing-key' => $routingKey,
111114
'data' => $this->data
112115
]);
113-
$this->send($exchange, $message);
116+
try {
117+
$this->send($exchange, $message);
118+
} catch(Exception $e) {
119+
$this->reconnect();
120+
121+
$exchange = $this->buildExchange($exchangeName);
122+
123+
$this->send($exchange, $message);
124+
}
114125
}
115126

116127
public function asRpc() {
@@ -167,7 +178,15 @@ public function onQueue($queueName)
167178
'queue' => $queueName,
168179
'data' => $this->data
169180
]);
170-
$this->send($queue);
181+
try {
182+
$this->send($queue);
183+
} catch(Exception $e) {
184+
$this->reconnect();
185+
186+
$queue = $this->buildQueue($queueName);
187+
188+
$this->send($queue);
189+
}
171190
}
172191

173192
protected function send( Destination $to, Message $message = null ) {
@@ -187,13 +206,30 @@ protected function send( Destination $to, Message $message = null ) {
187206
$this->resetRpc();
188207
}
189208

209+
protected function reconnect() {
210+
$this->disconnect();
211+
$this->connect();
212+
}
213+
214+
protected function disconnect() {
215+
if ( $this->context instanceof AmqpContext )
216+
$this->context->close();
217+
218+
$this->context = null;
219+
$this->clearExchanges();
220+
}
221+
190222
protected function connect() {
191223
if( $this->context instanceof Context )
192224
return;
193225

194226
$this->context = $this->connectionFactory->createContext();
195227
}
196228

229+
protected function clearExchanges() {
230+
$this->topics = [];
231+
}
232+
197233
protected function buildExchange($exchange)
198234
{
199235
if( !$this->exchangeExists($exchange) ) {

0 commit comments

Comments
 (0)