|  |  | 
 |  |  |  | 
 |  |  |     private String regionCode; | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     private final String exchange = "screens_data"; | 
 |  |  |  | 
 |  |  |     private static Map<String, Sensor> sensors; | 
 |  |  | 
 |  |  |     public void onOpen(Session session, @PathParam("param") String param) { | 
 |  |  |         this.session = session; | 
 |  |  |         String[] params = param.split("&"); | 
 |  |  |         this.orgId = params[1]; | 
 |  |  |         this.accountId = params[0]; | 
 |  |  |         this.orgId = params[1]; | 
 |  |  |         this.regionCode = params[2]; | 
 |  |  |  | 
 |  |  |         if (webSocketMap.containsKey(accountId)) { | 
 |  |  | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             //消费消息,手动确认模式。 | 
 |  |  |             channel.basicQos(1);//每次只消费一条数据 | 
 |  |  |             channel.basicConsume(queue, false, new DefaultConsumer(channel) { | 
 |  |  |                 @Override | 
 |  |  |                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | 
 |  |  |                     Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); | 
 |  |  |                     message = messageFormat(message); | 
 |  |  |                     message = messageFormat(message,deviceList); | 
 |  |  |                     sendMessage(JSON.toJSONString(message)); | 
 |  |  |                     channel.basicAck(envelope.getDeliveryTag(), false); | 
 |  |  |                     //判断socket是否已经断开 | 
 |  |  |                     if (!webSocketMap.containsKey(accountId)) { | 
 |  |  |                         RabbitMQUtils.closeConnectionChannel(connection,channel); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |             }); | 
 |  |  |         } catch (IOException e) { | 
 |  |  | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     //将MQ取出的数据进行格式化处理 | 
 |  |  |     private Map<String,Object> messageFormat( Map<String,Object> param) { | 
 |  |  |     //将MQ取出的数据进行格式化处理,并且将经纬度注入 | 
 |  |  |     private Map<String,Object> messageFormat( Map<String,Object> param,List<Device> deviceList) { | 
 |  |  |         Map<String, Object> map = new HashMap<>(); | 
 |  |  |         param.forEach((key, value) -> { ; | 
 |  |  |         param.forEach((key, value) -> { | 
 |  |  |             Sensor sensor = sensors.get(key); | 
 |  |  |             if (!ObjectUtils.isEmpty(sensor)) { | 
 |  |  |                 String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit(); | 
 |  |  |                 map.put(sensor.getName(),value+unit); | 
 |  |  |             } | 
 |  |  |         }); | 
 |  |  |         map.put("mac",param.get("mac")); | 
 |  |  |         String mac = (String) param.get("mac"); | 
 |  |  |         for (Device device : deviceList) { | 
 |  |  |             if(mac.equals(device.getMac())){ | 
 |  |  |                 map.put("纬度",device.getLatitude()); | 
 |  |  |                 map.put("经度",device.getLongitude()); | 
 |  |  |                 break; | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         map.put("mac",mac); | 
 |  |  |         return map; | 
 |  |  |     } | 
 |  |  |  |