Skip to content

Fix reading NULL Big Int data types. #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ Cassandra client library for PHP, using the native binary protocol.
* decimal and timestamps have bugs especially in collections (map,set,list)
* connection handling e.g. timeouts

## New feature for this fork
* on Database class constructor, now you can choose whether you want to connect to your nodes randomically or sequentially

## Installation

PHP 5.4+ is required. There is no need for additional libraries.
PHP Sockets extension is required to use Cassandra's binary protocol.

Append dependency into composer.json

Expand Down
57 changes: 48 additions & 9 deletions src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,76 @@ class Cluster {
*/
private $nodes;

/**
* @var array
*/
private $usedNodes;

/**
* @param array $nodes
*/
public function __construct(array $nodes = []) {
public function __construct(array $nodes = [])
{
$this->nodes = $nodes;
}

/**
* @param string $host
*/
public function appendNode($host) {
public function appendNode($host)
{
$this->nodes[] = $host;
}

/**
* Reset used nodes list.
*/
public function resetNodes()
{
$this->nodes = $this->usedNodes;
$this->usedNodes = array();
}

/**
* @return Node
* @throws Exception\ClusterException
*/
public function getRandomNode() {
if (empty($this->nodes)) throw new ClusterException('Node list is empty.');
$nodeKey = array_rand($this->nodes);
public function getNode($random = FALSE)
{
if (empty($this->nodes))
{
throw new ClusterException('Node list is empty.');
}

if ($random)
{
$nodeKey = array_rand($this->nodes);
}
else
{
$nodeKey = array_keys($this->nodes)[0];
}

$node = $this->nodes[$nodeKey];
try {
if ((array)$node === $node) {

try
{
if ((array)$node === $node)
{
$this->usedNodes[$nodeKey] = $node;
$node = new Node($nodeKey, $node);
unset($this->nodes[$nodeKey]);
} else {
}
else
{
$this->usedNodes[$nodeKey] = $node;
$node = new Node($node);
unset($this->nodes[$nodeKey]);
}
} catch (\InvalidArgumentException $e) {

}
catch (\InvalidArgumentException $e)
{
trigger_error($e->getMessage());
}

Expand Down
70 changes: 60 additions & 10 deletions src/Cluster/Node.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use evseevnn\Cassandra\Exception\ConnectionException;

class Node {
class Node
{

const STREAM_TIMEOUT = 10;
const STREAM_TIMEOUT = 2;

/**
* @var string
Expand Down Expand Up @@ -35,39 +36,88 @@ class Node {
* @param array $options
* @throws \InvalidArgumentException
*/
public function __construct($host, array $options = []) {
public function __construct($host, array $options = [])
{
$this->host = $host;
if (strstr($this->host, ':')) {
if (strstr($this->host, ':'))
{
$this->port = (int)substr(strstr($this->host, ':'), 1);
$this->host = substr($this->host, 0, -1 - strlen($this->port));
if (!$this->port) {
if (!$this->port)
{
throw new \InvalidArgumentException('Invalid port number');
}
}

$this->options = array_merge($this->options, $options);
}

/**
* @return resource
* @throws \Exception
*/
public function getConnection() {
if (!empty($this->socket)) return $this->socket;
public function getConnection()
{
if ( ! empty($this->socket)) return $this->socket;

$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

socket_set_option($this->socket, getprotobyname('TCP'), TCP_NODELAY, 1);
socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, ["sec" => self::STREAM_TIMEOUT, "usec" => 0]);
if (!socket_connect($this->socket, $this->host, $this->port)) {
throw new ConnectionException("Unable to connect to Cassandra node: {$this->host}:{$this->port}");
socket_set_nonblock($this->socket);

$maxAttempts = 3;
$attempts = 0;

while ( ! @socket_connect($this->socket, $this->host, $this->port))
{
$attempts++;
$err = substr(socket_last_error($this->socket), -2);

// Connection OK!
if ($err === "56")
{
break;
}

// 61: server found but port unavailable (connection refused)
// 37: ip does not exist, i wait
// 01: host not found
if ($err === "61" || $err === "01")
{
socket_close($this->socket);
throw new ConnectionException('Unable to connect. Socket last error code : ' . $err);
}

// if timeout reaches then call exit();
if ($attempts > $maxAttempts)
{
socket_close($this->socket);
throw new ConnectionException('Unable to connect. Socket last error code (connection timeout) : ' . $err);
}

usleep(100000);
}

// Re-block the socket if needed
socket_set_block($this->socket);

return $this->socket;
}

/**
* @return array
*/
public function getOptions() {
public function getOptions()
{
return $this->options;
}

/**
* @return string
*/
public function getHost()
{
return $this->host;
}
}
44 changes: 39 additions & 5 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ class Connection {
*/
private $cluster;

/**
* @var int
*/
private $connMaxAttempts;

/**
* @var int
*/
private $connAttempts;

/**
* @var UseRandomNOdes
*/
private $useRandomNodes;

/**
* @var Node
*/
Expand All @@ -27,24 +42,43 @@ class Connection {
/**
* @param Cluster $cluster
*/
public function __construct(Cluster $cluster) {
public function __construct(Cluster $cluster, $useRandomNodes) {
$this->cluster = $cluster;
$this->useRandomNodes = $useRandomNodes;
$this->connMaxAttempts = 2;
$this->connAttempts = 0;
}

public function connect() {

try {
$this->node = $this->cluster->getRandomNode();

$this->node = $this->cluster->getNode($this->useRandomNodes);
$this->connection = $this->node->getConnection();

} catch (ConnectionException $e) {
$this->connect();

} catch (Exception\ClusterException $e) {

if ($this->connAttempts >= $this->connMaxAttempts)
{
throw new ConnectionException('I tried to connect to Database ' . $this->connMaxAttempts . ' times with no response.');
}

$this->connAttempts++;
$this->cluster->resetNodes();
$this->connect();
}
}

/**
* @return bool
*/
public function disconnect() {
return socket_shutdown($this->connection);
$socketShutdown = socket_shutdown($this->connection);
socket_close($this->connection);
return $socketShutdown;
}

/**
Expand All @@ -70,9 +104,9 @@ public function sendRequest(Request $request) {
* @return string
*/
private function fetchData($length) {
$data = socket_read($this->connection, $length);
$data = @socket_read($this->connection, $length);
while (strlen($data) < $length) {
$data .= socket_read($this->connection, $length);
$data .= @socket_read($this->connection, $length);
}
if (socket_last_error($this->connection) == 110) {
throw new ConnectionException('Connection timed out');
Expand Down
11 changes: 9 additions & 2 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class Database {
* @param string $keyspace
* @param array $options
*/
public function __construct(array $nodes, $keyspace = '', array $options = []) {
public function __construct(array $nodes, $keyspace = '', array $options = [], $useRandomNodes = TRUE) {
$this->cluster = new Cluster($nodes);
$this->connection = new Connection($this->cluster);
$this->connection = new Connection($this->cluster, $useRandomNodes);
$this->options = array_merge($this->options, $options);
$this->keyspace = $keyspace;
}
Expand All @@ -65,7 +65,14 @@ public function __construct(array $nodes, $keyspace = '', array $options = []) {
*/
public function connect() {
if ($this->connection->isConnected()) return true;

$this->connection->connect();

if (!$this->connection->isConnected())
{
return false;
}

$response = $this->connection->sendRequest(
RequestFactory::startup($this->options)
);
Expand Down
2 changes: 1 addition & 1 deletion src/Enum/ConsistencyEnum.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ class ConsistencyEnum {
const CONSISTENCY_ALL = 0x0005;
const CONSISTENCY_LOCAL_QUORUM = 0x0006;
const CONSISTENCY_EACH_QUORUM = 0x0007;
const CONSISTENCY_LOCAL_ONE = 0x0010;
const CONSISTENCY_LOCAL_ONE = 0xA;
}
10 changes: 10 additions & 0 deletions src/Protocol/Response/DataStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public function __construct($binary) {
* @return string
*/
protected function read($length) {
if ( ! $length) {
return null;
}

if ($this->length < $length) {
throw new \Exception('Reading while at end of stream');
}
Expand Down Expand Up @@ -174,6 +178,12 @@ public function readBytes($isCollectionElement = false) {
if ($isCollectionElement)
$this->readShort();
$length = $this->readInt();

if ($length === -1)
{
return null;
}

return $this->read($length);
}

Expand Down