jinpengyong
2021-08-17 6f4e852b84c577454a4876f83c7085bd360fe4fb
特殊设备数据insert
5 files added
21 files modified
456 ■■■■ changed files
screen-common/src/main/java/com/moral/constant/Constants.java 24 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/KafkaConstants.java 14 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/RedisConstants.java 11 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/AmendUtils.java 14 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/DateUtils.java 6 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 6 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/CreateTableTask.java 4 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryAqiInsertTask.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java 8 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/JudgeOffLineDeviceTask.java 2 ●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/controller/DeviceController.java 25 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/entity/HistorySecondSpecial.java 55 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java 59 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/mapper/HistorySecondSpecialMapper.java 16 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/mapper/SpecialDeviceMapper.java 6 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/DeviceService.java 3 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/HistorySecondSpecialService.java 21 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java 33 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 6 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java 24 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistorySecondSpecialServiceImpl.java 81 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java 10 ●●●● patch | view | raw | blame | history
screen-manage/src/main/resources/mapper/HistorySecondSpecialMapper.xml 14 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/resources/mapper/SpecialDeviceMapper.xml 8 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/Constants.java
@@ -120,11 +120,6 @@
    public static final String UN_ADJUST = "unadjust";
    /*
     * 小时中间表后缀
     * */
    public static final String TRANSITION = "transition";
    /*
     * 离线设备状态码
     * */
    public static final String DEVICE_STATE_OFFLINE = "0";
@@ -184,14 +179,25 @@
     */
    public static final String NULL_VALUE = "N/V";
    //数据有效的标记位,有效
    /*
    * 数据有效的标记位,有效
    * */
    public static final String MARKER_BIT_TRUE = "N";
    //数据有效的标记位,有效性不足
    /*
    * 数据有效的标记位,有效性不足
    * */
    public static final String MARKER_BIT_FALSE = "H";
    //数据中各个传感器代表标记位的key后缀
    public static final String MARKER_BIT_KEY = "-Flag";
    /*
    * 数据中各个传感器代表标记位的key后缀
    * */
    public static final String MARKER_BIT_KEY = "Flag";
    /*
     * 特殊设备数据批次
     * */
    public static final String SENSOR_CODE_SPECIAL_BATCH = "a00000";
}
screen-common/src/main/java/com/moral/constant/KafkaConstants.java
@@ -3,29 +3,29 @@
public class KafkaConstants {
    /**
     * 分钟数据主题
     * 普通设备分钟数据主题
     */
    public static final String TOPIC_MINUTE = "minute_data";
    /**
     * 小时数据主题
     * 普通设备小时数据主题
     */
    public static final String TOPIC_HOUR = "hour_data";
    /*
     * 秒数据主题
     * 普通设备秒数据主题
     * */
    public static final String TOPIC_SECOND = "second_data";
    /**
     * 小时主题消费组
     * 无人机,走航车等特殊设备秒数据主题
     */
    public static final String GROUP_HOUR = "hour";
    public static final String TOPIC_SECOND_SPECIAL = "second_data_special";
    /**
     * 分钟主题消费组
     * 存入数据库的消费组
     */
    public static final String GROUP_MINUTE = "minute";
    public static final String GROUP_INSERT = "insert";
    /**
     * 用于判断设备状态消费组
screen-common/src/main/java/com/moral/constant/RedisConstants.java
@@ -46,7 +46,7 @@
    /*
     * 设备校准公式前缀
     * */
    public static final String ADJUST = "adjust_";
    public static final String ADJUST = "adjust";
    /*
     * redis中秒数据key
@@ -69,5 +69,14 @@
     * */
    public static final String DATA_FIVE_MINUTES = "data_five_minutes";
    /*
     * 存储设备显示单位,报警等级以及单位转换公式
     * 数据类型:hash
     * key:mac
     * value:SpecialDevice
     * 接收类型:Map<String,Device>
     * */
    public static final String SPECIAL_DEVICE_INFO = "special_device_alarm_info";
}
screen-common/src/main/java/com/moral/util/AmendUtils.java
@@ -60,7 +60,7 @@
                Double o3 = Double.parseDouble(dataValue.get(Constants.SENSOR_CODE_O3).toString());
                //O3数据标记位
                Object flag = dataValue.get(Constants.SENSOR_CODE_O3 + Constants.MARKER_BIT_KEY);
                Object flag = dataValue.get(Constants.SENSOR_CODE_O3 + "-" + Constants.MARKER_BIT_KEY);
                //剔除有效性不足的数据
                if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                    continue;
@@ -123,8 +123,8 @@
            Map<String, Object> dataValue = JSONObject.parseObject((String) map.get("value"), Map.class);
            Object wind = dataValue.get(Constants.SENSOR_CODE_WIND_DIR);
            Object speed = dataValue.get(Constants.SENSOR_CODE_WIND_SPEED);
            Object flagDir = dataValue.get(Constants.SENSOR_CODE_WIND_DIR + Constants.MARKER_BIT_KEY);
            Object flagSpeed = dataValue.get(Constants.SENSOR_CODE_WIND_SPEED + Constants.MARKER_BIT_KEY);
            Object flagDir = dataValue.get(Constants.SENSOR_CODE_WIND_DIR + "-" + Constants.MARKER_BIT_KEY);
            Object flagSpeed = dataValue.get(Constants.SENSOR_CODE_WIND_SPEED + "-" + Constants.MARKER_BIT_KEY);
            if (!Constants.MARKER_BIT_TRUE.equals(flagDir) || !Constants.MARKER_BIT_TRUE.equals(flagSpeed)) {
                continue;
            }
@@ -186,9 +186,9 @@
            if ("hour".equals(type)) {
                //有效值>=45个,打标记位 N,<45打H   H:有效性不足
                if (size >= 45) {
                    result.put(Constants.SENSOR_CODE_WIND_DIR + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                    result.put(Constants.SENSOR_CODE_WIND_DIR + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                } else {
                    result.put(Constants.SENSOR_CODE_WIND_DIR + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                    result.put(Constants.SENSOR_CODE_WIND_DIR + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                }
            }
        }
@@ -224,7 +224,7 @@
        for (Map<String, Object> dataMap : list) {
            Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
            Object o = dataValue.get(Constants.SENSOR_CODE_CO);
            Object flag = dataValue.get(Constants.SENSOR_CODE_CO + Constants.MARKER_BIT_KEY);
            Object flag = dataValue.get(Constants.SENSOR_CODE_CO + "-" + Constants.MARKER_BIT_KEY);
            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                continue;
            }
@@ -262,7 +262,7 @@
        for (Map<String, Object> dataMap : list) {
            Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
            Object o = dataValue.get(Constants.SENSOR_CODE_O3);
            Object flag = dataValue.get(Constants.SENSOR_CODE_O3 + Constants.MARKER_BIT_KEY);
            Object flag = dataValue.get(Constants.SENSOR_CODE_O3 + "-" + Constants.MARKER_BIT_KEY);
            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                continue;
            }
screen-common/src/main/java/com/moral/util/DateUtils.java
@@ -1358,7 +1358,7 @@
    /*
     * 根据时间获取时间内时间点
     * 例:time=2021-08-04 就得到这天内每个小时时间点
     * 例:time=2021-08-04 就得到这天内每个小时时间点2021-08-04 00,2021-08-04 01。。。
     *    time=2021-08 就得到这月内每天时间点
     * */
    public static List<String> getTimeLag(String time) {
@@ -1406,9 +1406,5 @@
        StringBuilder stringBuffer = new StringBuilder(dateString);
        stringBuffer.replace(15, 16, String.valueOf(i));
        return getDate(stringBuffer.toString(), yyyy_MM_dd_HH_mm_EN);
    }
    public static void main(String[] args) {
        System.out.println(getTimeLag("2021"));
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
@@ -149,7 +149,7 @@
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode);
                            //数据有效性标记位
                            Object flag = dataValue.get(sensorCode + Constants.MARKER_BIT_KEY);
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                                return null;
                            }
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java
@@ -135,7 +135,7 @@
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode);
                            //数据有效性标记位
                            Object flag = dataValue.get(sensorCode + Constants.MARKER_BIT_KEY);
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                                return null;
                            }
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -163,7 +163,7 @@
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode);
                            //数据有效性标记位
                            Object flag = dataValue.get(sensorCode + Constants.MARKER_BIT_KEY);
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                                return null;
                            }
@@ -199,9 +199,9 @@
                    jsonMap.put(sensorCode, sciCal);
                    //标志位
                    if (size.get() >= 45) {
                        jsonMap.put(sensorCode + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                    } else {
                        jsonMap.put(sensorCode + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                    }
                }
            });
screen-job/src/main/java/com/moral/api/task/CreateTableTask.java
@@ -32,7 +32,7 @@
            historyMinutelyService.createTable(timeUnits + "_" + Constants.UN_ADJUST);
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
@@ -45,7 +45,7 @@
            historyFiveMinutelyService.createTable(timeUnits);
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
screen-job/src/main/java/com/moral/api/task/HistoryAqiInsertTask.java
@@ -21,7 +21,7 @@
            historyAqiService.insertHistoryAqi();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -37,7 +37,7 @@
            historyFiveMinutelyService.insertHistoryFiveMinutely();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
@@ -61,7 +61,7 @@
            historyWeeklyService.insertHistoryWeekly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
@@ -73,7 +73,7 @@
            historyMonthlyService.insertHistoryMonthly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
@@ -85,7 +85,7 @@
            historyHourlyService.insertHistoryHourly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
screen-job/src/main/java/com/moral/api/task/JudgeOffLineDeviceTask.java
@@ -21,7 +21,7 @@
            deviceService.judgeOffLineDevice();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
screen-manage/src/main/java/com/moral/api/controller/DeviceController.java
@@ -19,8 +19,10 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.Device;
import com.moral.api.entity.SpecialDevice;
import com.moral.api.entity.Version;
import com.moral.api.service.DeviceService;
import com.moral.api.service.SpecialDeviceService;
import com.moral.api.service.VersionService;
import com.moral.constant.Constants;
import com.moral.constant.ResponseCodeEnum;
@@ -39,6 +41,9 @@
    @Autowired
    private VersionService versionService;
    @Autowired
    private SpecialDeviceService specialDeviceService;
    @ApiOperation(value = "添加设备", notes = "添加设备")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String")
@@ -53,10 +58,14 @@
            return ResultMessage.fail(ResponseCodeEnum.PARAMETERS_IS_MISSING.getCode(),
                    ResponseCodeEnum.PARAMETERS_IS_MISSING.getMsg());
        }
        //判断mac是否已存在
        //判断mac是否已存在,普通设备表和特殊设备表都要判断
        QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("mac", device.getMac());
        if (deviceService.count(queryWrapper) > 0) {
        queryWrapper.eq("mac", device.getMac()).eq("is_delete", Constants.NOT_DELETE);
        QueryWrapper<SpecialDevice> specialDeviceQueryWrapper = new QueryWrapper<>();
        queryWrapper.eq("mac", device.getMac()).eq("is_delete", Constants.NOT_DELETE);
        if (deviceService.getOne(queryWrapper) != null || specialDeviceService.getOne(specialDeviceQueryWrapper) != null) {
            return ResultMessage.fail(ResponseCodeEnum.MAC_IS_EXIST.getCode(), ResponseCodeEnum.MAC_IS_EXIST.getMsg());
        }
        deviceService.insert(device);
@@ -88,10 +97,14 @@
                    ResponseCodeEnum.PARAMETERS_IS_MISSING.getMsg());
        }
        if (device.getMac() != null) {
            //判断mac是否已存在
            //判断mac是否已存在,普通设备表和特殊设备表都要判断
            QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("mac", device.getMac());
            if (deviceService.getOne(queryWrapper) != null) {
            queryWrapper.eq("mac", device.getMac()).eq("is_delete", Constants.NOT_DELETE);
            QueryWrapper<SpecialDevice> specialDeviceQueryWrapper = new QueryWrapper<>();
            queryWrapper.eq("mac", device.getMac()).eq("is_delete", Constants.NOT_DELETE);
            if (deviceService.getOne(queryWrapper) != null || specialDeviceService.getOne(specialDeviceQueryWrapper) != null) {
                return ResultMessage.fail(ResponseCodeEnum.MAC_IS_EXIST.getCode(), ResponseCodeEnum.MAC_IS_EXIST.getMsg());
            }
        }
screen-manage/src/main/java/com/moral/api/entity/HistorySecondSpecial.java
New file
@@ -0,0 +1,55 @@
package com.moral.api.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
 * <p>
 * 特殊设备秒数据表
 * </p>
 *
 * @author moral
 * @since 2021-08-12
 */
@Data
@EqualsAndHashCode(callSuper = false)
public class HistorySecondSpecial extends Model<HistorySecondSpecial> {
    private static final long serialVersionUID = 1L;
    /**
     * 设备mac
     */
    private String mac;
    /**
     * 数据时间
     */
    private Date time;
    /**
     * 数据
     */
    private String value;
    /**
     * 此数据所属组织id
     */
    private Integer organizationId;
    /**
     * 数据批次
     */
    private Date batch;
    @Override
    protected Serializable pkVal() {
        return null;
    }
}
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -7,7 +7,7 @@
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.Iterator;
@@ -17,9 +17,13 @@
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.service.HistorySecondSpecialService;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
/*
 * 设备数据接入
 * */
@Component
@Slf4j
public class KafkaConsumer {
@@ -36,16 +40,19 @@
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private HistorySecondSpecialService historySecondSpecialService;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
@@ -77,15 +84,15 @@
    }
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
@@ -118,24 +125,54 @@
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
    public void listenSecond(ConsumerRecord<String, String> record) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            data.remove("ver");
            //数据校准
            data = deviceService.adjustDeviceData(data);
            //存入redis
            redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data);
            //判断并修改设备状态
            deviceService.judgeDeviceState(data);
        } catch (Exception e) {
            log.error("param{}" + msg);
        }
    }
    //特殊设备秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            data.remove("ver");
            historySecondSpecialService.insertHistorySecond(data);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);
screen-manage/src/main/java/com/moral/api/mapper/HistorySecondSpecialMapper.java
New file
@@ -0,0 +1,16 @@
package com.moral.api.mapper;
import com.moral.api.entity.HistorySecondSpecial;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
 * 特殊设备秒数据表 Mapper 接口
 * </p>
 *
 * @author moral
 * @since 2021-08-12
 */
public interface HistorySecondSpecialMapper extends BaseMapper<HistorySecondSpecial> {
}
screen-manage/src/main/java/com/moral/api/mapper/SpecialDeviceMapper.java
@@ -1,14 +1,14 @@
package com.moral.api.mapper;
import com.moral.api.entity.Organization;
import com.moral.api.entity.SpecialDevice;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import java.util.List;
import java.util.Map;
/**
 * <p>
 *  Mapper 接口
 * Mapper 接口
 * </p>
 *
 * @author moral
@@ -18,4 +18,6 @@
    List<SpecialDevice> querySpecialDeviceUnitAlarmInfo();
    Organization selectOrgByMac(String mac);
}
screen-manage/src/main/java/com/moral/api/service/DeviceService.java
@@ -58,6 +58,9 @@
    //设备数据校准
    Map<String, Object> adjustDeviceData(Map<String, Object> deviceData);
    //特殊设备数据校准
    Map<String, Object> adjustSpecialDeviceData(Map<String, Object> deviceData);
    //判断并修改设备状态
    void judgeDeviceState(Map<String, Object> data);
screen-manage/src/main/java/com/moral/api/service/HistorySecondSpecialService.java
New file
@@ -0,0 +1,21 @@
package com.moral.api.service;
import java.util.Map;
import com.moral.api.entity.HistorySecondSpecial;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 特殊设备秒数据表 服务类
 * </p>
 *
 * @author moral
 * @since 2021-08-12
 */
public interface HistorySecondSpecialService extends IService<HistorySecondSpecial> {
    //秒数据insert
    void insertHistorySecond(Map<String, Object> data);
}
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -84,6 +84,9 @@
    private VersionSensorUnitMapper versionSensorUnitMapper;
    @Autowired
    private SpecialDeviceMapper specialDeviceMapper;
    @Autowired
    private AdjustDataUtils adjustDataUtils;
    /*
@@ -177,7 +180,7 @@
            insertOrganizationUnitAlarm(newOrgId, newVersionId);
        }
        //从redis中删除设备信息
        delDeviceInfoFromRedis(mac);
        delDeviceInfoFromRedis(oldDevice.getMac());
        Map<String, Object> deviceInfo = selectDeviceInfoById(deviceId);
        //设备信息存入redis
        setDeviceInfoToRedis(mac, deviceInfo);
@@ -186,7 +189,7 @@
        //操作日志记录
        HttpServletRequest request = ((ServletRequestAttributes) Objects.requireNonNull(RequestContextHolder.getRequestAttributes())).getRequest();
        StringBuilder content = new StringBuilder();
        content.append("修改了设备:").append(mac).append(":");
        content.append("修改了设备:" + oldDevice.getMac()).append("==>").append(mac).append(":");
        Field[] fields = Device.class.getDeclaredFields();
        for (Field field : fields) {
            if (field.getName().equals("id")) {
@@ -405,7 +408,7 @@
    @Override
    public Map<String, Object> getDeviceByMac(String mac) {
        Map<String, Object> deviceInfo = getDeviceInfoFromRedis(mac);
        if (deviceInfo == null) {
        if (ObjectUtils.isEmpty(deviceInfo)) {
            QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("mac", mac).eq("is_delete", Constants.NOT_DELETE);
            Device device = deviceMapper.selectOne(queryWrapper);
@@ -419,9 +422,9 @@
    @Override
    public Map<String, Object> adjustDeviceData(Map<String, Object> deviceData) {
        String mac = deviceData.get("mac").toString();
        String mac = deviceData.remove("mac").toString();
        //从redis获取校准公式
        Map<String, Object> adjustFormula = redisTemplate.opsForHash().entries(RedisConstants.ADJUST + mac);
        Map<String, Object> adjustFormula = redisTemplate.opsForHash().entries(RedisConstants.ADJUST + "_" + mac);
        if (!ObjectUtils.isEmpty(adjustFormula)) {
            Map<String, Object> deviceInfo = getDeviceByMac(mac);
            Map<String, Object> monitorPoint = (Map<String, Object>) deviceInfo.get("monitorPoint");
@@ -434,6 +437,26 @@
            }
            return adjustDataUtils.adjust(deviceData, adjustFormula, ObjectUtils.isEmpty(aqiMap) ? null : aqiMap);
        }
        deviceData.remove("DataTime");
        return deviceData;
    }
    @Override
    public Map<String, Object> adjustSpecialDeviceData(Map<String, Object> deviceData) {
        String mac = deviceData.remove("mac").toString();
        //从redis获取校准公式
        Map<String, Object> adjustFormula = redisTemplate.opsForHash().entries(RedisConstants.ADJUST + "_" + mac);
        if (!ObjectUtils.isEmpty(adjustFormula)) {
            Organization organization = specialDeviceMapper.selectOrgByMac(mac);
            Integer areaCode = organization.getAreaCode();
            Integer cityCode = organization.getCityCode();
            Map<String, Object> aqiMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode);
            if (ObjectUtils.isEmpty(aqiMap)) {
                aqiMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode);
            }
            return adjustDataUtils.adjust(deviceData, adjustFormula, ObjectUtils.isEmpty(aqiMap) ? null : aqiMap);
        }
        deviceData.remove("DataTime");
        return deviceData;
    }
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -37,11 +37,12 @@
    @Override
    @Transactional
    public void insertHistoryHourly(Map<String, Object> data) {
        Integer version = (Integer) data.remove("ver");
        Map<String, Object> dataAdjust = new HashMap<>(data);
        String mac = data.remove("mac").toString();
        Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
        Integer version = (Integer) data.remove("ver");
        Map<String, Object> result = new HashMap<>();
        result.put("mac", mac);
        result.put("time", time);
@@ -53,9 +54,6 @@
        //数据校准
        dataAdjust = deviceService.adjustDeviceData(dataAdjust);
        dataAdjust.remove("mac");
        dataAdjust.remove("DataTime");
        dataAdjust.remove("ver");
        HistoryHourly historyHourly = new HistoryHourly();
        historyHourly.setMac(mac);
screen-manage/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java
@@ -28,38 +28,28 @@
    @Transactional
    public void insertHistoryMinutely(Map<String, Object> data) {
        Map<String, Object> result = new HashMap<>();
        Object ver = data.remove("ver");
        Map<String, Object> dataAdjust = new HashMap<>(data);
        Object mac = data.remove("mac");
        result.put("mac", mac);
        result.put("version", data.remove("ver"));
        result.put("version", ver);
        Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
        result.put("time", time);
        result.put("value", JSONObject.toJSONString(data));
        String timeUnits = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN);
        result.put("timeUnits", tableSuffix(timeUnits, Constants.UN_ADJUST));
        result.put("timeUnits", timeUnits + "_" + Constants.UN_ADJUST);
        //原始数据(未校准)
        historyMinutelyMapper.insertHistoryMinutely(result);
        //数据校准
        dataAdjust = deviceService.adjustDeviceData(dataAdjust);
        dataAdjust.remove("mac");
        dataAdjust.remove("DataTime");
        dataAdjust.remove("ver");
        result.put("timeUnits", timeUnits);
        result.put("value", JSONObject.toJSONString(dataAdjust));
        historyMinutelyMapper.insertHistoryMinutely(result);
    }
    //表后缀
    private String tableSuffix(String... keys) {
        StringBuilder key = new StringBuilder(keys[0]);
        for (int i = 1; i < keys.length; i++) {
            key.append("_");
            key.append(keys[i]);
        }
        return key.toString();
    }
}
screen-manage/src/main/java/com/moral/api/service/impl/HistorySecondSpecialServiceImpl.java
New file
@@ -0,0 +1,81 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.moral.api.entity.Device;
import com.moral.api.entity.HistorySecondSpecial;
import com.moral.api.entity.Sensor;
import com.moral.api.entity.SpecialDevice;
import com.moral.api.mapper.HistorySecondSpecialMapper;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistorySecondSpecialService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
 * <p>
 * 特殊设备秒数据表 服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-08-12
 */
@Service
public class HistorySecondSpecialServiceImpl extends ServiceImpl<HistorySecondSpecialMapper, HistorySecondSpecial> implements HistorySecondSpecialService {
    @Autowired
    private HistorySecondSpecialMapper historySecondSpecialMapper;
    @Autowired
    private DeviceService deviceService;
    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void insertHistorySecond(Map<String, Object> data) {
        Date batchTime = DateUtils.getDate((String) data.remove(Constants.SENSOR_CODE_SPECIAL_BATCH), DateUtils.yyyyMMddHHmmss_EN);
        Date time = DateUtils.getDate((String) data.get("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
        String mac = data.get("mac").toString();
        //根据mac从redis中获取因子
        SpecialDevice specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac);
        List<Sensor> sensors = specialDevice.getVersion().getSensors();
        //过滤因子
        data = data.entrySet().stream()
                .filter(m -> {
                    boolean flag = false;
                    String key = m.getKey();
                    if (!"mac".equals(key) && !"DataTime".equals(key)) {
                        for (Sensor sensor : sensors) {
                            if (sensor.getCode().equals(key)) {
                                flag = true;
                                break;
                            }
                        }
                        return flag;
                    }
                    return true;
                }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        //数据校准
        data = deviceService.adjustSpecialDeviceData(data);
        HistorySecondSpecial historySecondSpecial = new HistorySecondSpecial();
        historySecondSpecial.setMac(mac);
        historySecondSpecial.setTime(time);
        historySecondSpecial.setValue(JSONObject.toJSONString(data));
        historySecondSpecial.setBatch(batchTime);
        historySecondSpecialMapper.insert(historySecondSpecial);
    }
}
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java
@@ -21,17 +21,17 @@
@Component
public class AdjustDataUtils {
    /**
     * @param deviceData 设备数据
     * @param deviceData    设备数据
     * @param adjustFormula 校准公式
     * @param aqiMap 设备所在地区对应的墨迹aqi数据
     * @param aqiMap        设备所在地区对应的墨迹aqi数据
     * @return Map<String, Object> 校准后数据
     * */
     */
    public Map<String, Object> adjust(Map<String, Object> deviceData, Map<String, Object> adjustFormula, Map<String, Object> aqiMap) {
        try {
            Date time = DateUtils.getDate((String) deviceData.get("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
            Date time = DateUtils.getDate((String) deviceData.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
            long finalTime = DateUtils.dataToTimeStampTime(time, DateUtils.HH_mm_ss_EN).getTime();
            for (String key : deviceData.keySet()) {
                if (!key.equals("mac") && !key.equals("time") && !key.equals("DataTime") && !key.equals("ver") && !key.contains("Flag")) {
                if (!key.contains("Flag")) {
                    //测量值
                    Object measuredValue = deviceData.get(key);
                    //单个因子校准公式
screen-manage/src/main/resources/mapper/HistorySecondSpecialMapper.xml
New file
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistorySecondSpecialMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistorySecondSpecial">
        <result column="mac" property="mac"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
        <result column="organization_id" property="organizationId"/>
        <result column="batch" property="batch"/>
    </resultMap>
</mapper>
screen-manage/src/main/resources/mapper/SpecialDeviceMapper.xml
@@ -63,4 +63,12 @@
        where
            d.`is_delete` = 0;
    </select>
    <select id="selectOrgByMac" resultType="com.moral.api.entity.Organization">
        SELECT area_code, city_code
        FROM organization
        WHERE organization_id =
              (SELECT organization_id FROM special_device WHERE mac = #{mac})
    </select>
</mapper>