| | |
| | | |
| | | |
| | | try { |
| | | //打开连接和创建频道,与发送端一样 |
| | | /*ConnectionFactory factory = new ConnectionFactory(); |
| | | //设置MabbitMQ所在主机ip或者主机名 |
| | | factory.setHost("47.96.15.25"); |
| | | factory.setPort(5672); |
| | | factory.setUsername("guest"); |
| | | factory.setPassword("guest_pass");*/ |
| | | //Connection connection = RabbitMQUtils.getConnection(); |
| | | String routingKey; |
| | | this.connection = RabbitMQUtils.getConnection(); |
| | | channel = this.connection.createChannel(); |
| | |
| | | routingKey = orgId + "." + d.getMac(); |
| | | channel.queueBind(QUEUE_NAME, "screens_data", routingKey); |
| | | } |
| | | //创建队列消费者 |
| | | /*java废弃方法*/ |
| | | /*QueueingConsumer consumer = new QueueingConsumer(channel); |
| | | //指定消费队列 |
| | | channel.basicConsume(QUEUE_NAME, true, consumer); |
| | | |
| | | while (true) { |
| | | //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) |
| | | QueueingConsumer.Delivery delivery = consumer.nextDelivery(); |
| | | String message = new String(delivery.getBody()); |
| | | sendMessage(message); |
| | | }*/ |
| | | channel.basicQos(30);//预先读取数 |
| | | channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | String msg = new String(body, "utf-8"); |
| | | sendMessage(msg); |
| | | channel.basicAck(envelope.getDeliveryTag(), false); |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | } |
| | | }); |
| | | } catch (Exception e) { |