diff --git a/README.md b/README.md index 1b05ca6..c8f7de7 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,13 @@ Cassandra client library for PHP, using the native binary protocol. * decimal and timestamps have bugs especially in collections (map,set,list) * connection handling e.g. timeouts +## New feature for this fork +* on Database class constructor, now you can choose whether you want to connect to your nodes randomically or sequentially + ## Installation PHP 5.4+ is required. There is no need for additional libraries. +PHP Sockets extension is required to use Cassandra's binary protocol. Append dependency into composer.json diff --git a/src/Cluster.php b/src/Cluster.php index 8e579c9..45e5788 100644 --- a/src/Cluster.php +++ b/src/Cluster.php @@ -10,37 +10,76 @@ class Cluster { */ private $nodes; + /** + * @var array + */ + private $usedNodes; + /** * @param array $nodes */ - public function __construct(array $nodes = []) { + public function __construct(array $nodes = []) + { $this->nodes = $nodes; } /** * @param string $host */ - public function appendNode($host) { + public function appendNode($host) + { $this->nodes[] = $host; } + /** + * Reset used nodes list. + */ + public function resetNodes() + { + $this->nodes = $this->usedNodes; + $this->usedNodes = array(); + } + /** * @return Node * @throws Exception\ClusterException */ - public function getRandomNode() { - if (empty($this->nodes)) throw new ClusterException('Node list is empty.'); - $nodeKey = array_rand($this->nodes); + public function getNode($random = FALSE) + { + if (empty($this->nodes)) + { + throw new ClusterException('Node list is empty.'); + } + + if ($random) + { + $nodeKey = array_rand($this->nodes); + } + else + { + $nodeKey = array_keys($this->nodes)[0]; + } + $node = $this->nodes[$nodeKey]; - try { - if ((array)$node === $node) { + + try + { + if ((array)$node === $node) + { + $this->usedNodes[$nodeKey] = $node; $node = new Node($nodeKey, $node); unset($this->nodes[$nodeKey]); - } else { + } + else + { + $this->usedNodes[$nodeKey] = $node; $node = new Node($node); unset($this->nodes[$nodeKey]); } - } catch (\InvalidArgumentException $e) { + + } + catch (\InvalidArgumentException $e) + { trigger_error($e->getMessage()); } diff --git a/src/Cluster/Node.php b/src/Cluster/Node.php index dbe8808..2773f67 100644 --- a/src/Cluster/Node.php +++ b/src/Cluster/Node.php @@ -3,9 +3,10 @@ use evseevnn\Cassandra\Exception\ConnectionException; -class Node { +class Node +{ - const STREAM_TIMEOUT = 10; + const STREAM_TIMEOUT = 2; /** * @var string @@ -35,15 +36,19 @@ class Node { * @param array $options * @throws \InvalidArgumentException */ - public function __construct($host, array $options = []) { + public function __construct($host, array $options = []) + { $this->host = $host; - if (strstr($this->host, ':')) { + if (strstr($this->host, ':')) + { $this->port = (int)substr(strstr($this->host, ':'), 1); $this->host = substr($this->host, 0, -1 - strlen($this->port)); - if (!$this->port) { + if (!$this->port) + { throw new \InvalidArgumentException('Invalid port number'); } } + $this->options = array_merge($this->options, $options); } @@ -51,23 +56,68 @@ public function __construct($host, array $options = []) { * @return resource * @throws \Exception */ - public function getConnection() { - if (!empty($this->socket)) return $this->socket; + public function getConnection() + { + if ( ! empty($this->socket)) return $this->socket; $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + socket_set_option($this->socket, getprotobyname('TCP'), TCP_NODELAY, 1); socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, ["sec" => self::STREAM_TIMEOUT, "usec" => 0]); - if (!socket_connect($this->socket, $this->host, $this->port)) { - throw new ConnectionException("Unable to connect to Cassandra node: {$this->host}:{$this->port}"); + socket_set_nonblock($this->socket); + + $maxAttempts = 3; + $attempts = 0; + + while ( ! @socket_connect($this->socket, $this->host, $this->port)) + { + $attempts++; + $err = substr(socket_last_error($this->socket), -2); + + // Connection OK! + if ($err === "56") + { + break; + } + + // 61: server found but port unavailable (connection refused) + // 37: ip does not exist, i wait + // 01: host not found + if ($err === "61" || $err === "01") + { + socket_close($this->socket); + throw new ConnectionException('Unable to connect. Socket last error code : ' . $err); + } + + // if timeout reaches then call exit(); + if ($attempts > $maxAttempts) + { + socket_close($this->socket); + throw new ConnectionException('Unable to connect. Socket last error code (connection timeout) : ' . $err); + } + + usleep(100000); } + // Re-block the socket if needed + socket_set_block($this->socket); + return $this->socket; } /** * @return array */ - public function getOptions() { + public function getOptions() + { return $this->options; } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } } \ No newline at end of file diff --git a/src/Connection.php b/src/Connection.php index 741ad9c..deaae7d 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -14,6 +14,21 @@ class Connection { */ private $cluster; + /** + * @var int + */ + private $connMaxAttempts; + + /** + * @var int + */ + private $connAttempts; + + /** + * @var UseRandomNOdes + */ + private $useRandomNodes; + /** * @var Node */ @@ -27,16 +42,33 @@ class Connection { /** * @param Cluster $cluster */ - public function __construct(Cluster $cluster) { + public function __construct(Cluster $cluster, $useRandomNodes) { $this->cluster = $cluster; + $this->useRandomNodes = $useRandomNodes; + $this->connMaxAttempts = 2; + $this->connAttempts = 0; } public function connect() { + try { - $this->node = $this->cluster->getRandomNode(); + + $this->node = $this->cluster->getNode($this->useRandomNodes); $this->connection = $this->node->getConnection(); + } catch (ConnectionException $e) { $this->connect(); + + } catch (Exception\ClusterException $e) { + + if ($this->connAttempts >= $this->connMaxAttempts) + { + throw new ConnectionException('I tried to connect to Database ' . $this->connMaxAttempts . ' times with no response.'); + } + + $this->connAttempts++; + $this->cluster->resetNodes(); + $this->connect(); } } @@ -44,7 +76,9 @@ public function connect() { * @return bool */ public function disconnect() { - return socket_shutdown($this->connection); + $socketShutdown = socket_shutdown($this->connection); + socket_close($this->connection); + return $socketShutdown; } /** @@ -70,9 +104,9 @@ public function sendRequest(Request $request) { * @return string */ private function fetchData($length) { - $data = socket_read($this->connection, $length); + $data = @socket_read($this->connection, $length); while (strlen($data) < $length) { - $data .= socket_read($this->connection, $length); + $data .= @socket_read($this->connection, $length); } if (socket_last_error($this->connection) == 110) { throw new ConnectionException('Connection timed out'); diff --git a/src/Database.php b/src/Database.php index 086d4a0..f3ac7a1 100644 --- a/src/Database.php +++ b/src/Database.php @@ -50,9 +50,9 @@ class Database { * @param string $keyspace * @param array $options */ - public function __construct(array $nodes, $keyspace = '', array $options = []) { + public function __construct(array $nodes, $keyspace = '', array $options = [], $useRandomNodes = TRUE) { $this->cluster = new Cluster($nodes); - $this->connection = new Connection($this->cluster); + $this->connection = new Connection($this->cluster, $useRandomNodes); $this->options = array_merge($this->options, $options); $this->keyspace = $keyspace; } @@ -65,7 +65,14 @@ public function __construct(array $nodes, $keyspace = '', array $options = []) { */ public function connect() { if ($this->connection->isConnected()) return true; + $this->connection->connect(); + + if (!$this->connection->isConnected()) + { + return false; + } + $response = $this->connection->sendRequest( RequestFactory::startup($this->options) ); diff --git a/src/Enum/ConsistencyEnum.php b/src/Enum/ConsistencyEnum.php index d80ccb7..d9a4dbb 100644 --- a/src/Enum/ConsistencyEnum.php +++ b/src/Enum/ConsistencyEnum.php @@ -10,5 +10,5 @@ class ConsistencyEnum { const CONSISTENCY_ALL = 0x0005; const CONSISTENCY_LOCAL_QUORUM = 0x0006; const CONSISTENCY_EACH_QUORUM = 0x0007; - const CONSISTENCY_LOCAL_ONE = 0x0010; + const CONSISTENCY_LOCAL_ONE = 0xA; } \ No newline at end of file diff --git a/src/Protocol/Response/DataStream.php b/src/Protocol/Response/DataStream.php index 55a8c31..1be5568 100644 --- a/src/Protocol/Response/DataStream.php +++ b/src/Protocol/Response/DataStream.php @@ -36,6 +36,10 @@ public function __construct($binary) { * @return string */ protected function read($length) { + if ( ! $length) { + return null; + } + if ($this->length < $length) { throw new \Exception('Reading while at end of stream'); } @@ -174,6 +178,12 @@ public function readBytes($isCollectionElement = false) { if ($isCollectionElement) $this->readShort(); $length = $this->readInt(); + + if ($length === -1) + { + return null; + } + return $this->read($length); }