Skip to content

Commit 878be9b

Browse files
committed
use AMQPConnectionConfig and AMQPConnectionFactory for connection handling
1 parent 2f60783 commit 878be9b

File tree

1 file changed

+18
-21
lines changed

1 file changed

+18
-21
lines changed

src/Handlers/RabbitMQHandler.php

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,40 @@
2424
use CodeIgniter\Queue\Payloads\PayloadMetadata;
2525
use CodeIgniter\Queue\QueuePushResult;
2626
use PhpAmqpLib\Channel\AMQPChannel;
27-
use PhpAmqpLib\Connection\AMQPStreamConnection;
27+
use PhpAmqpLib\Connection\AbstractConnection;
28+
use PhpAmqpLib\Connection\AMQPConnectionConfig;
29+
use PhpAmqpLib\Connection\AMQPConnectionFactory;
2830
use PhpAmqpLib\Message\AMQPMessage;
2931
use PhpAmqpLib\Wire\AMQPTable;
3032
use Throwable;
3133

3234
class RabbitMQHandler extends BaseHandler implements QueueInterface
3335
{
34-
private readonly AMQPStreamConnection $connection;
36+
private readonly AbstractConnection $connection;
3537
private readonly AMQPChannel $channel;
3638
private array $declaredQueues = [];
3739
private array $declaredExchanges = [];
3840

3941
public function __construct(protected QueueConfig $config)
4042
{
4143
try {
42-
$this->connection = new AMQPStreamConnection(
43-
$config->rabbitmq['host'],
44-
$config->rabbitmq['port'],
45-
$config->rabbitmq['user'],
46-
$config->rabbitmq['password'],
47-
$config->rabbitmq['vhost'] ?? '/',
48-
$config->rabbitmq['insist'] ?? false,
49-
$config->rabbitmq['loginMethod'] ?? 'AMQPLAIN',
50-
null,
51-
$config->rabbitmq['locale'] ?? 'en_US',
52-
$config->rabbitmq['connectionTimeout'] ?? 3.0,
53-
$config->rabbitmq['readWriteTimeout'] ?? 3.0,
54-
null,
55-
$config->rabbitmq['keepalive'] ?? false,
56-
$config->rabbitmq['heartbeat'] ?? 0,
57-
);
44+
$amqp = new AMQPConnectionConfig();
45+
$amqp->setHost($config->rabbitmq['host']);
46+
$amqp->setPort($config->rabbitmq['port']);
47+
$amqp->setUser($config->rabbitmq['user']);
48+
$amqp->setPassword($config->rabbitmq['password']);
49+
$amqp->setVhost($config->rabbitmq['vhost'] ?? '/');
50+
51+
// Enable SSL/TLS
52+
if ($config->rabbitmq['ssl'] ?? ($config->rabbitmq['port'] === 5671)) {
53+
$amqp->setIsSecure(true);
54+
}
5855

59-
$this->channel = $this->connection->channel();
56+
$this->connection = AMQPConnectionFactory::create($amqp);
57+
$this->channel = $this->connection->channel();
6058

6159
// Set QoS for consumer (prefetch limit)
62-
$prefetch = $config->rabbitmq['prefetch'] ?? 1;
63-
$this->channel->basic_qos(0, $prefetch, false);
60+
$this->channel->basic_qos(0, 1, false);
6461

6562
// Enable publisher confirms if configured
6663
if ($config->rabbitmq['publisherConfirms'] ?? false) {

0 commit comments

Comments
 (0)