kaiyu
2020-11-24 bafbd7130529e2a2c1e4cb6461eb5649c53df027
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
@@ -6,8 +6,8 @@
import com.moral.entity.Sensor;
import com.moral.service.DeviceService;
import com.moral.service.SensorService;
import com.rabbitmq.client.*;
import com.moral.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -18,13 +18,15 @@
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ServerEndpoint("/web/webSocket/{param}")
@ServerEndpoint("/web/WebSocket/{param}")
@Component
public class BSWebsocketServer {
    public static DeviceService deviceService;
@@ -32,7 +34,7 @@
    public static SensorService sensorService;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
     */
    private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
@@ -46,9 +48,12 @@
    private String regionCode;
    private String mac;
    private final String exchange = "screens_data";
    private static Map<String, Sensor> sensors;
    @PostConstruct
@@ -83,32 +88,42 @@
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        Map<String,Device> deviceMap = new HashMap<>();
        try {
            Connection connection = RabbitMQUtils.getConnection();
            final Channel channel = connection.createChannel();
            //生成临时队列
            String queue = channel.queueDeclare().getQueue();
            //交换机与队列通过routingKey进行绑定
            String routingKey = "";
            for (Device d : deviceList) {
                deviceMap.put(d.getMac(),d);
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(queue, exchange, routingKey);
            }
            //消费消息,手动确认模式。
            channel.basicQos(1);//每次只消费一条数据
            channel.basicQos(30);//预先读取数
            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //对从MQ中取出的数据做转换,并且发送风速到客户端
                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                    message = messageFormat(message,deviceList);
                    sendMessage(JSON.toJSONString(message));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    sendWindInfo(message);
                    //判断是否接收到客户端发送的mac,如果接收到则返回指定mac设备信息
                    if(mac!=null&&(!mac.equals(0)))
                        sendDeviceInfo(message,deviceMap);
                    //手动确认
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    //判断socket是否已经断开
                    if (!webSocketMap.containsKey(accountId)) {
                        RabbitMQUtils.closeConnectionChannel(connection,channel);
                        RabbitMQUtils.closeConnectionChannel(connection, channel);
                    }
                }
            });
@@ -128,7 +143,10 @@
    //接收到客户端消息操作
    @OnMessage
    public void onMessage(String message, Session session) {
        if (!ObjectUtils.isEmpty(message)) {
            Map<String, Object> map = JSON.parseObject(message);
            this.mac = (String) map.get("mac");
        }
    }
    @OnError
@@ -142,31 +160,65 @@
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            //log.error(e.getMessage());
        }
    }
    //将MQ取出的数据进行格式化处理,并且将经纬度注入
    private Map<String,Object> messageFormat( Map<String,Object> param,List<Device> deviceList) {
        Map<String, Object> map = new HashMap<>();
        param.forEach((key, value) -> {
            Sensor sensor = sensors.get(key);
            if (!ObjectUtils.isEmpty(sensor)) {
                String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit();
                map.put(sensor.getName(),value+unit);
    /**
    * @Description: 从客户端连接到socket就一直开始发送风向数据
            * @Param: [param]
            * @return: void
            * @Author: 陈凯裕
            * @Date: 2020/9/30
            */
    private void sendWindInfo(Map<String, Object> param) {
        try {
            Map<String, Object> map = new HashMap<>();
            if (param.get("e23") != null && param.get("mac") != null) {
                map.put("风向", param.get("e23"));
                map.put("mac", param.get("mac"));
                sendMessage(JSON.toJSONString(map));
            }
        });
        String mac = (String) param.get("mac");
        for (Device device : deviceList) {
            if(mac.equals(device.getMac())){
                map.put("纬度",device.getLatitude());
                map.put("经度",device.getLongitude());
                map.put("状态",device.getState());
                break;
        } catch (IOException e) {
            log.error("发送风向标数据异常");
        }
    }
    /**
    * @Description: 接收前端mac以及accountid,传送前端指定mac号的设备信息
            * @Param: [param]
            * @return: void
            * @Author: 陈凯裕
            * @Date: 2020/9/30
            */
    private void sendDeviceInfo(Map<String, Object> param,Map<String,Device> deviceMap) {
        String deviceMac = (String) param.get("mac");
        if (mac.equals(deviceMac)) {
            try {
                Map<String,Object> sortMap = new LinkedHashMap<>();
                //先放名称地址,用于排序
                    Device device =deviceMap.get(mac);
                    sortMap.put("名称",device.getName());
                    sortMap.put("地址",device.getAddress());
                //将传感器代号转化为汉字
                param.forEach((key, value) -> {
                    Sensor sensor = sensors.get(key);
                    if (!ObjectUtils.isEmpty(sensor)) {
                        String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit();
                        sortMap.put(sensor.getName(), value + unit);
                    }
                });
                sendMessage(JSON.toJSONString(sortMap));
            } catch (IOException e) {
                log.error("根据mac发送设备数据异常");
            }
        }
        map.put("mac",mac);
        return map;
    }
}