jinpengyong
2021-12-22 ebaaac167a229dd2d9115478e17bc46b7f7afb5f
设备最大值,最小值,平均值小时统计任务
8 files modified
195 ■■■■■ changed files
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 4 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryMinutelyMapper.java 2 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java 3 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 154 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java 12 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml 11 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml 7 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,5 +1,7 @@
package com.moral.api.mapper;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.moral.api.entity.HistoryHourly;
@@ -26,6 +28,8 @@
    List<Map<String, Object>> selectDailyData(Map<String, Object> params);
    void insertHistoryHourlyComplete(@Param("list") List<Map<String, Object>> list, @Param("timeUnits") String timeUnits);
}
screen-job/src/main/java/com/moral/api/mapper/HistoryMinutelyMapper.java
@@ -8,4 +8,6 @@
    void createTable(String timeUnits);
    List<Map<String, Object>> getHistoryMinutelyData(Map<String, Object> params);
    List<Map<String, Object>> getHourlyData(Map<String, Object> params);
}
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -37,4 +37,7 @@
     */
    List<HistoryHourly> getValueByMacAndTime(String mac, Date startDate, Date endDate);
    //设备小时数据,最大值,最小值,均值统计入表
    void insertHistoryHourlyComplete();
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java
@@ -177,7 +177,7 @@
        });
        //5分钟表后缀
        String insertTimeUnits = DateUtils.dateToDateString(DateUtils.getDateOfMin(now, -5), DateUtils.yyyyMM_EN);
        String insertTimeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        //存入数据库
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(insertData, insertTimeUnits);
    }
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -30,8 +30,10 @@
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;
/**
 * <p>
@@ -251,6 +253,106 @@
        return datas;
    }
    @Override
    public void insertHistoryHourlyComplete() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_EN;
        Date now = new Date();
        //从数据库获取数据参数
        Map<String, Object> params = new HashMap<>();
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.addHours(now, -1), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        params.put("start", start);
        params.put("end", end);
        //获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        params.put("timeUnits", timeUnits);
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE);
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper);
        //获取所有设备的一小时内的数据
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHourlyData(params);
        if (ObjectUtils.isEmpty(hourlyData)) {
            return;
        }
        //按mac分组
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", start);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> map = new HashMap<>();
            map.put("data", value);
            map.put("type", "hourlyComplete");
            for (Sensor sensor : sensors) {
                String sensorCode = sensor.getCode();
                //风向上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) {
                    if (sensor.getUpper() != null) {
                        map.put("windDirUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windDirLower", sensor.getLower());
                    }
                }
                //风速上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) {
                    if (sensor.getUpper() != null) {
                        map.put("windSpeedUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windSpeedLower", sensor.getLower());
                    }
                }
            }
            Supplier<Stream<Map<String, Object>>> streamSupplier = () -> value.stream();
            //均值,最大值,最小值计算
            sensors.forEach(sensor -> {
                Double avg = calculatedValue(streamSupplier, sensor, "avg");
                if (Constants.SENSOR_CODE_WIND_DIR.equals(sensor.getCode())) {
                    //风向均值计算并修约
                    Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map);
                    if (!ObjectUtils.isEmpty(windDirAvg)) {
                        avg = (Double) windDirAvg.get(Constants.SENSOR_CODE_WIND_DIR);
                    }
                }
                Double min = calculatedValue(streamSupplier, sensor, "min");
                Double max = calculatedValue(streamSupplier, sensor, "max");
                List<Double> doubles = new ArrayList<>(3);
                if (avg != null && min != null && max != null) {
                    doubles.add(avg);
                    doubles.add(min);
                    doubles.add(max);
                    jsonMap.put(sensor.getCode(), doubles);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //后缀
        String insertTimeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        //存入数据库
        historyHourlyMapper.insertHistoryHourlyComplete(insertData, insertTimeUnits);
    }
    /**
     * @Description: 多表查询,传入表名集合,以及条件wrapper,返回数据
     * @Param: [wrapper, tableNames]
@@ -268,4 +370,56 @@
        MybatisPlusConfig.tableName.remove();
        return result;
    }
    //最大值,最小值,平均值计算
    private Double calculatedValue(Supplier<Stream<Map<String, Object>>> supplier, Sensor sensor, String type) {
        String sensorCode = sensor.getCode();
        Double upper = sensor.getUpper();
        Double lower = sensor.getLower();
        DoubleStream doubleStream = supplier.get()
                .flatMapToDouble(v -> {
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                    Object sensorValue = dataValue.get(sensorCode);
                    //数据有效性标记位
                    Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                    if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                        return null;
                    }
                    if (ObjectUtils.isEmpty(sensorValue)) {
                        return null;
                    }
                    //风向单独计算
                    if ("avg".equals(type) && Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                        return null;
                    }
                    //剔除数据超过上下限的数据
                    double aDouble = Double.parseDouble(sensorValue.toString());
                    if (!ObjectUtils.isEmpty(upper)) {
                        if (aDouble > upper) {
                            return null;
                        }
                    }
                    if (!ObjectUtils.isEmpty(lower)) {
                        if (aDouble < lower) {
                            return null;
                        }
                    }
                    return DoubleStream.of(aDouble);
                });
        OptionalDouble average = null;
        if ("avg".equals(type)) {
            average = doubleStream.average();
        } else if ("min".equals(type)) {
            average = doubleStream.min();
        } else if ("max".equals(type)) {
            average = doubleStream.max();
        }
        if (average.isPresent()) {
            //银行家算法修约
            return AmendUtils.sciCal(average.getAsDouble(), 4);
        }
        return null;
    }
}
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -89,4 +89,16 @@
        }
        return ReturnT.SUCCESS;
    }
    //设备小时数据,最大值,最小值,均值统计任务
    @XxlJob("insertHistoryHourlyComplete")
    public ReturnT insertHistoryHourlyComplete(){
        try {
            historyHourlyService.insertHistoryHourlyComplete();
        } catch (Exception e) {
            e.printStackTrace();
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
        }
        return ReturnT.SUCCESS;
    }
}
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -13,7 +13,7 @@
    </update>
    <update id="createTableComplete" parameterType="String">
        CREATE TABLE IF NOT EXISTS `history_hourly_complete_${timeUnits}` (
        CREATE TABLE IF NOT EXISTS `history_hourly_${timeUnits}_complete` (
            `mac` VARCHAR (20) DEFAULT NULL COMMENT '设备mac',
            `time` datetime DEFAULT NULL COMMENT '数据时间',
            `value` json DEFAULT NULL COMMENT '数据',
@@ -41,4 +41,13 @@
        WHERE `time` <![CDATA[>=]]> #{start}
        AND `time` <![CDATA[<]]> #{end}
    </select>
    <insert id="insertHistoryHourlyComplete">
        INSERT INTO
        history_hourly_${timeUnits}_complete
        VALUES
        <foreach collection="list" item="item" separator=",">
            (#{item.mac}, #{item.time}, #{item.value})
        </foreach>
    </insert>
</mapper>
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml
@@ -27,4 +27,11 @@
        </if>
    </select>
    <select id="getHourlyData" resultType="java.util.Map">
        SELECT mac, `time`, `value`
        FROM history_minutely_${timeUnits}
        WHERE `time` <![CDATA[>=]]> #{start}
        AND `time` <![CDATA[<]]> #{end}
    </select>
</mapper>