kaiyu
2020-12-15 1a406e3e206af1ab7e38042548a085b078b6ca09
src/main/java/com/moral/webSocketServer/BSAQIWebSocketServer.java
@@ -92,14 +92,19 @@
            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //对从MQ中取出的数据做转换,并且发送风速到客户端
                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                    Device device = deviceService.getDeviceByMac(mac,false);
                    sendDeviceInfo(message, device);
                    //手动确认
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    //判断socket是否已经断开
                    if (!webSocketMap.containsKey(accountId)) {
                    try{
                        //对从MQ中取出的数据做转换,并且发送风速到客户端
                        Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                        Device device = deviceService.getDeviceByMac(mac,false);
                        sendDeviceInfo(message, device);
                        //手动确认
                        channel.basicAck(envelope.getDeliveryTag(), true);
                        //判断socket是否已经断开
                        if (!webSocketMap.containsKey(accountId)) {
                            RabbitMQUtils.closeConnectionChannel(connection, channel);
                        }
                    }catch (Exception e){
                        log.error(e.getMessage());
                        RabbitMQUtils.closeConnectionChannel(connection, channel);
                    }
                }