Skip to content

Commit 7c536cc

Browse files
authored
Update AmqpConnectionFactory.php
1 parent 531cb02 commit 7c536cc

File tree

1 file changed

+50
-113
lines changed

1 file changed

+50
-113
lines changed

AmqpConnectionFactory.php

Lines changed: 50 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
1212
use Interop\Queue\Context;
1313
use PhpAmqpLib\Connection\AbstractConnection;
14+
use PhpAmqpLib\Connection\AMQPConnectionConfig;
15+
use PhpAmqpLib\Connection\AMQPConnectionFactory as PhpAmqpLibConnectionFactory;
1416
use PhpAmqpLib\Connection\AMQPLazyConnection;
1517
use PhpAmqpLib\Connection\AMQPLazySocketConnection;
1618
use PhpAmqpLib\Connection\AMQPSocketConnection;
@@ -32,9 +34,9 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
3234
private $connection;
3335

3436
/**
37+
* @param array|string|null $config
3538
* @see ConnectionConfig for possible config formats and values.
3639
*
37-
* @param array|string|null $config
3840
*/
3941
public function __construct($config = 'amqp:')
4042
{
@@ -49,8 +51,7 @@ public function __construct($config = 'amqp:')
4951
->addDefaultOption('keepalive', false)
5052
->addDefaultOption('channel_rpc_timeout', 0.)
5153
->addDefaultOption('heartbeat_on_tick', true)
52-
->parse()
53-
;
54+
->parse();
5455

5556
if (in_array('rabbitmq', $this->config->getSchemeExtensions(), true)) {
5657
$this->setDelayStrategy(new RabbitMqDlxDelayStrategy());
@@ -73,119 +74,55 @@ public function getConfig(): ConnectionConfig
7374
return $this->config;
7475
}
7576

77+
public function getAmqpConnectionConfig(): AMQPConnectionConfig
78+
{
79+
$config = new AMQPConnectionConfig();
80+
$config->setHost($this->config->getHost());
81+
$config->setPort($this->config->getPort());
82+
$config->setUser($this->config->getUser());
83+
$config->setPassword($this->config->getPass());
84+
$config->setVhost($this->config->getVHost());
85+
86+
if ($this->config->isSslOn()) {
87+
$config->setIsSecure(true);
88+
$config->setSslCaPath($this->config->getSslCaCert());
89+
$config->setSslCert($this->config->getSslCert());
90+
$config->setSslKey($this->config->getSslKey());
91+
$config->setSslVerify($this->config->isSslVerify());
92+
$config->setSslVerifyName($this->config->isSslVerify());
93+
$config->setSslPassPhrase($this->config->getSslPassPhrase());
94+
$config->setSslCiphers($this->config->getOption('ciphers', ''));
95+
}
96+
97+
$config->setInsist($this->config->getOption('insist'));
98+
$config->setLoginMethod($this->config->getOption('login_method'));
99+
100+
if ($this->config->getOption('login_response')) {
101+
$config->setLoginResponse($this->config->getOption('login_response'));
102+
}
103+
104+
$config->setLocale($this->config->getOption('locale'));
105+
$config->setConnectionTimeout($this->config->getConnectionTimeout());
106+
$config->setChannelRPCTimeout($this->config->getOption('channel_rpc_timeout'));
107+
$config->setReadTimeout((int)$this->config->getReadTimeout());
108+
$config->setWriteTimeout((int)$this->config->getWriteTimeout());
109+
$config->setKeepalive($this->config->getOption('keepalive'));
110+
$config->setHeartbeat((int)round($this->config->getHeartbeat()));
111+
$config->setIsLazy($this->config->isLazy());
112+
113+
$config->setIoType(
114+
$this->config->getOption('stream') ?
115+
AMQPConnectionConfig::IO_TYPE_STREAM :
116+
AMQPConnectionConfig::IO_TYPE_SOCKET
117+
);
118+
119+
return $config;
120+
}
121+
76122
private function establishConnection(): AbstractConnection
77123
{
78124
if (false == $this->connection) {
79-
if ($this->config->getOption('stream')) {
80-
if ($this->config->isSslOn()) {
81-
$sslOptions = array_filter([
82-
'cafile' => $this->config->getSslCaCert(),
83-
'local_cert' => $this->config->getSslCert(),
84-
'local_pk' => $this->config->getSslKey(),
85-
'verify_peer' => $this->config->isSslVerify(),
86-
'verify_peer_name' => $this->config->isSslVerify(),
87-
'passphrase' => $this->getConfig()->getSslPassPhrase(),
88-
'ciphers' => $this->config->getOption('ciphers', ''),
89-
], function ($value) { return '' !== $value; });
90-
91-
$con = new AMQPSSLConnection(
92-
$this->config->getHost(),
93-
$this->config->getPort(),
94-
$this->config->getUser(),
95-
$this->config->getPass(),
96-
$this->config->getVHost(),
97-
$sslOptions,
98-
[
99-
'insist' => $this->config->getOption('insist'),
100-
'login_method' => $this->config->getOption('login_method'),
101-
'login_response' => $this->config->getOption('login_response'),
102-
'locale' => $this->config->getOption('locale'),
103-
'connection_timeout' => $this->config->getConnectionTimeout(),
104-
'read_write_timeout' => (int) round(min($this->config->getReadTimeout(), $this->config->getWriteTimeout())),
105-
'keepalive' => $this->config->getOption('keepalive'),
106-
'heartbeat' => (int) round($this->config->getHeartbeat()),
107-
]
108-
);
109-
} elseif ($this->config->isLazy()) {
110-
$con = new AMQPLazyConnection(
111-
$this->config->getHost(),
112-
$this->config->getPort(),
113-
$this->config->getUser(),
114-
$this->config->getPass(),
115-
$this->config->getVHost(),
116-
$this->config->getOption('insist'),
117-
$this->config->getOption('login_method'),
118-
$this->config->getOption('login_response'),
119-
$this->config->getOption('locale'),
120-
$this->config->getConnectionTimeout(),
121-
(int) round(min($this->config->getReadTimeout(), $this->config->getWriteTimeout())),
122-
null,
123-
$this->config->getOption('keepalive'),
124-
(int) round($this->config->getHeartbeat()),
125-
$this->config->getOption('channel_rpc_timeout')
126-
);
127-
} else {
128-
$con = new AMQPStreamConnection(
129-
$this->config->getHost(),
130-
$this->config->getPort(),
131-
$this->config->getUser(),
132-
$this->config->getPass(),
133-
$this->config->getVHost(),
134-
$this->config->getOption('insist'),
135-
$this->config->getOption('login_method'),
136-
$this->config->getOption('login_response'),
137-
$this->config->getOption('locale'),
138-
$this->config->getConnectionTimeout(),
139-
(int) round(min($this->config->getReadTimeout(), $this->config->getWriteTimeout())),
140-
null,
141-
$this->config->getOption('keepalive'),
142-
(int) round($this->config->getHeartbeat()),
143-
$this->config->getOption('channel_rpc_timeout')
144-
);
145-
}
146-
} else {
147-
if ($this->config->isSslOn()) {
148-
throw new \LogicException('The socket connection implementation does not support ssl connections.');
149-
}
150-
151-
if ($this->config->isLazy()) {
152-
$con = new AMQPLazySocketConnection(
153-
$this->config->getHost(),
154-
$this->config->getPort(),
155-
$this->config->getUser(),
156-
$this->config->getPass(),
157-
$this->config->getVHost(),
158-
$this->config->getOption('insist'),
159-
$this->config->getOption('login_method'),
160-
$this->config->getOption('login_response'),
161-
$this->config->getOption('locale'),
162-
(int) round($this->config->getReadTimeout()),
163-
$this->config->getOption('keepalive'),
164-
(int) round($this->config->getWriteTimeout()),
165-
(int) round($this->config->getHeartbeat()),
166-
$this->config->getOption('channel_rpc_timeout')
167-
);
168-
} else {
169-
$con = new AMQPSocketConnection(
170-
$this->config->getHost(),
171-
$this->config->getPort(),
172-
$this->config->getUser(),
173-
$this->config->getPass(),
174-
$this->config->getVHost(),
175-
$this->config->getOption('insist'),
176-
$this->config->getOption('login_method'),
177-
$this->config->getOption('login_response'),
178-
$this->config->getOption('locale'),
179-
(int) round($this->config->getReadTimeout()),
180-
$this->config->getOption('keepalive'),
181-
(int) round($this->config->getWriteTimeout()),
182-
(int) round($this->config->getHeartbeat()),
183-
$this->config->getOption('channel_rpc_timeout')
184-
);
185-
}
186-
}
187-
188-
$this->connection = $con;
125+
$this->connection = PhpAmqpLibConnectionFactory::create($this->getAmqpConnectionConfig());
189126
}
190127

191128
return $this->connection;

0 commit comments

Comments
 (0)