kaiyu
2020-09-28 a13506f193f38720ae2729f9c2ceb35bf9fdb898
更新获取风向和设备信息,添加经纬度,修改rabbit MQ代码。
3 files modified
32 ■■■■■ changed files
src/main/java/com/moral/config/WebSocketConfig.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java 26 ●●●● patch | view | raw | blame | history
src/main/resources/mapper/DeviceMapper.xml 2 ●●● patch | view | raw | blame | history
src/main/java/com/moral/config/WebSocketConfig.java
@@ -17,10 +17,10 @@
public class WebSocketConfig {
    //本地测试需要将注释解开
   /* @Bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }*/
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
@@ -46,7 +46,6 @@
    private String regionCode;
    private final String exchange = "screens_data";
    private static Map<String, Sensor> sensors;
@@ -68,8 +67,8 @@
    public void onOpen(Session session, @PathParam("param") String param) {
        this.session = session;
        String[] params = param.split("&");
        this.orgId = params[1];
        this.accountId = params[0];
        this.orgId = params[1];
        this.regionCode = params[2];
        if (webSocketMap.containsKey(accountId)) {
@@ -99,13 +98,18 @@
            }
            //消费消息,手动确认模式。
            channel.basicQos(1);//每次只消费一条数据
            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
                    message = messageFormat(message);
                    message = messageFormat(message,deviceList);
                    sendMessage(JSON.toJSONString(message));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    //判断socket是否已经断开
                    if (!webSocketMap.containsKey(accountId)) {
                        RabbitMQUtils.closeConnectionChannel(connection,channel);
                    }
                }
            });
        } catch (IOException e) {
@@ -142,17 +146,25 @@
        }
    }
    //将MQ取出的数据进行格式化处理
    private Map<String,Object> messageFormat( Map<String,Object> param) {
    //将MQ取出的数据进行格式化处理,并且将经纬度注入
    private Map<String,Object> messageFormat( Map<String,Object> param,List<Device> deviceList) {
        Map<String, Object> map = new HashMap<>();
        param.forEach((key, value) -> { ;
        param.forEach((key, value) -> {
            Sensor sensor = sensors.get(key);
            if (!ObjectUtils.isEmpty(sensor)) {
                String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit();
                map.put(sensor.getName(),value+unit);
            }
        });
        map.put("mac",param.get("mac"));
        String mac = (String) param.get("mac");
        for (Device device : deviceList) {
            if(mac.equals(device.getMac())){
                map.put("纬度",device.getLatitude());
                map.put("经度",device.getLongitude());
                break;
            }
        }
        map.put("mac",mac);
        return map;
    }
src/main/resources/mapper/DeviceMapper.xml
@@ -484,7 +484,7 @@
    </select>
    <select id="selectDevicesAll" parameterType="java.util.Map" resultType="com.moral.entity.Device">
        SELECT dev.id,dev.mac,dev.device_version_id deviceVersionId from device dev
        SELECT dev.id,dev.mac,dev.longitude,dev.latitude,dev.device_version_id deviceVersionId from device dev
        left join monitor_point mpt on dev.monitor_point_id = mpt.id
        <where>
            <if test="@com.moral.common.bean.Constants@isNotSpecialOrgId(orgId)">