jinpengyong
2021-08-05 faf8649ff22b8af12c758355725389204838e02a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.moral.api.service.impl;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
 
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryFiveMinutelyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryFiveMinutelyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
 
@Service
public class HistoryFiveMinutelyServiceImpl implements HistoryFiveMinutelyService {
 
    @Autowired
    private HistoryFiveMinutelyMapper historyFiveMinutelyMapper;
 
    @Autowired
    private HistoryMinutelyMapper historyMinutelyMapper;
 
    @Autowired
    private SensorService sensorService;
 
    @Autowired
    private RedisTemplate redisTemplate;
 
    @Override
    public void createTable(String timeUnits) {
        historyFiveMinutelyMapper.createTable(timeUnits);
    }
 
    @Override
    @Transactional
    public void insertHistoryFiveMinutely() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_mm_EN;
        Date now = new Date();
        Map<String, Object> params = new HashMap<>();
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfMin(now, -5), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        params.put("start", start);
        params.put("end", end);
        //获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        params.put("timeUnits", timeUnits);
 
        //因子
        QueryWrapper<Sensor> queryWrapper = new QueryWrapper<>();
        queryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(queryWrapper);
 
        //获取所有设备的5分钟内的数据
        List<Map<String, Object>> fiveMinutelyData = historyMinutelyMapper.getHistoryMinutelyData(params);
        if (fiveMinutelyData.size() == 0) {
            return;
        }
 
        //按mac分组
        Map<String, List<Map<String, Object>>> data = fiveMinutelyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
 
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
 
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", end);
            Map<String, Object> jsonMap = new HashMap<>();
 
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            }
 
            //除风向外其他因子均值计算
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向另外计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            //存入redis
            redisTemplate.opsForHash().put(RedisConstants.DATA_FIVE_MINUTES, key, jsonMap);
            insertData.add(dataMap);
        });
 
        //5分钟表后缀
        String insertTimeUnits = DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN);
        //存入数据库
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(insertData, insertTimeUnits);
    }
}