|  |  | 
 |  |  | import com.moral.common.util.ParameterUtils; | 
 |  |  | import com.moral.entity.Device; | 
 |  |  | import com.moral.service.DeviceService; | 
 |  |  | import com.moral.util.RabbitMQUtils; | 
 |  |  | import com.rabbitmq.client.AMQP; | 
 |  |  | import com.rabbitmq.client.Channel; | 
 |  |  | import com.rabbitmq.client.Connection; | 
 |  |  | import com.rabbitmq.client.ConnectionFactory; | 
 |  |  | import com.rabbitmq.client.Consumer; | 
 |  |  | import com.rabbitmq.client.DefaultConsumer; | 
 |  |  | import com.rabbitmq.client.Envelope; | 
 |  |  | import com.rabbitmq.client.QueueingConsumer; | 
 |  |  |  | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  |  | 
 |  |  | import org.springframework.stereotype.Component; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  | 
 |  |  |         this.session = session; | 
 |  |  |         //这个一定要写,第一次很容易忽略! | 
 |  |  |         webSocketSet.add(this); | 
 |  |  |  | 
 |  |  |         int flag = param.indexOf("&"); | 
 |  |  |         int regionCodeIndex = param.indexOf("_"); | 
 |  |  |         orgId = param.substring(0, flag); | 
 |  |  |         accountId = param.substring(flag + 1,regionCodeIndex); | 
 |  |  |         accountId = param.substring(flag + 1, regionCodeIndex); | 
 |  |  |         regionCode = param.substring(regionCodeIndex + 1); | 
 |  |  |         String QUEUE_NAME = "deviceInfo_" + accountId; | 
 |  |  |  | 
 |  |  |         Map<String, Object> paramMap = new HashMap<String, Object>(); | 
 |  |  |         paramMap.put("orgId", orgId); | 
 |  |  |         paramMap.put("regionCode", regionCode); | 
 |  |  |         ParameterUtils.getRegionType4RegionCode(paramMap); | 
 |  |  |         List<Device> deviceList = deviceService.queryDevice(paramMap); | 
 |  |  |  | 
 |  |  |  | 
 |  |  |         try { | 
 |  |  |             //打开连接和创建频道,与发送端一样 | 
 |  |  |             ConnectionFactory factory = new ConnectionFactory(); | 
 |  |  |             //设置MabbitMQ所在主机ip或者主机名 | 
 |  |  |             factory.setHost("47.96.15.25"); | 
 |  |  |             factory.setPort(5672); | 
 |  |  |             factory.setUsername("guest"); | 
 |  |  |             factory.setPassword("guest_pass"); | 
 |  |  |             String routingKey; | 
 |  |  |             connection = factory.newConnection(); | 
 |  |  |             channel = connection.createChannel(); | 
 |  |  |             this.connection = RabbitMQUtils.getConnection(); | 
 |  |  |             channel = this.connection.createChannel(); | 
 |  |  |             //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 | 
 |  |  |             channel.queueDeclare(QUEUE_NAME, false, false, true, null); | 
 |  |  |             for (Device d : deviceList) { | 
 |  |  |                 routingKey = orgId + "." + d.getMac(); | 
 |  |  |                 channel.queueBind(QUEUE_NAME, "screens_data", routingKey); | 
 |  |  |             } | 
 |  |  |             //创建队列消费者 | 
 |  |  |             QueueingConsumer consumer = new QueueingConsumer(channel); | 
 |  |  |             //指定消费队列 | 
 |  |  |             channel.basicConsume(QUEUE_NAME, true, consumer); | 
 |  |  |  | 
 |  |  |             while (true) { | 
 |  |  |                 //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) | 
 |  |  |                 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); | 
 |  |  |                 String message = new String(delivery.getBody()); | 
 |  |  |                 sendMessage(message); | 
 |  |  |             } | 
 |  |  |             channel.basicQos(30);//预先读取数 | 
 |  |  |             channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ | 
 |  |  |                 @Override | 
 |  |  |                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | 
 |  |  |                     String msg = new String(body, "utf-8"); | 
 |  |  |                     sendMessage(msg); | 
 |  |  |                     channel.basicAck(envelope.getDeliveryTag(), true); | 
 |  |  |                 } | 
 |  |  |             }); | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             log.error(e.getMessage()); | 
 |  |  |         } | 
 |  |  | 
 |  |  |  | 
 |  |  |     @OnMessage | 
 |  |  |     public void onMessage(String message) { | 
 |  |  |         System.out.println(message); | 
 |  |  |         for (WebSocketServer webSocketServer : webSocketSet) { | 
 |  |  |             webSocketServer.sendMessage(message); | 
 |  |  |         } |