array('S', 'AMQPLib'), 'platform' => array('S', 'PHP'), 'version' => array('S', '2.6'), 'information' => array('S', ''), 'copyright' => array('S', ''), 'capabilities' => array( 'F', array( 'authentication_failure_close' => array('t', true), 'publisher_confirms' => array('t', true), 'consumer_cancel_notify' => array('t', true), 'exchange_exchange_bindings' => array('t', true), 'basic.nack' => array('t', true), 'connection.blocked' => array('t', true) ) ) ); /** @var AMQPChannel[] */ public $channels = array(); /** @var int */ protected $version_major; /** @var int */ protected $version_minor; /** @var array */ protected $server_properties; /** @var array */ protected $mechanisms; /** @var array */ protected $locales; /** @var bool */ protected $wait_tune_ok; /** @var string */ protected $known_hosts; /** @var AMQPReader */ protected $input; /** @var string */ protected $vhost; /** @var bool */ protected $insist; /** @var string */ protected $login_method; /** @var string */ protected $login_response; /** @var string */ protected $locale; /** @var int */ protected $heartbeat; /** @var SocketIO */ protected $sock; /** @var int */ protected $channel_max = 65535; /** @var int */ protected $frame_max = 131072; /** @var array Constructor parameters for clone */ protected $construct_params; /** @var bool Close the connection in destructor */ protected $close_on_destruct = true; /** @var bool Maintain connection status */ protected $is_connected = false; /** @var \PhpAmqpLib\Wire\IO\AbstractIO */ protected $io; /** @var \PhpAmqpLib\Wire\AMQPReader */ protected $wait_frame_reader; /** @var callable Handles connection blocking from the server */ private $connection_block_handler; /** @var callable Handles connection unblocking from the server */ private $connection_unblock_handler; /** * Circular buffer to speed up prepare_content(). * Max size limited by $prepare_content_cache_max_size. * * @var array * @see prepare_content() */ private $prepare_content_cache; /** @var int Maximal size of $prepare_content_cache */ private $prepare_content_cache_max_size; /** * @param string $user * @param string $password * @param string $vhost * @param bool $insist * @param string $login_method * @param null $login_response * @param string $locale * @param AbstractIO $io * @param int $heartbeat * @throws \Exception */ public function __construct( $user, $password, $vhost = '/', $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', AbstractIO $io, $heartbeat = 0 ) { // save the params for the use of __clone $this->construct_params = func_get_args(); $this->wait_frame_reader = new AMQPReader(null); $this->vhost = $vhost; $this->insist = $insist; $this->login_method = $login_method; $this->login_response = $login_response; $this->locale = $locale; $this->io = $io; $this->heartbeat = $heartbeat; if ($user && $password) { $this->login_response = new AMQPWriter(); $this->login_response->write_table(array( 'LOGIN' => array('S', $user), 'PASSWORD' => array('S', $password) )); // Skip the length $responseValue = $this->login_response->getvalue(); $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII'); } else { $this->login_response = null; } $this->prepare_content_cache = array(); $this->prepare_content_cache_max_size = 100; // Lazy Connection waits on connecting if ($this->connectOnConstruct()) { $this->connect(); } } /** * Connects to the AMQP server */ protected function connect() { try { // Loop until we connect while (!$this->isConnected()) { // Assume we will connect, until we dont $this->setIsConnected(true); // Connect the socket $this->getIO()->connect(); $this->channels = array(); // The connection object itself is treated as channel 0 parent::__construct($this, 0); $this->input = new AMQPReader(null, $this->getIO()); $this->write($this->amqp_protocol_header); $this->wait(array($this->waitHelper->get_wait('connection.start'))); $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale); $this->wait_tune_ok = true; while ($this->wait_tune_ok) { $this->wait(array( $this->waitHelper->get_wait('connection.secure'), $this->waitHelper->get_wait('connection.tune') )); } $host = $this->x_open($this->vhost, '', $this->insist); if (!$host) { return null; // we weren't redirected } $this->setIsConnected(false); $this->closeChannels(); // we were redirected, close the socket, loop and try again $this->close_socket(); } } catch (\Exception $e) { // Something went wrong, set the connection status $this->setIsConnected(false); $this->closeChannels(); throw $e; // Rethrow exception } } /** * Reconnects using the original connection settings. * This will not recreate any channels that were established previously */ public function reconnect() { // Try to close the AMQP connection $this->safeClose(); // Reconnect the socket/stream then AMQP $this->getIO()->reconnect(); $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure $this->connect(); } /** * Cloning will use the old properties to make a new connection to the same server */ public function __clone() { call_user_func_array(array($this, '__construct'), $this->construct_params); } public function __destruct() { if ($this->close_on_destruct) { $this->safeClose(); } } /** * Attempts to close the connection safely */ protected function safeClose() { try { if (isset($this->input) && $this->input) { $this->close(); } } catch (\Exception $e) { // Nothing here } } /** * @param int $sec * @param int $usec * @return mixed */ public function select($sec, $usec = 0) { return $this->getIO()->select($sec, $usec); } /** * Allows to not close the connection * it's useful after the fork when you don't want to close parent process connection * * @param bool $close */ public function set_close_on_destruct($close = true) { $this->close_on_destruct = (bool) $close; } protected function close_input() { $this->debug->debug_msg('closing input'); if (!is_null($this->input)) { $this->input->close(); $this->input = null; } } protected function close_socket() { $this->debug->debug_msg('closing socket'); if (!is_null($this->getIO())) { $this->getIO()->close(); } } /** * @param $data */ public function write($data) { $this->debug->debug_hexdump($data); try { $this->getIO()->write($data); } catch (AMQPRuntimeException $e) { $this->setIsConnected(false); throw $e; } } protected function do_close() { $this->setIsConnected(false); $this->close_input(); $this->close_socket(); } /** * @return int * @throws \PhpAmqpLib\Exception\AMQPRuntimeException */ public function get_free_channel_id() { for ($i = 1; $i <= $this->channel_max; $i++) { if (!isset($this->channels[$i])) { return $i; } } throw new AMQPRuntimeException('No free channel ids'); } /** * @param string $channel * @param int $class_id * @param int $weight * @param int $body_size * @param string $packed_properties * @param string $body * @param AMQPWriter $pkt */ public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) { $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt); $this->write($pkt->getvalue()); } /** * Returns a new AMQPWriter or mutates the provided $pkt * * @param string $channel * @param int $class_id * @param int $weight * @param int $body_size * @param string $packed_properties * @param string $body * @param AMQPWriter $pkt * @return AMQPWriter */ public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) { $pkt = $pkt ?: new AMQPWriter(); // Content already prepared ? $key_cache = sprintf( '%s|%s|%s|%s', $channel, $packed_properties, $class_id, $weight ); if (!isset($this->prepare_content_cache[$key_cache])) { $w = new AMQPWriter(); $w->write_octet(2); $w->write_short($channel); $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12); $w->write_short($class_id); $w->write_short($weight); $this->prepare_content_cache[$key_cache] = $w->getvalue(); if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) { reset($this->prepare_content_cache); $old_key = key($this->prepare_content_cache); unset($this->prepare_content_cache[$old_key]); } } $pkt->write($this->prepare_content_cache[$key_cache]); $pkt->write_longlong($body_size); $pkt->write($packed_properties); $pkt->write_octet(0xCE); // memory efficiency: walk the string instead of biting // it. good for very large packets (close in size to // memory_limit setting) $position = 0; $bodyLength = mb_strlen($body,'ASCII'); while ($position < $bodyLength) { $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII'); $position += $this->frame_max - 8; $pkt->write_octet(3); $pkt->write_short($channel); $pkt->write_long(mb_strlen($payload, 'ASCII')); $pkt->write($payload); $pkt->write_octet(0xCE); } return $pkt; } /** * @param string $channel * @param array $method_sig * @param AMQPWriter|string $args * @param null $pkt */ protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) { $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt); $this->write($pkt->getvalue()); $this->debug->debug_method_signature1($method_sig); } /** * Returns a new AMQPWriter or mutates the provided $pkt * * @param string $channel * @param array $method_sig * @param AMQPWriter|string $args * @param AMQPWriter $pkt * @return AMQPWriter */ protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) { if ($args instanceof AMQPWriter) { $args = $args->getvalue(); } $pkt = $pkt ?: new AMQPWriter(); $pkt->write_octet(1); $pkt->write_short($channel); $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id // in payload $pkt->write_short($method_sig[0]); // class_id $pkt->write_short($method_sig[1]); // method_id $pkt->write($args); $pkt->write_octet(0xCE); $this->debug->debug_method_signature1($method_sig); return $pkt; } /** * Waits for a frame from the server * * @param int $timeout * @return array * @throws \Exception * @throws \PhpAmqpLib\Exception\AMQPTimeoutException * @throws \PhpAmqpLib\Exception\AMQPRuntimeException */ protected function wait_frame($timeout = 0) { if (is_null($this->input)) { $this->setIsConnected(false); throw new AMQPRuntimeException('Broken pipe or closed connection'); } $currentTimeout = $this->input->getTimeout(); $this->input->setTimeout($timeout); try { // frame_type + channel_id + size $this->wait_frame_reader->reuse( $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG) ); $frame_type = $this->wait_frame_reader->read_octet(); $class = self::$PROTOCOL_CONSTANTS_CLASS; if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) { throw new AMQPRuntimeException('Invalid frame type ' . $frame_type); } $channel = $this->wait_frame_reader->read_short(); $size = $this->wait_frame_reader->read_long(); // payload + ch $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size)); $payload = $this->wait_frame_reader->read($size); $ch = $this->wait_frame_reader->read_octet(); } catch (AMQPTimeoutException $e) { $this->input->setTimeout($currentTimeout); throw $e; } $this->input->setTimeout($currentTimeout); if ($ch != 0xCE) { throw new AMQPRuntimeException(sprintf( 'Framing error, unexpected byte: %x', $ch )); } return array($frame_type, $channel, $payload); } /** * Waits for a frame from the server destined for a particular channel. * * @param string $channel_id * @param int $timeout * @return array */ protected function wait_channel($channel_id, $timeout = 0) { // Keeping the original timeout unchanged. $_timeout = $timeout; while (true) { $now = time(); list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout); if ($frame_channel === 0 && $frame_type === 8) { // skip heartbeat frames and reduce the timeout by the time passed if($_timeout > 0) { $_timeout -= time() - $now; if($_timeout <= 0) { // If timeout has been reached, throw the exception without calling wait_frame throw new AMQPTimeoutException("Timeout waiting on channel"); } } continue; } else { if ($frame_channel == $channel_id) { return array($frame_type, $payload); } // Not the channel we were looking for. Queue this frame //for later, when the other channel is looking for frames. // Make sure the channel still exists, it could have been // closed by a previous Exception. if (isset($this->channels[$frame_channel])) { array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload)); } // If we just queued up a method for channel 0 (the Connection // itself) it's probably a close method in reaction to some // error, so deal with it right away. if (($frame_type == 1) && ($frame_channel == 0)) { $this->wait(); } } } } /** * Fetches a channel object identified by the numeric channel_id, or * create that object if it doesn't already exist. * * @param string $channel_id * @return AMQPChannel */ public function channel($channel_id = null) { if (isset($this->channels[$channel_id])) { return $this->channels[$channel_id]; } $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id(); $ch = new AMQPChannel($this->connection, $channel_id); $this->channels[$channel_id] = $ch; return $ch; } /** * Requests a connection close * * @param int $reply_code * @param string $reply_text * @param array $method_sig * @return mixed|null */ public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) { if ($this->io instanceof StreamIO) { $this->io->disableHeartbeat(); } if (!$this->protocolWriter || !$this->isConnected()) { return null; } $this->closeChannels(); list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( $reply_code, $reply_text, $method_sig[0], $method_sig[1] ); $this->send_method_frame(array($class_id, $method_id), $args); $this->setIsConnected(false); return $this->wait(array( $this->waitHelper->get_wait('connection.close_ok') )); } /** * @param AMQPReader $reader * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException */ protected function connection_close(AMQPReader $reader) { $reply_code = $reader->read_short(); $reply_text = $reader->read_shortstr(); $class_id = $reader->read_short(); $method_id = $reader->read_short(); $this->x_close_ok(); throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); } /** * Confirms a connection close */ protected function x_close_ok() { $this->send_method_frame( explode(',', $this->waitHelper->get_wait('connection.close_ok')) ); $this->do_close(); } /** * Confirm a connection close * * @param AMQPReader $args */ protected function connection_close_ok($args) { $this->do_close(); } /** * @param string $virtual_host * @param string $capabilities * @param bool $insist * @return mixed */ protected function x_open($virtual_host, $capabilities = '', $insist = false) { $args = new AMQPWriter(); $args->write_shortstr($virtual_host); $args->write_shortstr($capabilities); $args->write_bits(array($insist)); $this->send_method_frame(array(10, 40), $args); $wait = array( $this->waitHelper->get_wait('connection.open_ok') ); if ($this->protocolVersion == '0.8') { $wait[] = $this->waitHelper->get_wait('connection.redirect'); } return $this->wait($wait); } /** * Signals that the connection is ready * * @param AMQPReader $args */ protected function connection_open_ok($args) { $this->known_hosts = $args->read_shortstr(); $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts); } /** * Asks the client to use a different server * * @param AMQPReader $args * @return string */ protected function connection_redirect($args) { $host = $args->read_shortstr(); $this->known_hosts = $args->read_shortstr(); $this->debug->debug_msg(sprintf( 'Redirected to [%s], known_hosts [%s]', $host, $this->known_hosts )); return $host; } /** * Security mechanism challenge * * @param AMQPReader $args */ protected function connection_secure($args) { $challenge = $args->read_longstr(); } /** * Security mechanism response * * @param string $response */ protected function x_secure_ok($response) { $args = new AMQPWriter(); $args->write_longstr($response); $this->send_method_frame(array(10, 21), $args); } /** * Starts connection negotiation * * @param AMQPReader $args */ protected function connection_start($args) { $this->version_major = $args->read_octet(); $this->version_minor = $args->read_octet(); $this->server_properties = $args->read_table(); $this->mechanisms = explode(' ', $args->read_longstr()); $this->locales = explode(' ', $args->read_longstr()); $this->debug->debug_connection_start( $this->version_major, $this->version_minor, $this->server_properties, $this->mechanisms, $this->locales ); } /** * @param AMQPTable|array $clientProperties * @param string $mechanism * @param string $response * @param string $locale */ protected function x_start_ok($clientProperties, $mechanism, $response, $locale) { $args = new AMQPWriter(); $args->write_table($clientProperties); $args->write_shortstr($mechanism); $args->write_longstr($response); $args->write_shortstr($locale); $this->send_method_frame(array(10, 11), $args); } /** * Proposes connection tuning parameters * * @param AMQPReader $args */ protected function connection_tune($args) { $v = $args->read_short(); if ($v) { $this->channel_max = $v; } $v = $args->read_long(); if ($v) { $this->frame_max = $v; } // use server proposed value if not set if ($this->heartbeat === null) { $this->heartbeat = $args->read_short(); } $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat); } /** * Negotiates connection tuning parameters * * @param int $channel_max * @param int $frame_max * @param int $heartbeat */ protected function x_tune_ok($channel_max, $frame_max, $heartbeat) { $args = new AMQPWriter(); $args->write_short($channel_max); $args->write_long($frame_max); $args->write_short($heartbeat); $this->send_method_frame(array(10, 31), $args); $this->wait_tune_ok = false; } /** * @return SocketIO */ public function getSocket() { return $this->io->getSocket(); } /** * @return \PhpAmqpLib\Wire\IO\AbstractIO */ protected function getIO() { return $this->io; } /** * Handles connection blocked notifications * * @param AMQPReader $args */ protected function connection_blocked(AMQPReader $args) { // Call the block handler and pass in the reason $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr())); } /** * Handles connection unblocked notifications * * @param AMQPReader $args */ protected function connection_unblocked(AMQPReader $args) { // No args to an unblock event $this->dispatch_to_handler($this->connection_unblock_handler, array()); } /** * Sets a handler which is called whenever a connection.block is sent from the server * * @param callable $callback */ public function set_connection_block_handler($callback) { $this->connection_block_handler = $callback; } /** * Sets a handler which is called whenever a connection.block is sent from the server * * @param callable $callback */ public function set_connection_unblock_handler($callback) { $this->connection_unblock_handler = $callback; } /** * Gets the connection status * * @return bool */ public function isConnected() { return (bool) $this->is_connected; } /** * Set the connection status * * @param bool $is_connected */ protected function setIsConnected($is_connected) { $this->is_connected = (bool) $is_connected; } /** * Closes all available channels */ protected function closeChannels() { foreach ($this->channels as $key => $channel) { // channels[0] is this connection object, so don't close it yet if ($key === 0) { continue; } try { $channel->close(); } catch (\Exception $e) { /* Ignore closing errors */ } } } /** * Should the connection be attempted during construction? * * @return bool */ public function connectOnConstruct() { return true; } /** * @return array */ public function getServerProperties() { return $this->server_properties; } }