jinpengyong
2021-08-05 faf8649ff22b8af12c758355725389204838e02a
小时缺失数据补充定时任务
1 files deleted
10 files modified
317 ■■■■ changed files
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 4 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryMinutelyMapper.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java 5 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryMinutelyService.java 6 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java 77 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 133 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java 7 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/DeleteHistoryHourlyTransitionTask.java 29 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java 25 ●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml 13 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml 16 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -17,8 +17,4 @@
    void insertHistoryHourly(List<HistoryHourly> list);
    List<HistoryHourly> getHistoryHourlyTransition();
    void deleteHistoryHourlyTransition();
}
screen-job/src/main/java/com/moral/api/mapper/HistoryMinutelyMapper.java
@@ -7,5 +7,5 @@
    void createTable(String timeUnits);
    List<Map<String, Object>> getHistoryFiveMinutelyData(Map<String,Object> params);
    List<Map<String, Object>> getHistoryMinutelyData(Map<String, Object> params);
}
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -13,10 +13,7 @@
 */
public interface HistoryHourlyService extends IService<HistoryHourly> {
    //中间小时表数据迁入正式表
    //缺失小时数据补充
    void insertHistoryHourly();
    //删除中间小时表数据
    void deleteHistoryHourlyTransition();
}
screen-job/src/main/java/com/moral/api/service/HistoryMinutelyService.java
@@ -1,14 +1,8 @@
package com.moral.api.service;
import java.util.List;
import java.util.Map;
public interface HistoryMinutelyService {
    //分钟表创建
    void createTable(String timeUnits);
    //从分钟表统计5分钟数据
    List<Map<String, Object>> getHistoryFiveMinutelyData(Map<String, Object> params);
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java
@@ -19,8 +19,8 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryFiveMinutelyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryFiveMinutelyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
@@ -34,18 +34,18 @@
    private HistoryFiveMinutelyMapper historyFiveMinutelyMapper;
    @Autowired
    private HistoryMinutelyService historyMinutelyService;
    private HistoryMinutelyMapper historyMinutelyMapper;
    @Autowired
    private SensorService sensorService;
    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void createTable(String timeUnits) {
        historyFiveMinutelyMapper.createTable(timeUnits);
    }
    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    @Transactional
@@ -69,8 +69,8 @@
        queryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(queryWrapper);
        //获取所有设备的5分钟数据
        List<Map<String, Object>> fiveMinutelyData = historyMinutelyService.getHistoryFiveMinutelyData(params);
        //获取所有设备的5分钟内的数据
        List<Map<String, Object>> fiveMinutelyData = historyMinutelyMapper.getHistoryMinutelyData(params);
        if (fiveMinutelyData.size() == 0) {
            return;
        }
@@ -125,68 +125,5 @@
        String insertTimeUnits = DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN);
        //存入数据库
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(insertData, insertTimeUnits);
    }
    public static void main(String[] args) {
       /* //模拟数据
        List<Map<String, Object>> list = new ArrayList<>();
        Map<String, Object> map1 = new HashMap<>();
        map1.put("mac", "p5dnd1234567");
        map1.put("value", "{\"a0001\": 10, \"a0002\": 8, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\"}");
        Map<String, Object> map2 = new HashMap<>();
        map2.put("mac", "p5dnd123456789");
        map2.put("value", "{\"a0001\": 12, \"a0002\": 12, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\"}");
        Map<String, Object> map3 = new HashMap<>();
        map3.put("mac", "p5dnd1234567");
        map3.put("value", "{\"a0001\": 6, \"a0002\": 20, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\", \"a0003\": 14}");
        Map<String, Object> map4 = new HashMap<>();
        map4.put("mac", "p5dnd1234567");
        map4.put("value", "{\"a0001\": 4, \"a0002\": 16, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\", \"a0003\": 16}");
        list.add(map1);
        list.add(map2);
        list.add(map3);
        list.add(map4);
        //数据按mac分组
        Map<String, List<Map<String, Object>>> data = list.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        System.out.println(data);
        //所有因子
        List<Object> sensors = new ArrayList<>();
        sensors.add("a0001");
        sensors.add("a0002");
        sensors.add("a0003");
        //插入数据库结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            sensors.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        System.out.println("666==" + insertData);*/
        double a = 5.5d;
        System.out.println(Math.round(a));
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,14 +1,33 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryHourlyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
/**
 * <p>
@@ -24,15 +43,113 @@
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
    @Override
    public void insertHistoryHourly() {
        //获取中间小时表数据
        List<HistoryHourly> list = historyHourlyMapper.getHistoryHourlyTransition();
        historyHourlyMapper.insertHistoryHourly(list);
    }
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private HistoryMinutelyMapper historyMinutelyMapper;
    @Autowired
    private SensorService sensorService;
    @Override
    public void deleteHistoryHourlyTransition() {
        historyHourlyMapper.deleteHistoryHourlyTransition();
    public void insertHistoryHourly() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_EN;
        //redis获取所有设备mac
        Set<String> macs = redisTemplate.opsForHash().keys(RedisConstants.DEVICE);
        //当前时间,到小时
        Date now = new Date();
        String time = DateUtils.dateToDateString(now, format) + ":00:00";
        QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("time", time);
        //查询数据库当前小时的条数,如果比macs小,说明需要数据补充
        Integer count = historyHourlyMapper.selectCount(queryWrapper);
        if (macs.size() > count) {
            macs.removeIf(mac -> {
                queryWrapper.clear();
                queryWrapper.eq("time", time);
                queryWrapper.eq("mac", mac);
                Integer num = historyHourlyMapper.selectCount(queryWrapper);
                return num != 0;
            });
        }
        Map<String, Object> params = new HashMap<>();
        //开始时间
        String dateString = DateUtils.getDateStringOfHour(-1, format);
        Date start = DateUtils.getDate(dateString, format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        //从分钟表获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(sensorQueryWrapper);
        params.put("timeUnits", timeUnits);
        params.put("start", start);
        params.put("end", end);
        params.put("macs", macs);
        //获取所有设备的1小时内的数据
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHistoryMinutelyData(params);
        if (hourlyData.size() == 0) {
            return;
        }
        //按mac分组
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<HistoryHourly> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            HistoryHourly historyHourly = new HistoryHourly();
            historyHourly.setMac(key);
            historyHourly.setTime(end);
            Map<String, Object> jsonMap = new HashMap<>();
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            }
            //除风向外其他因子均值计算
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向另外计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                }
            });
            historyHourly.setValue(JSONObject.toJSONString(jsonMap));
            historyHourly.setVersion((Integer) value.get(0).get("version"));
            insertData.add(historyHourly);
        });
        //存入小时表
        historyHourlyMapper.insertHistoryHourly(insertData);
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java
@@ -3,9 +3,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryMinutelyService;
@@ -20,8 +17,4 @@
        historyMinutelyMapper.createTable(timeUnits);
    }
    @Override
    public List<Map<String, Object>> getHistoryFiveMinutelyData(Map<String, Object> params) {
        return historyMinutelyMapper.getHistoryFiveMinutelyData(params);
    }
}
screen-job/src/main/java/com/moral/api/task/DeleteHistoryHourlyTransitionTask.java
File was deleted
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -42,19 +42,6 @@
        return ReturnT.SUCCESS;
    }
    //小时数据从中间表迁入正式表
    @XxlJob("insertHistoryHourly")
    public ReturnT insertHistoryHourly() {
        try {
            historyHourlyService.insertHistoryHourly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
        }
        return ReturnT.SUCCESS;
    }
    //天数据统计
    @XxlJob("insertHistoryDaily")
    public ReturnT insertHistoryDaily() {
@@ -90,4 +77,16 @@
        }
        return ReturnT.SUCCESS;
    }
    //缺失小时数据补充
    @XxlJob("historyHourlySupplement")
    public ReturnT historyHourlySupplement() {
        try {
            historyHourlyService.insertHistoryHourly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
        }
        return ReturnT.SUCCESS;
    }
}
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -16,17 +16,4 @@
            (#{item.mac}, #{item.time}, #{item.value}, #{item.version})
        </foreach>
    </insert>
    <select id="getHistoryHourlyTransition" resultType="com.moral.api.entity.HistoryHourly">
        SELECT mac,
        `time`,
        `value`,
        `version`
        FROM history_hourly_transition
        WHERE create_time = update_time
    </select>
    <delete id="deleteHistoryHourlyTransition">
        DELETE FROM history_hourly_transition
    </delete>
</mapper>
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml
@@ -8,19 +8,23 @@
            `time` datetime DEFAULT NULL COMMENT '数据时间',
            `value` json DEFAULT NULL COMMENT '数据',
            `version` INT (11) DEFAULT NULL COMMENT '型号',
            KEY `idx_mac` (`mac`),
            KEY `idx_time` (`time`),
            KEY `idx_mac_time` (`mac`,`time`)
            ) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '分钟数据表'
    </update>
    <select id="getHistoryFiveMinutelyData" resultType="java.util.LinkedHashMap">
    <select id="getHistoryMinutelyData" resultType="java.util.LinkedHashMap">
        SELECT
        mac, value
        mac, `value`, version
        FROM
        history_minutely_${timeUnits}
        WHERE time <![CDATA[>=]]> #{start}
        AND time <![CDATA[<]]> #{end}
        WHERE `time` <![CDATA[>=]]> #{start}
        AND `time` <![CDATA[<]]> #{end}
        <if test="macs != null">
            AND mac IN
            <foreach collection="macs" item="mac" open="(" close=")" separator=",">
                #{mac}
            </foreach>
        </if>
    </select>
</mapper>