| | |
| | | import com.moral.common.util.ParameterUtils; |
| | | import com.moral.entity.Device; |
| | | import com.moral.service.DeviceService; |
| | | import com.moral.util.RabbitMQUtils; |
| | | import com.rabbitmq.client.AMQP; |
| | | import com.rabbitmq.client.Channel; |
| | | import com.rabbitmq.client.Connection; |
| | | import com.rabbitmq.client.ConnectionFactory; |
| | | import com.rabbitmq.client.Consumer; |
| | | import com.rabbitmq.client.DefaultConsumer; |
| | | import com.rabbitmq.client.Envelope; |
| | | import com.rabbitmq.client.QueueingConsumer; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | |
| | | this.session = session; |
| | | //这个一定要写,第一次很容易忽略! |
| | | webSocketSet.add(this); |
| | | |
| | | int flag = param.indexOf("&"); |
| | | int regionCodeIndex = param.indexOf("_"); |
| | | orgId = param.substring(0, flag); |
| | | accountId = param.substring(flag + 1,regionCodeIndex); |
| | | accountId = param.substring(flag + 1, regionCodeIndex); |
| | | regionCode = param.substring(regionCodeIndex + 1); |
| | | String QUEUE_NAME = "deviceInfo_" + accountId; |
| | | |
| | | Map<String, Object> paramMap = new HashMap<String, Object>(); |
| | | paramMap.put("orgId", orgId); |
| | | paramMap.put("regionCode", regionCode); |
| | | ParameterUtils.getRegionType4RegionCode(paramMap); |
| | | List<Device> deviceList = deviceService.queryDevice(paramMap); |
| | | |
| | | |
| | | try { |
| | | //打开连接和创建频道,与发送端一样 |
| | | ConnectionFactory factory = new ConnectionFactory(); |
| | | //设置MabbitMQ所在主机ip或者主机名 |
| | | factory.setHost("47.96.15.25"); |
| | | factory.setPort(5672); |
| | | factory.setUsername("guest"); |
| | | factory.setPassword("guest_pass"); |
| | | String routingKey; |
| | | connection = factory.newConnection(); |
| | | channel = connection.createChannel(); |
| | | this.connection = RabbitMQUtils.getConnection(); |
| | | channel = this.connection.createChannel(); |
| | | //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 |
| | | channel.queueDeclare(QUEUE_NAME, false, false, true, null); |
| | | for (Device d : deviceList) { |
| | | routingKey = orgId + "." + d.getMac(); |
| | | channel.queueBind(QUEUE_NAME, "screens_data", routingKey); |
| | | } |
| | | //创建队列消费者 |
| | | QueueingConsumer consumer = new QueueingConsumer(channel); |
| | | //指定消费队列 |
| | | channel.basicConsume(QUEUE_NAME, true, consumer); |
| | | |
| | | while (true) { |
| | | //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) |
| | | QueueingConsumer.Delivery delivery = consumer.nextDelivery(); |
| | | String message = new String(delivery.getBody()); |
| | | sendMessage(message); |
| | | } |
| | | channel.basicQos(30);//预先读取数 |
| | | channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | String msg = new String(body, "utf-8"); |
| | | sendMessage(msg); |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | } |
| | | }); |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage()); |
| | | } |
| | |
| | | /**从安全Set中 移除当前连接对象*/ |
| | | webSocketSet.remove(this); |
| | | try { |
| | | channel.queueDelete("deviceInfo_" + accountId); |
| | | connection.close(); |
| | | } catch (IOException e) { |
| | | log.error(e.getMessage()); |