jinpengyong
2021-11-03 2e5ad6b6a97104693564ebc9108f58642386a362
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.moral.api.service.impl;
 
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.CityAqi;
import com.moral.api.entity.CityAqiDaily;
import com.moral.api.mapper.CityAqiDailyMapper;
import com.moral.api.service.CityAqiDailyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.CityAqiService;
import com.moral.constant.Constants;
import com.moral.pojo.AQI;
import com.moral.util.AQIUtils;
import com.moral.util.AmendUtils;
import com.moral.util.ComprehensiveIndexUtils;
import com.moral.util.DateUtils;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
 
/**
 * <p>
 * 城市aqi日数据表 服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-10-28
 */
@Service
public class CityAqiDailyServiceImpl extends ServiceImpl<CityAqiDailyMapper, CityAqiDaily> implements CityAqiDailyService {
 
    @Autowired
    private CityAqiDailyMapper cityAqiDailyMapper;
 
    @Autowired
    private CityAqiService cityAqiService;
 
    @Override
    public void insertCityAqiDaily() {
        List<String> sensors = Arrays.asList("PM2_5", "PM10", "SO2", "NO2", "CO", "O3", "AQI");
        String format = DateUtils.yyyy_MM_dd_EN;
        Date now = new Date();
        //开始时间,昨日
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format);
        //结束时间,今日
        Date end = DateUtils.dataToTimeStampTime(now, format);
        //获取所有城市aqi小时数据
        QueryWrapper<CityAqi> wrapper = new QueryWrapper<>();
        wrapper.select("city_code", "time", "value")
                .ge("time", DateUtils.dateToDateString(start))
                .le("time", DateUtils.dateToDateString(end));
        List<Map<String, Object>> dailyData = cityAqiService.listMaps(wrapper);
 
        if (dailyData.size() == 0) {
            return;
        }
        //按city_code分组
        Map<String, List<Map<String, Object>>> data = dailyData.parallelStream().collect(Collectors.groupingBy(o -> o.get("city_code").toString()));
 
        data.forEach((cityCode, value) -> {
            Map<String, Object> jsonMap = new HashMap<>();
            CityAqiDaily cityAqiDaily = new CityAqiDaily();
            cityAqiDaily.setCityCode(Integer.parseInt(cityCode));
            cityAqiDaily.setTime(start);
 
            //中间变量,用于计算除臭氧外其它因子
            List<Map<String, Object>> tempValue = new ArrayList<>(value);
 
            //移除第一天数据(0点的),O3滑动值第一条数据是从1点-8点
            value.removeIf(map -> ((Date) map.get("time")).getTime() == start.getTime());
 
            //臭氧日均值计算,滑动值算法
            Double o3OfDay = AmendUtils.o3OfDay(value);
            if (o3OfDay != null) {
                jsonMap.put("O3", o3OfDay);
            }
 
            //除臭氧外其他因子均值计算
            tempValue.removeIf(o -> ((Date) o.get("time")).getTime() == end.getTime());
 
            sensors.forEach(sensor -> {
                OptionalDouble optionalDouble = tempValue.parallelStream().flatMapToDouble(v -> {
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                    //臭氧单独计算
                    if ("O3".equals(sensor)) {
                        return null;
                    }
 
                    //aqi单独计算
                    if ("AQI".equals(sensor)) {
                        return null;
                    }
 
                    Object sensorValue = dataValue.get(sensor);
                    if (ObjectUtils.isEmpty(sensorValue)) {
                        return null;
                    }
                    double aDouble = Double.parseDouble(sensorValue.toString());
                    return DoubleStream.of(aDouble);
                }).average();
                if (optionalDouble.isPresent()) {
                    //银行家算法修约
                    double sciCal;
                    if ("CO".equals(sensor)) {
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 1);
                    } else {
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 0);
                    }
                    jsonMap.put(sensor, sciCal);
                }
            });
 
            //日aqi,首要污染物计算
            Map<String, Object> sixParamMap = new HashMap<>();
            sixParamMap.put(Constants.SENSOR_CODE_PM25, jsonMap.get("PM2_5"));
            sixParamMap.put(Constants.SENSOR_CODE_PM10, jsonMap.get("PM10"));
            sixParamMap.put(Constants.SENSOR_CODE_SO2, jsonMap.get("SO2"));
            sixParamMap.put(Constants.SENSOR_CODE_NO2, jsonMap.get("NO2"));
            sixParamMap.put(Constants.SENSOR_CODE_CO, jsonMap.get("CO"));
            sixParamMap.put(Constants.SENSOR_CODE_O3, jsonMap.get("O3"));
 
            AQI aqi = AQIUtils.dailyAQI(sixParamMap);
            jsonMap.put("AQI", aqi.getAQIValue());
            jsonMap.put("primaryPollutant", aqi.getPrimaryPollutantNames());
 
            //日综合指数计算
            Double compositeIndex = ComprehensiveIndexUtils.dailyData(jsonMap);
            jsonMap.put("compositeIndex", compositeIndex);
 
            cityAqiDaily.setValue(JSONObject.toJSONString(jsonMap));
            cityAqiDailyMapper.insert(cityAqiDaily);
        });
    }
}