<?php
|
namespace PhpAmqpLib\Wire;
|
|
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
|
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
|
use PhpAmqpLib\Exception\AMQPRuntimeException;
|
use PhpAmqpLib\Exception\AMQPTimeoutException;
|
use PhpAmqpLib\Exception\AMQPIOWaitException;
|
use PhpAmqpLib\Helper\MiscHelper;
|
use PhpAmqpLib\Wire\IO\AbstractIO;
|
|
/**
|
* This class can read from a string or from a stream
|
*
|
* TODO : split this class: AMQPStreamReader and a AMQPBufferReader
|
*/
|
class AMQPReader extends AbstractClient
|
{
|
const BIT = 1;
|
const OCTET = 1;
|
const SHORTSTR = 1;
|
const SHORT = 2;
|
const LONG = 4;
|
const SIGNED_LONG = 4;
|
const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
|
const LONGLONG = 8;
|
const TIMESTAMP = 8;
|
|
/** @var string */
|
protected $str;
|
|
/** @var int */
|
protected $str_length;
|
|
/** @var int */
|
protected $offset;
|
|
/** @var int */
|
protected $bitcount;
|
|
/** @var bool */
|
protected $is64bits;
|
|
/** @var int */
|
protected $timeout;
|
|
/** @var int */
|
protected $bits;
|
|
/** @var \PhpAmqpLib\Wire\IO\AbstractIO */
|
protected $io;
|
|
/**
|
* @param string $str
|
* @param AbstractIO $io
|
* @param int $timeout
|
*/
|
public function __construct($str, AbstractIO $io = null, $timeout = 0)
|
{
|
parent::__construct();
|
|
$this->str = $str;
|
$this->str_length = mb_strlen($this->str, 'ASCII');
|
$this->io = $io;
|
$this->offset = 0;
|
$this->bitcount = $this->bits = 0;
|
$this->timeout = $timeout;
|
}
|
|
/**
|
* Resets the object from the injected param
|
*
|
* Used to not need to create a new AMQPReader instance every time.
|
* when we can just pass a string and reset the object state.
|
* NOTE: since we are working with strings we don't need to pass an AbstractIO
|
* or a timeout.
|
*
|
* @param string $str
|
*/
|
public function reuse($str)
|
{
|
$this->str = $str;
|
$this->str_length = mb_strlen($this->str, 'ASCII');
|
$this->offset = 0;
|
$this->bitcount = $this->bits = 0;
|
}
|
|
/**
|
* Closes the stream
|
*/
|
public function close()
|
{
|
if ($this->io) {
|
$this->io->close();
|
}
|
}
|
|
/**
|
* @param int $n
|
* @return string
|
*/
|
public function read($n)
|
{
|
$this->bitcount = $this->bits = 0;
|
|
return $this->rawread($n);
|
}
|
|
/**
|
* Waits until some data is retrieved from the socket.
|
*
|
* AMQPTimeoutException can be raised if the timeout is set
|
*
|
* @throws \PhpAmqpLib\Exception\AMQPIOWaitException
|
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
|
*/
|
protected function wait()
|
{
|
if ($this->getTimeout() == 0) {
|
return null;
|
}
|
|
// wait ..
|
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
|
$result = $this->io->select($sec, $usec);
|
|
if ($result === false) {
|
throw new AMQPIOWaitException('A network error occured while awaiting for incoming data');
|
}
|
|
if ($result === 0) {
|
throw new AMQPTimeoutException(sprintf(
|
'The connection timed out after %s sec while awaiting incoming data',
|
$this->getTimeout()
|
));
|
}
|
}
|
|
/**
|
* @param int $n
|
* @return string
|
* @throws \RuntimeException
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
|
*/
|
protected function rawread($n)
|
{
|
if ($this->io) {
|
$this->wait();
|
$res = $this->io->read($n);
|
$this->offset += $n;
|
|
return $res;
|
}
|
|
if ($this->str_length < $n) {
|
throw new AMQPRuntimeException(sprintf(
|
'Error reading data. Requested %s bytes while string buffer has only %s',
|
$n,
|
$this->str_length
|
));
|
}
|
|
$res = mb_substr($this->str, 0, $n, 'ASCII');
|
$this->str = mb_substr($this->str, $n, mb_strlen($this->str, 'ASCII') - $n, 'ASCII');
|
$this->str_length -= $n;
|
$this->offset += $n;
|
|
return $res;
|
}
|
|
/**
|
* @return bool
|
*/
|
public function read_bit()
|
{
|
if (!$this->bitcount) {
|
$this->bits = ord($this->rawread(1));
|
$this->bitcount = 8;
|
}
|
|
$result = ($this->bits & 1) == 1;
|
$this->bits >>= 1;
|
$this->bitcount -= 1;
|
|
return $result;
|
}
|
|
/**
|
* @return mixed
|
*/
|
public function read_octet()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('C', $this->rawread(1));
|
|
return $res;
|
}
|
|
/**
|
* @return mixed
|
*/
|
public function read_signed_octet()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('c', $this->rawread(1));
|
|
return $res;
|
}
|
|
/**
|
* @return mixed
|
*/
|
public function read_short()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('n', $this->rawread(2));
|
|
return $res;
|
}
|
|
/**
|
* @return mixed
|
*/
|
public function read_signed_short()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
|
|
return $res;
|
}
|
|
/**
|
* Reads 32 bit integer in big-endian byte order.
|
*
|
* On 64 bit systems it will return always unsigned int
|
* value in 0..2^32 range.
|
*
|
* On 32 bit systems it will return signed int value in
|
* -2^31...+2^31 range.
|
*
|
* Use with caution!
|
*/
|
public function read_php_int()
|
{
|
list(, $res) = unpack('N', $this->rawread(4));
|
|
if ($this->is64bits) {
|
return (int) sprintf('%u', $res);
|
}
|
|
return $res;
|
}
|
|
/**
|
* PHP does not have unsigned 32 bit int,
|
* so we return it as a string
|
*
|
* @return string
|
*/
|
public function read_long()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('N', $this->rawread(4));
|
|
return !$this->is64bits && self::getLongMSB($res) ? sprintf('%u', $res) : $res;
|
}
|
|
/**
|
* @return integer
|
*/
|
private function read_signed_long()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
|
|
return $res;
|
}
|
|
/**
|
* Even on 64 bit systems PHP integers are singed.
|
* Since we need an unsigned value here we return it
|
* as a string.
|
*
|
* @return string
|
*/
|
public function read_longlong()
|
{
|
$this->bitcount = $this->bits = 0;
|
|
list(, $hi, $lo) = unpack('N2', $this->rawread(8));
|
$msb = self::getLongMSB($hi);
|
|
if (!$this->is64bits) {
|
if ($msb) {
|
$hi = sprintf('%u', $hi);
|
}
|
if (self::getLongMSB($lo)) {
|
$lo = sprintf('%u', $lo);
|
}
|
}
|
|
return bcadd($this->is64bits && !$msb ? $hi << 32 : bcmul($hi, '4294967296', 0), $lo, 0);
|
}
|
|
/**
|
* @return string
|
*/
|
public function read_signed_longlong()
|
{
|
$this->bitcount = $this->bits = 0;
|
|
list(, $hi, $lo) = unpack('N2', $this->rawread(8));
|
|
if ($this->is64bits) {
|
return bcadd($hi << 32, $lo, 0);
|
} else {
|
return bcadd(bcmul($hi, '4294967296', 0), self::getLongMSB($lo) ? sprintf('%u', $lo) : $lo, 0);
|
}
|
}
|
|
/**
|
* @param int $longInt
|
* @return bool
|
*/
|
private static function getLongMSB($longInt)
|
{
|
return (bool) ($longInt & 0x80000000);
|
}
|
|
/**
|
* Read a utf-8 encoded string that's stored in up to
|
* 255 bytes. Return it decoded as a PHP unicode object.
|
*/
|
public function read_shortstr()
|
{
|
$this->bitcount = $this->bits = 0;
|
list(, $slen) = unpack('C', $this->rawread(1));
|
|
return $this->rawread($slen);
|
}
|
|
/**
|
* Read a string that's up to 2**32 bytes, the encoding
|
* isn't specified in the AMQP spec, so just return it as
|
* a plain PHP string.
|
*/
|
public function read_longstr()
|
{
|
$this->bitcount = $this->bits = 0;
|
$slen = $this->read_php_int();
|
|
if ($slen < 0) {
|
throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
|
}
|
|
return $this->rawread($slen);
|
}
|
|
/**
|
* Read and AMQP timestamp, which is a 64-bit integer representing
|
* seconds since the Unix epoch in 1-second resolution.
|
*/
|
public function read_timestamp()
|
{
|
return $this->read_longlong();
|
}
|
|
/**
|
* Read an AMQP table, and return as a PHP array. keys are strings,
|
* values are (type,value) tuples.
|
*
|
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
|
* @return array|AMQPTable
|
*/
|
public function read_table($returnObject = false)
|
{
|
$this->bitcount = $this->bits = 0;
|
$tlen = $this->read_php_int();
|
|
if ($tlen < 0) {
|
throw new AMQPOutOfBoundsException('Table is longer than supported');
|
}
|
|
$table_data = new AMQPReader($this->rawread($tlen), null);
|
$result = $returnObject ? new AMQPTable() : array();
|
|
while ($table_data->tell() < $tlen) {
|
$name = $table_data->read_shortstr();
|
$ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
|
$val = $table_data->read_value($ftype, $returnObject);
|
$returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
|
}
|
|
return $result;
|
}
|
|
/**
|
* @return array|AMQPTable
|
*/
|
public function read_table_object()
|
{
|
return $this->read_table(true);
|
}
|
|
/**
|
* Reads the array in the next value.
|
*
|
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
|
* @return array|AMQPArray
|
*/
|
public function read_array($returnObject = false)
|
{
|
$this->bitcount = $this->bits = 0;
|
|
// Determine array length and its end position
|
$arrayLength = $this->read_php_int();
|
$endOffset = $this->offset + $arrayLength;
|
|
$result = $returnObject ? new AMQPArray() : array();
|
|
// Read values until we reach the end of the array
|
while ($this->offset < $endOffset) {
|
$fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
|
$fieldValue = $this->read_value($fieldType, $returnObject);
|
$returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
|
}
|
|
return $result;
|
}
|
|
/**
|
* @return array|AMQPArray
|
*/
|
public function read_array_object()
|
{
|
return $this->read_array(true);
|
}
|
|
/**
|
* Reads the next value as the provided field type.
|
*
|
* @param int $fieldType One of AMQPAbstractCollection::T_* constants
|
* @param bool $collectionsAsObjects Description
|
* @return mixed
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
|
*/
|
public function read_value($fieldType, $collectionsAsObjects = false)
|
{
|
$this->bitcount = $this->bits = 0;
|
|
switch ($fieldType) {
|
case AMQPAbstractCollection::T_INT_SHORTSHORT:
|
//according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
|
//$val=$this->read_bit();
|
$val = $this->read_signed_octet();
|
break;
|
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
|
$val = $this->read_octet();
|
break;
|
case AMQPAbstractCollection::T_INT_SHORT:
|
$val = $this->read_signed_short();
|
break;
|
case AMQPAbstractCollection::T_INT_SHORT_U:
|
$val = $this->read_short();
|
break;
|
case AMQPAbstractCollection::T_INT_LONG:
|
$val = $this->read_signed_long();
|
break;
|
case AMQPAbstractCollection::T_INT_LONG_U:
|
$val = $this->read_long();
|
break;
|
case AMQPAbstractCollection::T_INT_LONGLONG:
|
$val = $this->read_signed_longlong();
|
break;
|
case AMQPAbstractCollection::T_INT_LONGLONG_U:
|
$val = $this->read_longlong();
|
break;
|
case AMQPAbstractCollection::T_DECIMAL:
|
$e = $this->read_octet();
|
$n = $this->read_signed_long();
|
$val = new AMQPDecimal($n, $e);
|
break;
|
case AMQPAbstractCollection::T_TIMESTAMP:
|
$val = $this->read_timestamp();
|
break;
|
case AMQPAbstractCollection::T_BOOL:
|
$val = $this->read_octet();
|
break;
|
case AMQPAbstractCollection::T_STRING_SHORT:
|
$val = $this->read_shortstr();
|
break;
|
case AMQPAbstractCollection::T_STRING_LONG:
|
$val = $this->read_longstr();
|
break;
|
case AMQPAbstractCollection::T_ARRAY:
|
$val = $this->read_array($collectionsAsObjects);
|
break;
|
case AMQPAbstractCollection::T_TABLE:
|
$val = $this->read_table($collectionsAsObjects);
|
break;
|
case AMQPAbstractCollection::T_VOID:
|
$val = null;
|
break;
|
default:
|
throw new AMQPInvalidArgumentException(sprintf(
|
'Unsupported type "%s"',
|
$fieldType
|
));
|
}
|
|
return isset($val) ? $val : null;
|
}
|
|
/**
|
* @return int
|
*/
|
protected function tell()
|
{
|
return $this->offset;
|
}
|
|
/**
|
* Sets the timeout (second)
|
*
|
* @param int $timeout
|
*/
|
public function setTimeout($timeout)
|
{
|
$this->timeout = $timeout;
|
}
|
|
/**
|
* @return int
|
*/
|
public function getTimeout()
|
{
|
return $this->timeout;
|
}
|
}
|