laravel-rabbitmq/producer/app/Queue/RabbitMQQueue.php

57 lines
1.8 KiB
PHP

<?php
namespace App\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
class RabbitMQQueue extends BaseRabbitMQQueue
{
/**
* Create a payload array from the given job and data.
*
* @param string|object $job
* @param string $queue
* @param mixed $data
*/
protected function createPayloadArray($job, $queue, $data = ''): array
{
return array_merge(
parent::createPayloadArray($job, $queue, $data), [
'id' => $this->getRandomId(),
]
);
}
protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
{
try {
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
$this->reconnect();
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
}
}
protected function publishBatch($jobs, $data = '', $queue = null): void
{
try {
parent::publishBatch($jobs, $data, $queue);
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
$this->reconnect();
parent::publishBatch($jobs, $data, $queue);
}
}
protected function createChannel(): AMQPChannel
{
try {
return parent::createChannel();
} catch (AMQPConnectionClosedException) {
$this->reconnect();
return parent::createChannel();
}
}
}