| | |
| | | channel.basicConsume(queue, false, new DefaultConsumer(channel) { |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | //对从MQ中取出的数据做转换,并且发送风速到客户端 |
| | | Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); |
| | | Device device = deviceService.getDeviceByMac(mac,false); |
| | | sendDeviceInfo(message, device); |
| | | //手动确认 |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | //判断socket是否已经断开 |
| | | if (!webSocketMap.containsKey(accountId)) { |
| | | try{ |
| | | //对从MQ中取出的数据做转换,并且发送风速到客户端 |
| | | Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); |
| | | Device device = deviceService.getDeviceByMac(mac,false); |
| | | sendDeviceInfo(message, device); |
| | | //手动确认 |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | //判断socket是否已经断开 |
| | | if (!webSocketMap.containsKey(accountId)) { |
| | | RabbitMQUtils.closeConnectionChannel(connection, channel); |
| | | } |
| | | }catch (Exception e){ |
| | | log.error(e.getMessage()); |
| | | RabbitMQUtils.closeConnectionChannel(connection, channel); |
| | | } |
| | | } |