package com.moral.api.service.impl; 
 | 
  
 | 
import com.alibaba.fastjson.JSONObject; 
 | 
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; 
 | 
import com.moral.api.config.mybatis.MybatisPlusConfig; 
 | 
import com.moral.api.entity.HistoryHourly; 
 | 
import com.moral.api.entity.Sensor; 
 | 
import com.moral.api.mapper.HistoryHourlyMapper; 
 | 
import com.moral.api.mapper.HistoryMinutelyMapper; 
 | 
import com.moral.api.service.HistoryHourlyService; 
 | 
import com.moral.api.service.SensorService; 
 | 
import com.moral.constant.Constants; 
 | 
import com.moral.constant.RedisConstants; 
 | 
import com.moral.constant.SeparateTableType; 
 | 
import com.moral.util.AmendUtils; 
 | 
import com.moral.util.DateUtils; 
 | 
  
 | 
import com.moral.util.MybatisPLUSUtils; 
 | 
  
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.data.redis.core.RedisTemplate; 
 | 
import org.springframework.stereotype.Service; 
 | 
import org.springframework.util.ObjectUtils; 
 | 
  
 | 
import java.util.ArrayList; 
 | 
import java.util.Date; 
 | 
import java.util.HashMap; 
 | 
import java.util.List; 
 | 
import java.util.Map; 
 | 
import java.util.OptionalDouble; 
 | 
import java.util.Set; 
 | 
import java.util.concurrent.atomic.AtomicInteger; 
 | 
import java.util.function.Supplier; 
 | 
import java.util.stream.Collectors; 
 | 
import java.util.stream.DoubleStream; 
 | 
import java.util.stream.Stream; 
 | 
  
 | 
/** 
 | 
 * <p> 
 | 
 * 已校准小时表 服务实现类 
 | 
 * </p> 
 | 
 * 
 | 
 * @author moral 
 | 
 * @since 2021-06-28 
 | 
 */ 
 | 
@Service 
 | 
public class HistoryHourlyServiceImpl implements HistoryHourlyService { 
 | 
  
 | 
    @Autowired 
 | 
    private HistoryHourlyMapper historyHourlyMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private RedisTemplate redisTemplate; 
 | 
  
 | 
    @Autowired 
 | 
    private HistoryMinutelyMapper historyMinutelyMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private SensorService sensorService; 
 | 
  
 | 
    @Override 
 | 
    public void createTable(String timeUnits) { 
 | 
        historyHourlyMapper.createTable(timeUnits); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void createTableComplete(String timeUnits) { 
 | 
        historyHourlyMapper.createTableComplete(timeUnits); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void insertHistoryHourly() { 
 | 
        //时间格式化:yyyy-MM-dd HH:mm 
 | 
        String format = DateUtils.yyyy_MM_dd_HH_EN; 
 | 
  
 | 
        //redis获取所有设备mac 
 | 
        Set<String> macs = redisTemplate.opsForHash().keys(RedisConstants.DEVICE); 
 | 
        //当前时间,到小时 
 | 
        Date now = new Date(); 
 | 
        String time = DateUtils.dateToDateString(now, format) + ":00:00"; 
 | 
  
 | 
        //查询数据库当前小时的条数,如果比macs小,说明需要数据补充 
 | 
        Map<String, Object> prop = new HashMap<>(); 
 | 
        prop.put("timeUnits", DateUtils.getDateStringOfMon(0, DateUtils.yyyyMM_EN)); 
 | 
        prop.put("time", time); 
 | 
        Integer count = historyHourlyMapper.selectCountByTime(prop); 
 | 
  
 | 
        if (macs.size() <= count) { 
 | 
            return; 
 | 
        } else { 
 | 
            macs.removeIf(mac -> { 
 | 
                prop.put("mac", mac); 
 | 
                Integer num = historyHourlyMapper.selectCountByTime(prop); 
 | 
                return num != 0; 
 | 
            }); 
 | 
        } 
 | 
  
 | 
        //开始时间 
 | 
        String dateString = DateUtils.getDateStringOfHour(-1, format); 
 | 
        Date start = DateUtils.getDate(dateString, format); 
 | 
  
 | 
        //结束时间 
 | 
        Date end = DateUtils.dataToTimeStampTime(now, format); 
 | 
  
 | 
        //从分钟表获取数据的分钟表后缀 
 | 
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN); 
 | 
  
 | 
        //因子 
 | 
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>(); 
 | 
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE); 
 | 
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper); 
 | 
  
 | 
        //从数据库数据库获取数据的参数 
 | 
        Map<String, Object> params = new HashMap<>(); 
 | 
        params.put("timeUnits", timeUnits); 
 | 
        params.put("start", start); 
 | 
        params.put("end", end); 
 | 
        params.put("macs", macs); 
 | 
        //获取所有设备的1小时内的数据 
 | 
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHistoryMinutelyData(params); 
 | 
        if (hourlyData.size() == 0) { 
 | 
            return; 
 | 
        } 
 | 
  
 | 
        //按mac分组 
 | 
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream() 
 | 
                .collect(Collectors.groupingBy(o -> (String) o.get("mac"))); 
 | 
  
 | 
        //存入数据库的结果集 
 | 
        List<Map<String, Object>> insertData = new ArrayList<>(); 
 | 
  
 | 
        data.forEach((key, value) -> { 
 | 
            Map<String, Object> historyHourly = new HashMap<>(); 
 | 
            historyHourly.put("mac", key); 
 | 
            historyHourly.put("time", end); 
 | 
  
 | 
            Map<String, Object> jsonMap = new HashMap<>(); 
 | 
            Map<String, Object> map = new HashMap<>(); 
 | 
            map.put("data", value); 
 | 
            map.put("type", "hour"); 
 | 
  
 | 
            for (Sensor sensor : sensors) { 
 | 
                String sensorCode = sensor.getCode(); 
 | 
  
 | 
                //风向上下限 
 | 
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) { 
 | 
                    if (sensor.getUpper() != null) { 
 | 
                        map.put("windDirUpper", sensor.getUpper()); 
 | 
                    } 
 | 
                    if (sensor.getLower() != null) { 
 | 
                        map.put("windDirLower", sensor.getLower()); 
 | 
                    } 
 | 
                } 
 | 
  
 | 
                //风速上下限 
 | 
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) { 
 | 
                    if (sensor.getUpper() != null) { 
 | 
                        map.put("windSpeedUpper", sensor.getUpper()); 
 | 
                    } 
 | 
                    if (sensor.getLower() != null) { 
 | 
                        map.put("windSpeedLower", sensor.getLower()); 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
  
 | 
            //风向均值计算并修约 
 | 
            Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map); 
 | 
            if (!ObjectUtils.isEmpty(windDirAvg)) { 
 | 
                jsonMap.putAll(windDirAvg); 
 | 
            } 
 | 
  
 | 
            //除风向外其他因子均值计算 
 | 
            sensors.forEach(sensor -> { 
 | 
                String sensorCode = sensor.getCode(); 
 | 
                Double upper = sensor.getUpper(); 
 | 
                Double lower = sensor.getLower(); 
 | 
                AtomicInteger size = new AtomicInteger(); 
 | 
                DoubleStream optionalDouble = value.parallelStream() 
 | 
                        .flatMapToDouble(v -> { 
 | 
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class); 
 | 
                            Object sensorValue = dataValue.get(sensorCode); 
 | 
                            //数据有效性标记位 
 | 
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY); 
 | 
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) { 
 | 
                                return null; 
 | 
                            } 
 | 
  
 | 
                            if (ObjectUtils.isEmpty(sensorValue)) { 
 | 
                                return null; 
 | 
                            } 
 | 
  
 | 
                            //风向单独计算 
 | 
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) { 
 | 
                                return null; 
 | 
                            } 
 | 
  
 | 
                            //剔除数据超过上下限的数据 
 | 
                            double aDouble = Double.parseDouble(sensorValue.toString()); 
 | 
                            if (!ObjectUtils.isEmpty(upper)) { 
 | 
                                if (aDouble < upper) { 
 | 
                                    return null; 
 | 
                                } 
 | 
                            } 
 | 
                            if (!ObjectUtils.isEmpty(lower)) { 
 | 
                                if (aDouble > lower) { 
 | 
                                    return null; 
 | 
                                } 
 | 
                            } 
 | 
                            size.getAndIncrement(); 
 | 
                            return DoubleStream.of(aDouble); 
 | 
                        }); 
 | 
                OptionalDouble average = optionalDouble.average(); 
 | 
                if (average.isPresent()) { 
 | 
                    //银行家算法修约 
 | 
                    double sciCal = AmendUtils.sciCal(average.getAsDouble(), 4); 
 | 
                    jsonMap.put(sensorCode, sciCal); 
 | 
                    //标志位 
 | 
                    if (size.get() >= 45) { 
 | 
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE); 
 | 
                    } else { 
 | 
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE); 
 | 
                    } 
 | 
                } 
 | 
            }); 
 | 
            historyHourly.put("version", value.get(0).get("version")); 
 | 
            historyHourly.put("value", JSONObject.toJSONString(jsonMap)); 
 | 
            insertData.add(historyHourly); 
 | 
        }); 
 | 
  
 | 
        //存入小时表 
 | 
        historyHourlyMapper.insertHistoryHourly(insertData); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public List<Map<String, Object>> selectDailyData(Map<String, Object> params) { 
 | 
        return historyHourlyMapper.selectDailyData(params); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * @Description: 查询一段时间内某一mac的数据 
 | 
     * @Param: [mac, startDate, endDate] 
 | 
     * @return: java.util.List<com.moral.api.entity.HistoryHourly> 
 | 
     * @Author: 陈凯裕 
 | 
     * @Date: 2021/9/23 
 | 
     */ 
 | 
    @Override 
 | 
    public List<HistoryHourly> getValueByMacAndTime(String mac, Date startDate, Date endDate) { 
 | 
        QueryWrapper<HistoryHourly> wrapper = new QueryWrapper<>(); 
 | 
        wrapper.eq("mac", mac); 
 | 
        wrapper.between("time", startDate, endDate); 
 | 
        List<String> tableNames = MybatisPLUSUtils.getTableNamesByWrapper(startDate, endDate, SeparateTableType.MONTH); 
 | 
        List<HistoryHourly> datas = multiTableQuery(wrapper, tableNames); 
 | 
        return datas; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void insertHistoryHourlyComplete() { 
 | 
        //时间格式化:yyyy-MM-dd HH:mm 
 | 
        String format = DateUtils.yyyy_MM_dd_HH_EN; 
 | 
        Date now = new Date(); 
 | 
  
 | 
        //从数据库获取数据参数 
 | 
        Map<String, Object> params = new HashMap<>(); 
 | 
        //开始时间 
 | 
        Date start = DateUtils.dataToTimeStampTime(DateUtils.addHours(now, -1), format); 
 | 
        //结束时间 
 | 
        Date end = DateUtils.dataToTimeStampTime(now, format); 
 | 
        params.put("start", start); 
 | 
        params.put("end", end); 
 | 
        //获取数据的分钟表后缀 
 | 
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN); 
 | 
        params.put("timeUnits", timeUnits); 
 | 
  
 | 
        //因子 
 | 
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>(); 
 | 
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE); 
 | 
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper); 
 | 
  
 | 
        //获取所有设备的一小时内的数据 
 | 
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHourlyData(params); 
 | 
        if (ObjectUtils.isEmpty(hourlyData)) { 
 | 
            return; 
 | 
        } 
 | 
  
 | 
        //按mac分组 
 | 
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream() 
 | 
                .collect(Collectors.groupingBy(o -> (String) o.get("mac"))); 
 | 
  
 | 
        //存入数据库的结果集 
 | 
        List<Map<String, Object>> insertData = new ArrayList<>(); 
 | 
  
 | 
        data.forEach((key, value) -> { 
 | 
            Map<String, Object> dataMap = new HashMap<>(); 
 | 
            dataMap.put("mac", key); 
 | 
            dataMap.put("time", start); 
 | 
            Map<String, Object> jsonMap = new HashMap<>(); 
 | 
  
 | 
  
 | 
            Map<String, Object> map = new HashMap<>(); 
 | 
            map.put("data", value); 
 | 
            map.put("type", "hourlyComplete"); 
 | 
            for (Sensor sensor : sensors) { 
 | 
                String sensorCode = sensor.getCode(); 
 | 
  
 | 
                //风向上下限 
 | 
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) { 
 | 
                    if (sensor.getUpper() != null) { 
 | 
                        map.put("windDirUpper", sensor.getUpper()); 
 | 
                    } 
 | 
                    if (sensor.getLower() != null) { 
 | 
                        map.put("windDirLower", sensor.getLower()); 
 | 
                    } 
 | 
                } 
 | 
  
 | 
                //风速上下限 
 | 
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) { 
 | 
                    if (sensor.getUpper() != null) { 
 | 
                        map.put("windSpeedUpper", sensor.getUpper()); 
 | 
                    } 
 | 
                    if (sensor.getLower() != null) { 
 | 
                        map.put("windSpeedLower", sensor.getLower()); 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
            Supplier<Stream<Map<String, Object>>> streamSupplier = () -> value.stream(); 
 | 
            //均值,最大值,最小值计算 
 | 
            sensors.forEach(sensor -> { 
 | 
                Double avg = calculatedValue(streamSupplier, sensor, "avg"); 
 | 
                if (Constants.SENSOR_CODE_WIND_DIR.equals(sensor.getCode())) { 
 | 
                    //风向均值计算并修约 
 | 
                    Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map); 
 | 
                    if (!ObjectUtils.isEmpty(windDirAvg)) { 
 | 
                        avg = (Double) windDirAvg.get(Constants.SENSOR_CODE_WIND_DIR); 
 | 
                    } 
 | 
                } 
 | 
                Double min = calculatedValue(streamSupplier, sensor, "min"); 
 | 
                Double max = calculatedValue(streamSupplier, sensor, "max"); 
 | 
                List<Double> doubles = new ArrayList<>(3); 
 | 
                if (avg != null && min != null && max != null) { 
 | 
                    doubles.add(avg); 
 | 
                    doubles.add(min); 
 | 
                    doubles.add(max); 
 | 
                    jsonMap.put(sensor.getCode(), doubles); 
 | 
                } 
 | 
            }); 
 | 
            dataMap.put("value", JSONObject.toJSONString(jsonMap)); 
 | 
            insertData.add(dataMap); 
 | 
        }); 
 | 
  
 | 
        //后缀 
 | 
        String insertTimeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN); 
 | 
        //存入数据库 
 | 
        historyHourlyMapper.insertHistoryHourlyComplete(insertData, insertTimeUnits); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * @Description: 多表查询,传入表名集合,以及条件wrapper,返回数据 
 | 
     * @Param: [wrapper, tableNames] 
 | 
     * @return: java.util.List<com.moral.api.entity.HistoryHourly> 
 | 
     * @Author: 陈凯裕 
 | 
     * @Date: 2021/9/23 
 | 
     */ 
 | 
    private List<HistoryHourly> multiTableQuery(QueryWrapper<HistoryHourly> wrapper, List<String> tableNames) { 
 | 
        List<HistoryHourly> result = new ArrayList<>(); 
 | 
        for (String tableName : tableNames) { 
 | 
            MybatisPlusConfig.tableName.set(tableName); 
 | 
            List<HistoryHourly> datas = historyHourlyMapper.selectList(wrapper); 
 | 
            result.addAll(datas); 
 | 
        } 
 | 
        MybatisPlusConfig.tableName.remove(); 
 | 
        return result; 
 | 
    } 
 | 
  
 | 
    //最大值,最小值,平均值计算 
 | 
    private Double calculatedValue(Supplier<Stream<Map<String, Object>>> supplier, Sensor sensor, String type) { 
 | 
        String sensorCode = sensor.getCode(); 
 | 
        Double upper = sensor.getUpper(); 
 | 
        Double lower = sensor.getLower(); 
 | 
        DoubleStream doubleStream = supplier.get() 
 | 
                .flatMapToDouble(v -> { 
 | 
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class); 
 | 
                    Object sensorValue = dataValue.get(sensorCode); 
 | 
                    //数据有效性标记位 
 | 
                    Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY); 
 | 
                    if (!Constants.MARKER_BIT_TRUE.equals(flag)) { 
 | 
                        return null; 
 | 
                    } 
 | 
  
 | 
                    if (ObjectUtils.isEmpty(sensorValue)) { 
 | 
                        return null; 
 | 
                    } 
 | 
                    //风向单独计算 
 | 
                    if ("avg".equals(type) && Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) { 
 | 
                        return null; 
 | 
                    } 
 | 
                    //剔除数据超过上下限的数据 
 | 
                    double aDouble = Double.parseDouble(sensorValue.toString()); 
 | 
                    if (!ObjectUtils.isEmpty(upper)) { 
 | 
                        if (aDouble > upper) { 
 | 
                            return null; 
 | 
                        } 
 | 
                    } 
 | 
                    if (!ObjectUtils.isEmpty(lower)) { 
 | 
                        if (aDouble < lower) { 
 | 
                            return null; 
 | 
                        } 
 | 
                    } 
 | 
                    return DoubleStream.of(aDouble); 
 | 
                }); 
 | 
        OptionalDouble average = null; 
 | 
        if ("avg".equals(type)) { 
 | 
            average = doubleStream.average(); 
 | 
        } else if ("min".equals(type)) { 
 | 
            average = doubleStream.min(); 
 | 
        } else if ("max".equals(type)) { 
 | 
            average = doubleStream.max(); 
 | 
        } 
 | 
  
 | 
        if (average.isPresent()) { 
 | 
            //银行家算法修约 
 | 
            return AmendUtils.sciCal(average.getAsDouble(), 4); 
 | 
        } 
 | 
        return null; 
 | 
    } 
 | 
} 
 |