kaiyu
2020-09-25 14e69dcc83fcd1ebeefe1aaa7ccf485503a27e21
BS获取风向以及站点信息Webscoket接口
2 files added
6 files modified
284 ■■■■ changed files
src/main/java/com/moral/common/bean/ResultBean.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/config/WebSocketConfig.java 11 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/controller/ScreenController.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/controller/WebController.java 58 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/util/RabbitMQUtils.java 47 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java 159 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/WebSocketServer.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/WebSocketServerNew.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/common/bean/ResultBean.java
@@ -32,6 +32,7 @@
        this.message = e.toString();
        this.code = FAIL;
        this.errno = 1;
    }
    public ResultBean(T data) {
src/main/java/com/moral/config/WebSocketConfig.java
@@ -1,6 +1,8 @@
package com.moral.config;
import com.moral.service.DeviceService;
import com.moral.service.SensorService;
import com.moral.webSocketServer.BSWebsocketServer;
import com.moral.webSocketServer.WebSocketServer;
import com.moral.webSocketServer.WebSocketServerNew;
@@ -15,10 +17,10 @@
public class WebSocketConfig {
    //本地测试需要将注释解开
    /*@Bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }*/
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
@@ -29,6 +31,11 @@
    public void setMessageService(DeviceService deviceService){
        WebSocketServer.deviceService=deviceService;
        WebSocketServerNew.deviceService=deviceService;
        BSWebsocketServer.deviceService=deviceService;
    }
    @Autowired
    public void setSensorService( SensorService sensorService){
        BSWebsocketServer.sensorService=sensorService;
    }
}
src/main/java/com/moral/controller/ScreenController.java
@@ -1614,6 +1614,7 @@
    @RequestMapping(value = "/newMap-page", method = RequestMethod.GET)
    public ModelAndView newMap(ModelAndView model, @RequestParam("areaCode") long code,
                               @RequestParam("accountId") int accountId) {
        Account account = accountService.getAccountById(accountId);
        List<MonitorPoint> monitorPointList = monitorPointService.getMonitorPointListByAccountId(accountId);
        String regionName = areaService.queryFullNameByCode(code);
src/main/java/com/moral/controller/WebController.java
@@ -136,64 +136,6 @@
        return resultMap;
    }
    //弃用
    @UserLoginToken
    @RequestMapping("getAccountInfoTest")
    public Map<String, Object> getAccountInfoTest(@RequestBody Map<String, Object> parameters) {
        Map<String, Object> resultMap = new HashMap<String, Object>();
        System.out.println(parameters);
        if (!(parameters.containsKey("account") && parameters.containsKey("password"))) {
            resultMap.put("msg", "用户名及密码不允许为空!");
            resultMap.put("accountId", -1);
        } else {
            resultMap = accountService.screenLoginNew(parameters);
            // 添加返回行政区信息字符串操作
            Object orgId = resultMap.get("orgId");
            if (resultMap.get("orgId") != null && resultMap.get("orgId") instanceof Integer) {
                StringBuilder areaNamesBuilder = new StringBuilder("中国");
                //判断是否为本公司开发者
                if (!((Integer) orgId).equals(dictionaryDataService.querySupperOrgId())) {
                    //不是本公司开发者则获取用户所属地区
                    Organization organization = organizationService.getOrganizationById((Integer) orgId);
                    if (organization.getAreaNames() != null) {
                        Map<String, String> areaNameMap = BeanUtils.beanToMap(organization.getAreaNames());
                        List<String> names = areaNameMap.entrySet().stream().filter(item -> {
                            return item.getValue() != null;
                        }).map(item -> {
                            return item.getValue();
                        }).collect(Collectors.toList());
                        AreaNames areaNames = organization.getAreaNames();
                        areaNamesBuilder.append("/");
                        areaNamesBuilder.append(String.join("/", names));
                    }
                    // 企业用户
                    if (organization.getRank() != null && organization.getRank() == 0) {
                        resultMap.put("type", "enterprise");
                    } else {
                        resultMap.put("type", "government");
                    }
                    Number mapAreaCode = null;
                    if (organization.getVillageCode() != null) {
                        mapAreaCode = organization.getVillageCode();
                    } else if (organization.getTownCode() != null) {
                        mapAreaCode = organization.getTownCode();
                    } else if (organization.getAreaCode() != null) {
                        mapAreaCode = organization.getAreaCode();
                    } else if (organization.getCityCode() != null) {
                        mapAreaCode = organization.getCityCode();
                    } else if (organization.getProvinceCode() != null) {
                        mapAreaCode = organization.getProvinceCode();
                    }
                    resultMap.put("mapAreaCode", mapAreaCode.toString());
                }
                resultMap.put("mapPath", areaNamesBuilder.toString());
                String accountId = String.valueOf(resultMap.get("accountId"));
                resultMap.put("token", webTokenService.getToken(accountId));
            }
        }
        return resultMap;
    }
    @UserLoginToken
    @GetMapping("report_avg_datas")
src/main/java/com/moral/util/RabbitMQUtils.java
New file
@@ -0,0 +1,47 @@
package com.moral.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
    private  static ConnectionFactory factory ;
    static{
        factory = new ConnectionFactory();
        factory.setHost("47.96.15.25");//设置连接地址
        factory.setPort(5672);//设置端口
        factory.setUsername("guest");
        factory.setPassword("guest_pass");
        factory.setVirtualHost("/");//设置虚拟主机
    }
    public static Connection getConnection(){
        try{
            return factory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    public static void closeConnectionChannel(Connection connection, Channel channel){
        try{
            if(channel!=null)
                channel.close();
            if(connection!=null)
                connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void closeChannel( Channel channel){
        try{
            if(channel!=null)
                channel.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
New file
@@ -0,0 +1,159 @@
package com.moral.webSocketServer;
import com.alibaba.fastjson.JSON;
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.entity.Sensor;
import com.moral.service.DeviceService;
import com.moral.service.SensorService;
import com.rabbitmq.client.*;
import com.moral.util.RabbitMQUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ServerEndpoint("/web/webSocket/{param}")
@Component
public class BSWebsocketServer {
    public static DeviceService deviceService;
    public static SensorService sensorService;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    private String orgId;
    private String accountId;
    private String regionCode;
    private final String exchange = "screens_data";
    private static Map<String, Sensor> sensors;
    @PostConstruct
    public void init() {
        sensors = new HashMap<>();
        List<Sensor> allSensors = sensorService.getAllSensors();
        for (Sensor sensor : allSensors) {
            sensors.put(sensor.getSensorKey(), sensor);
        }
    }
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("param") String param) {
        this.session = session;
        String[] params = param.split("&");
        this.orgId = params[1];
        this.accountId = params[0];
        this.regionCode = params[2];
        if (webSocketMap.containsKey(accountId)) {
            webSocketMap.remove(accountId);
            webSocketMap.put(accountId, this);
        } else {
            webSocketMap.put(accountId, this);
        }
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("orgId", orgId);
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        try {
            Connection connection = RabbitMQUtils.getConnection();
            final Channel channel = connection.createChannel();
            //生成临时队列
            String queue = channel.queueDeclare().getQueue();
            //交换机与队列通过routingKey进行绑定
            String routingKey = "";
            for (Device d : deviceList) {
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(queue, exchange, routingKey);
            }
            //消费消息,手动确认模式。
            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                    message = messageFormat(message);
                    sendMessage(JSON.toJSONString(message));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(accountId)) {
            webSocketMap.remove(accountId);
        }
    }
    //接收到客户端消息操作
    @OnMessage
    public void onMessage(String message, Session session) {
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error(error.getMessage());
    }
    public void sendMessage(String message) throws IOException {
        try {
            if (session.isOpen()) {
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            //log.error(e.getMessage());
        }
    }
    //将MQ取出的数据进行格式化处理
    private Map<String,Object> messageFormat( Map<String,Object> param) {
        Map<String, Object> map = new HashMap<>();
        param.forEach((key, value) -> { ;
            Sensor sensor = sensors.get(key);
            if (!ObjectUtils.isEmpty(sensor)) {
                String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit();
                map.put(sensor.getName(),value+unit);
            }
        });
        map.put("mac",param.get("mac"));
        return map;
    }
}
src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -62,17 +62,21 @@
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        webSocketSet.add(this);
        int flag = param.indexOf("&");
        int regionCodeIndex = param.indexOf("_");
        orgId = param.substring(0, flag);
        accountId = param.substring(flag + 1,regionCodeIndex);
        regionCode = param.substring(regionCodeIndex + 1);
        String QUEUE_NAME = "deviceInfo_" + accountId;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("orgId", orgId);
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        try {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
src/main/java/com/moral/webSocketServer/WebSocketServerNew.java
@@ -52,12 +52,13 @@
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        webSocketSet.add(this);
        //取参
        String[] flag=param.split("&");
        String orgId = flag[0];
        String accountId = flag[1];
        String monitPointId = flag[2];
        String p=flag[3];
        String QUEUE_NAME = "deviceData_" + accountId;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        List<Device> deviceList = deviceService.getDeviceById2(Integer.parseInt(monitPointId));