kaiyu
2022-03-24 a827480e5e74cb6550c5f7b842bccac3023c5bdc
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -2,17 +2,20 @@
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.config.mybatis.MybatisPlusConfig;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryHourlyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.constant.SeparateTableType;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
import com.moral.util.MybatisPLUSUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
@@ -26,8 +29,11 @@
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;
/**
 * <p>
@@ -38,7 +44,7 @@
 * @since 2021-06-28
 */
@Service
public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
public class HistoryHourlyServiceImpl implements HistoryHourlyService {
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
@@ -53,6 +59,16 @@
    private SensorService sensorService;
    @Override
    public void createTable(String timeUnits) {
        historyHourlyMapper.createTable(timeUnits);
    }
    @Override
    public void createTableComplete(String timeUnits) {
        historyHourlyMapper.createTableComplete(timeUnits);
    }
    @Override
    public void insertHistoryHourly() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_EN;
@@ -63,21 +79,21 @@
        Date now = new Date();
        String time = DateUtils.dateToDateString(now, format) + ":00:00";
        QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("time", time);
        //查询数据库当前小时的条数,如果比macs小,说明需要数据补充
        Integer count = historyHourlyMapper.selectCount(queryWrapper);
        if (macs.size() > count) {
        Map<String, Object> prop = new HashMap<>();
        prop.put("timeUnits", DateUtils.getDateStringOfMon(0, DateUtils.yyyyMM_EN));
        prop.put("time", time);
        Integer count = historyHourlyMapper.selectCountByTime(prop);
        if (macs.size() <= count) {
            return;
        } else {
            macs.removeIf(mac -> {
                queryWrapper.clear();
                queryWrapper.eq("time", time);
                queryWrapper.eq("mac", mac);
                Integer num = historyHourlyMapper.selectCount(queryWrapper);
                prop.put("mac", mac);
                Integer num = historyHourlyMapper.selectCountByTime(prop);
                return num != 0;
            });
        }
        Map<String, Object> params = new HashMap<>();
        //开始时间
        String dateString = DateUtils.getDateStringOfHour(-1, format);
@@ -91,9 +107,11 @@
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(sensorQueryWrapper);
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE);
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper);
        //从数据库数据库获取数据的参数
        Map<String, Object> params = new HashMap<>();
        params.put("timeUnits", timeUnits);
        params.put("start", start);
        params.put("end", end);
@@ -109,47 +127,299 @@
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<HistoryHourly> insertData = new ArrayList<>();
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            HistoryHourly historyHourly = new HistoryHourly();
            historyHourly.setMac(key);
            historyHourly.setTime(end);
            Map<String, Object> historyHourly = new HashMap<>();
            historyHourly.put("mac", key);
            historyHourly.put("time", end);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> map = new HashMap<>();
            map.put("data", value);
            map.put("type", "hour");
            for (Sensor sensor : sensors) {
                String sensorCode = sensor.getCode();
                //风向上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) {
                    if (sensor.getUpper() != null) {
                        map.put("windDirUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windDirLower", sensor.getLower());
                    }
                }
                //风速上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) {
                    if (sensor.getUpper() != null) {
                        map.put("windSpeedUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windSpeedLower", sensor.getLower());
                    }
                }
            }
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map);
            if (!ObjectUtils.isEmpty(windDirAvg)) {
                jsonMap.putAll(windDirAvg);
            }
            //除风向外其他因子均值计算
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
            sensors.forEach(sensor -> {
                String sensorCode = sensor.getCode();
                Double upper = sensor.getUpper();
                Double lower = sensor.getLower();
                AtomicInteger size = new AtomicInteger();
                DoubleStream optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            Object sensorValue = dataValue.get(sensorCode);
                            //数据有效性标记位
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                                return null;
                            }
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向另外计算
                            //风向单独计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                            //剔除数据超过上下限的数据
                            double aDouble = Double.parseDouble(sensorValue.toString());
                            if (!ObjectUtils.isEmpty(upper)) {
                                if (aDouble < upper) {
                                    return null;
                                }
                            }
                            if (!ObjectUtils.isEmpty(lower)) {
                                if (aDouble > lower) {
                                    return null;
                                }
                            }
                            size.getAndIncrement();
                            return DoubleStream.of(aDouble);
                        });
                OptionalDouble average = optionalDouble.average();
                if (average.isPresent()) {
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                    double sciCal = AmendUtils.sciCal(average.getAsDouble(), 4);
                    jsonMap.put(sensorCode, sciCal);
                    //标志位
                    if (size.get() >= 45) {
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                    } else {
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                    }
                }
            });
            historyHourly.setValue(JSONObject.toJSONString(jsonMap));
            historyHourly.setVersion((Integer) value.get(0).get("version"));
            historyHourly.put("version", value.get(0).get("version"));
            historyHourly.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(historyHourly);
        });
        //存入小时表
        historyHourlyMapper.insertHistoryHourly(insertData);
    }
    @Override
    public List<Map<String, Object>> selectDailyData(Map<String, Object> params) {
        return historyHourlyMapper.selectDailyData(params);
    }
    /**
     * @Description: 查询一段时间内某一mac的数据
     * @Param: [mac, startDate, endDate]
     * @return: java.util.List<com.moral.api.entity.HistoryHourly>
     * @Author: 陈凯裕
     * @Date: 2021/9/23
     */
    @Override
    public List<HistoryHourly> getValueByMacAndTime(String mac, Date startDate, Date endDate) {
        QueryWrapper<HistoryHourly> wrapper = new QueryWrapper<>();
        wrapper.eq("mac", mac);
        wrapper.between("time", startDate, endDate);
        List<String> tableNames = MybatisPLUSUtils.getTableNamesByWrapper(startDate, endDate, SeparateTableType.MONTH);
        List<HistoryHourly> datas = multiTableQuery(wrapper, tableNames);
        return datas;
    }
    @Override
    public void insertHistoryHourlyComplete() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_EN;
        Date now = new Date();
        //从数据库获取数据参数
        Map<String, Object> params = new HashMap<>();
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.addHours(now, -1), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        params.put("start", start);
        params.put("end", end);
        //获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        params.put("timeUnits", timeUnits);
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE);
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper);
        //获取所有设备的一小时内的数据
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHourlyData(params);
        if (ObjectUtils.isEmpty(hourlyData)) {
            return;
        }
        //按mac分组
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", start);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> map = new HashMap<>();
            map.put("data", value);
            map.put("type", "hourlyComplete");
            for (Sensor sensor : sensors) {
                String sensorCode = sensor.getCode();
                //风向上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) {
                    if (sensor.getUpper() != null) {
                        map.put("windDirUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windDirLower", sensor.getLower());
                    }
                }
                //风速上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) {
                    if (sensor.getUpper() != null) {
                        map.put("windSpeedUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windSpeedLower", sensor.getLower());
                    }
                }
            }
            Supplier<Stream<Map<String, Object>>> streamSupplier = () -> value.stream();
            //均值,最大值,最小值计算
            sensors.forEach(sensor -> {
                Double avg = calculatedValue(streamSupplier, sensor, "avg");
                if (Constants.SENSOR_CODE_WIND_DIR.equals(sensor.getCode())) {
                    //风向均值计算并修约
                    Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map);
                    if (!ObjectUtils.isEmpty(windDirAvg)) {
                        avg = (Double) windDirAvg.get(Constants.SENSOR_CODE_WIND_DIR);
                    }
                }
                Double min = calculatedValue(streamSupplier, sensor, "min");
                Double max = calculatedValue(streamSupplier, sensor, "max");
                List<Double> doubles = new ArrayList<>(3);
                if (avg != null && min != null && max != null) {
                    doubles.add(avg);
                    doubles.add(min);
                    doubles.add(max);
                    jsonMap.put(sensor.getCode(), doubles);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //后缀
        String insertTimeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        //存入数据库
        historyHourlyMapper.insertHistoryHourlyComplete(insertData, insertTimeUnits);
    }
    /**
     * @Description: 多表查询,传入表名集合,以及条件wrapper,返回数据
     * @Param: [wrapper, tableNames]
     * @return: java.util.List<com.moral.api.entity.HistoryHourly>
     * @Author: 陈凯裕
     * @Date: 2021/9/23
     */
    private List<HistoryHourly> multiTableQuery(QueryWrapper<HistoryHourly> wrapper, List<String> tableNames) {
        List<HistoryHourly> result = new ArrayList<>();
        for (String tableName : tableNames) {
            MybatisPlusConfig.tableName.set(tableName);
            List<HistoryHourly> datas = historyHourlyMapper.selectList(wrapper);
            result.addAll(datas);
        }
        MybatisPlusConfig.tableName.remove();
        return result;
    }
    //最大值,最小值,平均值计算
    private Double calculatedValue(Supplier<Stream<Map<String, Object>>> supplier, Sensor sensor, String type) {
        String sensorCode = sensor.getCode();
        Double upper = sensor.getUpper();
        Double lower = sensor.getLower();
        DoubleStream doubleStream = supplier.get()
                .flatMapToDouble(v -> {
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                    Object sensorValue = dataValue.get(sensorCode);
                    //数据有效性标记位
                    Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                    if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                        return null;
                    }
                    if (ObjectUtils.isEmpty(sensorValue)) {
                        return null;
                    }
                    //风向单独计算
                    if ("avg".equals(type) && Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                        return null;
                    }
                    //剔除数据超过上下限的数据
                    double aDouble = Double.parseDouble(sensorValue.toString());
                    if (!ObjectUtils.isEmpty(upper)) {
                        if (aDouble > upper) {
                            return null;
                        }
                    }
                    if (!ObjectUtils.isEmpty(lower)) {
                        if (aDouble < lower) {
                            return null;
                        }
                    }
                    return DoubleStream.of(aDouble);
                });
        OptionalDouble average = null;
        if ("avg".equals(type)) {
            average = doubleStream.average();
        } else if ("min".equals(type)) {
            average = doubleStream.min();
        } else if ("max".equals(type)) {
            average = doubleStream.max();
        }
        if (average.isPresent()) {
            //银行家算法修约
            return AmendUtils.sciCal(average.getAsDouble(), 4);
        }
        return null;
    }
}