kaiyu
2020-09-30 786577d75f2221e95d5d53e60338cc20cc82df71
添加新的测试websocket接口
1 files added
3 files modified
199 ■■■■■ changed files
src/main/java/com/moral/config/WebSocketConfig.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/controller/WebController.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java 194 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/config/WebSocketConfig.java
@@ -2,6 +2,7 @@
import com.moral.service.DeviceService;
import com.moral.service.SensorService;
import com.moral.webSocketServer.BSTestWebsocketServer;
import com.moral.webSocketServer.BSWebsocketServer;
import com.moral.webSocketServer.WebSocketServer;
@@ -32,9 +33,11 @@
        WebSocketServer.deviceService=deviceService;
        WebSocketServerNew.deviceService=deviceService;
        BSWebsocketServer.deviceService=deviceService;
        BSTestWebsocketServer.deviceService=deviceService;
    }
    @Autowired
    public void setSensorService( SensorService sensorService){
        BSTestWebsocketServer.sensorService=sensorService;
        BSWebsocketServer.sensorService=sensorService;
    }
src/main/java/com/moral/controller/WebController.java
@@ -49,6 +49,7 @@
    @Resource
    DeviceService deviceService;
    @UserLoginToken
    @GetMapping("test")
    public String add() {
src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java
New file
@@ -0,0 +1,194 @@
package com.moral.webSocketServer;
import com.alibaba.fastjson.JSON;
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.entity.Sensor;
import com.moral.service.DeviceService;
import com.moral.service.SensorService;
import com.moral.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ServerEndpoint("/web/testWebSocket/{param}")
@Component
public class BSTestWebsocketServer {
    public static DeviceService deviceService;
    public static SensorService sensorService;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, BSTestWebsocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    private String orgId;
    private String accountId;
    private String regionCode;
    private String mac;
    private final String exchange = "screens_data";
    private static Map<String, Sensor> sensors;
    @PostConstruct
    public void init() {
        sensors = new HashMap<>();
        List<Sensor> allSensors = sensorService.getAllSensors();
        for (Sensor sensor : allSensors) {
            sensors.put(sensor.getSensorKey(), sensor);
        }
    }
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("param") String param) {
        this.session = session;
        String[] params = param.split("&");
        this.accountId = params[0];
        this.orgId = params[1];
        this.regionCode = params[2];
        if (webSocketMap.containsKey(accountId)) {
            webSocketMap.remove(accountId);
            webSocketMap.put(accountId, this);
        } else {
            webSocketMap.put(accountId, this);
        }
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("orgId", orgId);
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        try {
            Connection connection = RabbitMQUtils.getConnection();
            final Channel channel = connection.createChannel();
            //生成临时队列
            String queue = channel.queueDeclare().getQueue();
            //交换机与队列通过routingKey进行绑定
            String routingKey = "";
            for (Device d : deviceList) {
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(queue, exchange, routingKey);
            }
            //消费消息,手动确认模式。
            channel.basicQos(1);//每次只消费一条数据
            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                    sendWindInfo(message);
                    if(mac!=null&&(!mac.equals(0)))
                        sendDeviceInfo(message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    //判断socket是否已经断开
                    if (!webSocketMap.containsKey(accountId)) {
                        RabbitMQUtils.closeConnectionChannel(connection, channel);
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(accountId)) {
            webSocketMap.remove(accountId);
        }
    }
    //接收到客户端消息操作
    @OnMessage
    public void onMessage(String message, Session session) {
        if (!ObjectUtils.isEmpty(message)) {
            System.out.println(message);
            Map<String, Object> map = JSON.parseObject(message);
            this.mac = (String) map.get("mac");
        }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error(error.getMessage());
    }
    public void sendMessage(String message) throws IOException {
        try {
            if (session.isOpen()) {
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            //log.error(e.getMessage());
        }
    }
    //发送风向标数据
    private void sendWindInfo(Map<String, Object> param) {
        try {
            Map<String, Object> map = new HashMap<>();
            if (param.get("e23") != null && param.get("mac") != null) {
                map.put("风向", param.get("e23"));
                map.put("mac", param.get("mac"));
                sendMessage(JSON.toJSONString(map));
            }
        } catch (IOException e) {
            log.error("发送风向标数据异常");
        }
    }
    //根据mac发送device数据
    private void sendDeviceInfo(Map<String, Object> param) {
        String deviceMac = (String) param.get("mac");
        if (mac.equals(deviceMac)) {
            try {
                Map<String, Object> map = new HashMap<>();
                param.forEach((key, value) -> {
                    Sensor sensor = sensors.get(key);
                    if (!ObjectUtils.isEmpty(sensor)) {
                        String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit();
                        map.put(sensor.getName(), value + unit);
                    }
                });
                String mac = (String) param.get("mac");
                map.put("mac", mac);
                sendMessage(JSON.toJSONString(map));
            } catch (IOException e) {
                log.error("根据mac发送设备数据异常");
            }
        }
    }
}
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
@@ -142,6 +142,7 @@
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            //log.error(e.getMessage());
        }
    }