package com.moral.api.service.impl; 
 | 
  
 | 
import com.alibaba.fastjson.JSONObject; 
 | 
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; 
 | 
import com.moral.api.entity.CityAqi; 
 | 
import com.moral.api.entity.CityAqiDaily; 
 | 
import com.moral.api.mapper.CityAqiDailyMapper; 
 | 
import com.moral.api.service.CityAqiDailyService; 
 | 
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; 
 | 
import com.moral.api.service.CityAqiService; 
 | 
import com.moral.constant.Constants; 
 | 
import com.moral.pojo.AQI; 
 | 
import com.moral.util.AQIUtils; 
 | 
import com.moral.util.AmendUtils; 
 | 
import com.moral.util.ComprehensiveIndexUtils; 
 | 
import com.moral.util.DateUtils; 
 | 
  
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.stereotype.Service; 
 | 
import org.springframework.util.ObjectUtils; 
 | 
  
 | 
import java.util.ArrayList; 
 | 
import java.util.Arrays; 
 | 
import java.util.Date; 
 | 
import java.util.HashMap; 
 | 
import java.util.List; 
 | 
import java.util.Map; 
 | 
import java.util.OptionalDouble; 
 | 
import java.util.stream.Collectors; 
 | 
import java.util.stream.DoubleStream; 
 | 
  
 | 
/** 
 | 
 * <p> 
 | 
 * 城市aqi日数据表 服务实现类 
 | 
 * </p> 
 | 
 * 
 | 
 * @author moral 
 | 
 * @since 2021-10-28 
 | 
 */ 
 | 
@Service 
 | 
public class CityAqiDailyServiceImpl extends ServiceImpl<CityAqiDailyMapper, CityAqiDaily> implements CityAqiDailyService { 
 | 
  
 | 
    @Autowired 
 | 
    private CityAqiDailyMapper cityAqiDailyMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private CityAqiService cityAqiService; 
 | 
  
 | 
    @Override 
 | 
    public void insertCityAqiDaily() { 
 | 
        //需要均值计算的因子 
 | 
        List<String> sensors = Arrays.asList("PM2_5", "PM10", "SO2", "NO2", "CO"); 
 | 
        String format = DateUtils.yyyy_MM_dd_EN; 
 | 
        Date now = new Date(); 
 | 
        //开始时间,昨日 
 | 
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format); 
 | 
        //结束时间,今日 
 | 
        Date end = DateUtils.dataToTimeStampTime(now, format); 
 | 
        //获取所有城市aqi小时数据 
 | 
        QueryWrapper<CityAqi> wrapper = new QueryWrapper<>(); 
 | 
        wrapper.select("city_code", "time", "value") 
 | 
                .ge("time", start) 
 | 
                .lt("time", end); 
 | 
        List<Map<String, Object>> dailyData = cityAqiService.listMaps(wrapper); 
 | 
  
 | 
        if (dailyData.size() == 0) { 
 | 
            return; 
 | 
        } 
 | 
        //按city_code分组 
 | 
        Map<String, List<Map<String, Object>>> data = dailyData.stream() 
 | 
                .collect(Collectors.groupingBy(o -> o.get("city_code").toString())); 
 | 
  
 | 
        List<CityAqiDaily> cityAqiDailies = new ArrayList<>(); 
 | 
  
 | 
        data.forEach((cityCode, value) -> { 
 | 
            CityAqiDaily cityAqiDaily = new CityAqiDaily(); 
 | 
            Map<String, Object> jsonMap = new HashMap<>(); 
 | 
            cityAqiDaily.setCityCode(Integer.parseInt(cityCode)); 
 | 
            cityAqiDaily.setTime(start); 
 | 
            //O3日均值单独计算,滑动值算法 
 | 
            if (!ObjectUtils.isEmpty(value)) { 
 | 
                Double o3OfDay = AmendUtils.o3OfDay(value); 
 | 
                if (o3OfDay != null) { 
 | 
                    jsonMap.put("O3", o3OfDay); 
 | 
                } 
 | 
            } 
 | 
  
 | 
            sensors.forEach(sensor -> { 
 | 
                OptionalDouble optionalDouble = value.stream().flatMapToDouble(v -> { 
 | 
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class); 
 | 
                    Object sensorValue = dataValue.get(sensor); 
 | 
                    if (ObjectUtils.isEmpty(sensorValue)) { 
 | 
                        return null; 
 | 
                    } 
 | 
                    double aDouble = Double.parseDouble(sensorValue.toString()); 
 | 
                    return DoubleStream.of(aDouble); 
 | 
                }).average(); 
 | 
                if (optionalDouble.isPresent()) { 
 | 
                    //银行家算法修约 
 | 
                    double sciCal; 
 | 
                    if ("CO".equals(sensor)) { 
 | 
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 1); 
 | 
                    } else { 
 | 
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 0); 
 | 
                    } 
 | 
                    jsonMap.put(sensor, sciCal); 
 | 
                } 
 | 
            }); 
 | 
  
 | 
            //日aqi,首要污染物计算 
 | 
            Map<String, Object> sixParamMap = new HashMap<>(); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_PM25, jsonMap.get("PM2_5")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_PM10, jsonMap.get("PM10")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_SO2, jsonMap.get("SO2")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_NO2, jsonMap.get("NO2")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_CO, jsonMap.get("CO")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_O3, jsonMap.get("O3")); 
 | 
  
 | 
            AQI aqi = AQIUtils.dailyAQI(sixParamMap); 
 | 
            jsonMap.put("AQI", aqi.getAQIValue()); 
 | 
            jsonMap.put("primaryPollutant", aqi.getPrimaryPollutantNames()); 
 | 
  
 | 
            //日综合指数计算 
 | 
            Double compositeIndex = ComprehensiveIndexUtils.dailyData(jsonMap); 
 | 
            jsonMap.put("compositeIndex", compositeIndex); 
 | 
  
 | 
            cityAqiDaily.setValue(JSONObject.toJSONString(jsonMap)); 
 | 
            cityAqiDailies.add(cityAqiDaily); 
 | 
        }); 
 | 
        cityAqiDailyMapper.insertCityAqiDaily(cityAqiDailies); 
 | 
    } 
 | 
} 
 |