| <?php | 
| namespace PhpAmqpLib\Connection; | 
|   | 
| use PhpAmqpLib\Channel\AMQPChannel; | 
| use PhpAmqpLib\Channel\AbstractChannel; | 
| use PhpAmqpLib\Exception\AMQPProtocolConnectionException; | 
| use PhpAmqpLib\Exception\AMQPRuntimeException; | 
| use PhpAmqpLib\Exception\AMQPTimeoutException; | 
| use PhpAmqpLib\Wire\AMQPReader; | 
| use PhpAmqpLib\Wire\AMQPTable; | 
| use PhpAmqpLib\Wire\AMQPWriter; | 
| use PhpAmqpLib\Wire\IO\AbstractIO; | 
| use PhpAmqpLib\Wire\IO\SocketIO; | 
| use PhpAmqpLib\Wire\IO\StreamIO; | 
|   | 
| class AbstractConnection extends AbstractChannel | 
| { | 
|     /** @var array */ | 
|     public static $LIBRARY_PROPERTIES = array( | 
|         'product' => array('S', 'AMQPLib'), | 
|         'platform' => array('S', 'PHP'), | 
|         'version' => array('S', '2.6'), | 
|         'information' => array('S', ''), | 
|         'copyright' => array('S', ''), | 
|         'capabilities' => array( | 
|             'F', | 
|             array( | 
|                 'authentication_failure_close' => array('t', true), | 
|                 'publisher_confirms' => array('t', true), | 
|                 'consumer_cancel_notify' => array('t', true), | 
|                 'exchange_exchange_bindings' => array('t', true), | 
|                 'basic.nack' => array('t', true), | 
|                 'connection.blocked' => array('t', true) | 
|             ) | 
|         ) | 
|     ); | 
|   | 
|     /** @var AMQPChannel[] */ | 
|     public $channels = array(); | 
|   | 
|     /** @var int */ | 
|     protected $version_major; | 
|   | 
|     /** @var int */ | 
|     protected $version_minor; | 
|   | 
|     /** @var array */ | 
|     protected $server_properties; | 
|   | 
|     /** @var array */ | 
|     protected $mechanisms; | 
|   | 
|     /** @var array */ | 
|     protected $locales; | 
|   | 
|     /** @var bool */ | 
|     protected $wait_tune_ok; | 
|   | 
|     /** @var string */ | 
|     protected $known_hosts; | 
|   | 
|     /** @var AMQPReader */ | 
|     protected $input; | 
|   | 
|     /** @var string */ | 
|     protected $vhost; | 
|   | 
|     /** @var bool */ | 
|     protected $insist; | 
|   | 
|     /** @var string */ | 
|     protected $login_method; | 
|   | 
|     /** @var string */ | 
|     protected $login_response; | 
|   | 
|     /** @var string */ | 
|     protected $locale; | 
|   | 
|     /** @var int */ | 
|     protected $heartbeat; | 
|   | 
|     /** @var SocketIO */ | 
|     protected $sock; | 
|   | 
|     /** @var int */ | 
|     protected $channel_max = 65535; | 
|   | 
|     /** @var int */ | 
|     protected $frame_max = 131072; | 
|   | 
|      /** @var array Constructor parameters for clone */ | 
|     protected $construct_params; | 
|   | 
|     /** @var bool Close the connection in destructor */ | 
|     protected $close_on_destruct = true; | 
|   | 
|     /** @var bool Maintain connection status */ | 
|     protected $is_connected = false; | 
|   | 
|     /** @var \PhpAmqpLib\Wire\IO\AbstractIO */ | 
|     protected $io; | 
|   | 
|     /** @var \PhpAmqpLib\Wire\AMQPReader */ | 
|     protected $wait_frame_reader; | 
|   | 
|     /** @var callable Handles connection blocking from the server */ | 
|     private $connection_block_handler; | 
|   | 
|     /** @var callable Handles connection unblocking from the server */ | 
|     private $connection_unblock_handler; | 
|   | 
|     /** | 
|      * Circular buffer to speed up prepare_content(). | 
|      * Max size limited by $prepare_content_cache_max_size. | 
|      * | 
|      * @var array | 
|      * @see prepare_content() | 
|      */ | 
|     private $prepare_content_cache; | 
|   | 
|     /** @var int Maximal size of $prepare_content_cache */ | 
|     private $prepare_content_cache_max_size; | 
|   | 
|     /** | 
|      * @param string $user | 
|      * @param string $password | 
|      * @param string $vhost | 
|      * @param bool $insist | 
|      * @param string $login_method | 
|      * @param null $login_response | 
|      * @param string $locale | 
|      * @param AbstractIO $io | 
|      * @param int $heartbeat | 
|      * @throws \Exception | 
|      */ | 
|     public function __construct( | 
|         $user, | 
|         $password, | 
|         $vhost = '/', | 
|         $insist = false, | 
|         $login_method = 'AMQPLAIN', | 
|         $login_response = null, | 
|         $locale = 'en_US', | 
|         AbstractIO $io, | 
|         $heartbeat = 0 | 
|     ) { | 
|         // save the params for the use of __clone | 
|         $this->construct_params = func_get_args(); | 
|   | 
|         $this->wait_frame_reader = new AMQPReader(null); | 
|         $this->vhost = $vhost; | 
|         $this->insist = $insist; | 
|         $this->login_method = $login_method; | 
|         $this->login_response = $login_response; | 
|         $this->locale = $locale; | 
|         $this->io = $io; | 
|         $this->heartbeat = $heartbeat; | 
|   | 
|         if ($user && $password) { | 
|             $this->login_response = new AMQPWriter(); | 
|             $this->login_response->write_table(array( | 
|                 'LOGIN' => array('S', $user), | 
|                 'PASSWORD' => array('S', $password) | 
|             )); | 
|   | 
|             // Skip the length | 
|             $responseValue = $this->login_response->getvalue(); | 
|             $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII'); | 
|   | 
|         } else { | 
|             $this->login_response = null; | 
|         } | 
|   | 
|         $this->prepare_content_cache = array(); | 
|         $this->prepare_content_cache_max_size = 100; | 
|   | 
|         // Lazy Connection waits on connecting | 
|         if ($this->connectOnConstruct()) { | 
|             $this->connect(); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * Connects to the AMQP server | 
|      */ | 
|     protected function connect() | 
|     { | 
|         try { | 
|             // Loop until we connect | 
|             while (!$this->isConnected()) { | 
|                 // Assume we will connect, until we dont | 
|                 $this->setIsConnected(true); | 
|   | 
|                 // Connect the socket | 
|                 $this->getIO()->connect(); | 
|   | 
|                 $this->channels = array(); | 
|                 // The connection object itself is treated as channel 0 | 
|                 parent::__construct($this, 0); | 
|   | 
|                 $this->input = new AMQPReader(null, $this->getIO()); | 
|   | 
|                 $this->write($this->amqp_protocol_header); | 
|                 $this->wait(array($this->waitHelper->get_wait('connection.start'))); | 
|                 $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale); | 
|   | 
|                 $this->wait_tune_ok = true; | 
|                 while ($this->wait_tune_ok) { | 
|                     $this->wait(array( | 
|                         $this->waitHelper->get_wait('connection.secure'), | 
|                         $this->waitHelper->get_wait('connection.tune') | 
|                     )); | 
|                 } | 
|   | 
|                 $host = $this->x_open($this->vhost, '', $this->insist); | 
|                 if (!$host) { | 
|                     return null; // we weren't redirected | 
|                 } | 
|   | 
|                 $this->setIsConnected(false); | 
|                 $this->closeChannels(); | 
|   | 
|                 // we were redirected, close the socket, loop and try again | 
|                 $this->close_socket(); | 
|             } | 
|   | 
|         } catch (\Exception $e) { | 
|             // Something went wrong, set the connection status | 
|             $this->setIsConnected(false); | 
|             $this->closeChannels(); | 
|             throw $e; // Rethrow exception | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * Reconnects using the original connection settings. | 
|      * This will not recreate any channels that were established previously | 
|      */ | 
|     public function reconnect() | 
|     { | 
|         // Try to close the AMQP connection | 
|         $this->safeClose(); | 
|         // Reconnect the socket/stream then AMQP | 
|         $this->getIO()->reconnect(); | 
|         $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure | 
|         $this->connect(); | 
|     } | 
|   | 
|     /** | 
|      * Cloning will use the old properties to make a new connection to the same server | 
|      */ | 
|     public function __clone() | 
|     { | 
|         call_user_func_array(array($this, '__construct'), $this->construct_params); | 
|     } | 
|   | 
|     public function __destruct() | 
|     { | 
|         if ($this->close_on_destruct) { | 
|             $this->safeClose(); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * Attempts to close the connection safely | 
|      */ | 
|     protected function safeClose() | 
|     { | 
|         try { | 
|             if (isset($this->input) && $this->input) { | 
|                 $this->close(); | 
|             } | 
|         } catch (\Exception $e) { | 
|             // Nothing here | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * @param int $sec | 
|      * @param int $usec | 
|      * @return mixed | 
|      */ | 
|     public function select($sec, $usec = 0) | 
|     { | 
|         return $this->getIO()->select($sec, $usec); | 
|     } | 
|   | 
|     /** | 
|      * Allows to not close the connection | 
|      * it's useful after the fork when you don't want to close parent process connection | 
|      * | 
|      * @param bool $close | 
|      */ | 
|     public function set_close_on_destruct($close = true) | 
|     { | 
|         $this->close_on_destruct = (bool) $close; | 
|     } | 
|   | 
|     protected function close_input() | 
|     { | 
|         $this->debug->debug_msg('closing input'); | 
|   | 
|         if (!is_null($this->input)) { | 
|             $this->input->close(); | 
|             $this->input = null; | 
|         } | 
|     } | 
|   | 
|     protected function close_socket() | 
|     { | 
|         $this->debug->debug_msg('closing socket'); | 
|   | 
|         if (!is_null($this->getIO())) { | 
|             $this->getIO()->close(); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * @param $data | 
|      */ | 
|     public function write($data) | 
|     { | 
|         $this->debug->debug_hexdump($data); | 
|   | 
|         try { | 
|             $this->getIO()->write($data); | 
|         } catch (AMQPRuntimeException $e) { | 
|             $this->setIsConnected(false); | 
|             throw $e; | 
|         } | 
|     } | 
|   | 
|     protected function do_close() | 
|     { | 
|         $this->setIsConnected(false); | 
|         $this->close_input(); | 
|         $this->close_socket(); | 
|     } | 
|   | 
|     /** | 
|      * @return int | 
|      * @throws \PhpAmqpLib\Exception\AMQPRuntimeException | 
|      */ | 
|     public function get_free_channel_id() | 
|     { | 
|         for ($i = 1; $i <= $this->channel_max; $i++) { | 
|             if (!isset($this->channels[$i])) { | 
|                 return $i; | 
|             } | 
|         } | 
|   | 
|         throw new AMQPRuntimeException('No free channel ids'); | 
|     } | 
|   | 
|     /** | 
|      * @param string $channel | 
|      * @param int $class_id | 
|      * @param int $weight | 
|      * @param int $body_size | 
|      * @param string $packed_properties | 
|      * @param string $body | 
|      * @param AMQPWriter $pkt | 
|      */ | 
|     public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) | 
|     { | 
|         $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt); | 
|         $this->write($pkt->getvalue()); | 
|     } | 
|   | 
|     /** | 
|      * Returns a new AMQPWriter or mutates the provided $pkt | 
|      * | 
|      * @param string $channel | 
|      * @param int $class_id | 
|      * @param int $weight | 
|      * @param int $body_size | 
|      * @param string $packed_properties | 
|      * @param string $body | 
|      * @param AMQPWriter $pkt | 
|      * @return AMQPWriter | 
|      */ | 
|     public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) | 
|     { | 
|         $pkt = $pkt ?: new AMQPWriter(); | 
|   | 
|         // Content already prepared ? | 
|         $key_cache = sprintf( | 
|             '%s|%s|%s|%s', | 
|             $channel, | 
|             $packed_properties, | 
|             $class_id, | 
|             $weight | 
|         ); | 
|   | 
|         if (!isset($this->prepare_content_cache[$key_cache])) { | 
|             $w = new AMQPWriter(); | 
|             $w->write_octet(2); | 
|             $w->write_short($channel); | 
|             $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12); | 
|             $w->write_short($class_id); | 
|             $w->write_short($weight); | 
|             $this->prepare_content_cache[$key_cache] = $w->getvalue(); | 
|             if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) { | 
|                 reset($this->prepare_content_cache); | 
|                 $old_key = key($this->prepare_content_cache); | 
|                 unset($this->prepare_content_cache[$old_key]); | 
|             } | 
|         } | 
|         $pkt->write($this->prepare_content_cache[$key_cache]); | 
|   | 
|         $pkt->write_longlong($body_size); | 
|         $pkt->write($packed_properties); | 
|   | 
|         $pkt->write_octet(0xCE); | 
|   | 
|   | 
|         // memory efficiency: walk the string instead of biting | 
|         // it. good for very large packets (close in size to | 
|         // memory_limit setting) | 
|         $position = 0; | 
|         $bodyLength = mb_strlen($body,'ASCII'); | 
|         while ($position < $bodyLength) { | 
|             $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII'); | 
|             $position += $this->frame_max - 8; | 
|   | 
|             $pkt->write_octet(3); | 
|             $pkt->write_short($channel); | 
|             $pkt->write_long(mb_strlen($payload, 'ASCII')); | 
|   | 
|             $pkt->write($payload); | 
|   | 
|             $pkt->write_octet(0xCE); | 
|         } | 
|   | 
|         return $pkt; | 
|     } | 
|   | 
|     /** | 
|      * @param string $channel | 
|      * @param array $method_sig | 
|      * @param AMQPWriter|string $args | 
|      * @param null $pkt | 
|      */ | 
|     protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) | 
|     { | 
|         $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt); | 
|         $this->write($pkt->getvalue()); | 
|         $this->debug->debug_method_signature1($method_sig); | 
|     } | 
|   | 
|     /** | 
|      * Returns a new AMQPWriter or mutates the provided $pkt | 
|      * | 
|      * @param string $channel | 
|      * @param array $method_sig | 
|      * @param AMQPWriter|string $args | 
|      * @param AMQPWriter $pkt | 
|      * @return AMQPWriter | 
|      */ | 
|     protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) | 
|     { | 
|         if ($args instanceof AMQPWriter) { | 
|             $args = $args->getvalue(); | 
|         } | 
|   | 
|         $pkt = $pkt ?: new AMQPWriter(); | 
|   | 
|         $pkt->write_octet(1); | 
|         $pkt->write_short($channel); | 
|         $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id | 
|         // in payload | 
|   | 
|         $pkt->write_short($method_sig[0]); // class_id | 
|         $pkt->write_short($method_sig[1]); // method_id | 
|         $pkt->write($args); | 
|   | 
|         $pkt->write_octet(0xCE); | 
|   | 
|         $this->debug->debug_method_signature1($method_sig); | 
|   | 
|         return $pkt; | 
|     } | 
|   | 
|     /** | 
|      * Waits for a frame from the server | 
|      * | 
|      * @param int $timeout | 
|      * @return array | 
|      * @throws \Exception | 
|      * @throws \PhpAmqpLib\Exception\AMQPTimeoutException | 
|      * @throws \PhpAmqpLib\Exception\AMQPRuntimeException | 
|      */ | 
|     protected function wait_frame($timeout = 0) | 
|     { | 
|         if (is_null($this->input)) | 
|         { | 
|             $this->setIsConnected(false); | 
|             throw new AMQPRuntimeException('Broken pipe or closed connection'); | 
|         } | 
|   | 
|         $currentTimeout = $this->input->getTimeout(); | 
|         $this->input->setTimeout($timeout); | 
|   | 
|         try { | 
|             // frame_type + channel_id + size | 
|             $this->wait_frame_reader->reuse( | 
|                 $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG) | 
|             ); | 
|   | 
|             $frame_type = $this->wait_frame_reader->read_octet(); | 
|             $class = self::$PROTOCOL_CONSTANTS_CLASS; | 
|             if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) { | 
|                 throw new AMQPRuntimeException('Invalid frame type ' . $frame_type); | 
|             } | 
|             $channel = $this->wait_frame_reader->read_short(); | 
|             $size = $this->wait_frame_reader->read_long(); | 
|   | 
|             // payload + ch | 
|             $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size)); | 
|   | 
|             $payload = $this->wait_frame_reader->read($size); | 
|             $ch = $this->wait_frame_reader->read_octet(); | 
|   | 
|         } catch (AMQPTimeoutException $e) { | 
|             $this->input->setTimeout($currentTimeout); | 
|             throw $e; | 
|         } | 
|   | 
|         $this->input->setTimeout($currentTimeout); | 
|   | 
|         if ($ch != 0xCE) { | 
|             throw new AMQPRuntimeException(sprintf( | 
|                 'Framing error, unexpected byte: %x', | 
|                 $ch | 
|             )); | 
|         } | 
|   | 
|         return array($frame_type, $channel, $payload); | 
|     } | 
|   | 
|     /** | 
|      * Waits for a frame from the server destined for a particular channel. | 
|      * | 
|      * @param string $channel_id | 
|      * @param int $timeout | 
|      * @return array | 
|      */ | 
|     protected function wait_channel($channel_id, $timeout = 0) | 
|     { | 
|         // Keeping the original timeout unchanged. | 
|         $_timeout = $timeout; | 
|         while (true) { | 
|             $now = time(); | 
|             list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout); | 
|   | 
|             if ($frame_channel === 0 && $frame_type === 8) { | 
|                 // skip heartbeat frames and reduce the timeout by the time passed | 
|                 if($_timeout > 0) { | 
|                     $_timeout -= time() - $now; | 
|                     if($_timeout <= 0) { | 
|                         // If timeout has been reached, throw the exception without calling wait_frame | 
|                         throw new AMQPTimeoutException("Timeout waiting on channel"); | 
|                     } | 
|                 } | 
|                 continue; | 
|   | 
|             } else { | 
|   | 
|                 if ($frame_channel == $channel_id) { | 
|                     return array($frame_type, $payload); | 
|                 } | 
|   | 
|                 // Not the channel we were looking for.  Queue this frame | 
|                 //for later, when the other channel is looking for frames. | 
|                 // Make sure the channel still exists, it could have been | 
|                 // closed by a previous Exception. | 
|                 if (isset($this->channels[$frame_channel])) { | 
|                     array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload)); | 
|                 } | 
|   | 
|                 // If we just queued up a method for channel 0 (the Connection | 
|                 // itself) it's probably a close method in reaction to some | 
|                 // error, so deal with it right away. | 
|                 if (($frame_type == 1) && ($frame_channel == 0)) { | 
|                     $this->wait(); | 
|                 } | 
|             } | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * Fetches a channel object identified by the numeric channel_id, or | 
|      * create that object if it doesn't already exist. | 
|      * | 
|      * @param string $channel_id | 
|      * @return AMQPChannel | 
|      */ | 
|     public function channel($channel_id = null) | 
|     { | 
|         if (isset($this->channels[$channel_id])) { | 
|             return $this->channels[$channel_id]; | 
|         } | 
|   | 
|         $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id(); | 
|         $ch = new AMQPChannel($this->connection, $channel_id); | 
|         $this->channels[$channel_id] = $ch; | 
|   | 
|         return $ch; | 
|     } | 
|   | 
|     /** | 
|      * Requests a connection close | 
|      * | 
|      * @param int $reply_code | 
|      * @param string $reply_text | 
|      * @param array $method_sig | 
|      * @return mixed|null | 
|      */ | 
|     public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) | 
|     { | 
|         if ($this->io instanceof StreamIO) | 
|         { | 
|             $this->io->disableHeartbeat(); | 
|         } | 
|   | 
|         if (!$this->protocolWriter || !$this->isConnected()) { | 
|             return null; | 
|         } | 
|   | 
|         $this->closeChannels(); | 
|   | 
|         list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( | 
|             $reply_code, | 
|             $reply_text, | 
|             $method_sig[0], | 
|             $method_sig[1] | 
|         ); | 
|         $this->send_method_frame(array($class_id, $method_id), $args); | 
|   | 
|         $this->setIsConnected(false); | 
|   | 
|         return $this->wait(array( | 
|             $this->waitHelper->get_wait('connection.close_ok') | 
|         )); | 
|     } | 
|   | 
|     /** | 
|      * @param AMQPReader $reader | 
|      * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException | 
|      */ | 
|     protected function connection_close(AMQPReader $reader) | 
|     { | 
|         $reply_code = $reader->read_short(); | 
|         $reply_text = $reader->read_shortstr(); | 
|         $class_id = $reader->read_short(); | 
|         $method_id = $reader->read_short(); | 
|   | 
|         $this->x_close_ok(); | 
|   | 
|         throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); | 
|     } | 
|   | 
|     /** | 
|      * Confirms a connection close | 
|      */ | 
|     protected function x_close_ok() | 
|     { | 
|         $this->send_method_frame( | 
|             explode(',', $this->waitHelper->get_wait('connection.close_ok')) | 
|         ); | 
|         $this->do_close(); | 
|     } | 
|   | 
|     /** | 
|      * Confirm a connection close | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_close_ok($args) | 
|     { | 
|         $this->do_close(); | 
|     } | 
|   | 
|     /** | 
|      * @param string $virtual_host | 
|      * @param string $capabilities | 
|      * @param bool $insist | 
|      * @return mixed | 
|      */ | 
|     protected function x_open($virtual_host, $capabilities = '', $insist = false) | 
|     { | 
|         $args = new AMQPWriter(); | 
|         $args->write_shortstr($virtual_host); | 
|         $args->write_shortstr($capabilities); | 
|         $args->write_bits(array($insist)); | 
|         $this->send_method_frame(array(10, 40), $args); | 
|   | 
|         $wait = array( | 
|             $this->waitHelper->get_wait('connection.open_ok') | 
|         ); | 
|   | 
|         if ($this->protocolVersion == '0.8') { | 
|             $wait[] = $this->waitHelper->get_wait('connection.redirect'); | 
|         } | 
|   | 
|         return $this->wait($wait); | 
|     } | 
|   | 
|     /** | 
|      * Signals that the connection is ready | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_open_ok($args) | 
|     { | 
|         $this->known_hosts = $args->read_shortstr(); | 
|         $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts); | 
|     } | 
|   | 
|     /** | 
|      * Asks the client to use a different server | 
|      * | 
|      * @param AMQPReader $args | 
|      * @return string | 
|      */ | 
|     protected function connection_redirect($args) | 
|     { | 
|         $host = $args->read_shortstr(); | 
|         $this->known_hosts = $args->read_shortstr(); | 
|         $this->debug->debug_msg(sprintf( | 
|                 'Redirected to [%s], known_hosts [%s]', | 
|                 $host, | 
|                 $this->known_hosts | 
|             )); | 
|   | 
|         return $host; | 
|     } | 
|   | 
|     /** | 
|      * Security mechanism challenge | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_secure($args) | 
|     { | 
|         $challenge = $args->read_longstr(); | 
|     } | 
|   | 
|     /** | 
|      * Security mechanism response | 
|      * | 
|      * @param string $response | 
|      */ | 
|     protected function x_secure_ok($response) | 
|     { | 
|         $args = new AMQPWriter(); | 
|         $args->write_longstr($response); | 
|         $this->send_method_frame(array(10, 21), $args); | 
|     } | 
|   | 
|     /** | 
|      * Starts connection negotiation | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_start($args) | 
|     { | 
|         $this->version_major = $args->read_octet(); | 
|         $this->version_minor = $args->read_octet(); | 
|         $this->server_properties = $args->read_table(); | 
|         $this->mechanisms = explode(' ', $args->read_longstr()); | 
|         $this->locales = explode(' ', $args->read_longstr()); | 
|   | 
|         $this->debug->debug_connection_start( | 
|             $this->version_major, | 
|             $this->version_minor, | 
|             $this->server_properties, | 
|             $this->mechanisms, | 
|             $this->locales | 
|         ); | 
|     } | 
|   | 
|     /** | 
|      * @param AMQPTable|array $clientProperties | 
|      * @param string $mechanism | 
|      * @param string $response | 
|      * @param string $locale | 
|      */ | 
|     protected function x_start_ok($clientProperties, $mechanism, $response, $locale) | 
|     { | 
|         $args = new AMQPWriter(); | 
|         $args->write_table($clientProperties); | 
|         $args->write_shortstr($mechanism); | 
|         $args->write_longstr($response); | 
|         $args->write_shortstr($locale); | 
|         $this->send_method_frame(array(10, 11), $args); | 
|     } | 
|   | 
|     /** | 
|      * Proposes connection tuning parameters | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_tune($args) | 
|     { | 
|         $v = $args->read_short(); | 
|         if ($v) { | 
|             $this->channel_max = $v; | 
|         } | 
|   | 
|         $v = $args->read_long(); | 
|         if ($v) { | 
|             $this->frame_max = $v; | 
|         } | 
|   | 
|         // use server proposed value if not set | 
|         if ($this->heartbeat === null) { | 
|             $this->heartbeat = $args->read_short(); | 
|         } | 
|   | 
|         $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat); | 
|     } | 
|   | 
|     /** | 
|      * Negotiates connection tuning parameters | 
|      * | 
|      * @param int $channel_max | 
|      * @param int $frame_max | 
|      * @param int $heartbeat | 
|      */ | 
|     protected function x_tune_ok($channel_max, $frame_max, $heartbeat) | 
|     { | 
|         $args = new AMQPWriter(); | 
|         $args->write_short($channel_max); | 
|         $args->write_long($frame_max); | 
|         $args->write_short($heartbeat); | 
|         $this->send_method_frame(array(10, 31), $args); | 
|         $this->wait_tune_ok = false; | 
|     } | 
|   | 
|     /** | 
|      * @return SocketIO | 
|      */ | 
|     public function getSocket() | 
|     { | 
|         return $this->io->getSocket(); | 
|     } | 
|   | 
|     /** | 
|      * @return \PhpAmqpLib\Wire\IO\AbstractIO | 
|      */ | 
|     protected function getIO() | 
|     { | 
|         return $this->io; | 
|     } | 
|   | 
|     /** | 
|      * Handles connection blocked notifications | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_blocked(AMQPReader $args) | 
|     { | 
|         // Call the block handler and pass in the reason | 
|         $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr())); | 
|     } | 
|   | 
|     /** | 
|      * Handles connection unblocked notifications | 
|      * | 
|      * @param AMQPReader $args | 
|      */ | 
|     protected function connection_unblocked(AMQPReader $args) | 
|     { | 
|         // No args to an unblock event | 
|         $this->dispatch_to_handler($this->connection_unblock_handler, array()); | 
|     } | 
|   | 
|     /** | 
|      * Sets a handler which is called whenever a connection.block is sent from the server | 
|      * | 
|      * @param callable $callback | 
|      */ | 
|     public function set_connection_block_handler($callback) | 
|     { | 
|         $this->connection_block_handler = $callback; | 
|     } | 
|   | 
|     /** | 
|      * Sets a handler which is called whenever a connection.block is sent from the server | 
|      * | 
|      * @param callable $callback | 
|      */ | 
|     public function set_connection_unblock_handler($callback) | 
|     { | 
|         $this->connection_unblock_handler = $callback; | 
|     } | 
|   | 
|     /** | 
|      * Gets the connection status | 
|      * | 
|      * @return bool | 
|      */ | 
|     public function isConnected() | 
|     { | 
|         return (bool) $this->is_connected; | 
|     } | 
|   | 
|     /** | 
|      * Set the connection status | 
|      * | 
|      * @param bool $is_connected | 
|      */ | 
|     protected function setIsConnected($is_connected) | 
|     { | 
|         $this->is_connected = (bool) $is_connected; | 
|     } | 
|   | 
|     /** | 
|      * Closes all available channels | 
|      */ | 
|     protected function closeChannels() | 
|     { | 
|         foreach ($this->channels as $key => $channel) { | 
|             // channels[0] is this connection object, so don't close it yet | 
|             if ($key === 0) { | 
|                 continue; | 
|             } | 
|             try { | 
|                 $channel->close(); | 
|             } catch (\Exception $e) { | 
|                 /* Ignore closing errors */ | 
|             } | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * Should the connection be attempted during construction? | 
|      * | 
|      * @return bool | 
|      */ | 
|     public function connectOnConstruct() | 
|     { | 
|         return true; | 
|     } | 
|   | 
|     /** | 
|      * @return array | 
|      */ | 
|     public function getServerProperties() | 
|     { | 
|         return $this->server_properties; | 
|     } | 
| } |